How to use asyncio with multiprocessing in Python

Python multiprocessing module gives you great tools to write applications with clear logic.

But sometimes you want to convert old code that use external process. So now it will lunch many instances of the external process simultaneously.

You can rewrite it completely to use multiprocessing module.

Or you can use asyncio and leave old code intact. This way you don’t have to spend a lot of time in debugging new application logic.

I’ll show you how.

import asyncio
import subprocess
import random

semaphore = asyncio.Queue(maxsize=3-1)  # Max 3 processes


async def worker(id):
    """
    We could use more straightforward consumer-producer pattern:
        * producer puts tasks into the queue
        * worker waits for tasks in the queue

    But for this tiny code sniped that would produce too much boilerplates.
    """
    delay = random.random()
    print('>'*5, f'task {id} starts with delay {delay:.1} seconds')
    process = await asyncio.create_subprocess_exec(
        'sleep', str(delay),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    (output, err) = await process.communicate()
    status = await process.wait()

    print('<'*5, f'task {id} finished with status {status}')
    print(f'Stdout: {output}, Stderr: {err}')
    await semaphore.get()


async def main(loop):
    for task_id in range(6):
        await semaphore.put(task_id)  # It does'n matter what we put in the queue. We use it as semaphore.
        loop.create_task(worker(task_id))
    # all the tasks are scheduled at the moment but not all done

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks()))  # Wait for all tasks in the loop.
>>>>> task 0 starts with delay 0.9 seconds
>>>>> task 1 starts with delay 0.4 seconds
>>>>> task 2 starts with delay 0.6 seconds
<<<<< task 1 finished with status 0
Stdout: b'', Stderr: b''
>>>>> task 3 starts with delay 0.3 seconds
<<<<< task 3 finished with status 0
Stdout: b'', Stderr: b''
<<<<< task 2 finished with status 0
Stdout: b'', Stderr: b''
>>>>> task 4 starts with delay 0.9 seconds
>>>>> task 5 starts with delay 0.3 seconds
<<<<< task 0 finished with status 0
Stdout: b'', Stderr: b''
<<<<< task 5 finished with status 0
Stdout: b'', Stderr: b''
<<<<< task 4 finished with status 0
Stdout: b'', Stderr: b''

Process finished with exit code 0

This code starts 6 tasks in maximum three simultaneous instances of external process. I limited that with help of asyncio.Queue.

When external process is finished your code can process the result. And next process will be start automatically so at any moment there would be at max three processes running.

As you can see the code is pretty straightforward and looks like synchronous code that supposedly you are converting.

So you just care about your application logic and do not over-complicate it with multiprocessing boilerplates.

Written on February 22, 2019