Go1.18 调度器-G队列

发表于 · 归类于 代码 · 阅读完需 26 分钟 · 报告错误 · 阅读:

队列

新建的并发任务(goroutine)被保存在本地队列。

// runtime2.go

type p struct {
    
    // Queue of runnable goroutines. 
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    
    // runnext, if non-nil, is a runnable G that was ready'd by
    // the current G and should be run next instead of what's in
    // runq if there's time remaining in the running G's time
    // slice. It will inherit the time left in the current time
    // slice. If a set of goroutines is locked in a
    // communicate-and-wait pattern, this schedules that set as a
    // unit and eliminates the (potentially large) scheduling
    // latency that otherwise arises from adding the ready'd
    // goroutines to the end of the run queue.
    //
    // Note that while other P's may atomically CAS this to zero,
    // only the owner P can CAS it to a valid G.
    runnext guintptr    
}

本地队列runq是一个环状队列,通过累加和取模定位。

其中runq是存储容器,runqhead和runqtail为开始和结束位置。

无需判断回头,两个计数器总是增长,然后 index % len 就可确定在runq上的实际索引。

       0   1   2   3   4   5   
     +---+---+---+---+---+---+
runq | 1 | 0 | 1 | 1 | 1 | 1 |
     +-------+---+---+---+---+

     tail = 13 --> 13 % 6 = 1 --> runq[1]
     head = 8  -->  8 % 6 = 2 --> runq[2]

本地队列容量有限,多余的会转移到全局队列。

// runtime2.go

type schedt struct {
    
    // Global runnable queue.
    runq     gQueue
    runqsize int32    
}

在以高并发为设计目标的前提下,任务应尽可能被“饥饿”MP获取执行,这无关它由谁创建。但考虑到竞争问题,任务需分散存放,以减少锁定。在调度核心schedule函数内,findrunnable会依次检查本地队列、全局队列,乃至去其他P的私有队列偷窃。总之,在有任务的时候,不应该有闲置的MP。

如此,按竞争压力从大到小排列,分别是global/locklocal/lock-freerunnext/cas。而就当前Prunnext是最快也是竞争最小的位置,有助于提升本地执行效率。

 

本地

将新任务放到runnext,原有的加入runq。如本地队列已满,则主动转移一半任务到全局队列。

// proc.go

func newproc(fn *funcval) {
    newg := newproc1(fn, gp, pc)
    runqput(_p_, newg, true)

    if mainStarted {
        wakep()
    }
}
// proc.go

// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.

func runqput(_p_ *p, gp *g, next bool) {
    
    if randomizeScheduler && next && fastrandn(2) == 0 {
        next = false
    }

       // 放入 runnext,原有的拿出来转移到 runq。
    if next {
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        if oldnext == 0 {
            return
        }
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()
    }

retry:
    
    // 加入本地队列。
    h := atomic.LoadAcq(&_p_.runqhead)
    t := _p_.runqtail    
    if t-h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.StoreRel(&_p_.runqtail, t+1)
        return
    }
    
       // 转移任务到全局队列。
    if runqputslow(_p_, gp, h, t) {
        return
    }
    
    // the queue is not full, now the put above must succeed
    goto retry
}
// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    
    // +1 是当前这个引发超量检查的任务 gp。
    var batch [len(_p_.runq)/2 + 1]*g

    // 先从本地队列头部截取一半任务。
    n := t - h
    n = n / 2
    for i := uint32(0); i < n; i++ {
        batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
    }
    if !atomic.CasRel(&_p_.runqhead, h, h+n) { 
        return false
    }
    
    // 存入引发超量检查的任务。
    batch[n] = gp

    // 转换为链表。
    for i := uint32(0); i < n; i++ {
        batch[i].schedlink.set(batch[i+1])
    }
    
    var q gQueue
    q.head.set(batch[0])
    q.tail.set(batch[n])

    // 加入全局队列。
    lock(&sched.lock)
    globrunqputbatch(&q, int32(n+1))
    unlock(&sched.lock)
    
    return true
    }

获取任务时,优先从runnext提取,然后是本地队列。考虑到有任务被转移到全局队列,甚至于被其他P偷窃。所以,任务执行次序和创建顺序无关。

// Get g from local runnable queue.

// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.

func runqget(_p_ *p) (gp *g, inheritTime bool) {
    
    // 优先从 runnext 获取。(atomic.cas)
    next := _p_.runnext
    if next != 0 && _p_.runnext.cas(next, 0) {
        return next.ptr(), true
    }

       // 从本地队列获取。(lock-free/atomic.cas)
    for {
        h := atomic.LoadAcq(&_p_.runqhead)
        t := _p_.runqtail
        if t == h {
            return nil, false
        }

        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        if atomic.CasRel(&_p_.runqhead, h, h+1) {
            return gp, false
        }
    }
}

 

全局

