Go1.18调度器-任务调度

发表于 · 归类于 代码 · 阅读完需 13 分钟 · 报告错误 · 阅读:

调度

调度函数除在各级队列查找任务外,还要处理垃圾回收、锁定等情况。

调度器-schedule

// proc.go

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.

func schedule() {
    _g_ := getg()
    
    // 检查当前 M 是否被特定 G 锁定。
    // 交出 P,休眠 M,直到其他人发现锁定任务后唤醒执行。    
    if _g_.m.lockedg != 0 {
        // 将当前的 M 和 P 解绑
        stoplockedm()
        execute(_g_.m.lockedg.ptr(), false) // Never returns.
    }
    
top:
    
    pp := _g_.m.p.ptr()
    pp.preempt = false      // 设置抢占位;不抢占
    
    // GC等待被执行
    // STW 标记。
    if sched.gcwaiting != 0 {
        gcstopm()
        goto top
    }
    
    // 执行安全函数,比如 forEachP。
    if pp.runSafePointFn != 0 {
        runSafePointFn()
    }

    // 检查并执行定时器。
    checkTimers(pp, 0)
    
    var gp *g
    var inheritTime bool

    // 启动垃圾回收任务。
    if gp == nil && gcBlackenEnabled != 0 {
        gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
        if gp != nil {
            tryWakeP = true
        }
    }
    
    // 每隔 60 次,尝试从全局队列提取任务,以确保公平。
    if gp == nil {
        
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
        
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    
    // 从本地任务队列提取。
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
        // We can see gp != nil here even if the M is spinning,
        // if checkTimers added a local goroutine via goready.
    }
    
    // 从其他可能的地方提取。
    if gp == nil {
        gp, inheritTime = findrunnable() // blocks until work is available
    }
    
    // 找到任务,解除自旋状态。
    if _g_.m.spinning {
        resetspinning()
    }

    // 找到锁定任务,移交 P 给锁定 M 去执行。
    // 自己休眠。一旦被唤醒,则重新开始调度循环。
    if gp.lockedm != 0 {
        
        // Hands off own p to the locked m,
        // then blocks waiting for a new p.
        startlockedm(gp)
        
        goto top
    }
    
    // 执行任务。
    execute(gp, inheritTime)
}

如果返回的是runnext,则 inheritTime = True。表示继承上个任务时间片,不累加执行计数器(P.schedtick),这会减少对全局队列的检查。

另一方面,sysmon 会比对不同时段计数器,检查是否执行了新任务。以此判断任务执行时间是否超长(10ms),是否需要抢占调度。

GODEBUG=scheddetail=1,schedtrace=1000 输出里可以看到计数器信息

 

中断

单个G未必在一次调度内结束,因某些原因中断,被重新放回任务队列,等待断点恢复。

MP 切换任务前,会将执行状态(PC、SP)保存到G.sched,其他寄存器数据保存到G.stack。切换本身(G100->G0->G101),消耗并不大,关键在于如何查找下一个G任务。

 

锁定

某些G(syscallcgo)需要与特定M锁定。

一旦锁定,M会休眠直到G出现,并执行。

同理,G只能在锁定M上执行。

这么做是为了维持M的某些特殊状态(比如signal等)

// proc.go

// LockOSThread wires the calling goroutine to its current operating system thread.
// The calling goroutine will always execute in that thread,
// and no other goroutine will execute in it,
// until the calling goroutine has made as many calls to
// UnlockOSThread as to LockOSThread.
// If the calling goroutine exits without unlocking the thread,
// the thread will be terminated.
//
// All init functions are run on the startup thread. Calling LockOSThread
// from an init function will cause the main function to be invoked on
// that thread.
//
// A goroutine should call LockOSThread before calling OS services or
// non-Go library functions that depend on per-thread state.

func LockOSThread() {
    _g_ := getg()
    _g_.m.lockedExt++
    dolockOSThread()
}


func lockOSThread() {
    getg().m.lockedInt++
    dolockOSThread()
}
// dolockOSThread is called by LockOSThread and lockOSThread below
// after they modify m.locked. Do not allow preemption during this call,
// or else the m might be different in this function than in the caller.

func dolockOSThread() {
    _g_ := getg()
    _g_.m.lockedg.set(_g_)  // 存放位置!
    _g_.lockedm.set(_g_.m)
}

在调度函数(scheduler)头部,一旦检查到当前 M 被某个 G 锁定,那么会进入休眠状态,直到目标G出现。

// Stops execution of the current m that is locked to a g until the g is runnable again.
func stoplockedm() {
    _g_ := getg()

    // 交出 P 给其他 M 去执行任务。
    if _g_.m.p != 0 {
        // Schedule another M to run this p.
        _p_ := releasep()
        handoffp(_p_)
    }
    
    // Wait until another thread schedules lockedg again.
    mPark()

    // 被其他 MP 唤醒,关联它转交的 P。
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}

其他MP找到锁定G,会交出自己的P,唤醒该任务关联M。

// Schedules the locked m to run the locked gp.
func startlockedm(gp *g) {
    _g_ := getg()

    // 该任务锁定的 M。
    mp := gp.lockedm.ptr()
    
    // 转交自己的 P 给锁定 M,并唤醒它。
    _p_ := releasep()
    mp.nextp.set(_p_)
    notewakeup(&mp.park)
    
    // 当前 M 休眠。
    stopm()
}