Unlock the Secrets of Concurrency: Mastering Micropython Models!

Preface

Concurrency is hard because computers like doing just one thing at a time. This is an inconvenience to humans, who want them to do everything all at once, but struggle to put that in terms that are straightforward for a machine to understand. This is especially bad in circumstances like my Raspberry Pi Pico LED project, where the correct answer is “do everything you can all at once, but especially this one part over here because it needs to be ‘realtime’. Also do it on a limited instruction set.” 🙁

Assumptions

The following are technical assumptions that are made for the context of this document. The information in this document might not be valid outside of the below:

  • The target microcontroller is the Raspberry Pi Pico W
  • You’re using the Micropython version 1.20 or later

Rationale

Stated Purpose

For the purposes of my LED project (or any networked IoT type device) there are inevitably going to be many concurrent operations that must be handled at once. This must be carefully managed when using low-cost SoCs that do not include full OSs with process scheduling support baked in. I believe this is a problem that may be able to be solved for general purposes with a single pattern.

Hardware capabilities

The Raspberry Pi Pico W is a wifi-enabled version of the Pi Pico. The Pico is powered by a dual-core Arm Cortex M0+ processor up to 133 MHz.

Putting the ‘W’ at the end of the name will add an on-board single-band 2.4GHz wireless interface (802.11n) using the Infineon CYW43439.

The CLK is shared with VSYS monitor, so only when there isn’t an SPI transaction in progress can VSYS be read via the ADC. The Infineon CYW43439 DIN/DOUT and IRQ all share one pin on the RP2040. Only when an SPI transaction isn’t in progress is it suitable to check for IRQs. The interface typically runs at 33MHz. [2]

asyncio on rp2

The below is from [1] and has been edited for conciseness. This section talks about the async module for micropython before it was integrated into the core feature set. As far as I can tell, it remains relevant.

There is no common GIL. This means that under some conditions Python built in objects can be corrupted.

In the code sample there is a risk of the uasyncio task reading the dict at the same moment as it is being written. Updating a dictionary data entry is atomic: there is no risk of corrupt data being read. In the code sample a lock is only required if mutual consistency of the three values is essential.

In the absence of a GIL some operations on built-in objects are not thread safe. For example adding or deleting items in a dict. This extends to global variables because these are implemented as a dict. See Globals.

The observations in 1.3 re user defined data structures and uasyncio interfacing apply:

This protection does not extend to user defined data structures. The fact that a dictionary won’t be corrupted by concurrent access does not imply that its contents will be mutually consistent. In the code sample in section 1, if the application needs mutual consistency between the dictionary values, a lock is needed to ensure that a read cannot be scheduled while an update is in progress.

The above means that, for example, calling uasyncio.create_task from a thread is unsafe as it can destroy the mutual consistency of uasyncio data structures.

Code running on a thread other than that running uasyncio may block for as long as necessary (an application of threading is to handle blocking calls in a way that allows uasyncio to continue running).

Code running on a core other than that running uasyncio may block for as long as necessary.

A tale of two loops

In the LED project (and of broader use-cases for the platform itself), there are two main operational loops that need to happen simulaneously. The first is a general operation loop that will be run through (u)asyncio. This handles most all of the systems level processes including the following:

  • Network interrupts
  • Button handling
  • DNS queries / mDNS advertising
  • Configuration management
  • HTTP server command handling
    • and supporting tasks thereof
  • Appplication initalization and lifecycle management
  • Any other trivial detail

Meanwhile the second loop must be a near-realtime loop dedicated to the application logic itself. This loop is said to require being ‘near-realtime’ because, at least in the case of driving LED chains, fancy effects will appear stuttery or delayed if they have to time-slice with all of the ‘platform’ features listed above. The goal here is said to be ‘near-realtime’ because this loop will time-slice for concurrency as well, but only among application-specific operations.

I guess in a particular way you could maybe call that a “priority thread”?

Gotchas

The following error is thrown when trying to start more than one _thread at a time on an Pico. This illustrates the essential need to be able to schedule and run many complex tasks across both cores.

Traceback (most recent call last):
  File "main.py", line 98, in <module>
  File "main.py", line 95, in main
  File "asyncio/core.py", line 1, in run
  File "asyncio/core.py", line 1, in run_until_complete
  File "asyncio/core.py", line 1, in run_until_complete
  File "main.py", line 63, in amain
OSError: core1 in use

Implementation

Code


# Copyright 2023 Brad Arnett
# MIT License.  Just google it if you don't know.

import _thread
import time
import asyncio
import gc

class AppLoopManager:
    """
    Manages an event loop on a separate thread, allowing functions to be added to the loop for
    regular execution. Uses a 'thread 1' to run the loop functions independently of the main thread.
    """
    def __init__(self):
        self.workers = []
        self.gens = []
        self.started = False

    def create_worker(self, worker_cls):
        try:
            self.gens.append(worker_cls.loop()) # note the inner call (as opposed to function ref)
            self.workers.append(worker_cls)
        except Exception as e:
            print(f"{e} thrown during create_worker()")
        
    def start_thread(self):
        """Starts the event loop in a new thread."""
        print("Starting event loop thread")
        self.started = True
        _thread.start_new_thread(self._loop, ())

    def stop_thread(self):
        """Stops the event loop thread."""
        print("Stopping event loop thread")
        self.started = False
        _thread.exit()

    def _loop(self):
        """The internal loop that runs all functions added to the event loop."""
        print("Event loop thread running")
        while self.started:
            for gen in self.gens:
                try:
                    next(gen)
                    time.sleep(0.001)  # Small sleep to prevent CPU hogging
                except StopIteration:
                    print("DEBUG: StopIteration thrown")
                except Exception as e:
                    print(f"Exception in event loop function: {e}")
            gc.collect()

