Как использовать asyncio и multiprocessing в Python

Модуль multiprocessing module позволяет вам писать компактные приложения использующие множество процессов.

Но иногда у вас есть старый код который запускал один процесс и вы хотите чтобы теперь он запускал несколько одновременно.

Вы можете полностью переписать его используя модуль multiprocessing.

Но тогда вам придется заново отлаживать это по сути новое приложение.

Я предлагаю иной подход - чуть модифицировать ваш старый код, добавив в него asyncio. В итоге ваше приложение по сути останется тем же, вам не потребуется долго отлаживать его заново.

import asyncio
import subprocess
import random

semaphore = asyncio.Queue(maxsize=3-1)  # Max 3 processes


async def worker(id):
    """
    We could use more straightforward consumer-producer pattern:
        * producer puts tasks into the queue
        * worker waits for tasks in the queue

    But for this tiny code sniped that would produce too much boilerplates.
    """
    delay = random.random()
    print('>'*5, f'task {id} starts with delay {delay:.1} seconds')
    process = await asyncio.create_subprocess_exec(
        'sleep', str(delay),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    (output, err) = await process.communicate()
    status = await process.wait()

    print('<'*5, f'task {id} finished with status {status}')
    print(f'Stdout: {output}, Stderr: {err}')
    await semaphore.get()


async def main(loop):
    for task_id in range(6):
        await semaphore.put(task_id)  # It does'n matter what we put in the queue. We use it as semaphore.
        loop.create_task(worker(task_id))
    # all the tasks are scheduled at the moment but not all done

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks()))  # Wait for all tasks in the loop.
>>>>> task 0 starts with delay 0.9 seconds
>>>>> task 1 starts with delay 0.4 seconds
>>>>> task 2 starts with delay 0.6 seconds
<<<<< task 1 finished with status 0
Stdout: b'', Stderr: b''
>>>>> task 3 starts with delay 0.3 seconds
<<<<< task 3 finished with status 0
Stdout: b'', Stderr: b''
<<<<< task 2 finished with status 0
Stdout: b'', Stderr: b''
>>>>> task 4 starts with delay 0.9 seconds
>>>>> task 5 starts with delay 0.3 seconds
<<<<< task 0 finished with status 0
Stdout: b'', Stderr: b''
<<<<< task 5 finished with status 0
Stdout: b'', Stderr: b''
<<<<< task 4 finished with status 0
Stdout: b'', Stderr: b''

Process finished with exit code 0

Этот код запускает 6 задач в максимум трех одновременно работающих экземплярах внешнего процесса. Я ограничиваю это с помощью asyncio.Queue.

Когда процесс завершается вы можете обработать его результаты. Следующий процесс запустится автоматически так что в любой момент времени будет работать одновременно максимум процесса.

Как видите, код очень прост и содержит практически только прикладную логику.

Он выглядит как синхронный. То есть как предположительный старый код, который запускал только один процесс и который мы теперь конвертировали в асинхронный.

Опубликовано February 22, 2019