python 中实现简单的基于 Redis 后端的任务消息队列

了解基本的高并发需求下实现异步的任务队列模型

任务队列是什么?

任务队列是用于在多线程或多进程或多服务器间分发工作的实现机制。
任务作为任务队列的基本工作单元,专用的工作进程(Worker) 用于不断从任务队列里取出并执行任务。
任务队列的实现需要消息中间件方案支持,客户端通过消息中间件发送任务消息,消息中间件发送任务给 Worker 处理。

为什么需要任务队列?

实际应用场景中,有些任务(次要任务)不需要被及时性处理或者需要处理一些计划任务(某个时间点执行任务或者周期性执行任务),这时候为了提高系统的体验,性能,这些任务需要通过一种机制去异步处理,从而避免影响主业务(那些必须同步操作的业务),如何异步处理这些任务?异步操作适合通过队列来实现,队列是个 FIFO 数据结构,这样能将大量的任务进行队列化,按顺序一个个处理,当然也可以实现优先级队列处理那些优先级高的任务。

比如电商项目中的下单业务,用户下单,服务器需要处理一系列关联的业务操作,库存,订单,发短信,发邮件,还有其他特定需求的相关业务,比如发红包,积分等等。这么多的业务难道需要同步式地一个个处理?肯定不是的,每一个任务都会花费一定的处理时间,然而对于下单这个操作来说,作为主要的是去库存拿货并且生存订单,其他次要操作没必要同步等待操作完成才去响应下单请求,发短信,邮件,积分操作及时性不高,可以异步去处理,从而系统能够及时响应用户下单请求,提升系统的体验和高并发能力。

简单的基于 Redis 后端手工实现任务队列

Python 中 的使用标准 queue 模块就可以建立多进程使用的队列,但是使用 redis 和 redis-queue(rq)模块使这一操作更加简单。

Redis 中实现队列的最佳数据结构为 redis list
首先在 redis_queue.py 模块中实现 Redis 队列模型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import redis

class RedisQueue(object):
    def __init__(self, name, namespace='queue', **redis_kwargs):
       # redis的默认参数为:host='localhost', port=6379, db=0, 其中db为定义redis database的数量
       self.__db= redis.Redis(**redis_kwargs)
       self.key = '%s:%s' %(namespace, name)

    def qsize(self):
        return self.__db.llen(self.key)  # 返回队列里面list内元素的数量

    def put(self, item):
        self.__db.rpush(self.key, item)  # 添加新元素到队列最右方

    def get_wait(self, timeout=None):
        # 返回队列第一个元素,如果为空则等待至有元素被加入队列(超时时间阈值为timeout,如果为None则一直等待)
        item = self.__db.blpop(self.key, timeout=timeout)
        # if item:
        #     item = item[1]  # 返回值为一个tuple
        return item

    def get_nowait(self):
        # 直接返回队列第一个元素,如果队列为空返回的是None
        item = self.__db.lpop(self.key)  
        return item

接着,测试一下
input.py

1
2
3
4
5
6
7
8
from redis_queue import RedisQueue
import time

q = RedisQueue('rq')  # 新建队列名为rq
for i in range(5):
    q.put(i)
    print "input.py: data {} enqueue {}".format(i, time.strftime("%c"))
    time.sleep(1)

output.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from redis_queue import RedisQueue
import time

q = RedisQueue('rq')
while 1:
    result = q.get_nowait()
    if not result:
        break
    print "output.py: data {} out of queue {}".format(result, time.strftime("%c"))
    time.sleep(2)

创建 bash 脚本 test_run.sh 运行这两个测试模块

1
2
python input.py &
python output.py &

最后运行脚本

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
input.py: data 0 enqueue Fri Nov 10 10:51:37 2017
output.py: data 0 out of queue Fri Nov 10 10:51:37 2017
input.py: data 1 enqueue Fri Nov 10 10:51:38 2017
output.py: data 1 out of queue Fri Nov 10 10:51:38 2017
input.py: data 2 enqueue Fri Nov 10 10:51:39 2017
output.py: data 2 out of queue Fri Nov 10 10:51:39 2017
input.py: data 3 enqueue Fri Nov 10 10:51:40 2017
output.py: data ('queue:rq', '3') out of queue Fri Nov 10 10:51:40 2017
input.py: data 4 enqueue Fri Nov 10 10:51:41 2017
output.py: data ('queue:rq', '4') out of queue 1510282301.69

