Go1.18调度器-内核函数

发表于 · 归类于 代码 · 阅读完需 15 分钟 · 报告错误 · 阅读:

内核函数

某些内核专用的函数,比如休眠、唤醒等。

 

gopark, goready

休眠函数gopark解除当前GMP的绑定,让MP去执行其他任务。重点是G没有放回任务队列,除非被唤醒,否则再不会被调度执行。

// proc.go

// Puts the current goroutine into a waiting state and calls unlockf on the
// system stack.
//
// If unlockf returns false, the goroutine is resumed.

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    mp := acquirem()
       gp := mp.curg
    
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }

    mp.waitlock = lock
    mp.waitunlockf = unlockf
    gp.waitreason = reason

    releasem(mp)
    
    // can't do anything that might move the G between Ms here.
    mcall(park_m)
}
// park continuation on g0.

func park_m(gp *g) {
    _g_ := getg()

    // 修改状态,解除 MP 绑定。
    // 该任务让出执行绪,但未放回队列。
    casgstatus(gp, _Grunning, _Gwaiting)
    dropg()

    // 调用 unlock 函数。
    if fn := _g_.m.waitunlockf; fn != nil {
        ok := fn(gp, _g_.m.waitlock)
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil

        // 失败!继续执行 G 任务。
        if !ok {
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    
    // 当前 MP 继续执行其他任务。
    schedule()
}
// dropg removes the association between m and the current goroutine m->curg (gp for short).
func dropg() {
    _g_ := getg()

    // 解除当前`G`和 `M`的绑定
    setMNoWB(&_g_.m.curg.m, nil)
    setGNoWB(&_g_.m.curg, nil)
}

与之配套的是goready函数。它负责将park G放回队列,可设为runnext任务。

func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {

    status := readgstatus(gp)

    // Mark runnable.
    _g_ := getg()
    mp := acquirem() // disable preemption because it can be holding p in a local var
    
    // 必须是 park G。
    if status&^_Gscan != _Gwaiting {
        dumpgstatus(gp)
        throw("bad g->status in ready")
    }

    // 修改状态,重新放回队列。
    casgstatus(gp, _Gwaiting, _Grunnable)
    runqput(_g_.m.p.ptr(), gp, next)
    
    // 有新任务,唤醒闲置 MP 工作。
    wakep()
    
    releasem(mp)
}

 

notesleep

该休眠函数不解除MP关联,适合一些自旋(spanning)等待场合。

像 Linux、freebsd基于futex实现,其他操作系统使用信号量(semaphore)。

Futex通常称作“快速用户区互斥”,是一种在用户空间实现的锁(互斥)机制。

多执行单位(进程或线程)通过共享同一块内存(整数)未实现等待和唤醒操作。

因为Futex只在操作结果不一致时才进入内核仲裁,所以有非常高的执行效率。

更多内容请参考 man 2 futex.

基于futex实现的lock/unlock,参考《8.其他》

// runtime2.go

type m struct {
    park note
}

// sleep and wakeup on one-time events.
// before any calls to notesleep or notewakeup,
// must call noteclear to initialize the Note.
// then, exactly one thread can call notesleep
// and exactly one thread can call notewakeup (once).
// once notewakeup has been called, the notesleep
// will return.  future notesleep will return immediately.
// subsequent noteclear must be called only after
// previous notesleep has returned, e.g. it's disallowed
// to call noteclear straight after notewakeup.
//
// notetsleep is like notesleep but wakes up after
// a given number of nanoseconds even if the event
// has not yet happened.  if a goroutine uses notetsleep to
// wake up early, it must wait to call noteclear until it
// can be sure that no other goroutine is calling
// notewakeup.

type note struct {
    // Futex-based impl treats it as uint32 key,
    // while sema-based impl as M* waitm.
    // Used to be a union, but unions break precise GC.
    key uintptr
}

围绕 note.key 值进行休眠和唤醒操作。

// lock_futex.go
//
// 休眠(notesleep)和唤醒(notewakeup)调用之前,需要进行初始化(noteclear)。

// 休眠
func notesleep(n *note) {
    gp := getg()
    if gp != gp.m.g0 {
        throw("notesleep not on g0")
    }
    
    ns := int64(-1)
    
    for atomic.Load(key32(&n.key)) == 0 {
        gp.m.blocked = true

        // 如果 key == 0,休眠。(ns < 0,表示不超时)
        // 直到 key 改变,跳出循环。
        futexsleep(key32(&n.key), 0, ns)

        gp.m.blocked = false
    }
}
// 唤醒
func notewakeup(n *note) {
    
    // 修改 key,返回原有值。
    old := atomic.Xchg(key32(&n.key), 1)
    
    if old != 0 {
        print("notewakeup - double wakeup (", old, ")\n")
        throw("notewakeup - double wakeup")
    }
    
    // 唤醒 1 个等待。
    futexwakeup(key32(&n.key), 1)
}
// One-time notifications.
func noteclear(n *note) {
    n.key = 0
}

 

Futex

具体实现依赖操作系统提供。

// os_linux.go

// Atomically,
//	if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.

func futexsleep(addr *uint32, val uint32, ns int64) {
    
    // Some Linux kernels have a bug where futex of
    // FUTEX_WAIT returns an internal error code
    // as an errno. Libpthread ignores the return value
    // here, and so can we: as it says a few lines up,
    // spurious wakeups are allowed.
    if ns < 0 {
        futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
        return
    }

    var ts timespec
    ts.setNsec(ns)
    
    futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}
// If any procs are sleeping on addr, wake up at most cnt.
func futexwakeup(addr *uint32, cnt uint32) {
    ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
    if ret >= 0 {
        return
    }

    // I don't know that futex wakeup can return
    // EAGAIN or EINTR, but if it does, it would be
    // safe to loop and call futex again.
    systemstack(func() {
        print("futexwakeup addr=", addr, " returned ", ret, "\n")
    })

    *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}