第一版是针对 Lua 5.0 编写的。虽然在很大程度上仍然适用于更高版本,但还是有一些差异。
第四版针对 Lua 5.3,可在 亚马逊 和其他书店购买。
购买本书,您还可以帮助支持 Lua 项目


9.4 – 非抢占式多线程

如前所述,协程是一种协作式多线程。每个协程相当于一个线程。一对 yield-resume 将控制从一个线程切换到另一个线程。但是,与“真正的”多线程不同,协程是非抢占式的。当协程正在运行时,无法从外部停止它。它仅在显式请求时才挂起执行(通过调用 yield)。对于一些应用程序来说,这不是问题,恰恰相反。在没有抢占的情况下,编程要容易得多。您不必担心同步错误,因为线程之间的所有同步都在程序中显式指定。您只需要确保协程仅在其位于临界区外部时才产生收益。

但是,使用非抢占式多线程,每当任何线程调用阻塞操作时,整个程序都会阻塞,直到操作完成。对于大多数应用程序来说,这是不可接受的行为,这导致许多程序员将协程视为传统多线程的真正替代方案。正如我们在这里所看到的,这个问题有一个有趣的(并且事后看来很明显的)解决方案。

让我们假设一个典型的多线程情况:我们希望通过 HTTP 下载多个远程文件。当然,要下载多个远程文件,我们必须知道如何下载一个远程文件。在此示例中,我们将使用 Diego Nehab 开发的 LuaSocket 库。要下载文件,我们必须打开与其站点连接,向文件发送请求,接收文件(以块为单位),然后关闭连接。在 Lua 中,我们可以按如下方式编写此任务。首先,我们加载 LuaSocket 库

    require "luasocket"

然后,我们定义主机和我们要下载的文件。在此示例中,我们将从万维网联盟站点下载 HTML 3.2 参考规范

    host = "www.w3.org"
    file = "/TR/REC-html32.html"

然后,我们打开到该站点 80 端口(HTTP 连接的标准端口)的 TCP 连接

    c = assert(socket.connect(host, 80))
该操作返回一个连接对象,我们使用它来发送文件请求
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")

receive 方法始终返回一个字符串,其中包含它读取的内容以及另一个包含操作状态的字符串。当主机关闭连接时,我们中断接收循环。

最后,我们关闭连接

    c:close()

现在我们知道了如何下载一个文件,让我们回到下载多个文件的问题。最简单的方法是一次下载一个文件。但是,这种顺序方法(我们仅在完成前一个文件后才开始读取文件)太慢了。在读取远程文件时,程序大部分时间都在等待数据到达。更具体地说,它大部分时间都在 receive 调用中被阻塞。因此,如果程序同时下载所有文件,则可以运行得更快。然后,当一个连接没有可用数据时,程序可以从另一个连接读取。显然,协程提供了一种方便的方法来组织这些同时下载。我们为每个下载任务创建一个新线程。当一个线程没有可用数据时,它将控制权让渡给一个简单的调度程序,该调度程序调用另一个线程。

要使用协程重写程序,让我们首先将先前的下载代码重写为一个函数

    function download (host, file)
      local c = assert(socket.connect(host, 80))
      local count = 0    -- counts number of bytes read
      c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
      while true do
        local s, status = receive(c)
        count = count + string.len(s)
        if status == "closed" then break end
      end
      c:close()
      print(file, count)
    end
由于我们对远程文件内容不感兴趣,因此此函数仅计算文件大小,而不是将文件写入标准输出。(使用多个线程读取多个文件时,输出会将所有文件混合在一起。)在此新代码中,我们使用辅助函数 (receive) 从连接接收数据。在顺序方法中,其代码如下所示
    function receive (connection)
      return connection:receive(2^10)
    end
对于并发实现,此函数必须在不阻塞的情况下接收数据。相反,如果没有足够的数据可用,它将让步。新代码如下所示
    function receive (connection)
      connection:timeout(0)   -- do not block
      local s, status = connection:receive(2^10)
      if status == "timeout" then
        coroutine.yield(connection)
      end
      return s, status
    end
timeout(0) 的调用使得连接上的任何操作都成为非阻塞操作。当操作状态为 "timeout" 时,这意味着操作在未完成的情况下返回。在这种情况下,线程让步。传递给 yield 的非假参数向调度程序发出信号,表明线程仍在执行其任务。(稍后我们将看到另一个版本,其中调度程序需要超时连接。)请注意,即使在超时的情况下,连接也会返回在超时之前读取的内容,因此 receive 始终向其调用者返回 s

下一个函数确保每个下载在单独的线程中运行

    threads = {}    -- list of all live threads
    function get (host, file)
      -- create coroutine
      local co = coroutine.create(function ()
        download(host, file)
      end)
      -- insert it in the list
      table.insert(threads, co)
    end
threads 为调度程序保留所有活动线程的列表。

调度程序很简单。它主要是一个循环,逐个遍历所有线程。它还必须从列表中删除完成其任务的线程。当没有更多线程要运行时,它会停止循环

    function dispatcher ()
      while true do
        local n = table.getn(threads)
        if n == 0 then break end   -- no more threads to run
        for i=1,n do
          local status, res = coroutine.resume(threads[i])
          if not res then    -- thread finished its task?
            table.remove(threads, i)
            break
          end
        end
      end
    end

最后,主程序创建它需要的线程并调用调度程序。例如,要从 W3C 站点下载四个文档,主程序可以如下所示

    host = "www.w3.org"
    
    get(host, "/TR/html401/html40.txt")
    get(host,"/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
    get(host,"/TR/REC-html32.html")
    get(host,
        "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
    
    dispatcher()   -- main loop
我的机器使用协程下载这四个文件需要六秒钟。使用顺序实现,需要两倍以上的时间(15 秒)。

尽管速度加快了,但这种最新实现还远未达到最佳状态。只要至少有一个线程有内容可读,一切都会很好。但是,当没有线程有数据可读时,调度程序会进行繁忙等待,逐个线程检查它们是否仍然没有数据。因此,此协程实现使用的 CPU 比顺序解决方案多近 30 倍。

为了避免这种行为,我们可以使用 LuaSocket 中的 select 函数。它允许程序在等待一组套接字中的状态更改时阻塞。我们实现中的更改很小。我们只需要更改调度程序。新版本如下

    function dispatcher ()
      while true do
        local n = table.getn(threads)
        if n == 0 then break end   -- no more threads to run
        local connections = {}
        for i=1,n do
          local status, res = coroutine.resume(threads[i])
          if not res then    -- thread finished its task?
            table.remove(threads, i)
            break
          else    -- timeout
            table.insert(connections, res)
          end
        end
        if table.getn(connections) == n then
          socket.select(connections)
        end
      end
    end
沿着内部循环,这个新的调度器在表 connections 中收集已超时的连接。记住,receive 会将此类连接传递给 yield;因此 resume 会返回它们。当所有连接都超时时,调度器会调用 select 来等待这些连接中的任何一个更改状态。这个最终实现的运行速度与使用协程的第一个实现一样快。此外,由于它不会进行忙等待,因此它使用的 CPU 仅比顺序实现多一点。