Cola de tareas en Python (I)

Después de unos meses trabajando en Go se llegan a extrañar las abstracciones del lenguaje para concurrencia. Hoy por ejemplo necesitaba hacer una cola de tareas en Python utilizando AsyncIO. En Go esto sigue una estructura sencilla:

// Este es la gorutina que procesa los trabajos
func worker(jobChan <-chan Job) {
    for job := range jobChan {
        process(job)
    }
}
// Creamos un canal a donde enviar los datos
jobChan := make(chan Job, 10)
// Arrancamos la gorutina
go worker(jobChan)
// Enviamos datos para un trabajo, esto puede ser desde cualquier
// gorutina
jobChan <- job
//Indicamos que ya no vamos a procesar más datos
close(jobChan)

Veamos si podemos lograr un equivalente en Python:

# Primer intento
import asyncio
async def worker(queue):
    while True:
		process(await queue.get())
		# Indicamos que se procesó el trabajo
		queue.task_done()

# Creamos la cola de mensajes
queue = asyncio.Queue(10)
# arrancamos el worker
worker_task = asyncio.create_task(worker(queue))
# Enviamos mensajes a la cola
await queue.put(x)

Hasta acá todo parece bien, pero: ¿Cómo indicamos que no se van a procesar más datos?

En Go la operación close sobre un canal hace que el ciclo termine después de procesar cualquier elemento pendiente. En Python a primera vista podemos cancelar la tarea creada en worker_task pero eso nos deja con la posibilidad de que varias tareas se queden en la cola. Por otro lado, la corutina join() de asyncio.Queue nos permite esperar a que ya no existan elementos en la cola, aunque no garantiza que worker se detenga.

La solución es utilizar una mezcla de los dos:

await queue.join() # Esperar a que se procesen los pendientes
worker_task.cancel() # Cancelar la tarea