16 Zig 中的线程和并行性介绍
在 Zig 中,线程可以通过Thread
Zig 标准库中的结构体使用。该结构体代表一个内核线程,它遵循 POSIX 线程模式,这意味着它的工作方式类似于pthread
C 库中的线程,该线程通常在 GNU C 编译器 ( gcc
) 的任何发行版中可用。如果您不熟悉线程,我先给您讲解一下它背后的理论,好吗?
16.1什么是线程?
线程本质上是一个独立的执行上下文。我们使用线程为程序引入并行性,在大多数情况下,这可以提高程序的运行速度,因为我们有多个任务同时并行执行。
程序通常默认是单线程的。这意味着每个程序通常运行在单个线程或单个执行上下文中。当只有一个线程运行时,就没有并行性。当没有并行性时,命令将按顺序执行,也就是说,一次只执行一个命令,一个接一个。通过在程序中创建多个线程,我们可以同时执行多个命令。
创建多线程的程序在现实世界中非常常见。因为许多不同类型的应用程序都非常适合并行执行。视频和照片编辑应用程序(例如 Adobe Photoshop 或 DaVinci Resolve)、游戏(例如《巫师 3》)以及 Web 浏览器(例如 Google Chrome、Firefox、Microsoft Edge 等)都是很好的例子。例如,在 Web 浏览器中,线程通常用于实现标签页。Web 浏览器中的标签页通常在 Web 浏览器的主进程中作为单独的线程运行。也就是说,您在 Web 浏览器中打开的每个新标签页通常在单独的执行线程中运行。
通过在单独的线程中运行每个标签页,我们允许浏览器中所有打开的标签页同时运行,并且彼此独立。例如,您可能当前在某个标签页中打开了 YouTube 或 Spotify,并且正在该标签页中收听播客,同时在另一个标签页中处理 Google 文档撰写论文。即使您没有查看 YouTube 标签页,您仍然可以收听播客,因为这个 YouTube 标签页与正在运行 Google 文档的另一个标签页并行运行。
如果没有线程,另一种选择是将每个标签页作为计算机上完全独立的进程运行。但这不是一个好选择,因为仅仅几个标签页就已经消耗了太多的计算机电量和资源。换句话说,与创建一个新的执行线程相比,创建一个全新的进程非常昂贵。此外,在使用浏览器时遇到延迟和开销的可能性也会很大。线程创建速度更快,而且它们消耗的计算机资源也少得多,尤其是因为它们与主进程共享一些资源。
因此,现代网络浏览器之所以能使用线程功能,是因为它允许你在 Google 文档上撰写文章的同时收听播客。如果没有线程功能,网络浏览器很可能只会显示一个标签页。
线程也非常适合处理任何涉及处理请求或订单的操作。因为处理请求需要时间,而且通常涉及大量的“等待时间”。换句话说,我们会花费大量时间处于空闲状态,等待某件事完成。例如,考虑一家餐厅。餐厅处理订单通常涉及以下步骤:
- 接收客户的订单。
- 将订单传递给厨房,然后等待食物烹制。
- 开始在厨房做饭。
- 当食物完全煮熟后,将食物交给客户。
如果你仔细思考以上几点,就会发现整个过程中有一个重要的等待时刻,那就是食物在厨房烹饪的时候。在准备食物的过程中,服务员和顾客都在等待食物准备好并送达。
如果我们编写一个程序来代表这家餐厅,更具体地说,一个单线程程序,那么这个程序的效率会非常低。因为程序会处于空闲状态,在“检查食物是否准备好”这一步骤上等待相当长的时间。请考虑下面展示的代码片段,它可能代表这样的程序。
这个程序的问题在于while循环。程序会花费大量时间等待while循环,除了检查食物是否准备好之外什么也不做。这太浪费时间了。与其等待某个事情发生,服务员可以直接把订单发送到厨房,然后继续处理其他顾客的订单,并继续向厨房发送订单,而不是什么也不做,等着食物准备好。
const order = Order.init("Pizza Margherita", n = 1);
const waiter = Waiter.init();
waiter.receive_order(order);
waiter.ask_kitchen_to_cook();
var food_not_ready = true;
while (food_not_ready) {
food_not_ready = waiter.is_food_ready();
}
const food = waiter.get_food_from_kitchen();
waiter.send_food_to_client(food);
这就是为什么线程非常适合这个程序。我们可以用线程把服务员从“等待”的职责中解放出来,让他们可以继续做其他事情,并接收更多订单。请看下一个示例,我将上面的程序重写成了一个不同的程序,使用线程来烹饪和配送订单。
您可以在这段程序中看到,当服务员收到顾客的新订单时,他会执行该send_order()
函数。该函数唯一要做的就是创建一个新线程并将其分离。由于创建线程是一个非常快的操作,该send_order()
函数几乎立即返回,因此服务员几乎不用花时间处理订单,而是继续尝试从顾客那里获取下一个订单。
在新创建的线程中,订单由厨师烹饪,食物准备好后,送到客户的餐桌上。
fn cook_and_deliver_order(order: *Order) void {
const chef = Chef.init();
const food = chef.cook(order.*);
chef.deliver_food(food);
}
fn send_order(order: Order) void {
const cook_thread = Thread.spawn(
.{}, cook_and_deliver_order, .{&order}
);
cook_thread.detach();
}
const waiter = Waiter.init();
while (true) {
const order = waiter.get_new_order();
if (order) {
send_order(order);
}
}
16.2线程与进程
_当我们运行一个程序时,该程序在操作系统中作为一个进程_执行。这是一种一对一的关系,您执行的每个程序或应用程序在操作系统中都是一个单独的进程。但是每个程序或每个进程都可以创建并包含多个线程。因此,进程和线程之间存在一对多的关系。
这也意味着我们创建的每个线程始终与计算机中的特定进程相关联。换句话说,线程始终是现有进程的子集(或子进程)。所有线程都共享与其创建源进程关联的部分资源。由于线程与进程共享资源,因此它们非常适合简化任务之间的通信。
例如,假设你正在开发一个庞大而复杂的应用程序,如果能将其拆分成两个部分,并使这两个独立的部分相互通信,开发起来就会简单得多。有些程序员选择将代码库的这两个部分有效地编写成两个完全独立的程序,然后使用 IPC(进程间通信)使这两个独立的程序/进程相互通信,并使它们协同工作。
然而,一些程序员发现 IPC 难以处理,因此,他们倾向于将代码库的一部分编写为“程序的主体部分”,或者作为在操作系统中以进程形式运行的代码部分,而将代码库的另一部分编写为在新线程中执行的任务。进程和线程可以通过控制流以及数据轻松地相互通信,因为它们共享并可以访问相同的标准文件描述符(stdout
、stdin
、stderr
),以及堆和全局数据段上的相同内存空间。
更详细地说,您创建的每个线程都有一个专门为该线程保留的单独堆栈帧,这实际上意味着您在此线程内创建的每个本地对象都只对该线程有效,即其他线程无法访问此本地对象。除非您创建的这个对象位于堆上。换句话说,如果与此对象关联的内存位于堆上,那么其他线程就有可能访问此对象。
因此,存储在栈中的对象对于创建它们的线程来说是本地的。但存储在堆中的对象可能被其他线程访问。所有这些意味着,每个线程都有自己独立的栈帧,但同时,所有线程共享同一个堆、相同的标准文件描述符(这意味着它们共享相同的stdout
、stdin
、stderr
)以及程序中的同一个全局数据段。
16.3创建线程
我们在 Zig 中创建新线程,首先将Thread
结构体导入到当前的 Zig 模块中,然后调用spawn()
该结构体的方法,该方法从当前进程创建(或“生成”)一个新的执行线程。此方法有三个参数,分别为:
- 一个
SpawnConfig
对象,其中包含生成过程的配置。 - 在这个新线程中将要执行(或将要“调用”)的函数的名称。
- 传递给第二个参数中提供的函数的参数(或输入)列表。
通过这三个参数,你可以控制线程的创建方式,并指定将在此新线程中执行哪些工作(或“任务”)。线程只是一个独立的执行上下文,我们通常在代码中创建新线程,因为我们想在这个新的执行上下文中执行一些工作。我们通过提供函数名称作为方法的第二个参数来指定在此上下文中要执行的具体工作或步骤spawn()
。
因此,当这个新线程创建时,您提供给该spawn()
方法作为输入的函数将被调用,或者说,在这个新线程内执行。您可以通过在方法的第三个参数中提供参数列表(或输入列表)来控制调用此函数时传递给它的参数或输入spawn()
。这些参数将按照提供给函数的顺序传递给函数spawn()
。
此外,它SpawnConfig
是一个结构体对象,只有两个可能的字段(或者说两个可能的成员),您可以设置它们来定制生成行为。这些字段是:
stack_size
:您可以提供一个usize
值来指定线程堆栈框架的大小(以字节为单位)。默认情况下,此值为:16×1024×1024。allocator
:您可以提供一个分配器对象,用于为线程分配内存时使用。
要使用这两个字段(或“配置”)之一,您只需创建一个新的 类型的对象SpawnConfig
,并将该对象作为spawn()
方法的输入。但是,如果您不想使用其中一个配置,而只想使用默认值,则可以提供一个匿名结构体字面量 ( .{}
) 来代替此SpawnConfig
参数。
作为我们的第一个非常简单的示例,请考虑下面公开的代码。在同一个程序中,您可以根据需要创建多个执行线程。但是,在第一个示例中,我们只创建了一个执行线程,因为我们spawn()
只调用了一次。
另外,请注意,在这个例子中,我们在新线程中执行该函数do_some_work()
。由于该函数没有参数,因此不接收任何输入,因此我们在此实例中传递了一个空列表,或者更准确地说,.{}
在 的第三个参数中传递了一个空的匿名结构体 ( ) spawn()
。
const std = @import("std");
const stdout = std.io.getStdOut().writer();
const Thread = std.Thread;
fn do_some_work() !void {
_ = try stdout.write("Starting the work.\n");
std.time.sleep(100 * std.time.ns_per_ms);
_ = try stdout.write("Finishing the work.\n");
}
pub fn main() !void {
const thread = try Thread.spawn(.{}, do_some_work, .{});
thread.join();
}
Starting the work.Finishing the work.
注意调用try
该方法时使用的spawn()
。这意味着该方法在某些情况下可能会返回错误。一种特殊情况是,当您尝试创建新线程时,如果已经创建了过多的线程(即超出了系统并发线程的配额)。
但是,如果新线程成功创建,该spawn()
方法会返回一个处理程序对象(类型为 的对象Thread
)。您可以使用这个处理程序对象有效地控制线程的各个方面。
当线程创建时,您提供的输入函数spawn()
将被调用(即,被调用),以启动此新线程的执行。换句话说,每次调用 时spawn()
,不仅会创建一个新线程,还会自动按下此线程的“启动工作按钮”。因此,此线程中正在执行的工作会在线程创建后立即开始。这类似于 C 语言库的工作方式pthread_create()
,pthreads
它也会在线程创建后立即开始执行。
16.4从线程返回
我们在上一节中了解到,线程一旦创建就开始执行。现在,我们将学习如何在 Zig 中“加入”或“分离”线程。“加入”和“分离”是控制线程如何返回主线程或程序中的主进程的操作。
我们使用线程处理程序对象中的方法join()
和执行这些操作。您创建的每个线程都可以标记为可_连接 (joinable)或_可分离 (detached) (Linux 手册页 2024) 。您可以通过调用线程处理程序对象中的方法将线程转换为_分离_线程。但是,如果您调用该方法,则该线程将变为_可连接_线程。detach()
detach()``join()
一个线程不能同时是_可连接 (joinable)和_可分离 (detached) 的。通常,这意味着你不能在同一个线程中同时调用join()
和detach()
。但一个线程必须是两者之一,这意味着你应该始终在线程上调用join()
或。如果你不在线程上调用这两个方法中的任何一个,则会在程序中引入未定义的行为,这将在16.9.2 节detach()
中描述。
现在,让我们描述一下这两种方法对您的线程的作用。
16.4.1加入线程
当你加入一个线程时,你实际上是在说:“嘿!你能等线程完成后再继续执行吗?” 例如,如果我们回到 Zig 中第一个也是最简单的线程示例,我们在main()
程序的函数内部创建了一个单线程,并join()
在最后调用了这个线程。代码示例的这一部分如下所示。
因为我们是在 的作用域内加入这个新线程的main()
,所以main()
函数的执行会暂时停止,等待线程执行完成。也就是说, 的执行会main()
在 被调用的那一行暂时停止join()
,只有在线程完成其任务后才会继续。
pub fn main() !void {
const thread = try Thread.spawn(.{}, do_some_work, .{});
thread.join();
}
因为我们已经将这个新线程加入到main()
作用域内,所以我们可以保证这个新线程将在执行结束之前完成main()
。因为它保证main()
会等待线程完成其任务。
在上面的例子中,调用之后没有其他表达式join()
。我们刚好到达了 的作用域的末尾main()
,因此,我们的程序的执行在线程完成其任务后就结束了,因为没有其他事情要做了。但是,如果我们在 join 调用之后还有更多事情要做呢?
为了演示这种可能性,请考虑下面展示的示例。在这里,我们创建一个print_id()
函数,它只接收一个 id 作为输入,并将其打印到stdout
。在这个例子中,我们依次创建了两个新线程。然后,我们加入第一个线程,然后等待整整两秒,最后加入第二个线程。
此示例背后的理念是,最后一个join()
调用仅在第一个线程完成其任务(即第一次join()
调用)并经过两秒延迟后执行。如果您编译并运行此示例,您会注意到大多数消息都快速打印到stdout
,即它们几乎立即出现在屏幕上。然而,最后一条消息(“加入线程 2”)大约需要 2 秒才能显示在屏幕上。
fn print_id(id: *const u8) !void {
try stdout.print("Thread ID: {d}\n", .{id.*});
}
pub fn main() !void {
const id1: u8 = 1;
const id2: u8 = 2;
const thread1 = try Thread.spawn(.{}, print_id, .{&id1});
const thread2 = try Thread.spawn(.{}, print_id, .{&id2});
_ = try stdout.write("Joining thread 1\n");
thread1.join();
std.time.sleep(2 * std.time.ns_per_s);
_ = try stdout.write("Joining thread 2\n");
thread2.join();
}
Thread ID: Joining thread 1
1
Thread ID: 2
Joining thread 2
这表明两个线程都非常快地完成了它们的工作(即打印 ID),在两秒的延迟结束之前。因此,最后一个join()
调用几乎立即返回。因为当最后一个join()
调用发生时,第二个线程已经完成了它的任务。
现在,如果你编译并运行此示例,你还会注意到,在某些情况下,消息会相互缠绕。换句话说,你可能会看到消息“加入线程 1”插入到消息“线程 1”的中间,反之亦然。发生这种情况的原因是:
main()
线程基本上和程序的主进程(即函数)同时执行。- 线程
stdout
与程序的主进程共享相同的消息,这意味着线程产生的消息被发送到与主进程产生的消息完全相同的地方。
这两点在之前的16.1 节中都有描述。因此,由于消息的生成和发送时间stdout
大致相同,它们可能会相互缠绕。总之,当你调用join()
一个线程时,当前进程会等待该线程完成后才能继续执行;并且,当该线程完成其任务后,与该线程关联的资源会自动释放,当前进程将继续执行。
16.4.2分离线程
当您分离一个线程时,与该线程关联的资源将自动释放回系统,而无需另一个线程加入该终止的线程。
换句话说,当你调用detach()
一个线程时,就像你的孩子长大成人,也就是说,他们不再依赖你。分离线程会释放自身,并且当该线程完成其任务时,它不会将结果报告给你。因此,当你不需要使用线程的返回值,或者你不关心线程何时完成其工作(即线程自行解决所有问题)时,通常将线程标记为_分离线程。_
以下面的代码示例为例。我们创建一个新线程,然后将其分离,最后在程序结束前打印一条消息。我们使用了print_id()
前面示例中用过的相同函数。
fn print_id(id: *const u8) !void {
try stdout.print("Thread ID: {d}\n", .{id.*});
}
pub fn main() !void {
const id1: u8 = 1;
const thread1 = try Thread.spawn(.{}, print_id, .{&id1});
thread1.detach();
_ = try stdout.write("Finish main\n");
}
Finish main
现在,如果你仔细观察这段代码示例的输出,你会发现只有 main 函数中的最后一条消息被打印到了控制台。原本应该打印的消息print_id()
并没有出现在控制台中。为什么?这是因为程序的主进程在线程能够发出任何指令之前就先完成了。
这完全没问题,因为线程已经分离,所以它可以自行释放,而无需等待主进程。如果你让主进程在结束前休眠(或“等待”)几纳秒,你很可能会看到打印的消息print_id()
,因为你在主进程结束前为线程提供了足够的时间来完成。
16.5线程池
线程池是一种非常流行的编程模式,尤其适用于服务器和守护进程。线程池只是一组线程,或者说是一个线程“池”。许多程序员喜欢使用这种模式,因为它可以更轻松地管理和使用程序中的多个线程,而无需在需要时手动创建线程。
此外,使用线程池也可能提升程序的性能,尤其是在程序需要不断创建线程执行短期任务的情况下。在这种情况下,线程池可能会提升性能,因为您不必一直不断地创建和销毁线程,从而避免了这种持续创建和销毁线程所带来的大量开销。
线程池的核心思想是预先创建一组线程,并随时准备执行任务。在程序启动时创建一组线程,并在程序运行时保持这些线程处于活动状态。每个线程要么正在执行任务,要么等待分配任务。每当程序中出现新任务时,该任务就会被添加到“任务队列”中。当某个线程可用并准备好执行新任务时,该线程就会从“任务队列”中获取下一个任务,并直接执行该任务。
Zig 标准库提供了基于结构体的线程池实现。您可以通过将对象作为输入提供给此结构体的方法,std.Thread.Pool
来创建对象的新实例。对象 是一个包含线程池配置的结构体对象。此结构体对象中最重要的设置是成员和。顾名思义,成员应该接收一个分配器对象,而成员指定要在此池中创建和维护的线程数。Pool``Pool.Options``init()``Pool.Options``n_jobs``allocator``allocator``n_jobs
考虑下面展示的示例,它演示了如何创建一个新的线程池对象。在这里,我们创建一个Pool.Options
包含通用分配器对象的对象,并且其n_jobs
成员设置为 4,这意味着线程池将创建并使用 4 个线程。
另请注意,pool
对象初始设置为undefined
。这允许我们初始声明线程池对象,但无法正确实例化对象的底层内存。您必须undefined
像这样使用 初始声明线程池对象,因为init()
的方法Pool
需要初始指针才能正确实例化对象。
因此,只需记住使用 创建线程池对象undefined
,然后在该对象上调用 方法即可。使用完毕后,init()
也不要忘记调用线程池对象的 方法来释放分配给线程池的资源。否则,程序将出现内存泄漏。deinit()
const std = @import("std");
const Pool = std.Thread.Pool;
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
const opt = Pool.Options{
.n_jobs = 4,
.allocator = allocator,
};
var pool: Pool = undefined;
try pool.init(opt);
defer pool.deinit();
}
现在我们知道了如何创建Pool
对象,接下来我们必须了解如何分配任务给线程池对象中的线程执行。要分配一个任务给线程执行,我们需要调用spawn()
线程池对象中的方法。
此方法的工作原理与对象中的方法spawn()
相同。该方法的参数与前一个方法几乎相同,更确切地说,在这种情况下我们无需提供对象。但是,线程池对象中的此方法不会创建新线程,而是仅在内部“任务队列”中注册一个待执行的新任务,池中任何可用的线程都会获取此任务并直接执行。spawn()``Thread``SpawnConfig``spawn()
在下面的示例中,我们print_id()
再次使用了之前的函数。但您可能会注意到,print_id()
这次的函数略有不同,因为我们在调用时使用了catch
而不是。目前,该结构体仅支持不会将错误作为任务返回的函数。因此,在将任务分配给线程池中的线程时,必须使用不会返回错误的函数。这就是我们在这里使用 的原因,这样函数就不会返回错误。try``print()``Pool``catch``print_id()
fn print_id(id: *const u8) void {
_ = stdout.print("Thread ID: {d}\n", .{id.*})
catch void;
}
const id1: u8 = 1;
const id2: u8 = 2;
try pool.spawn(print_id, .{&id1});
try pool.spawn(print_id, .{&id2});
这种限制可能不应该存在,事实上,Zig 团队已经在考虑解决这个问题,并且正在一个未解决的问题1中进行跟踪。因此,如果您确实需要提供一个可能返回错误的函数作为线程池中线程要执行的任务,那么您要么只能这样做:
- 实现自己的没有此限制的线程池。
- 等待 Zig 团队真正解决这个问题。
16.6互斥锁
互斥锁是每个线程库的经典组件。本质上,互斥锁是一个_互斥标志_,它的作用类似于一种“锁”,或者说是代码特定部分的守门人。互斥锁与线程同步相关,更具体地说,它们可以防止程序中出现一些典型的竞争条件,从而避免通常难以跟踪和理解的重大错误和未定义行为。
互斥锁的核心思想是帮助我们控制特定代码段的执行,并防止两个或多个线程同时执行这一段代码。许多程序员喜欢将互斥锁比作浴室的门(通常带有锁)。当一个线程锁定自己的互斥锁对象时,就好像浴室的门被锁上了一样。因此,其他想要同时使用同一浴室的人(在本例中为其他线程)必须耐心等待,直到当前使用者(或线程)打开门并离开浴室。
一些程序员也喜欢用“每个人都有发言权”的比喻来解释互斥锁。这是Computerphile 项目2中的多线程代码视频中使用的比喻。想象一下,如果你在一个谈话圈中。圈子里有一位主持人,他决定谁有权在特定时刻发言。主持人给即将发言的人一张绿卡(或某种授权卡),因此,其他所有人都必须保持安静,听这位持有绿卡的人说话。当这个人讲完后,他们会把绿卡交还给主持人,然后主持人决定谁接下来发言,并将绿卡交给该人。循环往复。
在这个对话圈中,互斥锁就像一个调解员。它授权一个线程执行特定的代码段,同时阻止其他线程执行同一代码段。如果其他线程想要执行同一代码段,它们必须等待授权线程先执行完毕。当授权线程执行完这段代码后,互斥锁会授权下一个线程执行这段代码,而其余线程则保持阻塞状态。因此,互斥锁就像一个调解员,执行着“每个线程都轮流执行这段代码”的控制。
互斥锁专门用于防止数据争用问题。当两个或多个线程同时尝试读取或写入同一个共享对象时,就会发生数据争用问题。因此,当您有一个所有线程共享的对象,并且您想避免两个或多个线程同时访问同一个对象时,可以使用互斥锁来锁定访问该特定对象的代码部分。当一个线程尝试运行被互斥锁锁定的代码时,该线程会停止执行,并耐心等待代码库的这部分解锁后再继续执行。
请注意,互斥锁通常用于锁定代码库中访问/修改所有线程共享数据的区域,例如存储在全局数据段或程序堆空间中的对象。因此,互斥锁通常不用于访问/修改线程本地对象的代码库区域。
16.6.1临界区
临界区是一个通常与互斥锁和线程同步相关的概念。本质上,临界区是程序中线程访问/修改共享资源(例如对象、文件描述符等所有线程都可以访问的资源)的部分。换句话说,临界区是程序中可能发生竞争条件的部分,因此,程序中可能引入未定义的行为。
当我们在程序中使用互斥锁时,临界区定义了代码库中需要锁定的区域。因此,我们通常在临界区开头锁定互斥锁对象,然后在临界区结尾解锁。以下两点来自 GeekFromGeeks 的“临界区”文章,它们很好地概括了临界区在线程同步问题中的作用(Geeks for Geeks 2024)。
- 临界区必须以原子操作的形式执行,这意味着一旦一个线程或进程进入临界区,所有其他线程或进程都必须等待,直到正在执行的线程或进程退出临界区。同步机制的目的是确保每次只有一个线程或进程可以执行临界区。
- 临界区的概念是计算机系统同步的核心,因为它需要确保多个线程或进程能够并发执行且互不干扰。各种同步机制(例如信号量、互斥量、监视器和条件变量)都用于实现临界区,并确保以互斥的方式访问共享资源。
16.6.2原子操作
在阅读有关线程、竞争条件和互斥锁的文章时,你也会经常看到“原子操作”这个词。总而言之,如果操作过程中无法发生上下文切换,则该操作被归类为“原子操作”。换句话说,该操作始终从头到尾执行,而不会在其执行阶段被其他进程或操作打断。
如今,原子操作并不多。但为什么原子操作在这里如此重要呢?因为数据竞争(一种竞争条件)不可能发生在原子操作上。所以,如果代码中的某一行执行了原子操作,那么这一行就永远不会遭遇数据竞争问题。因此,程序员有时会使用原子操作来保护自己免受代码中数据竞争问题的影响。
如果某个操作被编译成一条汇编指令,那么该操作可能是原子的,因为它本身就是一条汇编指令。但这并不能保证。对于较旧的 CPU 架构(例如x86
),通常确实如此。但如今,现代 CPU 架构中的大多数汇编指令都被分解为多个微任务,这本质上使得该操作非原子,即使它由一条汇编指令组成。
Zig 标准库在std.atomic
模块中提供了一些原子功能。在此模块中,您将找到一个名为的公共通用函数Value()
。使用此函数,我们创建一个“原子对象”,它是一个包含一些本机原子操作的值,最值得注意的是load()
和fetchAdd()
操作。如果您有使用 C++ 多线程的经验,您可能已经认出了这种模式。所以,是的,Zig 中的这个通用“原子对象”本质上与 C++ 标准库中的模板结构相同std::atomic
。需要强调的是,Zig 中的这些原子操作仅支持原始数据类型(即1.5 节中介绍的类型)。
16.6.3数据竞争和竞争条件
要理解互斥锁的用途,我们需要更好地理解它们试图解决的问题,这个问题可以概括为数据竞争问题。数据竞争问题是一种竞争条件,当一个线程正在访问特定的内存位置(即特定的共享对象),而另一个线程同时尝试将新数据写入/保存到同一内存位置(即同一个共享对象)时,就会发生这种情况。
我们可以简单地将竞争条件定义为程序中任何基于“谁先到达”问题的错误。数据竞争问题就是一种竞争条件,因为它发生在两方或多方尝试同时读取和写入同一内存位置时,因此,此操作的最终结果完全取决于谁先到达该内存位置。因此,存在数据竞争问题的程序每次执行都可能产生不同的结果。
因此,竞争条件会导致行为不明确和不可预测,因为每次不同的人先于其他人到达目标位置时,程序都会产生不同的答案。而且,我们无法轻松预测或控制谁会先到达这个目标位置。换句话说,每次程序运行时,都可能得到不同的答案,因为不同的人、不同的函数或不同的代码部分先于其他人完成了任务。
举个例子,请考虑下面展示的代码片段。在这个例子中,我们创建了一个全局计数器变量,并且还创建了一个increment()
函数,该函数的作用是在 for 循环中递增这个全局计数器变量。
由于 for 循环迭代了 10 万次,并且我们在此代码示例中创建了两个单独的线程,您期望在最终打印的消息中看到什么数字stdout
?答案应该是 20 万。对吗?嗯,理论上,这个程序应该在最后打印 20 万,但实际上,每次执行这个程序时,我都会得到不同的答案。
在下面展示的例子中,你可以看到这次的最终结果是 117254,而不是预期的 200000。第二次执行这个程序时,我得到的结果是 108592。所以这个程序的最终结果是变化的,但它永远不会达到我们想要的 200000。
// Global counter variable
var counter: usize = 0;
// Function to increment the counter
fn increment() void {
for (0..100000) |_| {
counter += 1;
}
}
pub fn main() !void {
const thr1 = try Thread.spawn(.{}, increment, .{});
const thr2 = try Thread.spawn(.{}, increment, .{});
thr1.join();
thr2.join();
try stdout.print("Couter value: {d}\n", .{counter});
}
Couter value: 117254
为什么会发生这种情况?答案是:因为该程序存在数据争用问题。当且仅当第一个线程在第二个线程开始执行之前完成其任务时,该程序才会打印正确的数字 200000。但这不太可能发生。因为创建线程的过程太快,因此两个线程几乎同时开始执行。如果您修改此代码,在第一次和第二次调用 之间添加几纳秒的休眠时间spawn()
,则可以增加程序产生“正确结果”的几率。
数据争用问题的发生是因为两个线程几乎同时读写同一内存位置。在这个例子中,每个线程在 for 循环的每次迭代中实际上执行三个基本操作,分别是:
- 读取的当前值
count
。 - 将此值增加 1。
- 将结果写回
count
。
理想情况下,线程 B 应该count
仅在线程 A 将递增的值写回对象后才读取 的值。因此,在理想情况下(如表 16.1count
所示),线程应该彼此同步工作。但实际情况是,这些线程并不同步,因此存在数据争用问题,如表 16.2所示。
请注意,在数据争用场景中(表 16.2),线程 B 执行的读取操作发生在线程 A 的写入操作之前,这最终导致程序结束时出现错误的结果。因为当线程 B 读取变量的值时count
,线程 A 仍在处理 中的初始值count
,并且尚未将新的递增值写回count
。因此,线程 B 最终读取的是相同的初始值(或“旧”),count
而不是线程 A 本来要写入的更新后的递增值。
表 16.1:两个线程增加相同整数值的理想场景
线程 1 | 线程 2 | 整数值 |
---|---|---|
读取值 | 0 | |
增量 | 1 | |
写入值 | 1 | |
读取值 | 1 | |
增量 | 2 | |
写入值 | 2 |
表 16.2:两个线程增加相同整数值时的数据争用场景
线程 1 | 线程 2 | 整数值 |
---|---|---|
读取值 | 0 | |
读取值 | 0 | |
增量 | 1 | |
增量 | 1 | |
写入值 | 1 | |
写入值 | 1 |
如果你仔细思考这些以表格形式呈现的图表,就会发现它们与我们在16.6.2 节中讨论的原子操作相关。记住,原子操作是指 CPU 从头到尾执行的操作,不会被其他线程或进程打断。因此,表 16.1中展示的场景不会受到数据竞争的影响,因为线程 A 执行的操作不会被线程 B 的操作在中间打断。
如果我们再思考一下16.6.1 节中关于临界区的讨论,我们就能确定程序中代表临界区的部分,也就是容易受到数据竞争条件影响的部分。在这个例子中,程序的临界区就是我们递增counter
变量的那一行(counter += 1
)。因此,理想情况下,我们应该使用互斥锁,并在这一行之前加锁,然后在这一行之后解锁。
16.6.4在Zig中使用互斥锁
现在我们知道了互斥锁试图解决的问题,我们可以学习如何在 Zig 中使用它们。Zig 中的互斥锁可以通过std.Thread.Mutex
Zig 标准库中的结构体获取。如果我们采用上一个示例中的相同代码,并使用互斥锁对其进行改进,以解决数据竞争问题,我们将得到下面的代码示例。
注意,这次我们必须修改increment()
函数,使其接收指向对象的指针Mutex
作为输入。为了确保程序安全,避免数据争用问题,我们需要做的就是lock()
在临界区开头调用该方法,然后unlock()
在临界区结尾调用该方法。注意,该程序的输出现在是正确的数字 200000。
const std = @import("std");
const stdout = std.io.getStdOut().writer();
const Thread = std.Thread;
const Mutex = std.Thread.Mutex;
var counter: usize = 0;
fn increment(mutex: *Mutex) void {
for (0..100000) |_| {
mutex.lock();
counter += 1;
mutex.unlock();
}
}
pub fn main() !void {
var mutex: Mutex = .{};
const thr1 = try Thread.spawn(.{}, increment, .{&mutex});
const thr2 = try Thread.spawn(.{}, increment, .{&mutex});
thr1.join();
thr2.join();
try stdout.print("Couter value: {d}\n", .{counter});
}
Couter value: 200000
16.7读/写锁
互斥锁通常用于两个或多个线程同时运行同一段代码并不总是安全的情况下。相比之下,读写锁通常用于混合场景,即代码库中有些部分可以安全地并行运行,而其他部分则不安全。
例如,假设您有多个线程使用文件系统中的同一个共享文件来存储某些配置或统计信息。如果两个或多个线程同时尝试从同一个文件读取数据,则不会发生任何异常。因此,这部分代码库完全可以安全地并行执行,多个线程可以同时读取同一个文件。
但是,如果两个或多个线程同时尝试将数据写入同一个文件,就会引发一些竞争条件问题。因此,代码库的这部分内容并行执行并不安全。更具体地说,一个线程可能会在另一个线程写入数据的中间写入数据。两个或多个线程同时写入同一位置可能会导致数据损坏。这种特定情况通常被称为“撕裂写入”。
因此,从这个例子中我们可以得出这样的结论:有些类型的操作会导致竞争条件,但也有一些类型的操作不会导致竞争条件问题。你也可以说,有些类型的操作容易受到竞争条件问题的影响,而有些类型的操作则不会。
读/写锁是一种承认特定场景存在的锁,您可以使用这种类型的锁来控制代码库的哪些部分可以安全地并行运行,哪些部分则不能。
16.7.1排他锁与共享锁
因此,读/写锁与互斥锁略有不同。互斥锁始终是_排他锁_,这意味着始终只允许一个线程执行。使用排他锁时,其他线程始终被“排除”,即始终被阻止执行。但在读/写锁中,其他线程可能被授权同时运行,具体取决于它们获取的锁类型。
读/写锁有两种类型:独占锁和共享锁。独占锁的工作原理与互斥锁完全相同,而共享锁则不会阻止其他线程同时运行。在pthreads
C 语言库中,读/写锁可以通过 C 结构体获得pthread_rwlock_t
。使用此 C 结构体,您可以创建:
- “写锁”,对应于排他锁。
- “读锁”,对应共享锁。
与 Zig 相比,术语可能略有不同。但含义是一样的。因此,只需记住这个关系:写锁是独占锁,而读锁是共享锁。
当一个线程尝试获取读锁(即共享锁)时,当且仅当另一个线程当前未持有写锁(即独占锁),并且队列中没有其他线程正在等待获取写锁时,该线程才会获得共享锁。换句话说,队列中的线程之前曾尝试获取写锁,但由于另一个正在运行且已持有写锁的线程,该线程被阻塞。因此,该线程在等待获取写锁的队列中,并且当前正在等待另一个持有写锁的线程完成执行。
当一个线程尝试获取读锁,但未能获取到该读锁时(原因可能是已经有线程在运行并获取了写锁,或者队列中已经有线程等待获取写锁),该线程的执行将立即被阻塞,即暂停。该线程将无限期地尝试获取读锁,只有在成功获取读锁后,其执行才会解除阻塞(或恢复暂停)。
如果你深入思考读锁和写锁之间的动态关系,你可能会注意到读锁本质上是一种安全机制。更具体地说,它是一种允许特定线程仅在安全的情况下与其他线程一起运行的方法。换句话说,如果当前有一个线程正在运行并持有写锁,那么尝试获取读锁的线程现在运行很可能是不安全的。因此,读锁可以保护该线程免于陷入危险,并耐心等待“写锁”线程完成其任务后再继续运行。
另一方面,如果当前只有“读锁”(即“共享锁”)线程在运行(即当前不存在任何“写锁”线程),那么获取读锁的线程与其他线程并行运行是完全安全的。因此,读锁恰好允许该线程与其他线程一起运行。
因此,通过结合使用读锁(共享锁)和写锁(排他锁),我们可以控制多线程代码的哪些区域或部分可以安全地进行并行,哪些部分不能安全地进行并行。
16.7.2在 Zig 中使用读/写锁
Zig 标准库通过std.Thread.RwLock
模块支持读/写锁。如果您希望某个线程获取共享锁(即读锁),则应该lockShared()
从RwLock
对象调用该方法。但是,如果您希望该线程获取独占锁(即写锁),则应该lock()
从RwLock
对象调用该方法。
与互斥锁一样,到达“临界区”末尾后,我们也需要解锁通过读/写锁对象获取的共享锁或排他锁。如果获取的是排他锁,则可以通过调用unlock()
读/写锁对象中的方法来解锁。相反,如果获取的是共享锁,则调用unlockShared()
来解锁共享锁。
举一个简单的例子,下面的代码片段创建了三个独立的线程,负责读取counter
对象中的当前值,并且还创建了另一个线程,负责将新数据写入counter
对象(更具体地说是增加它)。
var counter: u32 = 0;
fn reader(lock: *RwLock) !void {
while (true) {
lock.lockShared();
const v: u32 = counter;
try stdout.print("{d}", .{v});
lock.unlockShared();
std.time.sleep(2 * std.time.ns_per_s);
}
}
fn writer(lock: *RwLock) void {
while (true) {
lock.lock();
counter += 1;
lock.unlock();
std.time.sleep(2 * std.time.ns_per_s);
}
}
pub fn main() !void {
var lock: RwLock = .{};
const thr1 = try Thread.spawn(.{}, reader, .{&lock});
const thr2 = try Thread.spawn(.{}, reader, .{&lock});
const thr3 = try Thread.spawn(.{}, reader, .{&lock});
const wthread = try Thread.spawn(.{}, writer, .{&lock});
thr1.join();
thr2.join();
thr3.join();
wthread.join();
}
16.8放弃线程
该Thread
结构体支持通过yield()
方法进行让步。让步意味着线程的执行将暂时停止,并移至由操作系统调度程序管理的优先级队列的末尾。
也就是说,当你放弃一个线程时,你实际上是在对操作系统说:“嘿!你能暂时停止执行这个线程,稍后再回来继续吗?”。你也可以将这个放弃操作理解为:“你能降低这个线程的优先级,让你专注于其他任务吗?”。所以,这个放弃操作也是一种停止特定线程的方法,这样你就可以继续工作并优先执行其他线程。
需要强调的是,如今,yield 线程操作并不常见。换句话说,很少有程序员在生产环境中使用 yield 操作,原因很简单:这个操作很难实现,而且也存在更好的替代方案。大多数程序员更喜欢使用 yield 操作join()
。事实上,大多数情况下,当你在代码示例中看到有人使用这个“yield”操作时,他们通常是为了调试应用程序中的竞争条件。也就是说,yield 操作现在主要用作调试工具。
无论如何,如果您想产生一个线程,只需yield()
从中调用该方法,如下所示:
thread.yield();
16.9线程中的常见问题
16.9.1死锁
当两个或多个线程永远阻塞,等待对方释放资源时,就会发生死锁。这种情况通常发生在涉及多个锁,并且获取锁的顺序管理不善的情况下。
下面的代码示例演示了一种死锁情况。本例中,我们有两个不同的线程执行两个不同的函数(work1()
和work2()
)。此外,我们还使用了两个独立的互斥锁。编译并运行此代码示例,你会发现程序会无限期地运行,不会结束。
当我们查看执行该work1()
函数的第一个线程时,我们会注意到该函数mut1
首先获取了锁。因为这是在该线程(程序中创建的第一个线程)内执行的第一个操作。之后,该函数会休眠 1 秒,以模拟某种类型的工作,然后尝试获取mut2
锁。
另一方面,当我们查看执行该work2()
函数的第二个线程时,我们可以看到该函数mut2
首先获取了锁。因为当该线程创建并尝试获取mut2
锁时,第一个线程仍在“休眠 1 秒”那行代码处休眠。获取锁后mut2
,该work2()
函数也会休眠 1 秒,以模拟某种类型的工作,然后该函数才会尝试获取mut1
锁。
这就造成了死锁,因为在两个线程中执行完“休眠 1 秒”之后,线程 1 尝试获取mut2
锁,但该锁当前正被线程 2 使用。然而,此时线程 2 也在尝试获取锁mut1
,而该锁当前正被线程 1 使用。因此,两个线程最终都永远地等待着,等待对方释放它们想要获取的锁。
var mut1: Mutex = .{}; var mut2: Mutex = .{};
fn work1() !void {
mut1.lock();
std.time.sleep(1 * std.time.ns_per_s);
mut2.lock();
_ = try stdout.write("Doing some work 1\n");
mut2.unlock(); mut1.unlock();
}
fn work2() !void {
mut2.lock();
std.time.sleep(1 * std.time.ns_per_s);
mut1.lock();
_ = try stdout.write("Doing some work 1\n");
mut1.unlock(); mut2.unlock();
}
pub fn main() !void {
const thr1 = try Thread.spawn(.{}, work1, .{});
const thr2 = try Thread.spawn(.{}, work2, .{});
thr1.join();
thr2.join();
}
16.9.2不打电话join()
或detach()
如果线程没有调用join()
或detach()
,那么这个线程就会变成“僵尸线程”,因为它没有明确的“返回点”。你也可以将其理解为:“没有人负责管理这个线程”。如果我们没有确定一个线程是_可连接 (joinable)还是_可分离 (detached) 的,那么没有人负责处理这个线程的返回值,同样,也没有人负责清除(或释放)与该线程相关的资源。
您肯定不希望遇到这种情况,所以请记住始终在您创建的线程上使用join()
或detach()
。如果您不使用这些方法之一,我们将失去对线程的控制,并且其资源永远不会被释放(即,您在系统中泄漏了资源)。
16.9.3取消或终止特定线程
当我们思考pthreads
C 语言库时,有一种异步终止或取消线程的方法,即SIGTERM
通过函数向线程发送信号pthread_kill()
。但像这样取消线程是不好的,非常危险。因此,Zig 的线程实现没有类似的函数,或者说,没有类似的异步取消或终止线程的方法。
因此,如果您想在 Zig 中取消正在执行的线程,那么一个不错的策略是将控制流与 结合使用join()
。更具体地说,您可以围绕一个 while 循环设计线程,该循环不断检查线程是否应该继续运行。如果需要取消线程,我们可以中断 while 循环,并通过调用 将线程与主线程连接起来join()
。
下面的代码示例在一定程度上演示了这种策略。在这里,我们使用控制流来中断while循环,并比最初计划的更早退出线程。此示例还演示了如何在Zig中将原子对象与我们在16.6.2节Value()
中提到的泛型函数一起使用。
const std = @import("std");
const Thread = std.Thread;
const stdout = std.io.getStdOut().writer();
var running = std.atomic.Value(bool).init(true);
var counter: u64 = 0;
fn do_more_work() void {
std.time.sleep(2 * std.time.ns_per_s);
}
fn work() !void {
while (running.load(.monotonic)) {
for (0..10000) |_| { counter += 1; }
if (counter < 15000) {
_ = try stdout.write(
"Time to cancel the thread.\n"
);
running.store(false, .monotonic);
} else {
_ = try stdout.write("Time to do more work.\n");
do_more_work();
running.store(false, .monotonic);
}
}
}
pub fn main() !void {
const thread = try Thread.spawn(.{}, work, .{});
thread.join();
}
Time to cancel the thread.