首页Python【Python计算生态】s...

【Python计算生态】schedule——定时任务调度工具

Python受欢迎的原因之一就是其计算生态丰富,据不完全统计,Python 目前为止有约13万+的第三方库。

本系列将会陆续整理分享一些有趣、有用的第三方库。

文章配套代码获取有以下两种途径:
  • 通过百度网盘获取:
链接:https://pan.baidu.com/s/1FSGLd7aI_UQlCQuovVHc_Q?pwd=mnsj 提取码:mnsj
  • 前往GitHub获取
https://github.com/returu/Python_Ecosystem





01
简介

schedule是一个轻量级的Python第三方库,用于在特定时间间隔或特定时间点执行任务。它提供了简单而直观的API,使得定时任务的调度变得非常容易。

schedule库被设计为解决简单调度问题的简单解决方案。如果需要以下功能,建议寻找其他工具:
  • 任务持久化(在重启后仍能记住调度计划);
  • 精确计时(毫秒级精度的执行);
  • 并发执行(多线程运行任务);
  • 本地化功能(工作日或节假日)。
直接使用pip安装:
pip install schedule
GitHub页面:
https://github.com/dbader/schedule

02
使用

  • 1、定时执行任务:

schedule使用友好的语法周期性地运行 Python 函数或任何其他可调用对象。

以下是一个简单的示例,展示如何使用schedule库创建一个每隔10秒执行一次的任务:

import schedule
import time

# 定义函数
def job():
    print("Hello, this is a scheduled task!")

# 设置定时任务,即每隔 10 秒执行一次 job 函数
schedule.every(10).seconds.do(job)

# 通过一个循环来保持调度器运行
while True:
    # schedule.run_pending() 会检查所有已安排的任务,如果有任务到了执行时间,就会执行该任务
    schedule.run_pending()
    # time.sleep(1) 是为了避免 CPU 过度占用
    time.sleep(1)

# 持续输出:Hello, this is a scheduled task!


  • 任务函数:
do()方法接受一个函数,将其注册为定时任务。
也支持向任务传递参数,此时do()函数会将额外的参数传递给任务函数。
def greet(name):
    print(f"Hello, {name}!")

# 将 name="World" 作为关键字参数传递给 greet 函数
schedule.every(5).seconds.do(greet, name="World")

while True:
    schedule.run_pending()
    time.sleep(1)

# 持续输出:Hello, World!

  • 时间单位:

schedule支持多种时间单位,包括秒、分钟、小时、天、周等。

schedule.every(10).minutes.do(job)  # 每隔10分钟运行一次
schedule.every().hour.do(job)       # 每小时运行一次
schedule.every().day.at("10:30").do(job)  # 每天上午10:30运行一次
schedule.every().monday.do(job)     # 每周一运行一次
schedule.every().wednesday.at("13:15").do(job)  # 每周三下午13:15运行一次
schedule.every().day.at("12:42""Europe/Amsterdam").do(job)  # 每天12:42(阿姆斯特丹时区)运行一次
schedule.every().minute.at(":17").do(job)  # 每分钟的第17秒运行一次

  • 获取下一次执行的时间:
使用schedule.idle_seconds()可以获取下一次任务计划运行前的秒数。如果下一个计划任务的运行时间在过去,则返回值为负数。如果没有计划任务,则返回 None
通过该方式可以精确的睡眠到任务运行时间。
import schedule
import time

def job():
    print("Hello, this is a scheduled task!")

schedule.every(5).seconds.do(job)

while True:
    # 获取下一次任务计划运行前的秒数
    n = schedule.idle_seconds()
    # 精确地睡眠到任务运行时间
    time.sleep(n)
    schedule.run_pending()
    

# 持续输出:Hello, this is a scheduled task!

  • 2、使用装饰器调度任务:
使用@repeat装饰器来调度函数。使用与上述相同的语法,但省略do()函数。
from schedule import every, repeat, run_pending
import time

# 使用@repeat装饰器来调度函数
@repeat(every(10).seconds)
def job():
    print("Hello, this is a scheduled task!")

while True:
    run_pending()
    time.sleep(1)

# 持续输出:Hello, this is a scheduled task!


  • 3、获取所有任务:

要从调度器中检索所有任务,可以使用schedule.get_jobs()方法,也可以使用schedule.jobs属性获取。

schedule.jobs是一个内部属性,虽然可以直接操作,但建议尽量使用schedule.get_jobs()方法来管理任务,以避免潜在的兼容性问题。

import schedule

def hello():
    print('你好,世界')

schedule.every().second.do(hello)
schedule.every().day.do(hello)

# 获取所有任务
all_jobs = schedule.get_jobs()
# all_jobs = schedule.jobs
all_jobs
# 输出:[Every 1 second do hello() (last run: [never], next run: 2025-02-15 11:59:07),
#       Every 1 day do hello() (last run: [never], next run: 2025-02-16 11:59:06)]


  • 4、取消任务:
如果需要取消已经安排的任务,可以使用schedule.cancel_job(job) 方法:
import schedule
import time

def job():
    print("I'm working...")

# 注册任务并保存任务对象
job_to_cancel = schedule.every(5).seconds.do(job)

# 记录开始时间
start_time = time.time()

while True:
    schedule.run_pending()
    time.sleep(1)
    
    # 检查是否已经运行了20秒
    if time.time() - start_time >= 20:
        schedule.cancel_job(job_to_cancel)  # 取消任务
        print("任务已取消")
        break# 退出循环

print("程序结束")
# 输出:I'm working...
# 输出:I'm working...
# 输出:I'm working...
# 输出:任务已取消
# 输出:程序结束


  • 5、取消所有任务:
使用schedule.clear()方法从调度器中移除所有任务
import schedule

def hello():
    print('你好,世界')

schedule.every().second.do(hello)
schedule.every().day.do(hello)

# 取消所有任务
schedule.clear()

all_jobs = schedule.get_jobs()
print(all_jobs)
# 输出:[]


  • 6、只运行一次任务:
任务函数返回 schedule.CancelJob 后,任务将从调度器中移除。
import schedule
import time

def one_time_job():
    print("只执行一次!")
    return schedule.CancelJob

schedule.every(5).seconds.do(one_time_job)

while True:
    schedule.run_pending()
    time.sleep(1)
    # 检查任务列表是否为空。如果任务被取消(通过 schedule.CancelJob),schedule.jobs会变成空列表
    if not schedule.jobs:
        print("所有任务已完成,退出程序。")
        break

# 输出:Hello, World!
#      所有任务已完成,退出程序。


  • 7、在随机间隔运行任务:

可以通过every(A).to(B).<time_unit>的方式来设置任务在随机间隔内运行。这种方式允许任务在指定的时间范围内随机执行,而不是固定的时间间隔。

def my_job():
    print('Foo')

# 每隔 5 到 10 秒运行一次
schedule.every(5).to(10).seconds.do(my_job)


  • 8、在特定时间之前运行任务:

使用until()方法设置任务的截止时间,任务在截止时间之后将不再运行。

import schedule
from datetime import datetime, timedelta, time

def job():
    print('在特定时间之前运行任务')

# 在今天 18:30 之前运行任务
schedule.every(1).hours.until("18:30").do(job)

# 在 2030-01-01 18:33 之前运行任务
schedule.every(1).hours.until("2030-01-01 18:33").do(job)

# 在接下来的 8 小时内运行任务
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# 在今天 18:33:42 之前运行任务
schedule.every(1).hours.until(time(18, 33, 42)).do(job)

# 在特定的 datetime 之前运行任务
schedule.every(1).hours.until(datetime(2025, 5, 17, 11, 36, 20)).do(job)


  • 9、立即运行所有任务,忽略其调度计划:
使用schedule.run_all()方法立即运行所有任务,无论它们是否计划运行。
任务完成后会重新调度,就像它们通过run_pending()执行一样。
import schedule

def job_1():
    print('任务一')

def job_2():
    print('任务二')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 使用 delay_seconds 参数在任务之间添加延迟秒数
schedule.run_all(delay_seconds=10)


更多内容可以前往官方文档查看:

https://schedule.readthedocs.io/en/stable/

本篇文章来源于微信公众号: 码农设计师

RELATED ARTICLES

欢迎留下您的宝贵建议

Please enter your comment!
Please enter your name here

- Advertisment -

Most Popular

Recent Comments