定时任务,顾名思义,是指在某个时刻运行的任务。
使用 Go 实现一个定时任务。规定定时任务的最小间隙为 1 s,则核心代码为
for {
select {
case <-time.NewTicker(1 * time.Second).C:
// 检查当前可运行的任务并运行
case <-stopped:
// 终止信号
break
}
}
代码其实很简单,重点是怎么组织任务。
关键实体
任务
Job 任务
type Job struct {
interval uint64 // 指定时间单位下的任务执行间隔
name string // 任务名称
unit timeUnit // 任务时间间隔
// err error // 任务关联的错误
// loc *time.Location
lastRun time.Time // 任务上一次的执行时刻
nextRun time.Time // 任务下一次的执行时刻
f interface{} // 任务的执行函数
fParams []interface{} // 任务执行函数的参数
// lock bool
// tags []string
}
Scheduler 调度器
type Scheduler struct {
jobs []*Job
loc *time.Location
}
1.初始化调度器
2.添加任务
每次定时触发:
-
根据任务的下次运行时间戳从小到大,将调度器中的任务排序。
-
从头开始遍历任务,判断任务是否应该运行(当前时间 >= 任务的下次运行时间),因为任务已排序过,所以一旦遇到不符合的任务,直接中断遍历。
-
将步骤 2 中得到的将要执行的任务,每个任务使用一个协程运行。记录 lastRun 为当前时间,并计算任务的下次运行时间 nextRun。
3.1 任务的运行。判断当前任务是否有锁,有则尝试区获取锁。运行任务,传入函数及其参数。
调度器的启动
func (s *Scheduler) Start() chan bool {
stopped := make(chan bool, 1)
ticker := time.NewTicker(1 * time.Second)
go func() {
for {
select {
case <-ticker.C:
s.RunPending()
case <-stopped:
ticker.Stop()
return
}
}
}()
return stopped
}
Start() 方法返回了一个 bool channel,stopped 作为一个容量为 1 的 channel, 用于控制调度器的终止。定时任务框架的使用着在决定将调度器停止后,可以通过向通道中的传入值来达到目的。
stopped := s.Start()
// decide to stop scheduel
stopeed <- true
执行任务
func callJobFuncWithParams(jobFunc interface{}, params []interface{}) ([]reflect.Value, error) {
f := reflect.ValueOf(jobFunc)
if len(params) != f.Type().NumIn() {
return nil, ErrParamsNotAdapted
}
in := make([]reflect.Value, len(params))
for i, p := range params {
in[i] = reflect.ValueOf(p)
}
return f.Call(in), nil
}
计算任务下次执行的时间
func scheduleNextRun() error {
now := time.Now()
if j.lastRun == time.Unix(0, 0) {
j.lastRun = now
}
}