Method To Take Control of Cell Modem



  • @peterp Sorry to take so long to respond. I wanted to finish updating my posts showing the new code that seems to be working well. Soon I should be able to post the entire refined code. Maybe it will help others with the same problems we have had.

    The reason I went down this development path was as described originally:

    "I currently have a program working in production using Pycom MicroPython V1.20.0.rc13 on many units. When I try to run the same program on a newly purchased GPy running Pycom MicroPython 1.20.2.r3, the program crashes (20 sec watchdog timeout) when I do an AT command ('AT+CEREG?') after disconnecting from a successful transmission. How do I troubleshoot such a crash?"

    Our original reason for picking Pycom and GPy was the thinking that all the modem complexity would be handled by the high level LTE module. It's only a guess but our thinking that since the crashes always happened during times communicating with the modem, the LTE module was the likely source of the problem.

    The new method seems to be superior from the standpoint of interference with multiple tasks. Since we always call non-blocking uart commands, the code should never get hung in the C code for any period. A simple example of the LTE module code freezing multi-tasking is to do a simple modem reset. Reading and writing to the file storage also has a minor affect. The real reason for the development was to try to eliminate program crashes. Only longer term testing will show whether we have achieved that goal.

    One concern I do have is whether I am properly initializing the cell modem. Any input from anybody out there that is more familiar with properly setting up the LTE modem would be appreciated. It seems to work fine at this point; however, I'm not sure whether it's working because the GPy I'm testing with has run the LTE initialization at some point in the past and that has possibly setup the non-volatile memory of the modem to the proper settings that are needed.

    One drawback of using this method of transmission is it is not compatible with WIFI connections. That is not an issue for us at this time. Maybe later we can research how to do some form of communication that is compatible with both WIFI and direct AT commands. I assume that involves using the socket commands of the modem, but this is an area I'm not sure about at this time.



  • Here's some of the higher level code that uses the modem module.

    import globals as g
    import modem
    from poll import poll
    import pycom
    import sys
    import uos
    import util
    from util import log
    from utime import sleep_ms
    import wifi
    
    #####################################################################
    #                         Communication Task
    #####################################################################
    def com_task():
        try:
            sleep_ms(500)
            log("com_task: started")
            
            wifi.deinit()
            modem.reset_modem()
            modem.init_modem()
    
            g.poll_host = True      # triggers poll at startup
    
        except Exception as ex:
            sys.print_exception(ex)
    
        while True:
            try:
                if modem_ready():
                    if ready_to_poll():
                        modem.set_apn()
                        transmit()
    
                sleep_ms(2000)
    
            except Exception as ex:
                sys.print_exception(ex)
    
    
    #####################################################################
    #                           Modem Ready
    #####################################################################
    def modem_ready():
        try:
            log("** check for modem ready")
    
            # get modem signal strength
            g.oStatus.cell_signal = modem.signal_strength()
            
            # check modem registration
            r = modem.check_registration()
            if (r == 1) or (r == 5):
                # registered
                g.com_secs = 0
    
                # handle SMS messages
                r = modem.sms_read()
                if len(r) > 1:
                    modem.sms_process(r)
                    modem.sms_delete()
    
                return True
            
            reset = False
    
            if g.com_secs > (10*60):
                # unable to attach in 10 mins, try resetting modem
                reset = True
                
            if modem.restarted():
                # modem has crashed / restarted, try resetting modem
                reset = True
    
            if reset:    
                modem.reset_modem()
                modem.init_modem()
                g.com_secs = 0
    
            return False
    
        except Exception as ex:
            sys.print_exception(ex)
        
        return False
    
    

  • Global Moderator

    Hey @tlanier could you elaborate a bit on this point? How to reproduce this freezing?

    Task switching no longer freezes during modem communication.



  • The new at(cmd, filter="") function is now improved such that unsolicited responses should have no affect. It also allows you to specify the response you are interested in getting back by setting the filter parameter. This simplifies the coding of the rest of the modem module.

    #####################################################################
    #                     Send AT Command To Modem
    #####################################################################
    def at(cmd, filterx=""):
        try:
            if uart.any() > 0:
                # get unsolicited modem response 
                a = at_response()
    
                for s in a:
                    at_process(s)
    
            log("  modem command: {}".format(cmd))
    
            # send command to modem
            n = uart.write(cmd + '\r')
            if n != len(cmd) + 1:
                log("error sending command to modem: n = {}".format(n))
                return []
    
            a = at_response(filterx)
            return a
    
        except Exception as ex:
            sys.print_exception(ex)
            a = []
    
        return a    # return modem response lines in array
    
    
    #####################################################################
    #                     Filter Modem Response
    #####################################################################
    def at_filter(a, filterx):
        # a is passed as reference value
        if filterx == "":
            return
    
        # filter response
        n = len(filterx)
        i = len(a)
        while i > 0:
            i = i - 1
            if a[i][0:n] != filterx:
                # remove lines that do not start with filter value
                at_process(a[i])
                a.pop(i)
    
    
    #####################################################################
    #                     Process Unsolicited Response
    #####################################################################
    def at_process(s):
        if s == 'OK':
            pass
        else:
            log("unsolicited modem response: {}".format(s))
        return
    
    
    #####################################################################
    #                       Get AT Command Response
    #####################################################################
    def at_response(filterx=""):
        try:
            t0 = ticks_ms()
            
            while True:
                if uart.any() > 0:
                    # insure entire response has been received
                    sleep_ms(50)
                    break
    
                t1 = ticks_ms()
                tx = ticks_diff(t1, t0)
    
                if tx > 10000:
                    log("timeout error - no modem response after 10 secs")
                    return []
    
                sleep_ms(100)
    
            b = uart.read()
            s = str(b, 'utf-8')
            a = s.split('\r\n')
            a = list(filter(None, a))
    
            log("  modem response: {}".format(a))
            at_filter(a, filterx)
    
        except Exception as ex:
            sys.print_exception(ex)
            a = []
    
        return a    # return modem response lines in array
    


  • @jcaron Thanks for the suggestion. I have been thinking about that. I would need to make the module more general purpose and less specific to my application. My first priority is to make our application more robust and error free.

    I've thought about creating a general purpose do nothing application that implements multi-threading, a timer interrupt service routine, and the modem module to show how everything fits together. Someone could take the template application and create a full application in no time.

    Including our method of program updates would also be useful I think.



  • @tlanier you should probably create a GitHub or GitLab repo for your code...



  • A new method has been added to the modem module. It is desirable to use the ICCID to determine which APN to set. One issue to deal with is apparently the ICCID is not valid until registration with the tower is done. The set_apn() method should be called right before doing the attach().

    To reduce the wear of writing to the non-volatile memory, I read the current value of the APN and skip the write if not necessary. I wonder if the modem does this type of checking internally? Ideally we could just set the APN once at program start, but we can't read the ICCID at that point for some reason. Even though the manual says when CFUN=4 or CFUN=1 access to the SIM is available, reading the ICCID does not work until after registration.

    To get some context as to how the modem module can be used, I've included example code for a modem_ready() method that uses the modem module.

    There is still work to be done with the basic at() and get_modem_response() methods. Unsolicited responses are causing problems in some cases. That will be my next area to concentrate on. There seems to be two paths to take when talking to a modem. Path #1 sends a command to the modem and then waits a period of time and assumes the modem has had time to respond completely. Path #2 sends a command to the modem and then watches the data being returned looking for (1) OK, or (2) ERROR anywhere in the response. The responses then must be filtered to only return the particular response line that is desired.

    #####################################################################
    #                           Set APN
    #####################################################################
    def set_apn():
        try:
            log("set APN")
    
            # ICCID is only valid after registration 
            r = iccid()[0:5]
            if r == "89014":
                apn = g.oIni.apn_89014   # Kore AT&T SIM
            elif r == "89012":
                apn = g.oIni.apn_89012   # Kore T-Mobile SIM
            elif r == "89445":
                apn = g.oIni.apn_89445   # Hologram SIM
            else:
                apn = g.oIni.apn_89014   # Kore AT&T SIM
    
            g.oIni.apn = apn
            log("APN: {0}".format(apn))
    
            # try to reduce wear of non-volatile memory
            a = at('AT+CGDCONT?')
            # response --> ['+CGDCONT: 1,"IP","10569.mcs",,,,0,0,0,0,0,0', ...]
            s = a[0]
            a = s.split(",")
            if (a[0] == '+CGDCONT: 1') and (a[1] == '"IP"') and (a[2] == '"'+apn+'"'):
                return
    
            # apn needs setting
            a = at('AT+CGDCONT=1,"IP","{0}"'.format(apn))
            if a[0] != 'OK':
                log("error setting APN")
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                           Modem Ready
    #####################################################################
    def modem_ready():
        try:
            log("** check for modem ready")
    
            # get modem signal strength
            g.oStatus.cell_signal = modem.signal_strength()
            
            # handle SMS messages
            r = modem.sms_read()
            if len(r) > 1:
                modem.sms_process(r)
                modem.sms_delete()
    
            # check modem registration
            r = modem.check_registration()
            if (r == 1) or (r == 5):
                # wait until registered to set APN
                modem.set_apn()
                
                modem.attach()
                if modem.isattached():
                    g.com_secs = 0
                    return True
            
            reset = False
    
            if g.com_secs > (10*60):
                # unable to attach in 10 mins, try resetting modem
                reset = True
                
            if modem.restarted():
                # modem has crashed / restarted, try resetting modem
                reset = True
    
            if reset:    
                modem.reset_modem()
                modem.init_modem()
                g.com_secs = 0
    
            return False
    
        except Exception as ex:
            sys.print_exception(ex)
        
        return False
    
    
    


  • @jcaron Agreed. Also, just wanted to confirm about the MTU. Would the MTU for NB-IoT be 680 bits (85 bytes) DL and 1000 bits (125 bytes) UL. Referred Section 8.2 NB-IoT Deployment Guide



  • @bitvijays In my case the response to a HTTP Post is a large file (208K file used to update the program). The program hangs in a loop retrieving whatever is in the UART receive buffer until the correct number of bytes is received (or timeout occurs).



  • @bitvijays HTTP is based on TCP, so any HTTP payload would indeed be sent as multiple packets (the size and number of which depend on the path MTU). TCP normally makes use of windowing, so it would not even necessarily send all at once, it would wait for ACKs along the way and send more as the first ACKs are received.

    It's interesting how this is all built into the modem itself...



  • @tlanier said in Method To Take Control of Cell Modem:

    SQNHTTPSND

    Hey @tlanier Hope you are doing well. Regarding "Tested with file size > 128,000 bytes" Just curious, how it is able to send such large payload at one go? Is it using fragmentation ? and divide the payload into multiple packets? and then send?



  • @iot2020, thanks for the suggestions. It's interesting that when I search the Monarch-LR5110 manual neither of the AT!= commands are found.



  • @tlanier said in Method To Take Control of Cell Modem:

    I need to understand what AT commands are needed to properly initialize the modem and set the APN based on the SIM card. This is an area I'm currently fuzzy about. Do bands need to be set? This is stuff that is handled during the creation of the LTE module which I no longer want to use. The answers may come from studying the C code of the LTE module. Currently the code just sets CFUN=1 and everything magically works.

    I use this for the first time initializing:

    send_cmd('AT+CGDCONT=1,"IP","cdp.iot.t-mobile.nl"') # set context 1 to ip and apn to cdp.iot.t-mobile.nl
    send_cmd("AT+SQNAUTOINTERNET=1") # maybe not needed for you use case?
    send_cmd('AT!="clearscanconfig"') # clear band scan config
    send_cmd('AT!="RRC:addScanBand band=8"') # scan for band 8
    send_cmd('AT+COPS=1,2,"20416"') # manually register to plmn 20416 (tmobile nl)    
    #send_cmd('AT+COPS=0') # automatically choose plmn
    


  • Improved http_post() function

    This version can receive large files. Tested with file size > 128,000 bytes.

    #####################################################################
    #                            HTTP Post          
    #####################################################################
    def http_post(uri, header, tx_data):
        # --> AT+SQNHTTPSND=<prof_id>,<command>,<resource>,<data_len>[,<post_param>[,<extra_header_line>]]
        # xx> POST <uri> HTTP/1.1
        # xx> <header>
        # xx> Content-Length: <tx_size>
        # xx> Content-Type: application/octet-stream
        # xx> <blank line>
        # <-- '> '
        # --> <tx_data bytes>
        # <-- OK
        # <-- +SQNHTTPRING: <prof_id,<http_status_code>,<content_type>,<rx_size>
        # --> AT+SQNHTTPRCV=1,0\r
        # <-- /r/n<<<
        # <-- <rx_data bytes>
        # <-- OK
        try:
            log("HTTP post")
    
            while True:
                # note: init_modem() sets configuration "AT+SQNHTTPCFG=..." at startup
                tx_size = len(tx_data)
    
                # log("writing message: {0}".format(tx_data))
                log("tx_data length: {0}".format(tx_size))
    
                # AT+SQNHTTPSND=<prof_id>,<command>,<resource>,<data_len>[,<post_param>[,<extra_header_line>]]
                # prof_id=1, command=0 (POST), <uri>, <tx_size>, post_parm="2" (application/octet-stream), header="Host: <host_name>" 
    
                a = at('AT+SQNHTTPSND=1,0,"{0}",{1},"2","{2}"'.format(uri, tx_size, header))
                if a[0] != '> ':
                    break   # error
    
                uart.write(tx_data)
                a = get_modem_response()
                if a[0] != 'OK':
                    break   # error
    
                # modem responds with "+SQNHTTPRING: <prof_id,<http_status_code>,<content_type>,<data_size>"
                a = get_modem_response()
                s = a[0]
                if s[0:13] != "+SQNHTTPRING:":
                    break   # error
    
                a = s.split(",")
                http_status_code = int(a[1])
                if http_status_code != 200:
                    break   # error
    
                rx_size = int(a[3])
                log("rx_data length: {0}".format(rx_size))
    
                # get response
                cmd = 'AT+SQNHTTPRCV=1,0\r'
                log("  modem command: {0}".format(cmd))
                uart.write(cmd)
    
                r = wait_rx(5, 10000)   # wait for b'\r\n<<<'
                if r == False:
                    break   # error
    
                b = uart.read(5)
                # log(" modem response: {0}".format(b))
                if b[2:5] != b'<<<':
                    break   # error
    
                t0 = ticks_ms()
            
                b = b''
    
                n = rx_size     # no. bytes remaining to be read
                while True:
                    if uart.any() > 0:
                        log("  attempt to read {0} bytes".format(n))
                        bx = uart.read(n)
                        log(" bytes read: {0}".format(len(bx)))
                        # log(" modem response: {0}".format(bx))
                        b = b + bx
                        if len(b) == rx_size:
                            break   # entire message received
                        n = rx_size - len(b)
                        t0 = ticks_ms()     # reset timeout
    
                    t1 = ticks_ms()
                    tx = ticks_diff(t1, t0)
                    if tx > 10000:
                        log("timeout error receiving response")
                        return b''
    
                    sleep_ms(400)
    
                log(" total bytes read: {0}".format(len(b)))
                # log(" post response: {0}".format(b))
    
                a = get_modem_response()
                if a[0] != 'OK':
                    break   # error
    
                # log(b)
                return b
    
        except Exception as ex:
            sys.print_exception(ex)
    
        # error
        return b''
    
    
    
    


  • Work Left To Be Done

    • I need to understand what AT commands are needed to properly initialize the modem and set the APN based on the SIM card. This is an area I'm currently fuzzy about. Do bands need to be set? This is stuff that is handled during the creation of the LTE module which I no longer want to use. The answers may come from studying the C code of the LTE module. Currently the code just sets CFUN=1 and everything magically works.


  • It Works!!!

    I have successfully eliminated the need to use the LTE module. All cell modem communication is done by talking directly to the Sequans modem using AT commands. Task switching no longer freezes during modem communication. Hopefully this will also be more reliable.

    Thanks go to "iot2020" and "jcaron" who showed how to initialize the uart to talk to the modem.

    Also, thanks go to the following github site for one of the only examples I found of using the HTTP AT commands for the Sequans modem.
    https://github.com/Avnet/sequans_python/blob/master/http_demo.py

    Here's the modem technical reference manual I used.
    https://docs.pycom.io/gitbook/assets/Monarch-LR5110-ATCmdRefMan-rev6_noConfidential.pdf

    Any comments or suggestions are welcome. All modem communication code is now in Python for everyone to see and improve. Below is a list of the currently implemented functions. Currently they have been written to work in my specific application. They could be rewritten to be more general purpose. For instance in my application I don't even need the HTTP Get function, but an example test is included. All my communication is done using the HTTP Post function.

    All printing is done by calling a utility log function that I have in my application. Here's a simple implementation.

    def log(m):
        print(m)
    

    List of Functions in Modem Module:

    Send AT Command To Modem
    def at(cmd)

    Attach Cellular Modem To Packet Domain Service
    def attach()

    Check Registration
    def check_registration()

    Detach Cellular Modem From Packet Domain Service
    def detach()

    Disable LTE Modem
    def disable_modem()

    Enable Modem
    def enable_modem()

    Get CFUN - Modem Level of Functionality
    def get_cfun()

    Get Modem Response
    def get_modem_response()

    Get SMS Mode
    def get_sms_mode()

    HTTP Post
    def http_post(uri, header, m)

    Get SIM Card ICCID Number
    def iccid()

    Get Modem IMEI Number
    def imei()

    Initialize Modem
    def init_modem()

    Initialize Uart Connected To LTE Modem
    def init_uart()

    Is Cellular Modem Attached To Packet Domain Service
    def isattached()

    Reset Modem
    def reset_modem()

    Check Modem For Restart/Crash
    def restarted()

    Set SMS Mode To Text
    def set_sms_mode()

    Get Signal Strength
    def signal_strength()

    Delete Read, Sent, and Unsent SMS Messages
    def sms_delete_all()

    Process SMS Messages
    def sms_process(r)

    Read Unread SMS Messages
    def sms_read()

    Read All SMS Messages
    def sms_read_all()

    Wait For N Received Chars
    def wait_rx(n, timeout=10000)

    Wait For Specific Modem Response
    def wait_response(response, msecs=10000)

    Test #1
    def test1()

    Example HTTP Get
    def test2()

    Example HTTP Post
    def test3()

    import globals as g
    from machine import UART
    import sys
    import util
    from util import log
    from utime import sleep_ms, ticks_ms, ticks_diff
    
    uart = None
    
    #####################################################################
    #                     Send AT Command To Modem
    #####################################################################
    def at(cmd):
        try:
            if uart.any() > 0:
                # get unsolicited modem response 
                a = get_modem_response()
    
                if len(a) > 0:
                    log("unsolicited modem response: {}".format(a))
                
                # process unsolicited results here
    
            log("  modem command: {}".format(cmd))
    
            # send command to modem
            n = uart.write(cmd + '\r')
            if n != len(cmd) + 1:
                log("error sending command to modem: n = {}".format(n))
                return []
    
            a = get_modem_response()
    
        except Exception as ex:
            sys.print_exception(ex)
            a = []
    
        return a    # return modem response lines in array
    
    
    #####################################################################
    #             Attach Cellular Modem To Packet Domain Service
    #####################################################################
    def attach():
        try:
            log("attach modem to packet domain service")
            a = at('AT+CGATT=1')
            if a[0] != 'OK':
                log("error attaching modem")
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                       Check Registration
    #####################################################################
    def check_registration():
        # return 0-5, 99-unknown
        # response: ['+CEREG: 2,4', 'OK']
        # response: ['+CEREG: 2,1,"2C29","0B54400F",7', 'OK']
        # 2,0 - not registered, not searching
        # 2,1 - registered, home network
        # 2,2 - not registered, searching
        # 2,3 - registration denied
        # 2,4 - unknown (out of coverage)
        # 2,5 - registered, roaming
    
        try:
            while True:
                log("check registration")
    
                a = at('AT+CEREG?')
                if a[1] != 'OK':
                    break
    
                s = a[0]
                if (s[0:7] != "+CEREG:"):
                    break
                
                a = s.split(",")
    
                if (a[0] != "+CEREG: 2"):
                    break
                
                reg = int(a[1])
                if reg == 1:
                    log("registered - home network")
                elif reg == 5:
                    log("registered - roaming")
                else:
                    log("not registered")
    
                return reg
    
        except Exception as ex:
            log("check registration exception")
            sys.print_exception(ex)
    
        # unknown registration
        log("error checking registration")
        return 99
    
    
    #####################################################################
    #             Detach Cellular Modem From Packet Domain Service
    #####################################################################
    def detach():
        try:
            log("detach modem from packet domain service")
            a = at('AT+CGATT=0')
            if a[0] != 'OK':
                log("error detaching modem")
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                       Disable LTE Modem
    #####################################################################
    def disable_modem():
        try:
            log("disable modem")
            at("AT+CFUN=4")     # disable both transmit and receive RF circuits
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                       Enable Modem
    #####################################################################
    def enable_modem():
        try:
            log("enabling modem")
            a = at("AT+CFUN=1")     # full functionality
            if a[0] != 'OK':
                log("error enabling modem")
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                Get CFUN - Modem Level of Functionality
    #####################################################################
    def get_cfun():
        # 0 - minimum functionality
        # 1 - full functionality
        # 4 - radios disabled but access to SIM allowed
        # -1 - unknown
        # note: if modem does +SHUTDOWN/+SYSSTART, CFUN will be reset to 0
        try:
            while True:
                a = at('AT+CFUN?')
                # response: ['+CFUN: 1', 'OK']
    
                if a[1] != 'OK':
                    break
    
                s = a[0]
                if (s[0:6] != "+CFUN:"):
                    break
                
                cfun = int(s[7])
                return cfun
    
        except Exception as ex:
            sys.print_exception(ex)
    
        return -1
    
    
    #####################################################################
    #                     Get Modem Response
    #####################################################################
    def get_modem_response():
        try:
            t0 = ticks_ms()
            
            while True:
                if uart.any() > 0:
                    # insure entire response has been received
                    sleep_ms(50)
                    break
    
                t1 = ticks_ms()
                tx = ticks_diff(t1, t0)
    
                if tx > 10000:
                    log("timeout error - no modem response after 10 secs")
                    return []
    
                sleep_ms(100)
    
            b = uart.read()
            s = str(b, 'utf-8')
            a = s.split('\r\n')
            a = list(filter(None, a))
    
            log("  modem response: {}".format(a))
    
        except Exception as ex:
            sys.print_exception(ex)
            a = []
    
        return a    # return modem response lines in array
    
    
    #####################################################################
    #                           Get SMS Mode
    #####################################################################
    def get_sms_mode():
        # return 0-PDU mode or unknown, 1-text mode
    
        try:
            log("get SMS mode")
            a = at('AT+CMGF?')
            # response: ['+CMGF: 1', 'OK']
            if (len(a) < 2) or (a[1] != 'OK'):
                return 0    # mode unknown
    
            s = a[0]
    
            if (s[0:6] != '+CMGF:'):
                return 0    # state unknown
            
            mode = int(s[7])
            return mode
    
        except Exception as ex:
            sys.print_exception(ex)
    
        return 0    # mode unknown
    
    
    #####################################################################
    #                            HTTP Post          
    #####################################################################
    def http_post(uri, header, m):
        # POST <uri> HTTP/1.1
        # <header>
        # Content-Length: <len(m)>
        # Content-Type: application/octet-stream
        # <blank line>
        try:
            log("HTTP post")
    
            while True:
                # note: init_modem() is used to do "AT+SQNHTTPCFG=..."
                n = len(m)
    
                # log("writing message: {0}".format(m))
                log("post message length: {0}".format(n))
    
                # AT+SQNHTTPSND=<prof_id>,<command>,<resource>,<data_len>[,<post_param>[,<extra_header_line>]]
                # prof_id=1, command=0 (POST), post_parm="2" (application/octet-stream), header="Host: <host_name>" 
    
                a = at('AT+SQNHTTPSND=1,0,"{0}",{1},"2","{2}"'.format(uri, n, header))
                if a[0] != '> ':
                    break
    
                uart.write(m)
                a = get_modem_response()
                if a[0] != 'OK':
                    break
    
                # modem responds with "+SQNHTTPRING: <prof_id,<http_status_code>,<content_type>,<data_size>"
                a = get_modem_response()
                s = a[0]
                if s[0:13] != "+SQNHTTPRING:":
                    break
    
                a = s.split(",")
                n = int(a[3])
                log("response message length: {0}".format(n))
    
                # get response
                cmd = 'AT+SQNHTTPRCV=1,0\r'
                log("  modem command: {0}".format(cmd))
                uart.write(cmd)
    
                wait_rx(5, 10000)   # wait for b'\r\n<<<'
                b = uart.read(5)
                # log(" modem response: {0}".format(b))
                if b[2:5] != b'<<<':
                    break
    
                wait_rx(n, 10000)   # wait for response
                b = uart.read(n)
                log(" modem response: {0}".format(b))
    
                a = get_modem_response()
                if a[0] != 'OK':
                    break
    
                log(b)
                return b
    
        except Exception as ex:
            sys.print_exception(ex)
    
        # some type of error occurred
        return b''
    
    
    #####################################################################
    #                     Get SIM Card ICCID Number
    #####################################################################
    def iccid():
        # modem must be registered to network to get ICCID
        try:
            log("get SIM card ICCID")
            a = at("AT+SQNCCID?")
            
            # example response: ['+SQNCCID: "89014103271203065543",""', 'OK']
            if a[1] != 'OK':
                return ''
    
            s = a[0]
            if s[0:9] != '+SQNCCID:':
                return ''
    
            a = s.split(",")
            n = len(a[0])
            r = a[0][11:n-1]
    
        except Exception as ex:
            sys.print_exception(ex)
            r = ''
    
        log("ICCID = {0}".format(r))
        return r    # ICCID as string
    
    
    #####################################################################
    #                       Get Modem IMEI Number
    #####################################################################
    def imei():
        try:
            log("get modem IMEI number")
            a = at("AT+CGSN=1")
            
            # example response: ['+CGSN: "354347094028575"', 'OK']
            if a[1] != 'OK':
                return ''
    
            s = a[0]
            if s[0:6] != '+CGSN:':
                return ''
    
            n = len(s)
            r = s[8:n-1]
    
        except Exception as ex:
            sys.print_exception(ex)
            r = ''
    
        log("IMEI = {0}".format(r))
        return r    # IMEI as string
    
    
    #####################################################################
    #                       Initialize Modem
    #####################################################################
    def init_modem():
        try:
            log("initializing modem")
    
            if uart == None:
                init_uart()
    
            enable_modem()
    
            # set profile #1 parameters for HTTP connection
            # prof_id=1 (profile), ip, port, auth_type=0 (None), username="", password="", 
            # ssl_enabled=0, timeout, cid=1 (PDN Context Identifier)
            server = g.oIni.host_ip     # note: host_name could also be used (auto DNS lookup)
            port = g.oIni.host_port
            timeout = 10    # secs to wait for response from server
            cmd = 'AT+SQNHTTPCFG=1,"{0}",{1},0,"","",0,{2},1'.format(server, port, timeout)
            a = at(cmd)
            if a[0] != 'OK':
                log("error initializing modem")
    
            set_sms_mode()
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #               Initialize Uart Connected To LTE Modem
    #####################################################################
    def init_uart():
        global uart
        # pins=(TXD, RXD, RTS, CTS)
        # FiPy: pins=('P20', 'P18', 'P19', 'P17')
        # GPy:  pins=('P5', 'P98', 'P7', 'P99')
        log("initializing UART")
        # rx_buffer_size: must be > 127, 200000 works, 250000 crashes, http_post seems to work with any size
        uart = UART(1, baudrate=921600, bits=8, parity=None, stop=1, timeout_chars=2, pins=('P5', 'P98', 'P7', 'P99'), rx_buffer_size=512)
    
    
    #####################################################################
    #           Is Cellular Modem Attached To Packet Domain Service
    #####################################################################
    def isattached():
        # returns: True/False
        try:
            a = at('AT+CGATT?')
            if a[0][0:9] == '+CGATT: 1':
                return True
    
        except Exception as ex:
            sys.print_exception(ex)
    
        return False
    
    
    #####################################################################
    #                           Reset Modem
    #####################################################################
    def reset_modem():
        try:
            log("reseting modem...")
            a = at('AT^RESET')
            # generates +SHUTDOWN ... +SYSSTART responses
            if a[0] != 'OK':
                log("error resetting modem")
    
            if a[1] != '+SHUTDOWN':
                log("error resetting modem")
    
            a = get_modem_response()
            if a[0] != '+SYSSTART':
                log("error resetting modem")
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                  Check Modem For Restart/Crash
    #####################################################################
    def restarted():
        # return True if modem has crashed / restarted
        try:
            # if CFUN != 1, modem has crashed / restarted
            r = get_cfun()
            if r != 1:
                return True
    
            # if SMS text mode is reset to zero, modem has crashed / restarted
            mode = get_sms_mode()
            if mode != 1:
                return True
    
            return False
    
        except Exception as ex:
            sys.print_exception(ex)
    
        return True
    
    
    #####################################################################
    #                       Set SMS Mode To Text
    #####################################################################
    def set_sms_mode():
        try:
            log("set SMS mode to text")
            a = at('AT+CMGF=1')     # set SMS mode to text
            if a[0] != 'OK':
                log("error setting SMS mode to text")
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                       Get Signal Strength
    #####################################################################
    def signal_strength():
        # 0-31, 99-unknown
        try:
            while True:
                log("check signal strength")
                a = at('AT+CSQ')
                # example response: ['+CSQ: 18,99', 'OK'] where 18 is signal strength
                if a[1] != 'OK':
                    break
    
                s = a[0]
                if (s[0:5] != "+CSQ:"):
                    break
                
                n = len(s)
                s = s[6:n-1]
                a = s.split(",")
                cell_signal = int(a[0])
    
                g.oStatus.cell_signal = cell_signal
                log("signal strength = {0}".format(cell_signal))
                return cell_signal
    
        except Exception as ex:
            sys.print_exception(ex)
    
        # unknown signal strength
        g.oStatus.cell_signal = 99
        log("signal strength unknown")
        return 99
    
    
    #####################################################################
    #             Delete Read, Sent, and Unsent SMS Messages
    #####################################################################
    def sms_delete():
        try:
            log("delete all READ, SENT, and UNSENT SMS messages")
            a = at('AT+CMGD=1,3')
            if a[0] != 'OK':
                log("error trying to delete read, sent, and unsent SMS messages")
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                     Delete All SMS Messages
    #####################################################################
    def sms_delete_all():
        # doesn't work until some time after lte.attach() is called
        try:
            log("delete all SMS messages")
            a = at('AT+CMGD=1,4')
            if a[0] != 'OK':
                log("error trying to delete all SMS messages")
            
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                     Process SMS Messages
    #####################################################################
    def sms_process(r):
        # example response:
        # ['+CMGL: 1,"REC READ","+19999999999",,"19/01/07,09:16:53-20"', 'Test3', '+CMGL: 2,"REC READ","+19999999999",,"19/01/07,11:12:30-20"', 'Test4', 'OK']
        
        try:
            for m in r:
                if (m[0:5] == "+CMGL"):
                    continue
                
                # process SMS messages
                
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                      Read Unread SMS Messages
    #####################################################################
    def sms_read():
        # response example: ['+CMGL: 1,"REC UNREAD","+1999999999",,"19/01/07,09:16:53-20"', 'Test3', 'OK']
        a = at('AT+CMGL="REC UNREAD"')
        return a
        
    
    #####################################################################
    #                       Read All SMS Messages
    #####################################################################
    def sms_read_all():
        # response example:
        # ['+CMGL: 1,"REC READ","+19999999999",,"19/01/07,09:16:53-20"', 'Test3', '+CMGL: 2,"REC READ","+19999999999",,"19/01/07,11:12:30-20"', 'Test4', 'OK']
        a = at('AT+CMGL="ALL"')
        return a
    
    
    #####################################################################
    #                       Wait For N Received Chars          
    #####################################################################
    def wait_rx(n, timeout=10000):
        # timeout in msecs
        t0 = ticks_ms()
        while True:
            if uart.any() >= n:
                break
    
            t1 = ticks_ms()
            tx = ticks_diff(t1, t0)
            if tx > timeout:
                return False
        
        return True
    
    
    #####################################################################
    #                    Wait For Specific Modem Response          
    #####################################################################
    def wait_response(response, msecs=10000):
        log('wait up to {0} msecs for modem response: {1}'.format(msecs, response))
        t0 = ticks_ms()
        while True:
            try:
                a = at('AT')
    
                for x in a:
                    if x == response:
                        log('****modem response found****: ' + response)
                        return True
    
                t1 = ticks_ms()
                tx = ticks_diff(t1, t0)
                log('{0} msecs'.format(tx))
    
                if tx > msecs:
                    log('modem response not found: ' + response)
                    return False
    
                sleep_ms(500)
    
            except Exception as ex:
                sys.print_exception(ex)
                
    
    #####################################################################
    #                           Test #1
    #####################################################################
    def test1():
        try:
            log("test basic modem functions")
    
            if uart == None:
                init_uart()
            
            while True:
                # reset_modem()
    
                enable_modem()
                set_sms_mode()
    
                imei()
                iccid()
    
                while True:
                    r = check_registration()
                    if (r == 1) or (r == 5):
                        # modem registered
                        iccid()
                        attach()
                        return
    
                    if restarted():
                        print('MODEM crash/restart detected')
                        return
    
                    signal_strength()
    
                    sleep_ms(1000)
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                         Example HTTP Get          
    #####################################################################
    def test2():
        try:
            log("test HTTP get")
    
            at('AT+SQNHTTPCFG=1,"httpbin.org",80,0,"","",0,120,1')
            # some sites require "User-Agent" header
            #   ex: at('AT+SQNHTTPQRY=1,0,"/","User-Agent: Test1"')
            at('AT+SQNHTTPQRY=1,0,"/get"')
            get_modem_response()
            at('AT+SQNHTTPRCV=1,0')
    
        except Exception as ex:
            sys.print_exception(ex)
    
    
    #####################################################################
    #                         Example HTTP Post          
    #####################################################################
    def test3(size=10):
        try:
            log("test HTTP post")
    
            while True:
                a = at('AT+SQNHTTPCFG=1,"httpbin.org",80,0,"","",0,120,1')
                if a[0] != 'OK':
                    break
    
                m = "X"*size
                log("writing message: {0}".format(m))
                log("post message length: {0}".format(size))
                
                a = at('AT+SQNHTTPSND=1,0,"/post",{0}'.format(size))
                if a[0] != '> ':
                    break
    
                uart.write(m)
                a = get_modem_response()
                if a[0] != 'OK':
                    break
    
                # modem responds with "+SQNHTTPRING: <prof_id,<http_status_code>,<content_type>,<data_size>"
                a = get_modem_response()
                s = a[0]
                if s[0:13] != "+SQNHTTPRING:":
                    break
    
                a = s.split(",")
                n = int(a[3])
                log("response message length: {0}".format(n))
    
                # get response
                cmd = 'AT+SQNHTTPRCV=1,0\r'
                log("  modem command: {0}".format(cmd))
                uart.write(cmd)
    
                wait_rx(5, 10000)   # wait for b'\r\n<<<'
                b = uart.read(5)
                log(" modem response: {0}".format(b))
                if b[2:5] != b'<<<':
                    break
    
                wait_rx(n, 10000)   # wait for response
                b = uart.read(n)
                log(" modem response: {0}".format(b))
    
                a = get_modem_response()
                if a[0] != 'OK':
                    break
    
                log(b)
                return b
    
        except Exception as ex:
            sys.print_exception(ex)
    
        # some type of error occurred
        return b''
    
    


  • Ok, I made a little more progress today.

    One of the problematic aspects of coding modem communication routines is knowing what to do when errors occur and how to reliably handle unsolicited modem responses. Currently I've decided against throwing exceptions in the functions. The calling program will need to decide what to do if errors occur.

    I've upgraded the at() function to look for unsolicited responses before transmitting the requested command. If anything is in the RX buffer, it is assumed to be an unsolicited response. There is always a remote chance that an unsolicited response will arrive during the transmission of the command. This will mess up the expected response received.

    I will later try to identify specific unsolicited responses the modem may emit. Currently I've noticed it emits a "+CEREG" on occassion. This can be turned off if we decide to later.

    from machine import UART
    from utime import sleep_ms, ticks_ms, ticks_diff
    
    uart = None
    
    #####################################################################
    #               Initialize Uart Connected To LTE Modem
    #####################################################################
    def init_uart():
        global uart
    
        # FiPy: pins=('P20', 'P18', 'P19', 'P17')
        # GPy:  pins=('P5', 'P98', 'P7', 'P99')
        uart = UART(1, baudrate=921600, pins=('P5', 'P98', 'P7', 'P99'), timeout_chars=2)
    
    
    #####################################################################
    #                     Send AT Command To Modem
    #####################################################################
    def at(cmd):
        try:
            if uart.any() > 0:
                # get unsolicited modem response 
                a = get_modem_response()
    
                if len(a) > 0:
                    print("unsolicited modem response: {}".format(a))
                
                # process unsolicited results here
    
            print("modem command: {}".format(cmd))
    
            # send command to modem
            n = uart.write(cmd + '\r\n')
            if n != len(cmd) + 2:
                print("error sending command to modem: n = {}".format(n))
                return []
    
            a = get_modem_response()
    
        except Exception as ex:
            sys.print_exception(ex)
            a = []
    
        return a    # return modem response
    
    
    #####################################################################
    #                     Get Modem Response
    #####################################################################
    def get_modem_response():
        try:
            t0 = ticks_ms()
            
            while True:
                if uart.any() > 0:
                    # insure entire response has been received
                    sleep_ms(1)
                    break
    
                t1 = ticks_ms()
                tx = ticks_diff(t1, t0)
    
                if tx > 10000:
                    print("timeout error - no modem response after 10 secs")
                    return []
    
                sleep_ms(100)
    
            r = uart.read()
            s = str(r, 'utf-8')
            a = s.split('\r\n')
            a = list(filter(None, a))
    
            print("modem response: {}".format(a))
    
        except Exception as ex:
            sys.print_exception(ex)
            a = []
    
        return a    # return modem response
    
    
    #####################################################################
    #                       Test Modem Module
    #####################################################################
    def test():
        init_uart()
        r = at("ATZ")
        if (r[0] != 'OK'):
            print("error")
    
        r = at("AT+CEREG?")
        if (r[1] != 'OK'):
            print("error")
    
        s = iccid()
        print("ICCID: " + s)
    
        s = imei()
        print("IMEI: " + s)
    
        n = signal_strength()
        print("Signal Strength: " + str(n))
    
        print("End of test")
    
    #####################################################################
    #                     Get SIM Card ICCID Number
    #####################################################################
    def iccid():
        try:
            a = at("AT+SQNCCID?")
            
            # example response: ['+SQNCCID: "89014103271203065543",""', 'OK']
            if a[1] != 'OK':
                return ''
    
            s = a[0]
            if s[0:10] != '+SQNCCID: ':
                return ''
    
            a = s.split(",")
            n = len(a[0])
            r = a[0][11:n-1]
    
        except Exception as ex:
            sys.print_exception(ex)
            r = ''
    
        return r
    
    
    
    #####################################################################
    #                       Get Modem IMEI Number
    #####################################################################
    def imei():
        try:
            a = at("AT+CGSN=1")
            
            # example response: ['+CGSN: "354347094028575"', 'OK']
            if a[1] != 'OK':
                return ''
    
            s = a[0]
            if s[0:7] != '+CGSN: ':
                return ''
    
            n = len(s)
            r = s[8:n-1]
    
        except Exception as ex:
            sys.print_exception(ex)
            r = ''
    
        return r
    
    
    #####################################################################
    #                       Get Signal Strength
    #####################################################################
    def signal_strength():
        # 0-31, 99-unknown
        try:
            while True:
                a = at('AT+CSQ')
                # example response: ['+CSQ: 18,99', 'OK'] where 18 is signal strength
                if a[1] != 'OK':
                    break
    
                s = a[0]
                if (s[0:6] != "+CSQ: "):
                    break
                
                n = len(s)
                s = s[6:n-1]
                a = s.split(",")
                cell_signal = int(a[0])
    
                return cell_signal
    
        except Exception as ex:
            sys.print_exception(ex)
    
        # unknown signal strength
        return 99
    


  • Ok, here's my first bit of code to send AT commands. Hopefully I'll get more done tomorrow. Comments and suggestions are welcome. I'm thinking about making the at() function check the last result line for "OK" and throwing an exception if it is different.

    from machine import UART
    from utime import sleep_ms, ticks_ms, ticks_diff
    
    uart = None
    
    #####################################################################
    #               Initialize Uart Connected To LTE Modem
    #####################################################################
    def init_uart():
        global uart
    
        # FiPy: pins=('P20', 'P18', 'P19', 'P17')
        # GPy:  pins=('P5', 'P98', 'P7', 'P99')
        uart = UART(1, baudrate=921600, pins=('P5', 'P98', 'P7', 'P99'), timeout_chars=2)
    
    
    #####################################################################
    #                     Send AT Command To Modem
    #####################################################################
    def at(cmd):
        try:
            print("modem command: {}".format(cmd))
    
            n = uart.write(cmd + '\r\n')
            if n != len(cmd) + 2:
                print("error sending command to modem: n = {}".format(n))
                return []
    
            t0 = ticks_ms()
            
            while True:
                if uart.any() > 0:
                    sleep_ms(1) # insure entire response has been received from modem
                    break
    
                t1 = ticks_ms()
                tx = ticks_diff(t1, t0)
    
                if tx > 10000:
                    print("timeout error waiting for response from modem")
                    return []
    
                sleep_ms(100)
    
            r = uart.read()
            s = str(r, 'utf-8')
            a = s.split('\r\n')
            a = list(filter(None, a))
    
            print("response: {}".format(a))
    
        except Exception as ex:
            sys.print_exception(ex)
            a = []
    
        return a
    
    #####################################################################
    #                       Test Modem Module
    #####################################################################
    def test():
        init_uart()
        r = at("ATZ")
        if (r[0] != 'OK'):
            print("error")
    
        r = at("AT+CEREG?")
        if (r[1] != 'OK'):
            print("error")
     
    
    


  • Thanks for the help; it works! Now let me see if I can build a complete communication module that can do HTTP gets and puts with nothing but AT commands.

    from machine import UART
    # Open serial connection to LTE modem
    # FiPy: pins=('P20', 'P18', 'P19', 'P17')
    # GPy:  pins=('P5', 'P98', 'P7', 'P99')
    uart = UART(1, baudrate=921600, pins=('P5', 'P98', 'P7', 'P99'), timeout_chars=1000)
    uart.write("AT" + "\r\n")
    r = uart.read()
    print(r)    # b'\r\nOK\r\n'
    uart.deinit()```


  • I communicate with the LTE modem through UART, it seems faster and more reliable than the LTE() class... I don't know if the pins on a GPy are different from the ones on my FiPy...

    from machine import UART
    # Open serial connection to LTE modem
    serial = UART(1, baudrate=921600, pins=('P20', 'P18', 'P19', 'P17'), timeout_chars=1)
    # For example write AT to modem
    serial.write("AT" + '\r\n')
    # Read back the response from the modem
    serial.read()
     # Deinit uart to lte modem
    serial.deinit()
    

Log in to reply
 

Pycom on Twitter