celery的简单介绍

Posted by mjTree on April 20, 2024

更新于:2024-04-20 20:00

一、Celery简介

Celery 是一个基于Python开发的分布式异步消息队列,可以实现任务的异步处理,适合一些并行任务。Celery采用多进程方式,能够有效利用多核 CPU,并且Celery封装了常见任务队列的各种操作。

Celery具备如下几个显著的优点:

简单:熟悉 Celery 的工作流程后,配置使用简单。
高可用:当任务执行失败或发生连接中断,Celery 会自动尝试重新执行任务。
快速:一个单进程的 Celery 每分钟可处理上百万个任务,并且只需要毫秒级的延迟。
灵活:几乎 Celery 的各个组件都可以被扩展和定制。

二、Celery基本概念

Celery主要有4个核心概念,分别是 broker、backend、worker、task,下面我们简单说明一下。

1、broker

broker 是消息代理,用于接受生产者发送的任务消息,存进队列并按序分发给消费者。Celery本身没有消息存储功能,需要借用第三方的消息中间件,因此 broker 也被叫做消息中间件。常见的 broker 有 Redis、RabbitMQ、ZooKeeper 等,但官方推荐使用 Redis 和 RabbitMQ。

2、backend

backend 是用于存储结果的地方,因为有时候队列中的任务的执行结果或者执行状态需要被生产者知道,所以需要一个存储这些结果的位置。Celery支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式存储,实际使用按使用环境选择。

3、worker

worker 表示执行任务的消费者,用于处理队列中的任务。

4、task

task 表示在队列中执行的任务,由生产者将其放入队列中,然后交由 worker 处理。

CeleryArchitecture

上图是 Celery 的架构图,包含了生产者、broker、worker 以及 backend。生产者把任务(task)发布到消息中间件(broker)中,然后由消费者(worker)将其取出并进行处理,然后将其处理结果放到 backend,最后由生产者取出结果。

三、demo展示

pip3 install celery==5.2.7

下面代码表示一个消费者,通过 redis 做为 broker 和 backend,注册一个名为 send_email 的任务队列去接收消息。

from celery import Celery
import time

queue_name = 'send_email'
broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
celery_consumer = Celery(queue_name, backend=backend, broker=broker)


@celery_consumer.task
def process_message(task):
    name = task.get('name', '')
    time.sleep(3)
    print('向%s发送邮件完成' % name)
    return 'success'


if __name__ == "__main__":
    worker = celery_consumer.Worker()
    worker.start()

下面代码表示一个生产者,通过 celery 把消息发送到对应的任务队列中,获得对应任务的 id,后续可以借助这个 id 查询处理的结果。

from celery import Celery
from celery.result import AsyncResult

queue_name = 'send_email'
broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
celery_producer = Celery(queue_name, backend=backend, broker=broker)


def send_message(message):
    result_task = celery_producer.send_task(f'{queue_name}.process_message', args=[message])
    async_result = AsyncResult(id=result_task.id, app=celery_producer)
    response = async_result.get(timeout=10)
    print("response is ", response)


send_message({'name': 'mjtree'})

四、小结

本篇文章只是入门级的介绍了一些Celery框架的知识点,介绍了核心概念并且通过架构图展示工作流程,最后通过简单的代码展示如何快速上手使用。