全局队列只是简单的链表。相比本地循环数组队列,性能稍差。

// A gQueue is a dequeue of Gs linked through g.schedlink. A G can only
// be on one gQueue or gList at a time.

type gQueue struct {
    Head guintptr
    Tail guintptr
}
// proc.go

// Put a batch of runnable goroutines on the global runnable queue.
// This clears *batch.

func globrunqputbatch(batch *gQueue, n int32) {
    assertLockHeld(&sched.lock)

    sched.runq.pushBackAll(*batch)
    sched.runqsize += n
    *batch = gQueue{}
}

当然,本地队列为空时,也可以从全局队列倒腾一批过来。

// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqget(_p_ *p, max int32) *g {
    
    assertLockHeld(&sched.lock)

       // 全局队列空。
    if sched.runqsize == 0 {
        return nil
    }

       // 确定数量。
    n := sched.runqsize/gomaxprocs + 1
    if n > sched.runqsize {
        n = sched.runqsize
    }
    if max > 0 && n > max {
        n = max
    }
    if n > int32(len(_p_.runq))/2 {
        n = int32(len(_p_.runq)) / 2
    }

    sched.runqsize -= n

       // 转移到本地队列。
    gp := sched.runq.pop()
    n--
    for ; n > 0; n-- {
        gp1 := sched.runq.pop()
        runqput(_p_, gp1, false)
    }
    
    return gp
}

 

偷窃

从其他P偷窃任务,涉及runqrunnext,还需考虑timer因素。

// proc.go

// stealWork attempts to steal a runnable goroutine or timer from any P.
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {

    // 当前 P,也就是小偷。
    pp := getg().m.p.ptr()

    // 尝试次数。
    const stealTries = 4
    for i := 0; i < stealTries; i++ {
        
        // 如果前几次都没偷到 G,那么最后一次目标是 timer。
        stealTimersOrRunNextG := i == stealTries-1

        // 随机寻找目标。
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            p2 := allp[enum.position()]
            
            // 当然不能偷自己。
            if pp == p2 {
                continue
            }
            
            // 偷定时器(直接执行,避免目标来不及处理到期的定时器)
            if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
                
                // 检查并执行到期的定时器。
                tnow, w, ran := checkTimers(p2, now)
                now = tnow
                
                // 如果有定时器被执行。
                if ran {
                    // 定时器函数可能会创建多个新 G。
                    // 注意,定时器虽然是从 p2 偷来的,但是却由当前 pp(小偷)执行,
                    // 如果生成新任务,那也是加入 pp 的本地队列。如此,就没必要再去
                    // p2 私有队列偷窃,改从自身拿一个就好。当然,如果定时器没有生成
                    // 新任务,还得从 p2 身上下手。
                    if gp, inheritTime := runqget(pp); gp != nil {
                        return gp, inheritTime, now, pollUntil, ranTimer
                    }
                    ranTimer = true
                }
            }

            // 那些闲置的 P 没啥可偷的,得从正工作的目标下手。
            if !idlepMask.read(enum.position()) {
                if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
                    return gp, false, now, pollUntil, ranTimer
                }
            }
        }
    }

    // No goroutines found to steal. Regardless, running a timer may have
    // made some goroutine ready that we missed. Indicate the next timer to
    // wait for.
    return nil, false, now, pollUntil, ranTimer
}
// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).

func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    
    // 从目标(p2)拿一批任务放到本地(p)。
    t := _p_.runqtail
    n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
    if n == 0 {
        return nil
    }
    
       // 从本地队列尾部提取一个任务,返回。
    n--
    gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
    if n == 0 {
        return gp
    }
    
    atomic.StoreRel(&_p_.runqtail, t+n)
    
    return gp
    }
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.

func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
    for {
        h := atomic.LoadAcq(&_p_.runqhead)
        t := atomic.LoadAcq(&_p_.runqtail)

        // 确定数量。
        n := t - h
        n = n - n/2

        // 如果目标本地队列为空,那么尝试偷 runnext 任务。
        if n == 0 {
            if stealRunNextG {
                if next := _p_.runnext; next != 0 {
                    if !_p_.runnext.cas(next, 0) {
                        continue
                    }
                    batch[batchHead%uint32(len(batch))] = next
                    return 1
                }
            }
            return 0
        }

        if n > uint32(len(_p_.runq)/2) {
            continue
        }

        // 批量转移。
        for i := uint32(0); i < n; i++ {
            g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
            batch[(batchHead+i)%uint32(len(batch))] = g
        }

        // 调整被偷窃队列。
        if atomic.CasRel(&_p_.runqhead, h, h+n) {
            return n
        }
    }
}

函数里的n = n-n/2 是个很有意思的算法。

他确保n为奇数时,总是取“一半多”结果,例如(n=3) -> 2; (n=1) -> 1

这显然比写divmod + if 语句效率高。