跳至内容

并发

并发 vs. 并行

“并发”和“并行”的定义有时会混淆,但它们并不相同。

并发系统是指可以负责许多任务的系统,但并不一定同时执行这些任务。你可以想象自己在厨房做饭:你切洋葱,把它放到锅里煎,然后在洋葱煎的时候,你切番茄,但你并不是同时做所有这些事情:你把时间分配到这些任务之间。并行就像用一只手翻炒洋葱,另一只手切番茄。

在撰写本文时,Crystal 支持并发,但不支持并行:可以执行多个任务,并且在每个任务上花费一些时间,但两个代码路径永远不会在完全相同的时间执行。

默认情况下,Crystal 程序在单个操作系统线程中执行,除了垃圾收集器(目前是 Boehm GC)。并行是支持的,但目前被认为是实验性的。查看 这篇 Crystal 博客文章,了解有关并行的更多信息

纤程

为了实现并发,Crystal 有纤程。纤程在某种程度上类似于操作系统线程,只是它更轻量级,并且它的执行由进程内部管理。因此,程序将生成多个纤程,Crystal 将确保在适当的时候执行它们。

事件循环

对于所有与 I/O 相关的操作,都有一个事件循环。一些耗时的操作被委托给它,当事件循环等待该操作完成时,程序可以继续执行其他纤程。一个简单的例子是等待数据通过套接字传入。

通道

Crystal 有受 CSP 启发的通道。它们允许在纤程之间通信数据,而无需共享内存,也无需担心锁、信号量或其他特殊结构。

程序的执行

当程序启动时,它会启动一个主纤程,该纤程将执行您的顶层代码。在那里,可以生成许多其他纤程。程序的组成部分是

  • 运行时调度程序,负责在适当的时候执行所有纤程。
  • 事件循环,它只是另一个纤程,负责异步任务,例如文件、套接字、管道、信号和计时器(例如执行 sleep)。
  • 通道,用于在纤程之间通信数据。运行时调度程序将协调纤程和通道以进行通信。
  • 垃圾收集器:用于清理“不再使用”的内存。

纤程

纤程是一个执行单元,比线程更轻量级。它是一个小的对象,具有一个关联的 堆栈,大小为 8MB,通常分配给操作系统线程。

纤程与线程不同,它们是协作的。线程是抢占式的:操作系统可能会在任何时候中断线程并开始执行另一个线程。纤程必须明确地告诉运行时调度程序切换到另一个纤程。例如,如果要等待 I/O,纤程将告诉调度程序“看,我必须等待此 I/O 可用,你可以继续执行其他纤程,并在 I/O 就绪时再回来”。

协作的优势在于消除了进行上下文切换(在线程之间切换)的大量开销。

纤程比线程轻量级得多:即使它被分配了 8MB,它也从 4KB 的小堆栈开始。

在 64 位机器上,它允许我们生成数百万个纤程。在 32 位机器上,我们只能生成 512 个纤程,这并不多。但由于 32 位机器开始过时,我们将押注未来,更多地关注 64 位机器。

运行时调度程序

调度程序有一个队列,其中包含

  • 准备执行的纤程:例如,当你生成一个纤程时,它就准备执行。
  • 事件循环:它是另一个纤程。当没有其他纤程准备执行时,事件循环会检查是否有任何异步操作已准备就绪,然后执行等待该操作的纤程。事件循环目前使用 libevent 实现,它是一种其他事件机制(如 epollkqueue)的抽象。
  • 自愿请求等待的纤程:这是通过 Fiber.yield 完成的,这意味着“我可以继续执行,但如果你愿意,我会给你一些时间来执行其他纤程”。

通信数据

因为现在只有一个线程执行你的代码,所以访问和修改不同纤程中的类变量可以正常工作。但是,一旦在语言中引入了多个线程(并行),它可能会失效。这就是为什么建议使用通道并在它们之间发送消息来通信数据。在内部,通道实现所有锁定机制以避免数据竞争,但从外部来看,你使用它们作为通信原语,因此你(用户)不需要使用锁。

示例代码

生成纤程

要生成一个纤程,可以使用带有代码块的 spawn

spawn do
  # ...
  socket.gets
  # ...
end

spawn do
  # ...
  sleep 5.seconds
  #  ...
end

这里有两个协程:一个从套接字读取,另一个执行 `sleep`。 当第一个协程到达 `socket.gets` 行时,它会被挂起,事件循环被告知在套接字中有数据时继续执行此协程,程序继续执行第二个协程。 此协程希望休眠 5 秒,因此事件循环被告知在 5 秒后继续执行此协程。 如果没有其他协程要执行,事件循环将等待这两个事件中的任何一个发生,而不会消耗 CPU 时间。

