您现在的位置是:网站首页> 编程资料编程资料

Golang 定时器(Timer 和 Ticker),这篇文章就够了_Golang_

2023-05-26 378人已围观

简介 Golang 定时器(Timer 和 Ticker),这篇文章就够了_Golang_

定时器是什么

Golang 原生 time 包下可以用来执行一些定时任务或者是周期性的任务的一个工具

本文基于 Go 1.14,如果以下文章有哪里不对或者问题的地方,欢迎讨论学习

定时器的日常使用

Timer 相关

 func NewTimer(d Duration) *Timer func (t *Timer) Reset(d Duration) bool func (t *Timer) Stop() bool func After(d Duration) <-chan Time func AfterFunc(d Duration, f func()) *Timer func main() { timer := time.NewTimer(3 * time.Second) select { case <-timer.C: fmt.Println("3秒执行任务") } timer.Stop() // 这里来提高 timer 的回收 } func main() { tChannel := time.After(3 * time.Second) // 其内部其实是生成了一个 timer select { case <-tChannel: fmt.Println("3秒执行任务") } } func main() { timer := time.NewTimer(3 * time.Second) for { timer.Reset(4 * time.Second) // 这样来复用 timer 和修改执行时间 select { case <-timer.C: fmt.Println("每隔4秒执行任务") } } } 

注意事项:

错误使用:time.After 这里会不断生成 timer,虽然最终会回收,但是会造成无意义的cpu资源消耗 

 func main() { for { select { case <-time.After(3 * time.Second): fmt.Println("每隔3秒执行一次") } } } 

正确使用:

 func main() { timer := time.NewTimer(3 * time.Second) for { timer.Reset(3 * time.Second) // 这里复用了 timer select { case <-timer.C: fmt.Println("每隔3秒执行一次") } } } 

Ticker 相关

 func NewTicker(d Duration) *Ticker func Tick(d Duration) <-chan Time func (t *Ticker) Stop() func main() { ticker := time.NewTicker(3 * time.Second) for range ticker.C { fmt.Print("每隔3秒执行任务") } ticker.Stop() } 

错误使用:

 func main() { for { select { case <-time.Tick(3 * time.Second): // 这里会不断生成 ticker,而且 ticker 会进行重新调度,造成泄漏(后面源码会有解析) fmt.Println("每隔3秒执行一次") } } } 

定时器源码分析

我先给出涉及到过程的相关结构体(!!!要注意 Timer 和 timer 的不同)

 type Timer struct { C <-chan Time r runtimeTimer } ​ // Ticker 的结构与 Timer 一致 type Ticker struct { C <-chan Time // 这里就是返回的 channel r runtimeTimer } ​ // If this struct changes, // adjust ../time/sleep.go:/runtimeTimer. // 这里是与 runtimeTimer 对应的 type timer struct { pp puintptr // 对应的当前 P 的指针 when int64 // 需要执行的时间 period int64 // 周期,Ticker 会使用 f func(interface{}, uintptr) // 给 channel 推送信息的方式 arg interface{} // 与 f 相关的第一个参数,可以看下面 Ticker 的例子 seq uintptr // 与 f 相关的第二个参数(后续我们可以看到) nextwhen int64 // 下次执行的时候 status uint32 // 当前状态 } ​ ​ // P 结构体中的相关 timer 的字段 type p struct { ... timersLock mutex // 一个 P 中保证 timers 同步锁 ​ timers []*timer // timers 是四叉小顶堆(后续代码会有说明) ​ numTimers uint32 // timer 的数量 ​ adjustTimers uint32 // 需要调整的 timer 的数量 ​ deletedTimers uint32 // 需要删除的 timer 的数量 ... } 

我们以 Ticker 为切入点

 func NewTicker(d Duration) *Ticker { if d <= 0 { panic(errors.New("non-positive interval for NewTicker")) } c := make(chan Time, 1) t := &Ticker{ C: c, r: runtimeTimer{ when: when(d),//当前时间+d的时间,可看下面 period: int64(d),//执行周期 f: sendTime, arg: c, // 就是 f 中第一个参数 }, } startTimer(&t.r) return t } ​ func when(d Duration) int64 { if d <= 0 { return runtimeNano() } t := runtimeNano() + int64(d) //当前时间加上需要等待的时间 if t < 0 { t = 1<<63 - 1 // math.MaxInt64 } return t } ​ func sendTime(c interface{}, seq uintptr) { select { case c.(chan Time) <- Now(): default: } } 

从 NewTicker 中我们可以看到,开始执行是在 startTimer(),我们进去看下

addtimer

 // startTimer adds t to the timer heap. // 这里已经说明了 timers 是一种堆的数据结构,由于是定时器, // 最近的最先执行,所以猜测以 when 来判断的小顶堆 func startTimer(t *timer) { addtimer(t) } ​ func addtimer(t *timer) { if t.when < 0 { t.when = maxWhen //maxWhen 是 1<<63 - 1 } if t.status != timerNoStatus { throw("addtimer called with initialized timer") } t.status = timerWaiting ​ when := t.when ​ pp := getg().m.p.ptr() lock(&pp.timersLock) cleantimers(pp) // 根据 timer 删除和修改状态进行操作,可以看下面源码相关 doaddtimer(pp, t)// 添加 timer 的到 timers 堆 unlock(&pp.timersLock) ​ wakeNetPoller(when) } // 清理 timers 的源码部分 func cleantimers(pp *p) { for { if len(pp.timers) == 0 { return } t := pp.timers[0]// 从 0 开始,即最小的堆顶开始 if t.pp.ptr() != pp { throw("cleantimers: bad p") } switch s := atomic.Load(&t.status); s { case timerDeleted: if !atomic.Cas(&t.status, s, timerRemoving) {// status 变更为 timerRemoving continue } dodeltimer0(pp) // 这里是删除 timer 的关键部分,删除堆顶的部分并调整 if !atomic.Cas(&t.status, timerRemoving, timerRemoved) { // stauts 变更为 timerRemoved badTimer() // 这里就是 throw 一个异常 } atomic.Xadd(&pp.deletedTimers, -1) case timerModifiedEarlier, timerModifiedLater: if !atomic.Cas(&t.status, s, timerMoving) { // stauts 变更为 timerMoving continue } t.when = t.nextwhen // 将执行时间设置为其下次执行的时候 // -----删除堆顶位置,并按照其新的执行时间加入到对应的位置 dodeltimer0(pp) doaddtimer(pp, t) // 添加 timer 的关键部分 // ------------ if s == timerModifiedEarlier { atomic.Xadd(&pp.adjustTimers, -1) } if !atomic.Cas(&t.status, timerMoving, timerWaiting) { badTimer() } default: return } } } ​ // timer 删除的源码部分 //(扩充:func dodeltimer(pp *p, i int) 意思就是删除指定所索引 // 的位置,然后恢复小顶堆的结构,可以看源码,就不解释了) func dodeltimer0(pp *p) { if t := pp.timers[0]; t.pp.ptr() != pp { throw("dodeltimer0: wrong P") } else { t.pp = 0 // 这里将指针情况 } // --- 将堆的最后一位 timer 放到堆顶,然后清空最后一位的空间,然后向下调整--- last := len(pp.timers) - 1 if last > 0 { pp.timers[0] = pp.timers[last] } pp.timers[last] = nil pp.timers = pp.timers[:last] if last > 0 { siftdownTimer(pp.timers, 0)//向下调整的核心部分 } // --------------------- updateTimer0When(pp) //更新当前 p 的最先执行 timer 的执行时间 atomic.Xadd(&pp.numTimers, -1) } ​ func updateTimer0When(pp *p) { if len(pp.timers) == 0 { atomic.Store64(&pp.timer0When, 0) } else { atomic.Store64(&pp.timer0When, uint64(pp.timers[0].when)) } } ​ // timer 增加的源码部分 func doaddtimer(pp *p, t *timer) { ... if t.pp != 0 { throw("doaddtimer: P already set in timer") } t.pp.set(pp) // --- 将 timer 放置到堆的最后一位,然后向上调整 --- i := len(pp.timers) pp.timers = append(pp.timers, t) siftupTimer(pp.timers, i)// 向上调整的核心部分 // --------------------------- if t == pp.timers[0] { atomic.Store64(&pp.timer0When, uint64(t.when)) } atomic.Xadd(&pp.numTimers, 1) } 

当我们已知 timers 是小顶堆的数据结构(满足“当前位置的值小于等于父位置的值“即可,实现方式使用数组,由下面代码可以知道是四叉小顶堆,结构如下图)的情况后,接下来看堆向上或者向下调整的细节部分

 // timers 堆的向上调整 func siftupTimer(t []*timer, i int) { ... when := t[i].when tmp := t[i] for i > 0 { p := (i - 1) / 4 // 由这里可以看出,堆的节点长度是4 if when >= t[p].when { break } // --- 向上进行调整,即父节点下移,当前节点上移 --- t[i] = t[p] i = p //向上进行调整 } if tmp != t[i] { t[i] = tmp } } ​ //timers 堆的向下调整 func siftdownTimer(t []*timer, i int) { n := len(t) if i >= n { badTimer() } when := t[i].when tmp := t[i] for { // --- 以下部分就是找到当前4个节点中最小的那个值和在数组的位置 ----- c := i*4 + 1 // 这里是子节点最左边的节点 c3 := c + 2 // 这里是子节点第三个节点 if c >= n { break } w := t[c].when if c+1 < n && t[c+1].when < w { w = t[c+1].when c++ } if c3 < n { w3 := t[c3].when if c3+1 < n && t[c3+1].when < w3 { w3 = t[c3+1].when c3++ } if w3 < w { w = w3 c = c3 } } //--------------------------------- if w >= when { break } // --- 向下进行调整,即子节点上移,当前节点下移 --- t[i] = t[c] i = c // --------------- } if tmp != t[i] { t[i] = tmp } } 

既然已经知道timer放到四叉小顶堆,那 timer 是怎么执行的呢?接下来就是定时器的核心部分入口 runtimer()

runtimer

 // 这里执行的前提是当前 P 的 timesLock 已经锁了,所以不用担心并发问题 func runtimer(pp *p, now int64) int64 { for { t := pp.timers[0] //找到 timers 堆的堆顶,为最先执行的 timer if t.pp.ptr() != pp { throw("runtimer: bad p") } switch s := atomic.Load(&t.status); s { case timerWaiting: if t.when > now { //如果还没到时间,则返回调用的时间 return t.when } ​ if !atomic.Cas(&t.status, s, timerRunning) { continue } runOneTimer(pp, t, now)// 这里是执行timer的核心 return 0 ​ case timerDeleted: if !atomic.Cas(&t.status, s, timerRemoving) { continue } dodeltimer0(pp) //删除 timers 堆顶的 timer if !atomic.Cas(&t.status, timerRemoving, timerRemoved) { badTimer() } atomic.Xadd(&pp.deletedTimers, -1) if len(pp.timers) == 0 { return -1 } ​ case timerModifiedEarlier, timerModifiedLater: if !atomic.Cas(&t.status, s, timerMoving) { continue } //删除堆顶的位置,调整 timer 到最新的时间,以及进行重新调整 t.when = t.nextwhen dodeltimer0(pp) doaddtimer(pp, t) if s == timerModifiedEarlier { atomic.Xadd(&pp.adjustTimers, -1) } if !atomic.Cas(&t.status, timerMoving, timerWaiting) { badTimer() } ​ case timerModifying: osyield() case timerNoStatus, timerRemoved: badTimer() case timerRunning, timerRemoving, timerMoving: badTimer() default: badTimer() } } }

因此我们知道了执行的核心流程是 runOneTimer()

runOneTimer

 // 由于是 runtimer 进行调用,因此也线程安全 func runOneTimer(pp *p, t *timer, now int64) { ... f := t.f arg := t.arg seq := t.seq ​ if t.period > 0 { //如果有周期,则算出下次 timer 执行的时间,并加入到对应的位置(这里就是 Ticker 和 Timer 的区别) delta := t.when - now t.when += t.period * (1 + -delta/t.period) siftdownTimer(pp.timers, 0)// 将四叉小顶堆向下调整 if !atomic.Cas(&t.status, timerRunning, timerWaiting) { badTimer() } updateTimer0When(pp)//更新当前 P 的最先的 timer 的执行时间 } else { // 从堆顶位置上删除 timer,并调整 dodeltimer0(pp) if !atomic.Cas(&t.status, timerRunning, timerNoStatus) { badTimer() } } ... ​ unlock(&pp.timersLock) ​ f(arg, seq) // 执行对应的 f,这里就是我们 Timer.C 来的地方 ​ lock(&pp.timersLock) ​ ... } 

从 runtimer 的调用,我们知道执行的入口是 checkTimers(),我们详细看下

checkTimers

我们可以看下图,由下图可知,是通过 Go 里面的调度中去寻找可执行的 timer  

我们看下 checkTimers 做了什么

 func checkTimers(pp *p, now in
                
                

-六神源码网