阿小信大人的头像
Talk is cheap. Show me the code. Linus Torvalds

Hello World for RabbitMQ2014-09-02 06:37

下载安装

下载对应版本的RabbitMQ:http://www.rabbitmq.com/download.html Ubuntu下载的deb安装后,RabbitMQ的Server已经在后台运行。 启动与停止server:

$ sudo invoke-rc.d rabbitmq-server start | stop
$ sudo rabbitmq-server #运行
$ sudo rabbitmq-server -detached #后台运行
$ sudo rabbitmqctl stop #停止
可用Python操作库

安装pika:sudo pip install pika

Hello World

发送消息到队列(sendhello.py):

#-*- coding:utf-8 -*-
import pika

#establish a connection with RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# create a queue to which the message will be delivered, let's name it hello
channel.queue_declare(queue='hello')

#use a default exchange identified by an empty string. This exchange is special ‒ it allows us to specify exactly to which queue the message should go. The queue name needs to be specified in the routing_key parameter
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"

connection.close()

查看RabbitMQ中的队列:

sudo rabbitmqctl list_queues
Listing queues ...
hello   1
...done.

从队列接收消息(receivehello.py):

#-*- coding:utf-8 -*-
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = conn.channel()

channel.queue_declare(queue='hello')

#Whenever receive a message, this callback function is called by the Pika library
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

#tell RabbitMQ that this particular callback function should receive messages from our hello queue
channel.basic_consume(callback, queue='hello', no_ack=True)

#enter a never-ending loop that waits for data and runs callbacks whenever necessary
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()

运行:

[x] Received 'Hello World!'
[*] Waiting for messages. To exit press CTRL+C

再次运行sendhello.py会再次打印

[x] Received 'Hello World!'
Worker Queue

开启多个walker处理task

发送消息(task.py)

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent with a value of 2 (any other value make msg transient)
                      ))
print " [x] Sent %r" % (message,)
connection.close()

接收消息(worker.py)

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

开启多个worker,多次运行task,可见个worker分别处理请求。

如果您觉得从我的分享中得到了帮助,并且希望我的博客持续发展下去,请点击支付宝捐赠,谢谢!

若非特别声明,文章均为阿小信的个人笔记,转载请注明出处。文章如有侵权内容,请联系我,我会及时删除。

#Python#   #queue
分享到:
阅读[1205] 评论[0]

你可能也感兴趣的文章推荐

本文最近访客

发表评论