Go robfig/cron/v3 简析

  robfig cron是go开发者最常用的基于cron解析的定时任务管理器   cron介绍 (opens new window)

# 一、基本介绍:

# 1. 从Cron结构体查看整体设计:

type Cron struct {
	entries   []*Entry // 一个Entry即一个定时任务
	chain     Chain // 装饰原始Job的装饰器数组
	stop      chan struct{} // 用于已启动Cron的关闭
	add       chan *Entry // 用于已启动Cron 新增Entry
	remove    chan EntryID // 用于已启动Cron 删除Entry
	snapshot  chan chan []Entry // 用于获取已启动Cron 的entries
	running   bool // 控制只启动一次, 启动与关闭状态下, 很多函数的行为将有差异
	runningMu sync.Mutex // 控制running、entries、nextID等并发安全修改或访问
	location  *time.Location // 定时任务运行的时区
	parser    ScheduleParser // cron解析器
	nextID    EntryID // 存放当前最大EntryID
	jobWaiter sync.WaitGroup // 用于已启动Cron的优雅关闭(等待所有任务执行完毕)
}

type Entry struct {
	ID EntryID // EntryID (int)
	Schedule Schedule // 用于获取任务的下一次执行时间
	Next time.Time // 任务下一次将要执行的时间
	Prev time.Time // 任务上一次执行的时间
	WrappedJob Job // 为Job字段经过Cron.chain包装后的Job,定时任务运行的即Job.Run函数
	Job Job // 原始Job
}

type ScheduleParser interface {
	// 解析spec(即我们传入的cron字符串)为Schedule
	Parse(spec string) (Schedule, error)
}

type Schedule interface {
    // 获取任务下一次执行的时间(晚于传入的time)
	Next(time.Time) time.Time
}

// 将我们需要定时执行的逻辑存放在Job实现的Run函数中
type Job interface {
	Run()
}

# 2. 从官方ScheduleParser实现来学习 robig/cron 的cron书写规则:

