robfig cron是go开发者最常用的基于cron解析的定时任务管理器 cron介绍 (opens new window)
# 一、基本介绍:
# 1. 从Cron结构体查看整体设计:
type Cron struct {
entries []*Entry // 一个Entry即一个定时任务
chain Chain // 装饰原始Job的装饰器数组
stop chan struct{} // 用于已启动Cron的关闭
add chan *Entry // 用于已启动Cron 新增Entry
remove chan EntryID // 用于已启动Cron 删除Entry
snapshot chan chan []Entry // 用于获取已启动Cron 的entries
running bool // 控制只启动一次, 启动与关闭状态下, 很多函数的行为将有差异
runningMu sync.Mutex // 控制running、entries、nextID等并发安全修改或访问
location *time.Location // 定时任务运行的时区
parser ScheduleParser // cron解析器
nextID EntryID // 存放当前最大EntryID
jobWaiter sync.WaitGroup // 用于已启动Cron的优雅关闭(等待所有任务执行完毕)
}
type Entry struct {
ID EntryID // EntryID (int)
Schedule Schedule // 用于获取任务的下一次执行时间
Next time.Time // 任务下一次将要执行的时间
Prev time.Time // 任务上一次执行的时间
WrappedJob Job // 为Job字段经过Cron.chain包装后的Job,定时任务运行的即Job.Run函数
Job Job // 原始Job
}
type ScheduleParser interface {
// 解析spec(即我们传入的cron字符串)为Schedule
Parse(spec string) (Schedule, error)
}
type Schedule interface {
// 获取任务下一次执行的时间(晚于传入的time)
Next(time.Time) time.Time
}
// 将我们需要定时执行的逻辑存放在Job实现的Run函数中
type Job interface {
Run()
}
# 2. 从官方ScheduleParser实现来学习 robig/cron 的cron书写规则:
type Parser struct {
// ParseOption通过位运算设计简化了相关解析逻辑
options ParseOption
}
- 由本文开头的链接可以知道,cron表达式一般最多可以支持7个子表达式,但是robfig/cron/v3最多支持前6个,即不支持Year的配置,且默认配置为最小单位分钟即crontab式的配置(crontab cron (opens new window))
- 对于6个子表达式的配置,robig/cron 采取了位运算的方式:
type ParseOption int
const (
Second ParseOption = 1 << iota // Seconds field, default 0
SecondOptional // Optional seconds field, default 0
Minute // Minutes field, default 0
Hour // Hours field, default 0
Dom // Day of month field, default *
Month // Month field, default *
Dow // Day of week field, default *
DowOptional // Optional day of week field, default *
Descriptor // Allow descriptors such as @monthly, @weekly, etc.
)
对于Parser.options,即是我们选取若干需要的配置按位或的结果, 比如默认配置:
var standardParser = NewParser(
Minute | Hour | Dom | Month | Dow | Descriptor,
)
需要注意的是, 秒可选配置(SecondOptional,书写cron表达式的时候,可以加秒表达式也可以不加的意思,不加将添加默认配置)与星期可选配置(DowOptional)最多只能选取一个
func NewParser(options ParseOption) Parser {
optionals := 0
if options&DowOptional > 0 {
optionals++
}
if options&SecondOptional > 0 {
optionals++
}
if optionals > 1 {
panic("multiple optionals may not be configured")
}
return Parser{options}
}
- Parser核心函数Parse:由于函数过长,将省略部分分支逻辑和错误处理逻辑
func (p Parser) Parse(spec string) (Schedule, error) {
if len(spec) == 0 {return nil, fmt.Errorf("empty spec string")}
// 1. 处理时区, 注意这个时区不是Cron的时区, 设置的是Schedule的时区
var loc = time.Local
if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") {
// 省略
}
// 2. 处理@monthly等简略写法的特殊情况并返回, 需要配置中有Descriptor
if strings.HasPrefix(spec, "@") {// 省略}
// 3. 分割为子表达式
fields := strings.Fields(spec)
// 4. 检查子表达式长度并补充可选项的默认值
var err error
fields, err = normalizeFields(fields, p.options)
if err != nil {return nil, err}
// 5. 分别处理各子表达式, 每个子表达式将处理为uint64, 其中每一位的0/1都代表当前单位
// 在此取值时执不执行任务。如second处理结果的后8位为00000010,
// 则代表在第一秒(秒的取值为0~59)将要执行函数
// 所有子表达式的uint64结合起来就可以完整表示在何时将要执行任务
field := func(field string, r bounds) uint64 {
if err != nil {
return 0
}
var bits uint64
// getField
bits, err = getField(field, r)
return bits
}
var (
// 第二个参数为各个子表达式为:
// type bounds struct {
// min, max uint // 最小最大取值
// names map[string]uint // 支持星期和月份等可以用英文表达的对应的值
// }
second = field(fields[0], seconds)
minute = field(fields[1], minutes)
hour = field(fields[2], hours)
dayofmonth = field(fields[3], dom)
month = field(fields[4], months)
dayofweek = field(fields[5], dow)
)
if err != nil {return nil, err}
return &SpecSchedule{Second: second, Minute: minute,Hour: hour, Dom: dayofmonth, Month: month, Dow: dayofweek, Location: loc,}, nil
}
var (
seconds = bounds{0, 59, nil}
minutes = bounds{0, 59, nil}
hours = bounds{0, 23, nil}
dom = bounds{1, 31, nil}
months = bounds{1, 12, map[string]uint{
"jan": 1,
"feb": 2,
"mar": 3,
"apr": 4,
"may": 5,
"jun": 6,
"jul": 7,
"aug": 8,
"sep": 9,
"oct": 10,
"nov": 11,
"dec": 12,
}}
dow = bounds{0, 6, map[string]uint{
"sun": 0,
"mon": 1,
"tue": 2,
"wed": 3,
"thu": 4,
"fri": 5,
"sat": 6,
}}
)
getField函数:
- 每个子表达式通过 ”,“ 分割为若干range,如秒的表示可以为 0,1,20。代表第0 1 20秒
- 每个range中可以包含"-",其前后数字代表最大最小值,也可以包含”/“,其之后的数字代表在最大最小值范围中取值的step
- range中还可以包含"*", "?"代表 min-max
func getField(field string, r bounds) (uint64, error) {
var bits uint64
// 每一个子表达式由 "," 分割的若干range组成, range之间为叠加关系
ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' })
for _, expr := range ranges {
bit, err := getRange(expr, r)
if err != nil {
return bits, err
}
bits |= bit
}
return bits, nil
}
func getRange(expr string, r bounds) (uint64, error) {
var (
// 所有用,分割的range都将解析为开始-结尾-每次增长量
start, end, step uint
// ”/“ 符号后面的即为step, 前面为start与end(可选)
rangeAndStep = strings.Split(expr, "/")
// start与end用 ”-“ 分割
lowAndHigh = strings.Split(rangeAndStep[0], "-")
singleDigit = len(lowAndHigh) == 1
err error
)
var extra uint64
// 1. 处理start [-end]
if lowAndHigh[0] == "*" || lowAndHigh[0] == "?" { // 1.1 * 或者 ?
start = r.min
end = r.max
extra = 1 << 63 // 注意用第64位为特殊位来代表表达式中有 * 或 ? (step需为1, 不为1的后面重置extra了)
} else { // 1.2 start 或者 start-end
start, err = parseIntOrName(lowAndHigh[0], r.names)
if err != nil {return 0, err}
switch len(lowAndHigh) {
case 1:
end = start
case 2:
end, err = parseIntOrName(lowAndHigh[1], r.names)
if err != nil {return 0, err}
default:
return 0, fmt.Errorf("too many hyphens: %s", expr)
}
}
// 2. 处理step
switch len(rangeAndStep) {
case 1:
step = 1
case 2:
step, err = mustParseInt(rangeAndStep[1])
if err != nil {
return 0, err
}
// Special handling: "N/step" means "N-max/step".
// 注意有step, 没end才修复end为max
// eg: 1 为start=1, end=1, step=1
// eg: 1/1 为start=1, end=max, step=1
if singleDigit {
end = r.max
}
if step > 1 {
extra = 0
}
// 省略min, max, step取值规范错误处理
return getBits(start, end, step) | extra, nil
}
func getBits(min, max, step uint) uint64 {
var bits uint64
// 特殊情况处理降低时间复杂度
// eg: 秒的取值为4-7
// ^(11111111 11111111 11111111 11111111 11111111 11111111 11111111 00000000)
// & 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11110000
if step == 1 {
return ^(math.MaxUint64 << (max + 1)) & (math.MaxUint64 << min)
}
for i := min; i <= max; i += step {
bits |= 1 << i
}
return bits
}
我们可以发现getRange函数的逻辑让我们cron的书写可以有一个不算bug的bug 拿秒的配置举例,我们可以写成 *-5 这种表达而可以正常运行
# 3. 官方Schedule实现,SpecSchedule:
- 结构体如下
type SpecSchedule struct {
Second, Minute, Hour, Dom, Month, Dow uint64
Location *time.Location
}
- Next函数:从年开始位运算匹配,一直匹配到最小单位后即所有单位都匹配即下一次执行时间(大于传入时间)
func (s *SpecSchedule) Next(t time.Time) time.Time {
// 时区逻辑: 如果cron string没有指定时区, 则所有时区都按Cron的时区来(传入的t即为Cron时区时间)
// 如果cron string指定了非本地时区,则先把时间转换为SpecSchedule时区用于计算下一次执行时间,返回时再转换为Cron时区
origLocation := t.Location()
loc := s.Location
if loc == time.Local {
loc = t.Location()
}
if s.Location != time.Local {
t = t.In(s.Location)
}
// 向上取整秒
t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond)
// 用于第一次调整时将小单位全部取 ”零值“
// 很好理解,比如现在我们有个数字是0123 我们要 找到 1001,我们发现最高位不匹配
// 我们不能直接变为1123,我们需要变为1000
added := false
// 找到5年后还未找到下一次执行时间就退出
yearLimit := t.Year() + 5
WRAP:
if t.Year() > yearLimit {
return time.Time{}
}
for 1<<uint(t.Month())&s.Month == 0 {
if !added {
added = true
t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, loc)
}
t = t.AddDate(0, 1, 0)
// 溢出后高位会变动可能已经不满足了,要从循环开头重新匹配
if t.Month() == time.January {
goto WRAP
}
}
// 日、时、秒等处理也相似,省略
return t.In(origLocation)
感兴趣也可以看看对于day的处理,其中有夏令时 (opens new window)处理逻辑
# 二、Cron方法简析:
# 1. 启动及运行时分析:
func (c *Cron) Start() {
c.runningMu.Lock()
defer c.runningMu.Unlock()
// running参数保证只启动一次
if c.running {
return
}
c.running = true
// 非阻塞启动, Run方法为阻塞启动
go c.run()
}
- Cron核心函数为run:
func (c *Cron) run() {
// 获取当前时间,初始化各个任务的Next即下一次执行时间
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
}
for {
// 按Next由低到高排序, 其中找不到Next(零值)为最大值
sort.Sort(byTime(c.entries))
var timer *time.Timer
if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
// 无任务或者任务均找不到执行时间就睡眠, 当有任务加入时会走加入分支唤醒
timer = time.NewTimer(100000 * time.Hour)
} else {
// 设置定时器为距离当前最近的任务
timer = time.NewTimer(c.entries[0].Next.Sub(now))
}
for {
select {
case now = <-timer.C:
now = now.In(c.location)
// 执行Next <= now的任务
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
// 执行chain包装后的任务,其中有sync.WaitGroup逻辑,为了Stop时优雅停止
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
}
case newEntry := <-c.add: // 新增任务, 加入entries后重新执行排序逻辑
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
case <-c.stop: // 接收关闭任务, 返回
timer.Stop()
return
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
}
break
}
}
}
# 2. 停止:
func (c *Cron) Stop() context.Context {
// 保证运行中Cron只关闭一次, chan都是无缓存, 关闭多次会有影响
c.runningMu.Lock()
defer c.runningMu.Unlock()
if c.running {
c.stop <- struct{}{} // 给运行中Cron发信号关闭
c.running = false
}
ctx, cancel := context.WithCancel(context.Background())
// 优雅关闭, 调用后可根据ctx是否取消来判断是否完全关闭
go func() {
c.jobWaiter.Wait()
cancel()
}()
return ctx
}
# 3. 增加任务:
// AddJob(spec string, cmd Job) (EntryID, error) 调用 Schedule
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
// 保证并发安全
c.runningMu.Lock()
defer c.runningMu.Unlock()
c.nextID++
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
}
if !c.running { // 没有运行直接加入
c.entries = append(c.entries, entry)
} else { // 运行中需要发送add信号
c.add <- entry
}
return entry.ID
}
删除、获取entries也类似,不再粘贴描述