Lua Coroutine

C++20 更新了 coroutine 无栈协程的接口,westquote/SquidTasks 就已经使用它在商业产品中作为逻辑框架。它提供了一个近似 javascript 异步操作的写法来构建系统:

1
2
3
4
5
6
Task<int> LoadGame() {
co_await LoadMap("rc.umap");
auto net_handle = co_await CreateNetSession(conf.server);
co_await CreateInstance(net_handle, conf.user);
return 0;
}

这种写法的好处就在于,加载过程不阻塞启动流程,且大幅减少异步回调的函数链。本文会尝试利用 Lua 原生的 Coroutine 系统,来搭建优雅的异步框架。

原生用法

Lua 支持协程(协作式多线程)。Lua中的协程代表一个独立的执行线程。然而,与多线程系统中的线程不同,一个协程只能通过显式调用一个 yield 函数来暂停其执行。

通过调用 coroutine.create 来创建一个协程。它唯一的参数是 function,即协程的主函数。 create 函数只是创建一个 句柄(就是 thread),并没有开始运行。

每个 thread 对象需要通过 coroutine.resume 来执行,句柄作为第一个参数传入,后续的参数会作为主函数的参数传入。

1
2
3
4
5
6
7
local co = coroutine.create(
function(...)
print (...)
end
)

coroutine.resume(co, 233, "kfc") -- 233 kfc

主函数开始执行后,它会一直运行,直至 函数结束 或者 yield 才会停止。特别提醒,yield 只能出现在协程体内部!

Error Handleing

1
coroutine.resume(<thread>, ...) -> boolean, ... 

resume 函数会返回一个布尔值,来指示这一段程序中是否发生错误。仅当它为 true 时,会额外返回协程体内 yield 传入的参数

Example

官方文档中给出了一个简洁直白的用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function foo (a)
print(a)
return coroutine.yield(2*a)
end

co = coroutine.create(function (a, b)
print("co-body", a, b)
local r = foo(a+1)
print("co-body", r)
local r, s = coroutine.yield(a+b, a-b)
print("co-body", r, s)
return b, "end"
end)

print("main", coroutine.resume(co, 1, 10))
print("main", coroutine.resume(co, "r"))
print("main", coroutine.resume(co, "x", "y"))
print("main", coroutine.resume(co, "x", "y"))

执行后会输出如下内容:

1
2
3
4
5
6
7
8
co-body 1       10
foo 2
main true 4
co-body r
main true 11 -9
co-body x y
main true 10 end
main false cannot resume dead coroutine

在c侧也提供了接口用于创建协程:lua_newthreadlua_resumelua_yield

async

而活用 Lua 原生 Coroutine 系统,也能仿造出近似的效果。

顺便一提,接下来默认用 co 来作为 coroutine 的简写。

同步协程

不妨从一个同步对象入手,通过递归调用来实现阻塞性的同步协程。

1
2
3
4
5
6
7
8
9
local pong = function (thread, ...)
local nxt = nil
---@param co_state result of co.resume
nxt = function (co_state, ...)
if not co_state then return ...
else return nxt(co.resume(thread, ...)) end
end
return nxt(co.resume(thread, ...))
end

本例中 nxt 函数封装处理了 thread 的执行与终止情况,将 co.yield 的返回参数立即传入 co.resume 并继续执行。

测试用例

1
2
3
4
5
6
7
8
9
local result = pong(co.create(function (param)
param = param or 0
print(param)
local x = co.yield(param + 1)
print(x)
local y, z = co.yield(x + 1, x + 2)
print(y, z)
end), 1) -- as param
print(result)

我偷偷往函数入口塞了一个参数进去,这里的传入参数也是 pong 函数体末尾执行第一次 resume 的参数。如此一来就能复用这个函数体,减少闭包拷贝 upvalue 的频率。

输出如下

1
2
3
4
1
2
3 4
cannot resume dead coroutine

轮询与回调

我们如何判断一个异步任务是否结束呢?有两种方法,轮询回调

轮询是一个比较 handy 的做法,它会不停检测对象的合法性,看此时的状态是不是可以执行需要的操作,有如下面这段代码。

1
2
3
4
5
6
7
8
void OnTick(float DeltaTime) {
if (!GetWorld()) return;
if (!GetWorld()->IsValid()) return;
auto System = GetWorld()->AsyncEnsureSystem<System_t>()
if (System.IsLoading()) return;

// Do Your real work!
}

回调则是发起一个请求并传入函数作为回调入口,在任务结束后,主动调用它。

1
2
3
4
5
6
7
8
void EnsureSystem() {
GetWorld()->AsyncEnsureSystemWithHandler<System_t>(
&::OnSystemReady // callback
)
}
void OnSystemReady(System_t* System) {
// Do Your real work!
}

而回调的缺点也很明显:需要创建一大堆函数链,复杂度增长奇快。轮询则需要维护非常多的状态位,更是增加了不少维护难度。

其实轮询也是一种回调,回调的函数体等价于轮询成功时执行的函数内容~

而异步框架需要做的事便是将这些复杂的函数化简,用它们共同的特性来组织代码。

Thunk

我们可以将具有异步特征的 函数 统称为 Thunk,以 lua 为例我们要将它改造成一个 会执行回调的函数。“A Thunk is function whose purpose is to invoke a callback.”

