类实现
import time
class Cron:
def __init__(self, time_range=""):
self.time_tange = time_range
def cycle_exec(self, func, arg, callback=None, cycle_time=10):
while True:
# 判断时间段
if self.time_range:
if not self.in_time_range(self.time_range):
time.sleep(cycle_time)
continue
start_time = time.time()
try:
if callback:
if arg:
callback(func(arg))
else:
callback(func())
else:
if arg:
func(arg)
else:
func()
except Exception as e:
raise
# 保证固定周期时间
end_time = time.time()
sleep_time = cycle_time - (end_time - start_time)
sleep_time = sleep_time if sleep_time >= 0 else 0
time.sleep(sleep_time)
# 当前时间是否在某个时间段
# 例如9:30-11:30,13:30-15:30
# in_time_range("093000-113000,133000-153000")
@staticmethod
def in_time_range(ranges):
now = time.strptime(time.strftime("%H%M%S"), "%H%M%S")
ranges = ranges.split(",")
for range in ranges:
r = range.split("-")
if time.strptime(r[0], "%H%M%S") <= now <= time.strptime(r[1], "%H%M%S") or time.strptime(r[0],"%H%M%S") >= now >= time.strptime(r[1], "%H%M%S"):
return True
return False
调用示例
在9:30至22:30之间每隔10秒执行task函数,没有回调函数
def task(msg):
time.sleep(1)
print(msg)
cron = Cron("093000-223000").cycle_exec(task, "haha", None, 10)
发表评论
抢沙发~