LoPy and Time-based Events
-
Dear All,
I am trying to implement the following functionality:
A PIN goes HIGH for X seconds and then goes LOW for Y seconds. This process is repeated for a certain period of time W. Any hints what is the best approach to implement the aforementioned functionality?
Thanks and regards!
-
@fuzzy OK. here is a variant of the fist version which counts down the total duration every second:
from machine import Pin, Timer, idle DEFAULT_HIGH = const(2) DEFAULT_LOW = const(4) DEFAULT_DURATION = const(10) class BLINK: def __init__(self, pin, high = DEFAULT_HIGH, low = DEFAULT_LOW, duration = DEFAULT_DURATION): self.start(pin, high, low, duration) def start(self, pin, high, low, duration): self.pin = pin self.times = [high, low] self.duration = duration self.state = 0 self.count = self.times[self.state] self.pin(self.state) self.timer = Timer.Alarm(self.timerCallback, 1.0, periodic=True) def timerCallback(self, timer): self.duration -= 1 if self.duration <= 0: self.pin(0) # or whatever final state you like self.stop() else: self.count -= 1 if self.count <= 0: self.state = 1 - self.state self.count = self.times[self.state] self.pin(self.state) def stop(self): self.timer.cancel() def run(): pin = Pin("P5", mode=Pin.OUT) blink = BLINK(pin, 1, 1, 10) while blink.duration > 0: idle() run()
-
@fuzzy The way it's written at the moment is to reduce cycles by one after every high/low transition, not every second. Given that, the line you point at is perfectly right, but maybe not obvious. self.state toggles between 0 and 1. So every second time this line is executed, 0 is subtracted from self.cycles.
If you have a duration in seconds, you must determine cycles as:
cycles = duration // (low + high)
If that value is 0, the loop will still be executed at least once.
-
@robert-hh Thanks again! I tested the first variant (without threading, justing calling the function once). I think there is something wrong with the way
cycles
is reduced. The expected behaviour is to havecycles
reducing every 1 second. But I do not think this is the case, but I might be wrong.The expected behaviour is to having the pin
pin
LOW forlow
seconds and then the same pin HIGH forhigh
seconds. This change between LOW and HIGH needs to last in totalduration
seconds.Maybe the issue is with the following line of code.
self.cycles -= self.state
What do you think?
(lots of) thanks!
-
@fuzzy Hello, I made a short test code around both variants, and they seem to run. Variant 1 with the ISR timer:
# from machine import Pin, Timer, idle, RTC from time import sleep_ms, sleep import _thread DEFAULT_HIGH = const(2) DEFAULT_LOW = const(4) DEFAULT_CYCLES = const(10) PUMP_PIN = "P5" class BLINK: def __init__(self, pin, high = DEFAULT_HIGH, low = DEFAULT_LOW, cycles = DEFAULT_CYCLES): self.start(pin, high, low, cycles) def start(self, pin, high, low, cycles): self.pin = pin self.times = [high, low] self.cycles = cycles self.state = 0 self.pin(self.state) self.timer = Timer.Alarm(self.timerCallback, self.times[self.state], periodic=False) def stop(self): self.timer.cancel() def timerCallback(self, timer): self.cycles -= self.state if self.cycles > 0: self.state = 1 - self.state self.pin(self.state) self.timer = Timer.Alarm(self.timerCallback, self.times[self.state], periodic=False) def getSchedules(x): return (1,2,5,5) def switchSchedule(low, high, duration): rtc = RTC() print("Switch THREAD STARTED!") pin = Pin(PUMP_PIN, mode=Pin.OUT) blink = BLINK(pin, low, high, duration) while True: while blink.cycles > 0: idle() print('Switch cycle terminated!') onTime, offTime, program, duration = getSchedules(rtc.now()) blink.start(pin, onTime, offTime, duration) def otherSchedule(time): print("Enters other Thread") while True: sleep(time) print("Other thread is alive") def run(): id1 = _thread.start_new_thread(otherSchedule, [2]) id2 = _thread.start_new_thread(switchSchedule, (1,2,5)) run()
Variant 2 with just a loop and sleeps:
# from machine import Pin, Timer, idle, RTC from time import sleep_ms, sleep import _thread DEFAULT_HIGH = const(2) DEFAULT_LOW = const(4) DEFAULT_CYCLES = const(10) PUMP_PIN = "P5" def getSchedules(x): return (1,2,5,5) def switchSchedule(low, high, duration): print("Switch THREAD STARTED!") rtc = RTC() pin = Pin(PUMP_PIN, mode=Pin.OUT) while True: for _ in range (duration): pin(0) sleep(low) pin(1) sleep(high) print('Switch cycle terminated!') onTime, offTime, program, duration = getSchedules(rtc.now()) def otherSchedule(time): print("Enters other Thread") while True: sleep(time) print("Other thread is alive") def run(): id1 = _thread.start_new_thread(otherSchedule, [2]) id2 = _thread.start_new_thread(switchSchedule, (1,2,5)) run()
-
@fuzzy Try using a short sleep_ms, like sleep_ms(100) instead of idle() in the inner loop.
-
@robert-hh hi! I tried the code, but now due to the
while True:
loop in the function no other code of my program is executed. Any ideas?Thanks (once more...)!
-
@robert-hh You are right. It is better this way. Will give it a try and let you know.
-
@fuzzy Sorry no. I do not full understand your intentikon. One additional note to using the BLINK example.
Once you created the blink object with blink = BLINK(...) and it expired, you can restart this object again with:
blink.start(pin, low, high, duration)
That means, you do not have to create a new thread to start over a new switchSchedule cycle. You can do something like:def switchSchedule(low, high, duration): print("Switch THREAD STARTED!") pin = Pin(PUMP_PIN, mode=Pin.OUT) blink = Blink(pin, low, high, duration) while True: while blink.cycles > 0: idle() print('Switch cycle terminated!') onTime, offTime, program, duration = getSchedules(rtc.now()) blink.start(pin, onTime, offTime, duration)
And you can stop it anytime with either calling blink.stop() of setting blink.cycles = 0.
-
@robert-hh Thanks... once more! Can you please explain (if possible by an example) how this (the program logic outlined in my previous email) can be achieved in the main loop?
Thanks!
-
@fuzzy You can do that, but do not start a new thread within a running thread, because then you will run out of memory very soon. use a main loop which starts threads, (or the BLINK), if required at all. And you can use global symbols for communication.
Besides that:
BLINK, since interrupt driven, does not require an own thread. Once started, it will run independent of any thread. The thread you have is practically empty. If it should run forever, just chose a large number for duration, like 1e32.
If you want to use threading for it, you can use a much simpler implementation, not using BLINK, like:def switchSchedule(low, high, duration): from time import sleep print("THREAD RE-STARTED!") pin = Pin(PUMP_PIN, mode=Pin.OUT) for i in range(duration): pin(0) sleep(low) pin(1) sleep(high) pin(0) # Drop that line, if the final state should be 1 print('THREAD switchSchedule EXITED!')
-
@robert-hh Yes, I call them recursively, but I have the felling that this is not a good idea... My program logic is as follows:
1. Read schedule parameters from flash (LOW_duration, HIGH_duration, CYCLES_duration) 2. According to RTC.now() check which one of the schedules is active and get the parameters. 3. Call BLINK passing the parameters. 4. When BLINK is over, read again the schedules and get the parameters of the new active schedule. 5. Call BLINK passing the parameters. 6. ... and so on...
If there is no active schedule at the moment, the check for the next active schedule needs to happen every minute.
In parallel other tasks take place.
Any hints/ideas on how to implements this? I thought using threading would be a good idea (but probably is not...)
Thanks and regards!
-
@fuzzy At the moment, you do not get notified. Depending on whether it's the first or second implementation, the timer expires either when cycles is Zero, or at every on/of event. But to tell whether the logic behind BLINK is finished, you have to look at the cycles value.
In your code example above, the indentation seems a little bit off, making the code look like you start threads recursively.
-
@robert-hh as you correct state I need to restart the timer once it expires. But do I get notified (without checking the "cycles'" value of the class)?
What I tried to do is illustrated as snippet of code below.
def switchSchedule(low, high, duration): print("THREAD RE-STARTED!") pin = Pin(PUMP_PIN, mode=Pin.OUT) blink = Blink(pin, low, high, duration) while blink.cycles > 0: idle() print('THREAD switchSchedule EXITED!') onTime, offTime, program, duration = getSchedules(rtc.now()) _thread.start_new_thread(switchSchedule, (onTime, offTime, duration)) _thread.start_new_thread(switchSchedule, (onTime, offTime, duration))
However, I notice that once the the first thread terminates and tries to create the new one, I get an error.
Unhandled exception in thread started by <function switchSchedule at 0x3ffeba30>
Traceback (most recent call last):
File "main.py", line 127, in switchSchedule
MemoryError: memory allocation failed, allocating 5120 bytesAny hints?
Thanks and regards!
-
@fuzzy Just for fun, I made a simpler version of that blink example, allowing now to set arbitrary numbers for the high/low values.
# from machine import Pin, Timer, idle DEFAULT_HIGH = const(2) DEFAULT_LOW = const(4) DEFAULT_CYCLES = const(10) class BLINK: def __init__(self, pin, high = DEFAULT_HIGH, low = DEFAULT_LOW, cycles = DEFAULT_CYCLES): self.start(pin, high, low, cycles) def start(self, pin, high, low, cycles): self.pin = pin self.times = [high, low] self.cycles = cycles self.state = 0 self.pin(self.state) self.timer = Timer.Alarm(self.timerCallback, self.times[self.state], periodic=False) def stop(self): self.timer.cancel() def timerCallback(self, timer): self.cycles -= self.state if self.cycles > 0: self.state = 1 - self.state self.pin(self.state) self.timer = Timer.Alarm(self.timerCallback, self.times[self.state], periodic=False) def run(): pin = Pin("P5", mode=Pin.OUT) blink = BLINK(pin, 2, 4, 10) while blink.cycles > 0: idle() run()
-
@fuzzy The on-off timer, once started, does not require an own thread, since it is interrupt controlled and will run anyhow in the background.
I updated the code a little bit. If expired, you can restart the timer again with blink.start(pin, high, low, cycles). If you need more than one pin to toggle, use multiple instances of BLINK. if you want to stop it early, use blink.stop().# from machine import Pin, Timer, idle DEFAULT_HIGH = const(2) DEFAULT_LOW = const(4) DEFAULT_CYCLES = const(10) class BLINK: def __init__(self, pin, high = DEFAULT_HIGH, low = DEFAULT_LOW, cycles = DEFAULT_CYCLES): self.start(pin, high, low, cycles) def start(self, pin, high, low, cycles): self.pin = pin self.times = [high, low] self.cycles = cycles self.state = 0 self.count = self.times[self.state] self.pin(self.state) self.timer = Timer.Alarm(self.timerCallback, 1.0, periodic=True) def stop(self): self.timer.cancel() def timerCallback(self, timer): self.count -= 1 if self.count <= 0: self.cycles -= self.state if self.cycles <= 0: self.stop() else: self.state = 1 - self.state self.count = self.times[self.state] self.pin(self.state) def run(): pin = Pin("P5", mode=Pin.OUT) blink = BLINK(pin, 2, 4, 10) while blink.cycles > 0: idle() run()
-
@robert-hh Thanks Robert. Since during this process, I need also to send data every send to an MQTT broker, do you think it would be a good idea to put these two processes (on-ff and MQTT) in two different threads using threading?
Thanks once more for your time and support!
-
@fuzzy blink.loop is the down-counter for the number of on-off cycles. Once that is zero, this number has been performed.
And these two statements you refer to simply wait until it's done. You could do anything else in between, but for this simple test it's just waiting.
The call to idle() tells the scheduler that the actual task has nothing to do and something else could be done. I could have written 'pass' instead, but calling idle() ind that case is the better choice.
-
@robert-hh Great! Thanks a lot Robert. Just a question, so I understand the logic behind this. What is the purpose of the following block of code?
while blink.loop > 0: idle()
Thanks and regards!
-
@fuzzy You can achieve that with a periodic timer and a callback, which counts the timer events. I have nailed together a short piece of code showing the principle.
from machine import Pin, Timer, idle DEFAULT_HIGH = const(2) DEFAULT_LOW = const(4) DEFAULT_LOOP = const(10) class BLINK: def __init__(self, pin, high = DEFAULT_HIGH, low = DEFAULT_LOW, loop = DEFAULT_LOOP): self.set_values(pin, high, low, loop) self.timer = Timer.Alarm(self.timerCallback, 1.0, periodic=True) def set_values(self, pin, high, low, loop): self.pin = pin self.times = [high, low] self.loop = loop self.state = 0 self.count = self.times[self.state] self.pin(self.state) def timerCallback(self, timer): self.count -= 1 if self.count <= 0: self.loop -= 1 if self.loop <= 0: self.stop() else: self.state = 1 - self.state self.count = self.times[self.state] self.pin(self.state) def stop(self): self.timer.cancel() def run(): pin = Pin("P5", mode=Pin.OUT) blink = BLINK(pin, 2, 4, 10) while blink.loop > 0: idle() run()