一见钟情不是个魔法,它是命运。
内置 sched 模块
sched相当于一个延时处理任务,实用方法也非常简单。
import sched
import time
scheduler = sched.scheduler(time.time, time.sleep)
# 实例化调度器,使用默认参数
t1 = time.time()
# 当前时间点为 t1
def run(msg):
print('当前时间为:{}S后~ 执行任务为:{}'.format(time.time()-t1,msg))
print ('开始运行时间为:{}'.format(time.time()))
scheduler.enter(2,1, run,('两秒后,执行优先级等级为 1 的任务',))
scheduler.enter(5,2, run,('五秒后,执行优先级等级为 2 的任务',))
# 这里的参数为:5秒后执行任务,任务优先级为2,执行的任务函数为run,函数接受的参数为('五秒后,执行优先级等级为 2 的任务')
#运行调度器
scheduler.run()
需要注意的是,scheduler并不是无限循环的,只会执行一次就退出了。
第三方 schedule 库
schedule是对sched进行下一步优化,使用简单明了的一个第三方定时任务库,需要先pip安装一下。
上手案例
使用案例如下,代码来自简书案例
import schedule
import time
# 定义你要周期运行的函数
def job():
print("I'm working...")
schedule.every(10).minutes.do(job) # 每隔 10 分钟运行一次 job 函数
schedule.every().hour.do(job) # 每隔 1 小时运行一次 job 函数
schedule.every().day.at("10:30").do(job) # 每天在 10:30 时间点运行 job 函数
schedule.every().monday.do(job) # 每周一 运行一次 job 函数
schedule.every().wednesday.at("13:15").do(job) # 每周三 13:15 时间点运行 job 函数
schedule.every().minute.at(":17").do(job) # 每分钟的 17 秒时间点运行 job 函数
while True:
schedule.run_pending() # 运行所有可以运行的任务
time.sleep(1)
可以看到还是非常简洁明了。
着重记录一下要用得到的API
传递参数
import schedule
def run(msg):
print('接受参数为:{}'.format(msg))
schedule.every(1).seconds.do(run, msg='10')
while 1:
schedule.run_pending()
执行并发任务
三个任务执行如下:
import schedule,time
def job1(msg):
time.sleep(1)
print('任务1接受参数为:{}'.format(msg))
def job2(msg):
time.sleep(2)
print('任务2接受参数为:{}'.format(msg))
def job3(msg):
time.sleep(3)
print('任务3接受参数为:{}'.format(msg))
schedule.every(1).seconds.do(job1,msg='任务1')
schedule.every(3).seconds.do(job2,msg='任务2')
schedule.every(1).seconds.do(job3,msg='任务3')
while 1:
schedule.run_pending()
运行结果:
任务1接受参数为:任务1
任务3接受参数为:任务3
任务2接受参数为:任务2
任务1接受参数为:任务1
任务3接受参数为:任务3
任务1接受参数为:任务1
......
可以看到是顺序执行,如果要并发执行多个任务,使用threading完成
# -*- coding:utf-8 -*-
import schedule,time
import threading
def job1(msg):
time.sleep(1)
print('任务1接受参数为:{}'.format(msg))
def job2(msg):
time.sleep(2)
print('任务2接受参数为:{}'.format(msg))
def job3(msg):
time.sleep(3)
print('任务3接受参数为:{}'.format(msg))
def ensure_schedule_1():
threading.Thread(target=job1,args=(1,)).start()
def ensure_schedule_2():
threading.Thread(target=job2,args=(2,)).start()
def ensure_schedule_3():
threading.Thread(target=job3,args=(3,)).start()
def run():
schedule.every(1).seconds.do(ensure_schedule_1)
schedule.every(1).seconds.do(ensure_schedule_2)
schedule.every(1).seconds.do(ensure_schedule_3)
while 1:
schedule.run_pending()
run()
具体用法可以参考官方文档官方文档