在Django中使用Celery

这篇日志发布时间已经超过一年,许多内容可能已经失效,请读者酌情参考。

Python的异步调用有很多种方式,其中Celery是一种简单有效的异步执行队列,分分钟上手。

关于Django+Celery的组合资料比较少,搜到的都是在Django中使用django-celery来实现的,但是这样做有个问题:

如果分布式部署,其他客户端worker也要一套django环境?

由于Celery的工作原理比较容易理解,简单来说就是向broker(我这里使用rabbitMQ)队列中增加一项任务,包括任务名称和参数,然后依次执行。执行并不在添加任务的主机,而是在worker上。因此Django server和worker client本身就应该是彼此独立的,甚至说直接向rabbitMQ添加一条记录就能向Celery发起一次异步任务请求。

在Django可以直接调用Celery,唯一要注意的就是task的名称需要保持一致,具体如下。

0x01 Worker Client编写

编写celery worker,实现执行任务的函数。

目录结构如下:

tasks

├── __init__.py


└── task.py


task.py代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# file: tasks/task.py

from celery import Celery
import time

app = Celery()

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TIMEZONE='Asia/Shanghai',
    BROKER_URL='' # broker地址
)

@app.task
def plus(x, y):
    time.sleep(15)
    print '-' * 50
    print 'x + y = %s' % (x + y)
    return x + y

@app.task
def subtract(x, y):
    time.sleep(15)
    print '-' * 50
    print 'x - y = %s' % (x - y)
    return x - y

定义了两个函数,一个plus,一个subtract,其中使用sleep(15)来增加执行时间方便看效果。

注意这时候两个任务的完整名称应该是tasks.task.plus和tasks.task.subtract,在django server端应该与之对应。

选择这样的目录结构其实也是为了方便server端编写。

还可以为任务指定名字,这样就可以不拘泥于目录结构。使用方法是@app.task(name="task.name"),通过提供name参数为任务指定名称。

然后使用如下命令启动worker:

# 启动worker
celery worker -A tasks.task

0x02 Django Server编写

建立两个应用,分别为web应用和tasks。

我的目录结构如下:

django目录结构

向settings中添加study和tasks两个应用,并向urls中添加访问请求如下:

urlpatterns = [
    url(r'^admin/', include(admin.site.urls)),
    url(r'^index/$', 'study.views.index'),
    url(r'^show/$', 'study.views.show')
]

其中study.views.index方法为添加任务的页面,study.views.show为celery worker/task信息展示页面。

向tasks应用添加异步执行的任务,如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# file: tasks/task.py

from celery import Celery

app = Celery()

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TIMEZONE='Asia/Shanghai',
    BROKER_URL='' # broker地址
)

@app.task
def plus(x, y):
    pass

@app.task
def subtract(x, y):
    pass

可以看到在Django server端,并没有真正编写plus和subtract的实现,只是增加了一个任务别名。这个别名与worker中的一致,都是tasks.task.plus。其实在study中建立也是可以的,只要server和client对应上就好。为了结构清晰,还是新建一个应用好一些。

在study.views中异步调用。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# file: study/views.py

from django.shortcuts import render_to_response
from tasks.task import plus, subtract, app


def index(req):
    task_ids = []
    plus_task = plus.delay(3, 8)
    task_ids.append(plus_task.id)

    subtract_task = subtract.delay(11, 5)
    task_ids.append(subtract_task.id)

    return render_to_response('index.html', {'task_ids': task_ids})

    
def show(req):
    workers = app.control.inspect()
    return render_to_response('show.html', {'workers': workers})

在index中使用plus.delay(3, 8)异步执行了plus函数,当然现在plus的实现还没有,一会在worker client编写。

在show中调用了celer app的一些api,用来显示worker和tasks信息。

templates/index.html

<!DOCTYPE html>
<html>
<head lang="en">
    <meta charset="UTF-8">
    <title>Django Celery Index</title>
</head>
<body>
<h1>Index</h1>
向Celery添加了两条任务,如下:<br>
<ul>
{% for task_id in task_ids %}
    <li>{{ task_id }}</li>
{% endfor %}
</ul>
</body>
</html>

templates/show.html

<!DOCTYPE html>
<html>
<head lang="en">
    <meta charset="UTF-8">
    <title>Django Celery Workers</title>
</head>
<body>
<h1>Workers</h1>
<table>
<tr class="info"><td colspan="3">Worker列表</td></tr>
{% for worker, tasks in workers.registered.items %}
    <tr class="success"><td colspan="3">{{ worker }}</td></tr>
 {% for task in tasks %}
    <tr><td>{{ forloop.counter }}</td><td>{{ task }}</td></tr>
 {% endfor %}
{% endfor %}
<tr class="info"><td colspan="3">当前执行任务</td></tr>
{% for woker, tasks in workers.active.items %}
    {% for task in tasks %}
    <tr><td><strong>{{ task.hostname }}</strong></td><td>{{ task.name }}{{ task.args }}</td><td>{{ task.id }}</td></tr>
 {% endfor %}
{% endfor %}
</table>
</body>
</html>

0x03 执行效果

执行效果

celery执行效果.png

其实就是Django的celery和worker的celery都连到了同一个broker,Django的celery不执行具体任务,只添加任务请求,具体执行由worker操作。这样的代码组织结构看起来更加清晰,同时可以使用celery的多项api可以在Django Server端编写自己的任务管理。



留言交流

没有评论
点击换图