基于 RQ+Redis 实现

队列里面可以添加任意object,因此可以添加函数到多个队列来实现多线程并发的效果。
首先,建立一个rq work进程(写在worker.py脚本中)来监听队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import redis
from rq import Worker, Queue, Connection

listen = ['default']
redis_url = "redis://localhost:6379"  # redis server 默认地址
conn = redis.from_url(redis_url)

def square_function(x):
    return x*x

if __name__ == '__main__':
    with Connection(conn):  # 建立与redis server的连接
        worker = Worker(list(map(Queue, listen)))  # 建立worker监听给定的队列
        worker.work()

然后python worker.py启动redis server

test.py文件中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
from rq import Queue
from rq.job import Job
from worker import square_function, conn 
import time

q = Queue(connection=conn)

job = q.enqueue_call(square_function, args=(5, ), result_ttl=5000)   # 保存结果5000s
job_id = job.get_id()
print job_id

result1 = Job.fetch(job_id, connection=conn)
print result1.is_finished

time.sleep(1)  # 等待队列里任务完成

result2 = Job.fetch(job_id, connection=conn)
print result2.return_value

上面文件的输出结果为:

1
2
3
98dc6f58-9333-48f7-88c1-c4de1cc9e5cf  # job id
False # 任务尚未完成
25 # 任务完成,输出结果

注: 调用的square_function不允许和任务发起在同一个脚本,否则会报错ValueError: Functions from the main module cannot be processed by workers

当与flask一起使用时:
app.py文件中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from rq import Queue
from rq.job import Job
from worker import conn square_function

from flask import Flask, request

app = Flask(__name__)

q = Queue(connection=conn) # 建立与Redis server的连接并初始化一个队列

@app.route("/", methods=['POST'])
def index():
    x = request.get_data()  # string 类型
    job = q.enqueue_call(square_function, args=(int(x), ), result_ttl=5000)  # 最后的参数为RQ保留结果的时间
    return job.get_id()  # 返回job的id

@app.route('/result/<job_key>', methods=['GET'])
def get_results(job_key):
    job = Job.fetch(job_key, connection=conn) # 获取根据job_id获取任务的返回值
    if job.is_finished: # 检验是否完成
        return str(job.result), 200
    else:
        return "Wait!", 202

if __name__ == "__main__":
    app.run('0.0.0.0', port=5000)

python app.py启动flask服务

test.py文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
import requests
import time

post_url = "http://localhost:5000"
post_result = requests.post(post_url, data={'x': 2})
job_id = post_result.content
print job_id

time.sleep(1)

get_url = "http://localhost:5000/result/{}".format(job_id)
get_result = requests.get(get_url)
print get_result.content

获得的结果如下:

1
2
067306e9-f13b-4b6a-93dc-2f5b457a78b7  # job id
4  # job返回值

Celery

  • Celery python 端的异步任务队列框架,支持分布式。celery不实现消息的发送和接收和队列操作,需要依赖三方的消息中间件使用,只是说使得在 python 端实现异步任务更加简单,分布式方案解决多服务器应用实例间的交互,单机中支持多进程或者协程高并发操作,支持多个消息中间件后端和任务处理完后的结果的存储后端。支持 RabbitMQ(基于AMQP协议,推荐),Redis,SQS(亚马逊消息中间件),结果存储后端支持 Redis,Memcached 等等,以及一些 ORM 框架(SQLAlchemy, Django ORM)。
  • Kombu 基于 AMQ 协议来简化 python 端消息传递的库,是一个通用的处理库,也就是支持多种消息传输实现,基于 AMQP 和 非 AMQP 协议的消息中间件后端。可以理解为数据库中的 ORM 支持可插拔的多数据库后端,只需提供不同的驱动。Celery 依赖 Kombu。
  • RabbitMQ 基于AMQP协议实现的消息中间件服务器端,可以理解为数据库界的 MySQL。

参考连接:

Flask by Example – Implementing a Redis Task Queue
Simple Python Queue with Redis
celery 一个基于消息传递的分布式异步任务/作业队列框架,比 rq 更加高级,支持多个消息中间件和任务处理的结果的存储后端。
我为什么要选择RabbitMQ ,RabbitMQ简介,各种MQ选型对比
rq 基于 Redis 的异步任务队列实现库
在 Flask 中使用 Redis Queue 实现异步任务
Python—操作redis
解决python rq获取返回结果和异常的问题