`socket.gets` 和 `sleep` 这样做的原因是,它们的实现直接与运行时调度程序和事件循环通信,这没有什么神奇之处。 通常,标准库已经负责完成所有这些工作,因此您不必这样做。

但是请注意,协程不会立即执行。 例如

spawn do
  loop do
    puts "Hello!"
  end
end

运行上面的代码不会产生任何输出并立即退出。

出现这种情况的原因是,协程不会在生成后立即执行。 因此,主协程(生成上述协程的协程)完成其执行,程序退出。

解决此问题的一种方法是进行 `sleep`

spawn do
  loop do
    puts "Hello!"
  end
end

sleep 1.second

该程序现在将打印“Hello!”一秒钟,然后退出。 这是因为 `sleep` 调用将安排主协程在一秒钟后执行,然后执行另一个“准备执行”的协程,在本例中是上面的那个协程。

另一种方法是

spawn do
  loop do
    puts "Hello!"
  end
end

Fiber.yield

这一次,`Fiber.yield` 将告诉调度程序执行另一个协程。 这将打印“Hello!”直到标准输出阻塞(系统调用将告诉我们必须等到输出准备好),然后执行继续进行主协程,程序退出。 这里标准输出可能永远不会阻塞,因此程序将继续永远执行。

如果我们希望永远执行生成的协程,我们可以使用不带参数的 `sleep`

spawn do
  loop do
    puts "Hello!"
  end
end

sleep

当然,上面的程序完全可以用循环编写,而无需使用 `spawn`。 `sleep` 在生成多个协程时更有用。

生成一个调用

您还可以通过传递方法调用而不是块来生成。 为了理解为什么这很有用,让我们看看这个例子

i = 0
while i < 10
  spawn do
    puts(i)
  end
  i += 1
end

Fiber.yield

上面的程序打印了十次“10”。 问题在于只有一个变量 `i`,所有生成的协程都引用它,当执行 `Fiber.yield` 时,它的值为 10。

要解决这个问题,我们可以这样做

i = 0
while i < 10
  proc = ->(x : Int32) do
    spawn do
      puts(x)
    end
  end
  proc.call(i)
  i += 1
end

Fiber.yield

现在它可以工作了,因为我们创建了一个 Proc,我们调用它传递 `i`,因此该值被复制,现在生成的协程接收了一个副本。

为了避免所有这些样板代码,标准库提供了一个 `spawn` 宏,它接受一个调用表达式,并基本上将其重写为执行上述操作。 使用它,我们最终得到

i = 0
while i < 10
  spawn puts(i)
  i += 1
end

Fiber.yield

这主要对在迭代中更改的局部变量有用。 这不会发生在块参数中。 例如,这按预期工作

10.times do |i|
  spawn do
    puts i
  end
end

Fiber.yield

生成一个协程并等待它完成

我们可以为此使用一个通道

channel = Channel(Nil).new

spawn do
  puts "Before send"
  channel.send(nil)
  puts "After send"
end

puts "Before receive"
channel.receive
puts "After receive"

这将打印

Before receive
Before send
After send
After receive

首先,程序生成一个协程,但尚未执行它。 当我们调用 `channel.receive` 时,主协程被阻塞,执行继续进行生成的协程。 然后调用 `channel.send(nil)`。 请注意,此 `send` 不会占用通道中的空间,因为在第一个 `send` 之前调用了一个 `receive`,`send` 不会被阻塞。 协程只有在被阻塞或执行到完成时才会切换。 因此,生成的协程将在 `send` 之后继续,并且一旦执行了 `puts "After send"`,执行将切换回主协程。

然后主协程恢复到 `channel.receive`,它一直在等待一个值。 然后主协程继续执行并完成。

在上面的示例中,我们使用了 `nil` 只是为了传达协程已结束。 我们还可以使用通道在协程之间传递值

channel = Channel(Int32).new

spawn do
  puts "Before first send"
  channel.send(1)
  puts "Before second send"
  channel.send(2)
end

puts "Before first receive"
value = channel.receive
puts value # => 1

puts "Before second receive"
value = channel.receive
puts value # => 2

输出

Before first receive
Before first send
Before second send
1
Before second receive
2