这样一来我们就可以用一个统一的规则来封装 Thunk,例如一个读文件的 Thunk 工厂函数:

1
2
3
4
5
6
7
8
9
local read_fs = function (file) -- thunk factory
local thunk = function (callback)
fs.read(file, callback)
end
return thunk
end
---
local thunk = read_fs("file") -- build a thunk
thunk(function () end) -- run and callback

真正的异步体是 fs.read ,它会释放 lua 栈,直到读文件结束,从它自己的栈来调用我们的 lua callback。类似的,我们可以把类似 fs.read 的函数作为工厂的一个组件,再封装一层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
local wrap = function (func)
local factory = function (...)
local param = {...}
local thunk = function (callback)
return func(table.unpack(params), callback)
end
return thunk
end
return factory
end
---
local read_fs_wrap = wrap(fs.read) -- fs.read(<filename>, <callback>)
local thunk = read_fs_factory("file")
thunk(function () end)

为了方便,我们默认待封装异步函数体的参数分为两部分:传入参数包回调函数,即 (..., callback) 形式。

wrap 可以看作一个工厂的工厂,需要 执行两次 进行组装:先组装函数体,再组装参数包。第一次执行,会绑定异步函数体;第二次执行,会绑定传入参数包。

最终会获得一个简单的函数 func(callback)->...,异步函数体及其参数都已经被闭包进这个小函数中,并只暴露一个 callback,返回值则对应异步函数体的返回值(可有可无,因为真正重要的信息应该藉由回调来传入)。

异步协程

回顾之前提到的 同步协程,我们可以用 异步回调 来推进同步协程,也就是将 pong::nxtthunk::callback 结合一下,称为 step。如此一来,在需要异步的时候我们可以 yield 出去,直到执行完毕后 resume 回我们的同步协程。

  1. yield 出去执行异步,这要求我们把异步函数体与参数一并传出,也就是在 co.resume 的输出处来获取。然而我们并不能得知有多少个参数,所以就利用前文提到的工厂,来将函数体、参数闭包。

  2. resume 回来后,就已经能拿到异步代码交给 callback 的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
local pong = function (func, callback)
local thread = co.create(func)
local step = nil
step = function (...)
local stat, ret = co.resume(thread, ...) -- ret = yield出来的Thunk(已闭包异步函数体)
if co.status(thread) == "dead" then -- 同步协程全部执行完毕
(callback or function () end)(ret) -- call main callback
else
ret(step) -- 执行异步函数,直至回调的时候回到 step 执行 resume,并把回调拿到的参数传入
end
end
step() -- run co.resume at beginning
end

在理解这段代码的时候要时刻辨别现在是否在协程内部。来一个使用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
local echo = function (...)
local args = {...}
local thunk = function (step) -- step == callback
step(table.unpack(args)) -- 直接执行回调,并将 echo 的参数闭包传入
-- 前往 pong::step
end
return thunk
end

local thread = co.create(function ()
local x, y, z = co.yield(echo(1, 2, 3)) -- aka `co.yield(trunk)`
print(x, y, z)
local k, f, c = co.yield(echo(4, 5, 6))
print(k, f, c)
end)

pong(thread)
--- OUTPUT
-- 1 2 3
-- 4 5 6

语法糖

现在我们有创建 异步协程体 ,以及将异步函数化为 异步任务 置于其中运行的能力。

1. async

聪明的小朋友已经发现了,pong 的格式也满足 (..., callback),也就意味着一个异步协程体可以作为另一个异步协程的 子异步任务

1
local sync = wrap(pong)

wrap 首先组装了异步函数体 pong,然后等待后续组装一个协程体。

2. await

1
2
3
local wait = function (defer)
return co.yield(defer)
end

yield是异步协程体内执行 Thunk 的入口,我们只需要对它命个名。

3. build up

1
2
3
4
5
6
7
8
9
-- async.lua
local pong = ...
local wrap = ...
local wait = ...
return {
sync = wrap(pong),
wrap = wrap,
wait = wait
}

在引用这个文件时,可以用 a 来命名,这样使用起来就是 a.synca.wait……更为美观。

Example

来看看正经使用。

首先在cpp侧(游戏引擎等宿主)定义几个常用的异步函数。

1
2
3
4
5
6
7
8
9
-- sec 秒之后调用 callback
function Delay(sec, callback)
-- impl from cpp
end

-- 在下一帧执行 callback
function NextFrame(callback)
-- impl from cpp
end

使用异步框架来封装、执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local a = require("async")

-- 封装异步函数

local aDelay = a.wrap(Delay)
local aNextFrame = a.wrap(NextFrame)

-- 定义Task

local task1 = a.sync(function()
a.wait(aDelay(3)) -- 等待3s
print("after 3s")
end)

local task2 = a.sync(function()
a.wait(task1) -- 等待task1执行结束
end)

-- 执行Task

task1()
task2() -- 两个计时器将几乎同时执行

甚是优雅……

相关文献

https://www.lua.org/manual/5.4/manual.html#2.6
http://lua-users.org/wiki/CoroutinesTutorial
http://lua-users.org/wiki/ThreadsTutorial

Share