class AppWorker:
    """
    Represents a worker that executes an external function within its own internal loop.
    Designed to perform work as a part of the EventLoopManager's loop.
    """
    def __init__(self):
        self.external_func = None
        self.function_args = None

    def set_external_func(self, external_func, function_args=None):
        """Sets the external function to be called within the internal loop."""
        self.function_args = function_args
        self.external_func = external_func

    def loop(self):
        """The generator function that calls the external function in a loop."""
        while True:
            if self.external_func:
                self.external_func(self, self.function_args)
            yield  # Yield control to allow other functions in the event loop to run


class AppTask:
    """
    A class representing a task to be run within the Worker's internal loop.
    Contains an example function that simulates CPU work and keeps count of its invocations.
    """
    counter = 0

    def run(self, caller, args):
        """Simulates task execution."""
        time.sleep(1)  # Simulates work being done
        self.counter += 1
        print(f"Worker {args[0]}, {caller}: Run count {self.counter}")


async def say_after(delay, what):
    """An asynchronous function that prints a message after a delay."""
    await asyncio.sleep(delay)
    print(what)


async def main_async_loop():
    """Runs the main asynchronous loop with asyncio tasks."""
    print("Starting asyncio main loop")
    while True:
        tasks = await asyncio.gather(
            say_after(5, "Main loop work: 5 seconds"),
            say_after(2, "Main loop work: 2 seconds")
        )


def main():
    """
    Sets up and starts the entire application, including the event loop manager
    and the asynchronous loop.
    """

    # Initialize the event loop manager
    manager = AppLoopManager()
    manager.start_thread()

    # Create task runners
    task_runner_1 = AppTask()
    task_runner_2 = AppTask()

    manager.create_worker(AppWorker())
    manager.create_worker(AppWorker())

    manager.workers[0].set_external_func(task_runner_1.run, ("Worker 0",))
    manager.workers[1].set_external_func(task_runner_2.run, ("Worker 1",))

    # Start the asyncio loop on the main thread
    asyncio.run(main_async_loop())

if __name__ == "__main__":
    main()

Example output

MPY: soft reboot
Starting event loop thread
Event loop thread running
Starting asyncio main loop
Worker Worker 1, <AppWorker object at 20016630>: Run count 1
Main loop work: 2 seconds
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 1
Worker Worker 1, <AppWorker object at 20016630>: Run count 2
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 2
Main loop work: 5 seconds
Worker Worker 1, <AppWorker object at 20016630>: Run count 3
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 3
Main loop work: 2 seconds
Worker Worker 1, <AppWorker object at 20016630>: Run count 4
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 4
Worker Worker 1, <AppWorker object at 20016630>: Run count 5
Main loop work: 5 seconds
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 5
Worker Worker 1, <AppWorker object at 20016630>: Run count 6
Main loop work: 2 seconds
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 6
Worker Worker 1, <AppWorker object at 20016630>: Run count 7
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 7
Main loop work: 5 seconds
Worker Worker 1, <AppWorker object at 20016630>: Run count 8
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 8
Main loop work: 2 seconds

In the above output, The workers are being run on the second core, while the asyncio loop is on the first core. The workers each call a method of a class that maintains its own internal counter. Since they both fire after the same duration (1 second), both values stay in lock-step. The generators are never reset because they exist in what are effectively infinite loops; the while True / yield pattern should never see a StopIteration unless something has gone wrong. Meanwhile, the “Main loop work” (asyncio) simulates two tasks that takes 2 seconds and 5 seconds, handled asynchronously.

For the purposes of this example, “Main loop work” is done via an asyncio.gather(), so the main loop will block until the longer task is complete. We can inspect an individual interval here and make sure that the tasks are behaving properly:

Worker Worker 1, <AppWorker object at 20016630>: Run count 3
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 3
Main loop work: 2 seconds
Worker Worker 1, <AppWorker object at 20016630>: Run count 4
Worker Worker 0, <AppWorker object at 20018bb0>: Run count 4
Worker Worker 1, <AppWorker object at 20016630>: Run count 5
Main loop work: 5 seconds

This is one interval, cherry picked to align with the asyncio.gather() call, so the interval starts after a 5 second run and goes until the next one happens. As stated, each worker takes one second to complete execution. There are 2 executions between the “main loop 2 seconds” printout, and 3 more between the 2 second and 5 second mark. This looks like it’s doing what it’s supposed to and I should be able to drive multiple chains of LEDs from the second thread now without network operations slowing the process down!

Reference

[1] https://github.com/peterhinch/micropython-async/blob/master/v3/docs/THREADING.md

[2] https://www.raspberrypi.com/documentation/microcontrollers/raspberry-pi-pico.html

Leave a Reply

Your email address will not be published. Required fields are marked *