type Parser struct {
    // ParseOption通过位运算设计简化了相关解析逻辑
	options ParseOption
}
  • 由本文开头的链接可以知道,cron表达式一般最多可以支持7个子表达式,但是robfig/cron/v3最多支持前6个,即不支持Year的配置,且默认配置为最小单位分钟即crontab式的配置(crontab cron (opens new window)
  • 对于6个子表达式的配置,robig/cron 采取了位运算的方式:
type ParseOption int

const (
	Second         ParseOption = 1 << iota // Seconds field, default 0
	SecondOptional                         // Optional seconds field, default 0
	Minute                                 // Minutes field, default 0
	Hour                                   // Hours field, default 0
	Dom                                    // Day of month field, default *
	Month                                  // Month field, default *
	Dow                                    // Day of week field, default *
	DowOptional                            // Optional day of week field, default *
	Descriptor                             // Allow descriptors such as @monthly, @weekly, etc.
)

对于Parser.options,即是我们选取若干需要的配置按位或的结果, 比如默认配置:

var standardParser = NewParser(
	Minute | Hour | Dom | Month | Dow | Descriptor,
)

需要注意的是, 秒可选配置(SecondOptional,书写cron表达式的时候,可以加秒表达式也可以不加的意思,不加将添加默认配置)与星期可选配置(DowOptional)最多只能选取一个

func NewParser(options ParseOption) Parser {
	optionals := 0
	if options&DowOptional > 0 {
		optionals++
	}
	if options&SecondOptional > 0 {
		optionals++
	}
	if optionals > 1 {
		panic("multiple optionals may not be configured")
	}
	return Parser{options}
}
  • Parser核心函数Parse:由于函数过长,将省略部分分支逻辑和错误处理逻辑
func (p Parser) Parse(spec string) (Schedule, error) {
	if len(spec) == 0 {return nil, fmt.Errorf("empty spec string")}
	// 1. 处理时区, 注意这个时区不是Cron的时区, 设置的是Schedule的时区
	var loc = time.Local
	if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") {
		// 省略
	}
	// 2. 处理@monthly等简略写法的特殊情况并返回, 需要配置中有Descriptor
	if strings.HasPrefix(spec, "@") {// 省略}

	// 3. 分割为子表达式
	fields := strings.Fields(spec)

	// 4. 检查子表达式长度并补充可选项的默认值
	var err error
	fields, err = normalizeFields(fields, p.options)
	if err != nil {return nil, err}
	// 5. 分别处理各子表达式, 每个子表达式将处理为uint64, 其中每一位的0/1都代表当前单位
	// 在此取值时执不执行任务。如second处理结果的后8位为00000010, 
	// 则代表在第一秒(秒的取值为0~59)将要执行函数
	// 所有子表达式的uint64结合起来就可以完整表示在何时将要执行任务
	field := func(field string, r bounds) uint64 {
		if err != nil {
			return 0
		}
		var bits uint64
		// getField
		bits, err = getField(field, r)
		return bits
	}

	var (
	    // 第二个参数为各个子表达式为:
	    // type bounds struct {
		// 	min, max uint // 最小最大取值
		//	names    map[string]uint // 支持星期和月份等可以用英文表达的对应的值
		// }
		second     = field(fields[0], seconds)
		minute     = field(fields[1], minutes)
		hour       = field(fields[2], hours)
		dayofmonth = field(fields[3], dom)
		month      = field(fields[4], months)
		dayofweek  = field(fields[5], dow)
	)
	if err != nil {return nil, err}

	return &SpecSchedule{Second: second, Minute: minute,Hour: hour, Dom: dayofmonth, Month: month, Dow: dayofweek, Location: loc,}, nil
}

var (
	seconds = bounds{0, 59, nil}
	minutes = bounds{0, 59, nil}
	hours   = bounds{0, 23, nil}
	dom     = bounds{1, 31, nil}
	months  = bounds{1, 12, map[string]uint{
		"jan": 1,
		"feb": 2,
		"mar": 3,
		"apr": 4,
		"may": 5,
		"jun": 6,
		"jul": 7,
		"aug": 8,
		"sep": 9,
		"oct": 10,
		"nov": 11,
		"dec": 12,
	}}
	dow = bounds{0, 6, map[string]uint{
		"sun": 0,
		"mon": 1,
		"tue": 2,
		"wed": 3,
		"thu": 4,
		"fri": 5,
		"sat": 6,
	}}
)

getField函数:

  • 每个子表达式通过 ”,“ 分割为若干range,如秒的表示可以为 0,1,20。代表第0 1 20秒
  • 每个range中可以包含"-",其前后数字代表最大最小值,也可以包含”/“,其之后的数字代表在最大最小值范围中取值的step
  • range中还可以包含"*", "?"代表 min-max
func getField(field string, r bounds) (uint64, error) {
	var bits uint64
	// 每一个子表达式由 "," 分割的若干range组成, range之间为叠加关系
	ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' })
	for _, expr := range ranges {
		bit, err := getRange(expr, r)
		if err != nil {
			return bits, err
		}
		bits |= bit
	}
	return bits, nil
}

func getRange(expr string, r bounds) (uint64, error) {
	var (
	    // 所有用,分割的range都将解析为开始-结尾-每次增长量
		start, end, step uint
		// ”/“ 符号后面的即为step, 前面为start与end(可选)
		rangeAndStep     = strings.Split(expr, "/")
		// start与end用 ”-“ 分割
		lowAndHigh       = strings.Split(rangeAndStep[0], "-")
		singleDigit      = len(lowAndHigh) == 1
		err              error
	)
	var extra uint64
	// 1. 处理start [-end]
	if lowAndHigh[0] == "*" || lowAndHigh[0] == "?" { // 1.1 * 或者 ?
		start = r.min
		end = r.max
		extra = 1 << 63 // 注意用第64位为特殊位来代表表达式中有 * 或 ? (step需为1, 不为1的后面重置extra了)
	} else { // 1.2 start 或者 start-end
		start, err = parseIntOrName(lowAndHigh[0], r.names)
		if err != nil {return 0, err}
		switch len(lowAndHigh) {
		case 1:
			end = start
		case 2:
			end, err = parseIntOrName(lowAndHigh[1], r.names)
			if err != nil {return 0, err}
		default:
			return 0, fmt.Errorf("too many hyphens: %s", expr)
		}
	}
	// 2. 处理step
	switch len(rangeAndStep) {
	case 1:
		step = 1
	case 2:
		step, err = mustParseInt(rangeAndStep[1])
		if err != nil {
			return 0, err
		}
		// Special handling: "N/step" means "N-max/step".
		// 注意有step, 没end才修复end为max
		// eg: 1 为start=1, end=1, step=1
		// eg: 1/1 为start=1, end=max, step=1
		if singleDigit {
			end = r.max
		}
		if step > 1 {
			extra = 0
		}
	// 省略min, max, step取值规范错误处理
	return getBits(start, end, step) | extra, nil
}

func getBits(min, max, step uint) uint64 {
	var bits uint64
	// 特殊情况处理降低时间复杂度
	// eg: 秒的取值为4-7
	//  ^(11111111 11111111 11111111 11111111 11111111 11111111 11111111 00000000)
	// & 11111111 11111111 11111111 11111111 11111111 11111111 11111111 11110000
	if step == 1 {
		return ^(math.MaxUint64 << (max + 1)) & (math.MaxUint64 << min)
	}
	for i := min; i <= max; i += step {
		bits |= 1 << i
	}
	return bits
}

我们可以发现getRange函数的逻辑让我们cron的书写可以有一个不算bug的bug 拿秒的配置举例,我们可以写成 *-5 这种表达而可以正常运行

# 3. 官方Schedule实现,SpecSchedule:

  • 结构体如下
type SpecSchedule struct {
	Second, Minute, Hour, Dom, Month, Dow uint64
	Location *time.Location
}
  • Next函数:从年开始位运算匹配,一直匹配到最小单位后即所有单位都匹配即下一次执行时间(大于传入时间)
func (s *SpecSchedule) Next(t time.Time) time.Time {
	// 时区逻辑: 如果cron string没有指定时区, 则所有时区都按Cron的时区来(传入的t即为Cron时区时间)
	// 如果cron string指定了非本地时区,则先把时间转换为SpecSchedule时区用于计算下一次执行时间,返回时再转换为Cron时区
	origLocation := t.Location()
	loc := s.Location
	if loc == time.Local {
		loc = t.Location()
	}
	if s.Location != time.Local {
		t = t.In(s.Location)
	}
	// 向上取整秒
	t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond)
	// 用于第一次调整时将小单位全部取 ”零值“
	// 很好理解,比如现在我们有个数字是0123 我们要 找到 1001,我们发现最高位不匹配
	// 我们不能直接变为1123,我们需要变为1000
	added := false
	// 找到5年后还未找到下一次执行时间就退出
	yearLimit := t.Year() + 5
WRAP:
	if t.Year() > yearLimit {
		return time.Time{}
	}
	for 1<<uint(t.Month())&s.Month == 0 {
		if !added {
			added = true
			t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, loc)
		}
		t = t.AddDate(0, 1, 0)
		// 溢出后高位会变动可能已经不满足了,要从循环开头重新匹配
		if t.Month() == time.January {
			goto WRAP
		}
	}
	// 日、时、秒等处理也相似,省略
	return t.In(origLocation)

感兴趣也可以看看对于day的处理,其中有夏令时 (opens new window)处理逻辑

# 二、Cron方法简析:

# 1. 启动及运行时分析:

func (c *Cron) Start() {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	// running参数保证只启动一次
	if c.running {
		return
	}
	c.running = true
	// 非阻塞启动, Run方法为阻塞启动
	go c.run()
}
  • Cron核心函数为run:
func (c *Cron) run() {
	// 获取当前时间,初始化各个任务的Next即下一次执行时间
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
	}

	for {
		// 按Next由低到高排序, 其中找不到Next(零值)为最大值
		sort.Sort(byTime(c.entries))
		var timer *time.Timer
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
		    // 无任务或者任务均找不到执行时间就睡眠, 当有任务加入时会走加入分支唤醒
			timer = time.NewTimer(100000 * time.Hour)
		} else {
		    // 设置定时器为距离当前最近的任务
			timer = time.NewTimer(c.entries[0].Next.Sub(now))
		}

		for {
			select {
			case now = <-timer.C:
				now = now.In(c.location)
				// 执行Next <= now的任务
				for _, e := range c.entries {
					if e.Next.After(now) || e.Next.IsZero() {
						break
					}
					// 执行chain包装后的任务,其中有sync.WaitGroup逻辑,为了Stop时优雅停止
					c.startJob(e.WrappedJob)
					e.Prev = e.Next
					e.Next = e.Schedule.Next(now)
				}
			case newEntry := <-c.add: // 新增任务, 加入entries后重新执行排序逻辑
				timer.Stop()
				now = c.now()
				newEntry.Next = newEntry.Schedule.Next(now)
				c.entries = append(c.entries, newEntry)
			case replyChan := <-c.snapshot:
				replyChan <- c.entrySnapshot()
				continue
			case <-c.stop: // 接收关闭任务, 返回
				timer.Stop()
				return
			case id := <-c.remove:
				timer.Stop()
				now = c.now()
				c.removeEntry(id)
			}
			break
		}
	}
}

# 2. 停止:

func (c *Cron) Stop() context.Context {
    // 保证运行中Cron只关闭一次, chan都是无缓存, 关闭多次会有影响
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		c.stop <- struct{}{} // 给运行中Cron发信号关闭
		c.running = false
	}
	ctx, cancel := context.WithCancel(context.Background())
	// 优雅关闭, 调用后可根据ctx是否取消来判断是否完全关闭
	go func() {
		c.jobWaiter.Wait()
		cancel()
	}()
	return ctx
}

# 3. 增加任务:

// AddJob(spec string, cmd Job) (EntryID, error) 调用 Schedule
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
	// 保证并发安全
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	c.nextID++
	entry := &Entry{
		ID:         c.nextID,
		Schedule:   schedule,
		WrappedJob: c.chain.Then(cmd),
		Job:        cmd,
	}
	if !c.running { // 没有运行直接加入
		c.entries = append(c.entries, entry)
	} else { // 运行中需要发送add信号
		c.add <- entry
	}
	return entry.ID
}

删除、获取entries也类似,不再粘贴描述