问题描述

某段时间, 正常运行的celery定时任务就会忽然停止运行, celery_beatcelery_worker 进程并无异常

首先做了以下排查

  1. task 是否抛出异常导致整个task没有产生正常结果?
  2. 消息队列是否异常导致task丢失?
  3. 是否性能问题导致task处理不过来?

在明确排除了以上原因后, 开始了深入的问题分析

分析

检查日志

查看日志发现celery_worker 最后接收了一些任务后就没有继续接收, 且这些任务并有正常返回result, 定时器执行的时候, 发送出去的任务, 也是没有接收到

1
2
3
4
5
6
(venv) [webapi@VM_231_194_centos log]$ tail -f worker_err.log 
[2018-12-12 13:22:09,110: INFO/MainProcess] Received task: task.send_mail[648a580f-dae1-4a8e-9adc-924183e8226c]
[2018-12-12 13:22:09,374: INFO/MainProcess] Received task: task.send_mail[02afd467-4e13-4420-a769-d05c1e003a6c]
[2018-12-12 13:23:10,959: INFO/MainProcess] Received task: task.send_mail[7e6f5656-d222-4095-81d4-ca56b86daed5]
[2018-12-12 13:23:11,285: INFO/MainProcess] Received task: task.send_mail[69d1dd2a-9eae-4d2d-b556-f933243b3bfb]
[2018-12-12 13:30:05,584: INFO/MainProcess] Received task: task.send_mail[bbc271a1-b88b-4b9c-b2d8-e446f26ddb96]

此celery_worker 除了执行schedule调度的定时任务以外, 还有实时的异步任务例如发送邮件

检查状态

stats 可以得出一些信息, worker 跑了4个子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
(venv) [webapi@VM_231_194_centos current]$ celery -A celery_worker:celery inspect stats
{
"pid": 22644,
"pool": {
"max-concurrency": 4,
"max-tasks-per-child": "N/A",
"processes": [
22655,
22657,
22666,
22667
],
"put-guarded-by-semaphore": false,
"timeouts": [
0,
0
],
"writes": {
"all": "24.99%, 25.01%, 25.01%, 24.99%",
"avg": "25.00%",
"inqueues": {
"active": 0,
"total": 4
},
"raw": "1196, 1197, 1197, 1196",
"total": 4786
}
}
}

通过reserved 查看目前接收等待处理的task数量是16, 粗略地估算了一下task大小(与传递的数据相关), 16个任务大小已经接近64K

1
2
(venv) [webapi@VM_231_194_centos current]$ celery -A celery_worker:celery inspect reserved | grep -v "OK" | wc -l
16

通过active 找到目前worker 只有一个进行中的任务, 并且这个任务已经超时了非常长的时间

1
2
(venv) [webapi@VM_231_194_centos current]$ celery -A celery_worker:celery inspect active | grep -v "OK" | wc -l
1

思考过程

celery_worker 是多进程来接收处理任务, 返回任务的结果, 问题是出在active的任务明明只有一个, 为什么所有子进程都没法处理其他已经接收的任务呢?这会不会是worker的消费模型导致?从官网的资料入手, 里面有prefork-pool-prefetch-settings 说明

  1. worker 根据配置fork一些子进程
  2. 父进程通过消息队列获取任务
  3. 父进程通过管道向子进程发送任务
  4. 子进程接收处理任务并返回结果

prefectch-settings 是让父进程往pipe里面异步发送task , pipe缓冲区大小默认64KB(部分系统可能是1MB), 如果子进程A执行长时间某个任务, 这时候pipe 来了一些task是需要A接收的, 如果刚好缓冲区满了, 主进程发送给A时被阻塞那么所有其他子进程都无法通过pipe 接收任务

以下是伪代码演示

1
2
3
4
5
6
7
8
9
10
11
-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send task T3 to process A
# A still executing T1, T3 stuck in local buffer and won't start until
# T1 returns, and other queued tasks won't be sent to idle processes
<- T1 complete sent by process A
# A executes T3

流程图如下

1
2
3
4
5
6
7
8
9
graph LR
父进程 --> |1.发送任务A1| 管道A{管道A}
父进程 --> |8.发送任务A2, 缓冲区满, 阻塞父进程| 管道A{管道A}
父进程 --> |4.发送任务B1| 管道B{管道B}
管道A -->|2.接收任务A1| A[子进程A]
管道B -->|5.接收任务B1| B[子进程B]
A --> |3.处理任务| 子进程A的IO
B --> |6.处理任务| 子进程B的IO
子进程B的IO --> |7.返回结果| 消息队列

解决方案

worker 提供-Ofair 参数来禁用这种prefetch-settings

启动这个参数后, 主进程只会向当前可用的PIPE 发送数据

以下是伪代码演示

1
2
3
4
5
6
7
8
9
10
11
-> send task T1 to process A
# A executes T1
-> send task T2 to process B
# B executes T2
<- T2 complete sent by process B

-> send T3 to process B
# B executes T3

<- T3 complete sent by process B
<- T1 complete sent by process A

上面只是避开来遇到长期阻塞的子进程,极端的情况下有可能全部子进程都被阻塞, 所以我们针对task应该要有超时设置来保证这个错误能被正确处理. 由于问题是因为send_mail 这个任务引起, 所以我们要对这个任务进行时间限制, 只要超出来时间, 就自动抛出SoftTimeLimitExceeded 而不是无限期等待

1
2
3
4
5
@celery.task(name='task.send_mail',soft_time_limit=20)
def async_send_email(title, html_content, recipients):
msg = Message(title, recipients=recipients)
msg.html = html_content
mail.send(msg)

参考资料

celery optimizing

celery userguide