请注意,当程序执行 `receive` 时,当前协程被阻塞,执行继续进行另一个协程。 当执行 `channel.send(1)` 时,执行将继续,因为如果通道尚未满,`send` 是非阻塞的。 但是,`channel.send(2)` 会导致协程被阻塞,因为通道(默认大小为 1)已满,因此执行继续进行等待该通道的协程。

这里我们正在发送字面值,但生成的协程可能会通过例如读取文件或从套接字获取值来计算此值。 当此协程必须等待 I/O 时,其他协程将能够继续执行代码,直到 I/O 就绪,最后当值就绪并通过通道发送时,主协程将接收它。 例如

require "socket"

channel = Channel(String).new

spawn do
  server = TCPServer.new("0.0.0.0", 8080)
  socket = server.accept
  while line = socket.gets
    channel.send(line)
  end
end

spawn do
  while line = gets
    channel.send(line)
  end
end

3.times do
  puts channel.receive
end

上面的程序生成两个协程。 第一个协程创建一个 TCPServer,接受一个连接并从其中读取行,将它们发送到通道。 有一个第二个协程从标准输入读取行。 主协程读取发送到通道的前 3 条消息,无论是来自套接字还是 stdin,然后程序退出。 `gets` 调用将阻塞协程,并告诉事件循环如果数据到来,则从那里继续执行。

同样,我们可以等待多个协程完成执行,并收集它们的值

channel = Channel(Int32).new

10.times do |i|
  spawn do
    channel.send(i * 2)
  end
end

sum = 0
10.times do
  sum += channel.receive
end
puts sum # => 90

当然,您可以在生成的协程中使用 `receive`

channel = Channel(Int32).new

spawn do
  puts "Before send"
  channel.send(1)
  puts "After send"
end

spawn do
  puts "Before receive"
  puts channel.receive
  puts "After receive"
end

puts "Before yield"
Fiber.yield
puts "After yield"

输出

Before yield
Before send
Before receive
1
After receive
After send
After yield

这里首先执行 `channel.send`,但由于还没有人在等待值(尚未),因此执行继续进行其他协程。 第二个协程被执行,通道上有一个值,它被获取,执行继续,首先进行第一个协程,然后进行主协程,因为 `Fiber.yield` 将协程置于执行队列的末尾。

缓冲通道

上面的示例使用非缓冲通道:在发送值时,如果一个协程正在等待该通道,那么执行将继续进行该协程。

使用缓冲通道,调用 `send` 不会切换到另一个协程,除非缓冲区已满

# A buffered channel of capacity 2
channel = Channel(Int32).new(2)

spawn do
  puts "Before send 1"
  channel.send(1)
  puts "Before send 2"
  channel.send(2)
  puts "Before send 3"
  channel.send(3)
  puts "After send"
end

3.times do |i|
  puts channel.receive
end

输出

Before send 1
Before send 2
Before send 3
After send
1
2
3

请注意,第一个 `send` 不会占用通道中的空间。 这是因为在第一个 `send` 之前调用了一个 `receive`,而其他 2 个 `send` 调用发生在各自的 `receive` 之前。 `send` 调用的数量没有超过缓冲区的界限,因此发送协程无阻碍地运行到完成。

这里有一个例子,其中缓冲区中的所有空间都被占用

# A buffered channel of capacity 1
channel = Channel(Int32).new(1)

spawn do
  puts "Before send 1"
  channel.send(1)
  puts "Before send 2"
  channel.send(2)
  puts "Before send 3"
  channel.send(3)
  puts "End of send fiber"
end

3.times do |i|
  puts channel.receive
end

输出

Before send 1
Before send 2
Before send 3
1
2
3

请注意,“End of send fiber”没有出现在输出中,因为我们接收了 3 个 `send` 调用,这意味着 `3.times` 运行到完成,进而解除了主协程的阻塞,主协程执行到完成。

这里与我们刚刚看到的代码片段相同 - 只是在最底部添加了一个 `Fiber.yield` 调用

# A buffered channel of capacity 1
channel = Channel(Int32).new(1)

spawn do
  puts "Before send 1"
  channel.send(1)
  puts "Before send 2"
  channel.send(2)
  puts "Before send 3"
  channel.send(3)
  puts "End of send fiber"
end

3.times do |i|
  puts channel.receive
end

Fiber.yield

输出

Before send 1
Before send 2
Before send 3
1
2
3
End of send fiber

通过在代码片段的末尾添加一个 `Fiber.yield` 调用,我们在输出中看到了“End of send fiber”消息,否则由于主协程执行到完成而会被遗漏。