Pinging a second device on the LAN - how to?



  • I have a pair of WiPys that I have been able to install on a common network with static IP addresses. This is a private network, I have no need to access the Internet. I would like to use ping to verify that I can "see" one WiPy from the other (and vice versa). Ultimately these devices will be sharing information between themselves (and other WiPys on the network). Although I have read that uping is the ping equivalent in Micropython I don't seem to be able to load it onto my WiPys. When I attempt to import uping the system complains that no such module exists. I guess I need to add this to /flash but where do I get the necessary file(s)?

    Some help would be much appreciated!



  • @nigelwattrus The original uping.py code is here: https://gist.github.com/shawwwn/91cc8979e33e82af6d99ec34c38195fb
    I had to adapt it for the pycom microypthon variant. Code below:

    # ┬ÁPing (MicroPing) for MicroPython
    # copyright (c) 2018 Shawwwn <shawwwn1@gmail.com>
    # License: MIT
    
    # Internet Checksum Algorithm
    # Author: Olav Morken
    # https://github.com/olavmrk/python-ping/blob/master/ping.py
    # @data: bytes
    def checksum(data):
        if len(data) & 0x1: # Odd number of bytes
            data += b'\0'
        cs = 0
        for pos in range(0, len(data), 2):
            b1 = data[pos]
            b2 = data[pos + 1]
            cs += (b1 << 8) + b2
        while cs >= 0x10000:
            cs = (cs & 0xffff) + (cs >> 16)
        cs = ~cs & 0xffff
        return cs
    
    def ping(host, count=4, timeout=5000, interval=500, quiet=False, size=64):
        import utime
        import uselect
        import uctypes
        import usocket
        import ustruct
        import uos
    
        # prepare packet
        assert size >= 16, "pkt size too small"
        pkt = b'Q'*size
        pkt_desc = {
            "type": uctypes.UINT8 | 0,
            "code": uctypes.UINT8 | 1,
            "checksum": uctypes.UINT16 | 2,
            "id": (uctypes.ARRAY | 4, 2 | uctypes.UINT8),
            "seq": uctypes.INT16 | 6,
            "timestamp": uctypes.UINT64 | 8,
        } # packet header descriptor
        h = uctypes.struct(uctypes.addressof(pkt), pkt_desc, uctypes.BIG_ENDIAN)
        h.type = 8 # ICMP_ECHO_REQUEST
        h.code = 0
        h.checksum = 0
        h.id[0:2] = uos.urandom(2)
        h.seq = 1
    
        # init socket
        sock = usocket.socket(usocket.AF_INET, usocket.SOCK_RAW, 1)
        sock.setblocking(0)
        sock.settimeout(timeout/1000)
        addr = usocket.getaddrinfo(host, 1)[0][-1][0] # ip address
        sock.connect((addr, 1))
        not quiet and print("PING %s (%s): %u data bytes" % (host, addr, len(pkt)))
    
        seqs = list(range(1, count+1)) # [1,2,...,count]
        c = 1
        t = 0
        n_trans = 0
        n_recv = 0
        finish = False
        while t < timeout:
            if t==interval and c<=count:
                # send packet
                h.checksum = 0
                h.seq = c
                h.timestamp = utime.ticks_us()
                h.checksum = checksum(pkt)
                if sock.send(pkt) == size:
                    n_trans += 1
                    t = 0 # reset timeout
                else:
                    seqs.remove(c)
                c += 1
    
            # recv packet
            while 1:
                socks, _, _ = uselect.select([sock], [], [], 0)
                if socks:
                    resp = socks[0].recv(4096)
                    resp_mv = memoryview(resp)
                    h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), pkt_desc, uctypes.BIG_ENDIAN)
                    # TODO: validate checksum (optional)
                    seq = h2.seq
                    if h2.type==0 and h2.id==h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY
                        t_elasped = (utime.ticks_us()-h2.timestamp) / 1000
                        ttl = ustruct.unpack('!B', resp_mv[8:9])[0] # time-to-live
                        n_recv += 1
                        not quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp), addr, seq, ttl, t_elasped))
                        seqs.remove(seq)
                        if len(seqs) == 0:
                            finish = True
                            break
                else:
                    break
    
            if finish:
                break
    
            utime.sleep_ms(1)
            t += 1
    
        # close
        sock.close()
        ret = (n_trans, n_recv)
        not quiet and print("%u packets transmitted, %u packets received" % (n_trans, n_recv))
        return (n_trans, n_recv)
    

    Call it for instance with:

    from uping import ping
    ping("www.google.com")


Log in to reply
 

Pycom on Twitter