celery+rabbitmq+redis 分布任务队列探索(一)

首先我们说一下celery是什么以及它的组成:

Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。

celery由5个主要组件组成: 
producer: 任务发布者, 通过调用API向celery发布任务的程序 
celery beat: 任务调度, 根据配置文件发布定时任务 
worker: 实际执行任务的程序 
broker: 接受任务消息,存入队列再按顺序分发给worker执行 
backend: 存储结果的服务器

预期目标:

在Django中生成任务(生产者),通过rabbitmq记录消息,celery worker执行mq中任务(消费者),回写结果进redis。机器A:windows部署Django(1.10.2)、Celery(3.1.7),机器B:centos部署rabbitmq(3.5.3)、redis(3.2.8)。机器C:windows部署Celery(3.1.7)、redis(3.2.8)

实际结果:机器A:生成任务发送到机器B的MQ,然后机器C执行任务。

1、安装

机器A:

按照既定的软件列表安装Django、Celery:

pip install django==1.10.2

因celery4.0以上版本不支持windows 所以,我们先安装3.1.7 做一个调试用

pip install celery==3.1.7

因会对redis进行一些操作,所以我们这里也安装上redis

pip install redis

机器B:

通过源码安装redis 跟 rabbitmq。此处安装省略,可以参考我之前的MAC下redis 及rabbitmq安装手册。

因为centos6.5对于如此干净的环境,源码安装redis可能还好。但是rabbitmq的安装因为要依赖的比较多,比如erlang,所以这块的安装需要大家特别注意一下。在这里就省略不说了。。。

机器C:

对应的安装 celery  、redis

2.代码调试(python)

首先我们在机器A的Django中新建工程。

然后在app下创建一个目录queue,然后分别创建三个文件。celeryconfig.py、tasks.py、testcelery.py

celeryconfig.py 根据名字我们就可以知道,主要是用来做一个celery通用配置的操作。原本可以放在django的settings.py文件中,但是此处为了我们能够更好的理解celery,就单独拎出来了。

# -*- coding:utf-8 -*-

"""
@version: python2.7
@license: Apache Licence 
@software: PyCharm
@file: celeryconfig.py
@time: 2017/3/21 21:38
"""

from kombu import Exchange, Queue

BROKER_URL = 'amqp://autotest:autotest@10.83.14.245//'  # 消息存储数据存储rabbitmq
CELERY_RESULT_BACKEND = 'redis://10.83.14.245/0'  # 消息执行后的结果包括函数返回值的数据存储在仓库0

CELERY_QUEUES = (  # 定义任务队列
    Queue("default", Exchange("default"), routing_key="default"),
    Queue("for\_task\_realtime", Exchange("for\_task\_realtime"), routing_key="task\_realtime"),
    Queue("for\_task\_timer", Exchange("for\_task\_timer"), routing_key="task\_timer"),
    Queue("for\_task\_monitor", Exchange("for\_task\_monitor"), routing_key="task\_monitor")
)

CELERY_ROUTES = {  # 定义routes用来决定不同的任务去哪一个queue
    # tasks.taskrealtime的消息会进入for\_task\_realtime队列
    'tasks.taskrealtime': {"queue": "for\_task\_realtime", "routing\_key": "task\_realtime"},
    # tasks.tasktimer的消息会进入for\_task\_timer队列
    'tasks.tasktimer': {"queue": "for\_task\_timer", "routing\_key": "task\_timer"},
    # tasks.taskmonitor的消息会进入for\_task\_monitor队列
    'tasks.taskmonitor': {"queue": "for\_task\_monitor", "routing\_key": "task\_monitor"},
}

首先设置了BROKER_URL 以及CELERY_RESULT_BACKEND ,接下来定义了三个Message Queue,并且指明了Queue对应的Exchange以及routing_key的值。

tasks.py 则是我们具体任务执行的方法,在这里会定义对应之前的消息生产函数:

# -*- coding:utf-8 -*-

"""
@version: python2.7
@license: Apache Licence 
@software: PyCharm
@file: tasks.py
@time: 2017/3/21 21:16
"""

