不过艰辛的路显然不是光靠勇气就能踏开的,路上满是崴脚的石头。
简单案例
使用redis+celery3.1.5版本,django为2.1。0版本
首先新建一个项目
django-admin startproject Test_Celery
cd Test_Celery
python3 manage.py startapp app
修改文件
Test_Celery/celery.py
#!/usr/bin/env python3
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'Test_Celery.settings') # 设置django环境
CELERY_BROKER_URL = 'redis://192.168.1.102:6379/0' # Broker配置,使用Redis作为消息中间件
CELERY_RESULT_BACKEND = 'redis://192.168.1.102:6379/0' # BACKEND配置,这里使用redis
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
app = Celery('Test_Celery',broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND)
app.config_from_object('django.conf:settings')
# 在settings中写配置
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# 发现任务文件每个app下的task.py
Test_Celery/urls.py
from django.contrib import admin
from django.urls import path,include
urlpatterns = [
path('admin/', admin.site.urls),
path('index/',include('app.urls'))
]
app/views.py
# -*- encoding: utf-8 -*-
from django.shortcuts import render
# Create your views here.
from django.http import JsonResponse
from . import tasks
# Create your views here.
def index(request,*args,**kwargs):
res=tasks.add.delay(1,3)
#任务逻辑
return JsonResponse({'status':'successful','task_id':res.task_id})
app/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y
app/urls.py
from django.urls import path
from .views import index
urlpatterns = [path('',index)]
使用命令
首先启动redis数据库
C:\CODE\Learn_Python\Learn_Django\Test_Celery\Test_Celery>celery -A Test_Celery worker
python3 manage.py runserver
访问网址
127.0.0.1:8000/index
案例完善
上面可能不方便理解,但是确是我一点一点根据官方文档摸索…下面是按照杨仕航老师博客内容进行优化
还是使用一样的项目结构,文件如下
Test_Celery/init.py
# coding:utf-8
from __future__ import absolute_import, unicode_literals
# 引入celery实例对象
from .celery import app as celery_app
Test_Celery/celery.py
# coding:utf-8
from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
# 获取当前文件夹名,即为该Django的项目名
project_name = os.path.split(os.path.abspath('.'))[-1]
project_settings = '%s.settings' % project_name
# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', project_settings)
# 实例化Celery
app = Celery(project_name)
# 使用django的settings文件配置celery
app.config_from_object('django.conf:settings')
# Celery加载所有注册的应用
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# CELERY_BROKER_URL = 'redis://192.168.1.102:6379/0' # Broker配置,使用Redis作为消息中间件
#
# CELERY_RESULT_BACKEND = 'redis://192.168.1.102:6379/0' # BACKEND配置,这里使用redis
Test_Celery/settings.py
再最后加上如下内容
STATIC_URL = '/static/'
# celery settings
# celery中间人 redis://redis服务所在的ip地址:端口/数据库号
BROKER_URL = 'redis://localhost:6379/0'
# celery结果返回,可用于跟踪结果
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# celery内容等消息的格式设置
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# celery时区设置,使用settings中TIME_ZONE同样的时区
CELERY_TIMEZONE = TIME_ZONE
Test_Celery/urls.py
from django.contrib import admin
from django.urls import path,include
urlpatterns = [
path('admin/', admin.site.urls),
path('index/',include('app.urls'))
]
app/tasks.py
# coding:utf-8
from celery.decorators import task
import time
@task
def sendmail(email):
print('start send email to %s' % email)
time.sleep(5) # 休息5秒
print('success')
return True
app/urls.py
from django.urls import path
from .views import home
urlpatterns = [path('',home)]
app/views.py
# coding:utf-8
from django.shortcuts import render
from django.http import HttpResponse
import random
from .tasks import sendmail # 引用tasks.py文件的中sendmail方法
import json
def home(request):
# 耗时任务,发送邮件(用delay执行方法)
sendmail.delay('test@test.com')
# 其他行为
data = list(random.sample(range(100),20))
return HttpResponse(json.dumps(data), content_type='application/json')
执行命令
C:\CODE\Learn_Python\Learn_Django\Test_Celery\Test_Celery>celery -A Test_Celery worker -l info
python3 manage.py runserver
访问网址
http://127.0.0.1:8000/index/
添加定时任务
定时执行任务。在实例代码的基础上写个测试方法:
#coding:utf-8
from celery.decorators import periodic_task
@periodic_task(run_every=10)
def some_task():
print('periodic task test!!!!!')
time.sleep(5)
print('success')
return True
即每十秒钟执行一次任务,还能这么写
run_every=datetime.timedelta(hours=1, minutes=15, seconds=40)
即 1小时15分钟40秒 运行一次