Go1.18 调度器-G队列
发表于 · 归类于
代码 · 阅读完需 26 分钟 ·
报告错误 ·
阅读:
队列
新建的并发任务(goroutine
)被保存在本地队列。
// runtime2.go
type p struct {
// Queue of runnable goroutines.
runqhead uint32
runqtail uint32
runq [256]guintptr
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
//
// Note that while other P's may atomically CAS this to zero,
// only the owner P can CAS it to a valid G.
runnext guintptr
}
本地队列runq
是一个环状队列,通过累加和取模定位。
其中runq是存储容器,runqhead和runqtail为开始和结束位置。
无需判断回头,两个计数器总是增长,然后 index % len
就可确定在runq上的实际索引。
0 1 2 3 4 5
+---+---+---+---+---+---+
runq | 1 | 0 | 1 | 1 | 1 | 1 |
+-------+---+---+---+---+
tail = 13 --> 13 % 6 = 1 --> runq[1]
head = 8 --> 8 % 6 = 2 --> runq[2]
本地队列容量有限,多余的会转移到全局队列。
// runtime2.go
type schedt struct {
// Global runnable queue.
runq gQueue
runqsize int32
}
在以高并发为设计目标的前提下,任务应尽可能被“饥饿”MP获取执行,这无关它由谁创建。但考虑到竞争问题,任务需分散存放,以减少锁定。在调度核心schedule
函数内,findrunnable
会依次检查本地队列、全局队列,乃至去其他P
的私有队列偷窃。总之,在有任务的时候,不应该有闲置的MP。
如此,按竞争压力从大到小排列,分别是global/lock
、local/lock-free
、runnext/cas
。而就当前P
,runnext
是最快也是竞争最小的位置,有助于提升本地执行效率。
本地
将新任务放到runnext
,原有的加入runq
。如本地队列已满,则主动转移一半任务到全局队列。
// proc.go
func newproc(fn *funcval) {
newg := newproc1(fn, gp, pc)
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
}
// proc.go
// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrandn(2) == 0 {
next = false
}
// 放入 runnext,原有的拿出来转移到 runq。
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
retry:
// 加入本地队列。
h := atomic.LoadAcq(&_p_.runqhead)
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1)
return
}
// 转移任务到全局队列。
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}
// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
// +1 是当前这个引发超量检查的任务 gp。
var batch [len(_p_.runq)/2 + 1]*g
// 先从本地队列头部截取一半任务。
n := t - h
n = n / 2
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
if !atomic.CasRel(&_p_.runqhead, h, h+n) {
return false
}
// 存入引发超量检查的任务。
batch[n] = gp
// 转换为链表。
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}
var q gQueue
q.head.set(batch[0])
q.tail.set(batch[n])
// 加入全局队列。
lock(&sched.lock)
globrunqputbatch(&q, int32(n+1))
unlock(&sched.lock)
return true
}
获取任务时,优先从runnext
提取,然后是本地队列。考虑到有任务被转移到全局队列,甚至于被其他P
偷窃。所以,任务执行次序和创建顺序无关。
// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// 优先从 runnext 获取。(atomic.cas)
next := _p_.runnext
if next != 0 && _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
// 从本地队列获取。(lock-free/atomic.cas)
for {
h := atomic.LoadAcq(&_p_.runqhead)
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.CasRel(&_p_.runqhead, h, h+1) {
return gp, false
}
}
}
全局
全局队列只是简单的链表。相比本地循环数组队列,性能稍差。
// A gQueue is a dequeue of Gs linked through g.schedlink. A G can only
// be on one gQueue or gList at a time.
type gQueue struct {
Head guintptr
Tail guintptr
}
// proc.go
// Put a batch of runnable goroutines on the global runnable queue.
// This clears *batch.
func globrunqputbatch(batch *gQueue, n int32) {
assertLockHeld(&sched.lock)
sched.runq.pushBackAll(*batch)
sched.runqsize += n
*batch = gQueue{}
}
当然,本地队列为空时,也可以从全局队列倒腾一批过来。
// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.
func globrunqget(_p_ *p, max int32) *g {
assertLockHeld(&sched.lock)
// 全局队列空。
if sched.runqsize == 0 {
return nil
}
// 确定数量。
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n
// 转移到本地队列。
gp := sched.runq.pop()
n--
for ; n > 0; n-- {
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
}
return gp
}
偷窃
从其他P
偷窃任务,涉及runq
和runnext
,还需考虑timer
因素。
// proc.go
// stealWork attempts to steal a runnable goroutine or timer from any P.
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
// 当前 P,也就是小偷。
pp := getg().m.p.ptr()
// 尝试次数。
const stealTries = 4
for i := 0; i < stealTries; i++ {
// 如果前几次都没偷到 G,那么最后一次目标是 timer。
stealTimersOrRunNextG := i == stealTries-1
// 随机寻找目标。
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
p2 := allp[enum.position()]
// 当然不能偷自己。
if pp == p2 {
continue
}
// 偷定时器(直接执行,避免目标来不及处理到期的定时器)
if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
// 检查并执行到期的定时器。
tnow, w, ran := checkTimers(p2, now)
now = tnow
// 如果有定时器被执行。
if ran {
// 定时器函数可能会创建多个新 G。
// 注意,定时器虽然是从 p2 偷来的,但是却由当前 pp(小偷)执行,
// 如果生成新任务,那也是加入 pp 的本地队列。如此,就没必要再去
// p2 私有队列偷窃,改从自身拿一个就好。当然,如果定时器没有生成
// 新任务,还得从 p2 身上下手。
if gp, inheritTime := runqget(pp); gp != nil {
return gp, inheritTime, now, pollUntil, ranTimer
}
ranTimer = true
}
}
// 那些闲置的 P 没啥可偷的,得从正工作的目标下手。
if !idlepMask.read(enum.position()) {
if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
return gp, false, now, pollUntil, ranTimer
}
}
}
}
// No goroutines found to steal. Regardless, running a timer may have
// made some goroutine ready that we missed. Indicate the next timer to
// wait for.
return nil, false, now, pollUntil, ranTimer
}
// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
// 从目标(p2)拿一批任务放到本地(p)。
t := _p_.runqtail
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
return nil
}
// 从本地队列尾部提取一个任务,返回。
n--
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
return gp
}
atomic.StoreRel(&_p_.runqtail, t+n)
return gp
}
// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
h := atomic.LoadAcq(&_p_.runqhead)
t := atomic.LoadAcq(&_p_.runqtail)
// 确定数量。
n := t - h
n = n - n/2
// 如果目标本地队列为空,那么尝试偷 runnext 任务。
if n == 0 {
if stealRunNextG {
if next := _p_.runnext; next != 0 {
if !_p_.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
return 0
}
if n > uint32(len(_p_.runq)/2) {
continue
}
// 批量转移。
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
// 调整被偷窃队列。
if atomic.CasRel(&_p_.runqhead, h, h+n) {
return n
}
}
}
函数里的n = n-n/2
是个很有意思的算法。
他确保n为奇数时,总是取“一半多”结果,例如(n=3) -> 2; (n=1) -> 1
。
这显然比写divmod + if
语句效率高。