Python受欢迎的原因之一就是其计算生态丰富,据不完全统计,Python 目前为止有约13万+的第三方库。
本系列将会陆续整理分享一些有趣、有用的第三方库。
-
通过百度网盘获取:
链接:https://pan.baidu.com/s/1FSGLd7aI_UQlCQuovVHc_Q?pwd=mnsj
提取码:mnsj
-
前往GitHub获取:
https://github.com/returu/Python_Ecosystem
-
任务持久化(在重启后仍能记住调度计划); -
精确计时(毫秒级精度的执行); -
并发执行(多线程运行任务); -
本地化功能(工作日或节假日)。
pip install schedule
https://github.com/dbader/schedule
-
1、定时执行任务:
schedule使用友好的语法周期性地运行 Python 函数或任何其他可调用对象。
以下是一个简单的示例,展示如何使用schedule库创建一个每隔10秒执行一次的任务:
import schedule
import time
# 定义函数
def job():
print("Hello, this is a scheduled task!")
# 设置定时任务,即每隔 10 秒执行一次 job 函数
schedule.every(10).seconds.do(job)
# 通过一个循环来保持调度器运行
while True:
# schedule.run_pending() 会检查所有已安排的任务,如果有任务到了执行时间,就会执行该任务
schedule.run_pending()
# time.sleep(1) 是为了避免 CPU 过度占用
time.sleep(1)
# 持续输出:Hello, this is a scheduled task!
-
任务函数:
def greet(name):
print(f"Hello, {name}!")
# 将 name="World" 作为关键字参数传递给 greet 函数
schedule.every(5).seconds.do(greet, name="World")
while True:
schedule.run_pending()
time.sleep(1)
# 持续输出:Hello, World!
-
时间单位:
schedule支持多种时间单位,包括秒、分钟、小时、天、周等。
schedule.every(10).minutes.do(job) # 每隔10分钟运行一次
schedule.every().hour.do(job) # 每小时运行一次
schedule.every().day.at("10:30").do(job) # 每天上午10:30运行一次
schedule.every().monday.do(job) # 每周一运行一次
schedule.every().wednesday.at("13:15").do(job) # 每周三下午13:15运行一次
schedule.every().day.at("12:42", "Europe/Amsterdam").do(job) # 每天12:42(阿姆斯特丹时区)运行一次
schedule.every().minute.at(":17").do(job) # 每分钟的第17秒运行一次
-
获取下一次执行的时间:
import schedule
import time
def job():
print("Hello, this is a scheduled task!")
schedule.every(5).seconds.do(job)
while True:
# 获取下一次任务计划运行前的秒数
n = schedule.idle_seconds()
# 精确地睡眠到任务运行时间
time.sleep(n)
schedule.run_pending()
# 持续输出:Hello, this is a scheduled task!
-
2、使用装饰器调度任务:
from schedule import every, repeat, run_pending
import time
# 使用@repeat装饰器来调度函数
@repeat(every(10).seconds)
def job():
print("Hello, this is a scheduled task!")
while True:
run_pending()
time.sleep(1)
# 持续输出:Hello, this is a scheduled task!
-
3、获取所有任务:
要从调度器中检索所有任务,可以使用schedule.get_jobs()方法,也可以使用schedule.jobs属性获取。
schedule.jobs是一个内部属性,虽然可以直接操作,但建议尽量使用schedule.get_jobs()方法来管理任务,以避免潜在的兼容性问题。
import schedule
def hello():
print('你好,世界')
schedule.every().second.do(hello)
schedule.every().day.do(hello)
# 获取所有任务
all_jobs = schedule.get_jobs()
# all_jobs = schedule.jobs
all_jobs
# 输出:[Every 1 second do hello() (last run: [never], next run: 2025-02-15 11:59:07),
# Every 1 day do hello() (last run: [never], next run: 2025-02-16 11:59:06)]
-
4、取消任务:
import schedule
import time
def job():
print("I'm working...")
# 注册任务并保存任务对象
job_to_cancel = schedule.every(5).seconds.do(job)
# 记录开始时间
start_time = time.time()
while True:
schedule.run_pending()
time.sleep(1)
# 检查是否已经运行了20秒
if time.time() - start_time >= 20:
schedule.cancel_job(job_to_cancel) # 取消任务
print("任务已取消")
break# 退出循环
print("程序结束")
# 输出:I'm working...
# 输出:I'm working...
# 输出:I'm working...
# 输出:任务已取消
# 输出:程序结束
-
5、取消所有任务:
import schedule
def hello():
print('你好,世界')
schedule.every().second.do(hello)
schedule.every().day.do(hello)
# 取消所有任务
schedule.clear()
all_jobs = schedule.get_jobs()
print(all_jobs)
# 输出:[]
-
6、只运行一次任务:
import schedule
import time
def one_time_job():
print("只执行一次!")
return schedule.CancelJob
schedule.every(5).seconds.do(one_time_job)
while True:
schedule.run_pending()
time.sleep(1)
# 检查任务列表是否为空。如果任务被取消(通过 schedule.CancelJob),schedule.jobs会变成空列表
if not schedule.jobs:
print("所有任务已完成,退出程序。")
break
# 输出:Hello, World!
# 所有任务已完成,退出程序。
-
7、在随机间隔运行任务:
可以通过every(A).to(B).<time_unit>的方式来设置任务在随机间隔内运行。这种方式允许任务在指定的时间范围内随机执行,而不是固定的时间间隔。
def my_job():
print('Foo')
# 每隔 5 到 10 秒运行一次
schedule.every(5).to(10).seconds.do(my_job)
-
8、在特定时间之前运行任务:
使用until()方法设置任务的截止时间,任务在截止时间之后将不再运行。
import schedule
from datetime import datetime, timedelta, time
def job():
print('在特定时间之前运行任务')
# 在今天 18:30 之前运行任务
schedule.every(1).hours.until("18:30").do(job)
# 在 2030-01-01 18:33 之前运行任务
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# 在接下来的 8 小时内运行任务
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# 在今天 18:33:42 之前运行任务
schedule.every(1).hours.until(time(18, 33, 42)).do(job)
# 在特定的 datetime 之前运行任务
schedule.every(1).hours.until(datetime(2025, 5, 17, 11, 36, 20)).do(job)
-
9、立即运行所有任务,忽略其调度计划:
import schedule
def job_1():
print('任务一')
def job_2():
print('任务二')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all()
# 使用 delay_seconds 参数在任务之间添加延迟秒数
schedule.run_all(delay_seconds=10)
更多内容可以前往官方文档查看:
https://schedule.readthedocs.io/en/stable/


本篇文章来源于微信公众号: 码农设计师