Django中使用Celery+RabbitMQ实现异步处理 | 阿小信的博客
阿小信大人的头像
Life is short (You need Python) Bruce Eckel

Django中使用Celery+RabbitMQ实现异步处理2014-09-02 06:38

Celery是一个分布式的任务队列,需要用其他后端作为Broker来存储消息,可选的很多,Redis,Django ORM什么的都可以,默认是用RabbitMQ来作Broker的。RabbitMQ是一个消息队列系统。

其实rq更加简单易用。https://github.com/nvie/rq

安装

需要安装RabbitMQ、Celery和Django-celery

Celery和Django-celery的安装直接pip就好

修改settings.py

在INSTALLED_APPS中加入app:

INSTALLED_APPS = (
  ...
  'djcelery',      
  'main',             # startapp
}

添加RabbitMQ的配置:

import djcelery
djcelery.setup_loader()

BROKER_URL = 'amqp://guest:guest@localhost:5672/'

#或者
#BROKER_HOST = "localhost"
#BROKER_PORT = 5672
#BROKER_USER = "guest"
#BROKER_PASSWORD = "guest"
#BROKER_VHOST = "/"

如果使用mod_wsgi部署Django,在.wsgi模块中也要有:

import djcelery
djcelery.setup_loader()
创建数据库
$ python manage.py syncdb #default
$ python manage.py migrate djcelery #for south
定义和调用tasks

在app(main)目录下新建文件tasks.py

from celery import task

@task()
def add(x, y):
    return x + y

开启worker:

$ python manage.py celery worker --loglevel=info
$ #OR
$ python manage.py celeryd -l info --settings=settings

和celery本身的命令一样,只不过前面是由python manage.py启动的。这样也行:

$ python manage.py celery help

调用任务:

$ python manage.py shell
In [1]: from main.tasks import add

In [2]: a=add.delay(1,1)

In [3]: a.ready() #worker未开启
Out[3]: False

In [4]: a=add.delay(1,1) #开启worker,重新执行

In [5]: a.ready()
Out[5]: True

In [9]: a.get() #Waits until the task is done and returns the retval.
Out[9]: 2

In [10]: a.successful()
Out[10]: True

最好是从manage.py的shell进入python交互界面,否则报错

NotImplementedError: No result backend configured.  Please see the documentation for more information.

实例测试:假设有主站www,有一个索引查询站index,由于索引大,主站用户数量也大,并发量高,在www上为了更好的快速查询,使用消息队列实现异步操作。

在www的app下新建tasks.py:

#-*-coding:utf-8-*-
__author__ = 'ashin'

from celery.task import Task
from celery.registry import tasks
import requests

class RequestCindexTask(Task):

    def run(self, key, field='user', **kwargs):

        if field == 'user':
            r = requests.get("http://127.0.0.1:8000/user/search?key=%s"%key)
            return r.text
        elif field == 'title':
            pass

tasks.register(RequestCindexTask)

view中查询调用处改为:

#-*- coding:utf-8 -*-

from django.http import HttpResponse

from main.tasks import RequestCindexTask

def search(request, nick=u"压力", field="user"):
    r = RequestCindexTask.delay(nick, field)
    print r.get()
    return HttpResponse("ok")

运行index:$ python app.py 8000 运行www:$ python manage.py runserver 8001 运行celery:$ python manage.py celeryd -l info --settings=settings

访问www中可以调用search的url,可在www的后台看到返回结果,返回值必须用get()获得,不能使用result

如果您觉得从我的分享中得到了帮助,并且希望我的博客持续发展下去,请点击支付宝捐赠,谢谢!

若非特别声明,文章均为阿小信的个人笔记,转载请注明出处。文章如有侵权内容,请联系我,我会及时删除。

#Python#   #django #queue
分享到:
阅读[11094] 评论[1]

你可能也感兴趣的文章推荐

本文最近访客

网友54.*.*.22[火星]2018-08-21 08:26
网友220.*.*.122[北京]2018-08-21 08:22
网友54.*.*.25[法国]2018-08-21 08:18
网友46.*.*.82[火星]2018-08-21 08:17

发表评论

#1 网友14.*.*.32[深圳]5394 :
hello 我也碰到了 No result backend configured. 这种问题 想问一下你当初是不是在eclipse下编辑的
2015-08-27 17:36 回复