Python的异步调用有很多种方式,其中Celery是一种简单有效的异步执行队列,分分钟上手。
关于Django+Celery的组合资料比较少,搜到的都是在Django中使用django-celery来实现的,但是这样做有个问题:
如果分布式部署,其他客户端worker也要一套django环境?
由于Celery的工作原理比较容易理解,简单来说就是向broker(我这里使用rabbitMQ)队列中增加一项任务,包括任务名称和参数,然后依次执行。执行并不在添加任务的主机,而是在worker上。因此Django server和worker client本身就应该是彼此独立的,甚至说直接向rabbitMQ添加一条记录就能向Celery发起一次异步任务请求。
在Django可以直接调用Celery,唯一要注意的就是task的名称需要保持一致,具体如下。
0x01 Worker Client编写
编写celery worker,实现执行任务的函数。
目录结构如下:
tasks
├── __init__.py
└── task.py
task.py代码如下:
#!/usr/bin/env python # -*- coding: utf-8 -*- # file: tasks/task.py from celery import Celery import time app = Celery() app.conf.update( CELERY_TASK_SERIALIZER='json', CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Asia/Shanghai', BROKER_URL='' # broker地址 ) @app.task def plus(x, y): time.sleep(15) print '-' * 50 print 'x + y = %s' % (x + y) return x + y @app.task def subtract(x, y): time.sleep(15) print '-' * 50 print 'x - y = %s' % (x - y) return x - y
定义了两个函数,一个plus,一个subtract,其中使用sleep(15)来增加执行时间方便看效果。
注意这时候两个任务的完整名称应该是tasks.task.plus和tasks.task.subtract,在django server端应该与之对应。
选择这样的目录结构其实也是为了方便server端编写。
还可以为任务指定名字,这样就可以不拘泥于目录结构。使用方法是@app.task(name="task.name"),通过提供name参数为任务指定名称。
然后使用如下命令启动worker:
# 启动worker celery worker -A tasks.task
0x02 Django Server编写
建立两个应用,分别为web应用和tasks。
我的目录结构如下:
向settings中添加study和tasks两个应用,并向urls中添加访问请求如下:
urlpatterns = [ url(r'^admin/', include(admin.site.urls)), url(r'^index/$', 'study.views.index'), url(r'^show/$', 'study.views.show') ]
其中study.views.index方法为添加任务的页面,study.views.show为celery worker/task信息展示页面。
向tasks应用添加异步执行的任务,如下:
#!/usr/bin/env python # -*- coding: utf-8 -*- # file: tasks/task.py from celery import Celery app = Celery() app.conf.update( CELERY_TASK_SERIALIZER='json', CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Asia/Shanghai', BROKER_URL='' # broker地址 ) @app.task def plus(x, y): pass @app.task def subtract(x, y): pass
可以看到在Django server端,并没有真正编写plus和subtract的实现,只是增加了一个任务别名。这个别名与worker中的一致,都是tasks.task.plus。其实在study中建立也是可以的,只要server和client对应上就好。为了结构清晰,还是新建一个应用好一些。
在study.views中异步调用。
#!/usr/bin/env python # -*- coding: utf-8 -*- # file: study/views.py from django.shortcuts import render_to_response from tasks.task import plus, subtract, app def index(req): task_ids = [] plus_task = plus.delay(3, 8) task_ids.append(plus_task.id) subtract_task = subtract.delay(11, 5) task_ids.append(subtract_task.id) return render_to_response('index.html', {'task_ids': task_ids}) def show(req): workers = app.control.inspect() return render_to_response('show.html', {'workers': workers})
在index中使用plus.delay(3, 8)异步执行了plus函数,当然现在plus的实现还没有,一会在worker client编写。
在show中调用了celer app的一些api,用来显示worker和tasks信息。
templates/index.html
<!DOCTYPE html> <html> <head lang="en"> <meta charset="UTF-8"> <title>Django Celery Index</title> </head> <body> <h1>Index</h1> 向Celery添加了两条任务,如下:<br> <ul> {% for task_id in task_ids %} <li>{{ task_id }}</li> {% endfor %} </ul> </body> </html>
templates/show.html
<!DOCTYPE html> <html> <head lang="en"> <meta charset="UTF-8"> <title>Django Celery Workers</title> </head> <body> <h1>Workers</h1> <table> <tr class="info"><td colspan="3">Worker列表</td></tr> {% for worker, tasks in workers.registered.items %} <tr class="success"><td colspan="3">{{ worker }}</td></tr> {% for task in tasks %} <tr><td>{{ forloop.counter }}</td><td>{{ task }}</td></tr> {% endfor %} {% endfor %} <tr class="info"><td colspan="3">当前执行任务</td></tr> {% for woker, tasks in workers.active.items %} {% for task in tasks %} <tr><td><strong>{{ task.hostname }}</strong></td><td>{{ task.name }}{{ task.args }}</td><td>{{ task.id }}</td></tr> {% endfor %} {% endfor %} </table> </body> </html>
0x03 执行效果
其实就是Django的celery和worker的celery都连到了同一个broker,Django的celery不执行具体任务,只添加任务请求,具体执行由worker操作。这样的代码组织结构看起来更加清晰,同时可以使用celery的多项api可以在Django Server端编写自己的任务管理。
留言交流