不过艰辛的路显然不是光靠勇气就能踏开的,路上满是崴脚的石头。

Celery 入门知识

简单案例

使用redis+celery3.1.5版本,django为2.1。0版本

首先新建一个项目

django-admin startproject Test_Celery

cd Test_Celery

python3 manage.py startapp app

修改文件

Test_Celery/celery.py

#!/usr/bin/env python3
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Test_Celery.settings')  # 设置django环境
CELERY_BROKER_URL = 'redis://192.168.1.102:6379/0' # Broker配置,使用Redis作为消息中间件

CELERY_RESULT_BACKEND = 'redis://192.168.1.102:6379/0' # BACKEND配置,这里使用redis

CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案

app = Celery('Test_Celery',broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND)

app.config_from_object('django.conf:settings')
# 在settings中写配置

# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 发现任务文件每个app下的task.py

Test_Celery/urls.py

from django.contrib import admin
from django.urls import path,include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('index/',include('app.urls'))
]

app/views.py

# -*- encoding: utf-8 -*-
from django.shortcuts import render

# Create your views here.
from django.http import JsonResponse
from . import tasks

# Create your views here.

def index(request,*args,**kwargs):
    res=tasks.add.delay(1,3)
    #任务逻辑
    return JsonResponse({'status':'successful','task_id':res.task_id})

app/tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task


@shared_task
def add(x, y):
    return x + y


@shared_task
def mul(x, y):
    return x * y

app/urls.py

from django.urls import path
from .views import index

urlpatterns = [path('',index)]

使用命令

首先启动redis数据库

C:\CODE\Learn_Python\Learn_Django\Test_Celery\Test_Celery>celery -A Test_Celery worker

python3 manage.py runserver

访问网址

127.0.0.1:8000/index

官方DEMO

案例完善

上面可能不方便理解,但是确是我一点一点根据官方文档摸索…下面是按照杨仕航老师博客内容进行优化

还是使用一样的项目结构,文件如下

Test_Celery/init.py

# coding:utf-8
from __future__ import absolute_import, unicode_literals

# 引入celery实例对象
from .celery import app as celery_app

Test_Celery/celery.py

# coding:utf-8
from __future__ import absolute_import, unicode_literals

from celery import Celery
from django.conf import settings
import os

# 获取当前文件夹名,即为该Django的项目名
project_name = os.path.split(os.path.abspath('.'))[-1]
project_settings = '%s.settings' % project_name

# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', project_settings)

# 实例化Celery
app = Celery(project_name)

# 使用django的settings文件配置celery
app.config_from_object('django.conf:settings')

# Celery加载所有注册的应用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

# CELERY_BROKER_URL = 'redis://192.168.1.102:6379/0' # Broker配置,使用Redis作为消息中间件
#
# CELERY_RESULT_BACKEND = 'redis://192.168.1.102:6379/0' # BACKEND配置,这里使用redis

Test_Celery/settings.py

再最后加上如下内容

STATIC_URL = '/static/'

# celery settings
# celery中间人 redis://redis服务所在的ip地址:端口/数据库号
BROKER_URL = 'redis://localhost:6379/0'
# celery结果返回,可用于跟踪结果
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# celery内容等消息的格式设置
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

# celery时区设置,使用settings中TIME_ZONE同样的时区
CELERY_TIMEZONE = TIME_ZONE

Test_Celery/urls.py

from django.contrib import admin
from django.urls import path,include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('index/',include('app.urls'))
]

app/tasks.py

# coding:utf-8
from celery.decorators import task
import time


@task
def sendmail(email):
    print('start send email to %s' % email)
    time.sleep(5)  # 休息5秒
    print('success')
    return True

app/urls.py

from django.urls import path
from .views import home
urlpatterns = [path('',home)]

app/views.py

# coding:utf-8
from django.shortcuts import render
from django.http import HttpResponse
import random
from .tasks import sendmail  # 引用tasks.py文件的中sendmail方法
import json


def home(request):
    # 耗时任务,发送邮件(用delay执行方法)
    sendmail.delay('test@test.com')

    # 其他行为

    data = list(random.sample(range(100),20))
    return HttpResponse(json.dumps(data), content_type='application/json')

执行命令

C:\CODE\Learn_Python\Learn_Django\Test_Celery\Test_Celery>celery -A Test_Celery worker -l info

python3 manage.py runserver

访问网址

http://127.0.0.1:8000/index/

添加定时任务

定时执行任务。在实例代码的基础上写个测试方法:

#coding:utf-8
from celery.decorators import periodic_task

@periodic_task(run_every=10)
def some_task():
    print('periodic task test!!!!!')
    time.sleep(5)
    print('success')
    return True

即每十秒钟执行一次任务,还能这么写

run_every=datetime.timedelta(hours=1, minutes=15, seconds=40)
即 1小时15分钟40秒 运行一次