from celery import Celery
import subprocess

app = Celery()
app.config_from_object("celeryconfig")


@app.task
def taskrealtime(x, y):
    return (x + y), 'taskrealtime'

@app.task
def tasktimer(x, y):
    return (x + y), 'tasktimer'


@app.task
def taskmonitor(x, y):
    return (x + y), 'taskmonitor'

为区分是调用的不同的queue,我在每个方法的后面分别加上了方法名如:taskmonitor

testcelery.py 是我们具体要用来调用的文件。

# -*- coding:utf-8 -*-

"""
@version: python2.7
@license: Apache Licence 
@software: PyCharm
@file: testcelery.py
@time: 2017/3/21 19:45
"""

from tasks import *

r = taskrealtime.delay(1, 1)
print r,r.get()

r = tasktimer.delay(2, 2)
print r,r.get()

r = taskmonitor.delay(3, 3)
print r,r.get()

在这里,我分别对三个方法进行了输出操作,便于在前端查看。

3.启动服务

首先启动redis、rabbitmq,具体的启动方法,在之前的文章中已经做了对应的操作。

但是接下来我们要启动celery-work去做消费。

拷贝tasks.py、celeryconfig.py文件到机器C中桌面建立的queue中,

然后cd进入到该层级celery worker -c 4 --loglevel=info -A tasks

C:\Users\Administrator\Desktop\queue>celery worker -c 4 --loglevel=info -A tasks

[2017-03-22 00:24:53,770: WARNING/MainProcess] c:\python27\lib\site-packages\cel ery\apps\worker.py:159: CDeprecationWarning: Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers the ability to execute any command. It's important to secure your broker from unauthorized access when using pickle, so we think that enabling pickle should require a deliberate action and not be the default choice.

If you depend on pickle then you should set a setting to disable this warning and to be sure that everything will continue working when you upgrade to Celery 3.2::

>>CELERY\_ACCEPT\_CONTENT = \['pickle', 'json', 'msgpack', 'yaml'\]

You must only enable the serializers that you will actually use.

warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

-------------- celery@WIN-QHB59C6FF53 v3.1.7 (Cipater) ---- * ----- --- * *** * -- Windows-2008ServerR2-6.1.7601-SP1 -- * - --- - ** ---------- [config] - ** ---------- .> app: __main__:0x32cc7b8 - ** ---------- .> transport: amqp://autotest:*@10.83.14.245:5672// - ** ---------- .> results: redis://10.83.14.245/0 - *** --- * --- .> concurrency: 4 (prefork) -- * ---- --- ***** ----- [queues] -------------- .> default exchange=default(direct) key=default .> for_task_monitor exchange=for_task_monitor(direct) key=task_m onitor .> for_task_realtime exchange=for_task_realtime(direct) key=task _realtime .> for_task_timer exchange=for_task_timer(direct) key=task_tim er

[tasks] . tasks.taskmonitor . tasks.taskrealtime . tasks.tasktimer

[2017-03-22 00:24:53,811: INFO/MainProcess] Connected to amqp://autotest:**@10.8 3.14.245:5672// [2017-03-22 00:24:53,844: INFO/MainProcess] mingle: searching for neighbors [2017-03-22 00:24:54,868: INFO/MainProcess] mingle: all alone [2017-03-22 00:24:54,901: WARNING/MainProcess] celery@WIN-QHB59C6FF53 ready.

我们可以看到,celery woker已经启动。接下来,运行机器A中的testcelery.py,可以看到机器A中输出的打印信息如下:

33361d05-89c6-408e-b491-88c72a89519b (2, 'taskrealtime') 3540dc5f-69a1-45d5-a421-7dfe95af40aa (4, 'tasktimer') 0173d003-8807-4e2b-b9d0-591efa36c05d (6, 'taskmonitor')

这就是对于celery这个分布式任务调度系统的一个基本操作。希望对大家有所帮助,接下来,我们会进一步的探索celery更多的玩法。


评论(0 ) 点赞(15)


暂未登录,请登录之后发表评论。 QQ