# Pytorch Fsdp使用 PyTorch FSDP 进行完全分片数据并行训练的专家指南 - 参数分片、混合精度、CPU 卸载、FSDP2## 技能元数据| | |
|---|---|
| 来源 | 可选 — 使用 hermes skills install official/mlops/pytorch-fsdp 安装 |
| 路径 | optional-skills/mlops/pytorch-fsdp |
| 版本 | 1.0.0 |
| 作者 | Orchestra Research |
| 许可证 | MIT |
| 依赖项 | torch>=2.0, transformers |
| 平台 | linux, macos |
| 标签 | Distributed Training, PyTorch, FSDP, Data Parallel, Sharding, Mixed Precision, CPU Offloading, FSDP2, Large-Scale Training |## 参考:完整 SKILL.md:::info
以下是 Hermes 在触发此技能时加载的完整技能定义。这是技能激活时代理看到的指令。
:::# Pytorch-Fsdp 技能提供源自官方文档的 pytorch-fsdp 开发综合协助。## 何时使用此技能当出现以下情况时,应触发此技能:
- 使用 pytorch-fsdp 时
- 询问 pytorch-fsdp 功能或 API 时
- 实现 pytorch-fsdp 解决方案时
- 调试 pytorch-fsdp 代码时
- 学习 pytorch-fsdp 最佳实践时## 快速参考### 常见模式
模式 1: 通用 Join 上下文管理器
创建日期:2025 年 6 月 6 日 | 最后更新日期:2025 年 6 月 6 日
通用 join 上下文管理器有助于在输入不均匀的情况下进行分布式训练。本页概述了相关类的 API:Join、Joinable 和 JoinHook。有关教程,请参阅[使用 Join 上下文管理器进行不均匀输入的分布式训练](Distributed Training with Uneven Inputs Using the Join Context Manager)。
class torch.distributed.algorithms.Join(joinables, enable=True, throw_on_early_termination=False, **kwargs)[source]#
此类定义了通用 join 上下文管理器,它允许在进程加入后调用自定义钩子。这些钩子应掩盖(shadow)未加入进程的集体通信,以防止挂起和出错,并确保算法正确性。有关钩子定义的详细信息,请参阅 JoinHook。
警告 上下文管理器要求每个参与的
Joinable在其自身的每迭代集体通信之前调用方法notify_join_context(),以确保正确性。
警告 上下文管理器要求
JoinHook对象中的所有process_group属性相同。如果有多个JoinHook对象,则使用第一个对象的设备。进程组和设备信息用于检查未加入的进程,并在启用throw_on_early_termination时通知进程抛出异常,这两者均使用 all-reduce 操作。
参数
- joinables (
List[Joinable]) – 参与的Joinable列表;它们的钩子按给定顺序迭代。 - enable (
bool) – 启用不均匀输入检测的标志;设置为False将禁用上下文管理器的功能,仅应在用户确定输入不会不均匀时设置(默认值:True)。 - throw_on_early_termination (
bool) – 控制是否在检测到不均匀输入时抛出异常的标志(默认值:False)。
示例:
>>> import os
>>> import torch
>>> import torch.distributed as dist
>>> import torch.multiprocessing as mp
>>> import torch.nn.parallel.DistributedDataParallel as DDP
>>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO
>>> from torch.distributed.algorithms.join import Join
>>>
>>> # 在每个生成的 worker 上
>>> def worker(rank):
>>> dist.init_process_group("nccl", rank=rank, world_size=2)
>>> model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
>>> optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
>>> # Rank 1 比 rank 0 多获得一个输入
>>> inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
>>> with Join([model, optim]):
>>> for input in inputs:
>>> loss = model(input).sum()
>>> loss.backward()
>>> optim.step()
>>> # 所有 rank 都到达此处,没有挂起或出错
static notify_join_context(joinable)[source]#
通知 join 上下文管理器调用进程尚未加入。然后,如果 throw_on_early_termination=True,则检查是否检测到不均匀输入(即,是否有一个进程已经加入),如果是,则抛出异常。此方法应由 Joinable 对象在其每迭代集体通信之前调用。例如,这应该在 DistributedDataParallel 的前向传播开始时调用。只有传入上下文管理器的第一个 Joinable 对象在此方法中执行集体通信,对于其他对象,此方法为空操作。
参数
- joinable (
Joinable) – 调用此方法的Joinable对象。
返回
如果 joinable 是传入上下文管理器的第一个对象,则返回一个异步工作句柄,用于 all-reduce 以通知上下文管理器该进程尚未加入;否则返回 None。
class torch.distributed.algorithms.Joinable[source]#
这定义了可加入类(joinable classes)的抽象基类。可加入类(继承自 Joinable)应实现 join_hook(),该方法返回一个 JoinHook 实例,此外还应实现 join_device() 和 join_process_group(),它们分别返回设备和进程组信息。
abstract property join_device: device#
返回执行 join 上下文管理器所需的集体通信的设备。
abstract join_hook(**kwargs)[source]#
为给定的 Joinable 返回一个 JoinHook 实例。
参数
- kwargs (
dict) – 包含任何关键字参数的字典,用于在运行时修改 join 钩子的行为;共享同一 join 上下文管理器的所有Joinable实例都会转发相同的kwargs值。
返回类型
JoinHook
abstract property join_process_group: Any#
返回 join 上下文管理器本身所需的集体通信的进程组。
class torch.distributed.algorithms.JoinHook[source]#
这定义了一个 join 钩子,它在 join 上下文管理器中提供两个入口点。入口点:一个主钩子(main hook),当存在未加入的进程时会重复调用;一个后钩子(post-hook),当所有进程都加入后调用一次。要为通用 join 上下文管理器实现 join 钩子,请定义一个继承自 JoinHook 的类,并根据需要重写 main_hook() 和 post_hook()。
main_hook()[source]#
当存在未加入的进程时调用此钩子,以掩盖训练迭代中的集体通信。训练迭代即,在
一次前向传播、反向传播和优化器步骤。post_hook(is_last_joiner)[source]# 在所有进程加入后调用钩子。它传递一个额外的布尔参数 is_last_joiner,该参数指示当前 rank 是否是最后加入的之一。参数 is_last_joiner (bool) – 如果当前 rank 是最后加入的之一,则为 True;否则为 False。``` Join
**模式 2:** 分布式通信包 - torch.distributed
# 创建日期:2017 年 7 月 12 日 | 最后更新日期:2025 年 9 月 4 日
> **注意**
> 请参阅 [PyTorch Distributed Overview](PyTorch Distributed Overview) 以简要了解与分布式训练相关的所有功能。
## 后端 (Backends)
`torch.distributed` 支持四种内置后端,每种后端具有不同的功能。下表显示了每个后端在 CPU 或 GPU 上可用的函数。对于 NCCL,GPU 指的是 CUDA GPU;对于 XCCL,GPU 指的是 XPU GPU。仅当用于构建 PyTorch 的 MPI 实现支持时,MPI 才支持 CUDA。
| Backend | gloo | | mpi | | nccl | | xccl | |
| :--- | :---: | :---: | :---: | :---: | :---: | :---: | :---: | :---: |
| **Device** | **CPU** | **GPU** | **CPU** | **GPU** | **CPU** | **GPU** | **CPU** | **GPU** |
| send | ✓ | ✘ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| recv | ✓ | ✘ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| broadcast | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| all_reduce | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| reduce | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| all_gather | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| gather | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| scatter | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| reduce_scatter | ✓ | ✓ | ✘ | ✘ | ✘ | ✓ | ✘ | ✓ |
| all_to_all | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| barrier | ✓ | ✘ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
## PyTorch 附带的后端
PyTorch 分布式包支持 Linux(稳定版)、MacOS(稳定版)和 Windows(原型版)。默认情况下,在 Linux 上,Gloo 和 NCCL 后端会被构建并包含在 PyTorch 分布式包中(仅在使用 CUDA 构建时才包含 NCCL)。MPI 是一个可选后端,只有当你从源代码构建 PyTorch 时才能包含它(例如,在安装了 MPI 的主机上构建 PyTorch)。
> **注意**
> 自 PyTorch v1.8 起,Windows 支持除 NCCL 以外的所有集体通信后端。如果 `init_process_group()` 的 `init_method` 参数指向一个文件,它必须遵循以下架构:
> * 本地文件系统:`init_method="file:///d:/tmp/some_file"`
> * 共享文件系统:`init_method="file://////{machine_name}/{share_folder_name}/some_file"`
>
> 与 Linux 平台相同,你可以通过设置环境变量 `MASTER_ADDR` 和 `MASTER_PORT` 来启用 TcpStore。
## 使用哪个后端?
过去,我们经常被问到:“我应该使用哪个后端?”。
**经验法则**
* **使用 CUDA GPU 进行分布式训练:** 使用 NCCL 后端。
* **使用 XPU GPU 进行分布式训练:** 使用 XCCL 后端。
* **使用 CPU 进行分布式训练:** 使用 Gloo 后端。
**配备 InfiniBand 互连的 GPU 主机**
使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端。
**配备以太网互连的 GPU 主机**
使用 NCCL,因为它目前提供最佳的分布式 GPU 训练性能,特别是对于多进程单节点或多节点分布式训练。如果你遇到任何 NCCL 问题,请使用 Gloo 作为后备选项。(请注意,Gloo 在 GPU 上的运行速度目前比 NCCL 慢。)
**配备 InfiniBand 互连的 CPU 主机**
如果你的 InfiniBand 已启用 IP over IB,请使用 Gloo,否则请使用 MPI。我们计划在即将发布的版本中为 Gloo 添加 InfiniBand 支持。
**配备以太网互连的 CPU 主机**
使用 Gloo,除非你有特定理由使用 MPI。
## 常见环境变量
### 选择要使用的网络接口
默认情况下,NCCL 和 Gloo 后端都会尝试找到合适的网络接口。如果自动检测到的接口不正确,你可以使用以下环境变量覆盖它(适用于相应的后端):
* `NCCL_SOCKET_IFNAME`,例如 `export NCCL_SOCKET_IFNAME=eth0`
* `GLOO_SOCKET_IFNAME`,例如 `export GLOO_SOCKET_IFNAME=eth0`
如果你使用的是 Gloo 后端,可以通过逗号分隔来指定多个接口,如下所示:`export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3`。后端将以轮询方式在这些接口上分发操作。**务必**确保所有进程在此变量中指定相同数量的接口。
### 其他 NCCL 环境变量
**调试** - 如果 NCCL 失败,你可以设置 `NCCL_DEBUG=INFO` 以打印明确的警告消息以及基本的 NCCL 初始化信息。你还可以使用 `NCCL_DEBUG_SUBSYS` 获取有关 NCCL 特定方面的更多详细信息。例如,`NCCL_DEBUG_SUBSYS=COLL` 将打印集体调用的日志,这在调试挂起问题时可能很有帮助,尤其是那些由集体类型或消息大小不匹配引起的问题。如果出现拓扑检测失败,设置 `NCCL_DEBUG_SUBSYS=GRAPH` 将有助于检查详细的检测结果,并在需要 NCCL 团队进一步帮助时作为参考保存。
**性能调优** - NCCL 根据其拓扑检测执行自动调优,以节省用户的调优工作。在某些基于 socket 的系统上,用户可能仍会尝试调整 `NCCL_SOCKET_NTHREADS` 和 `NCCL_NSOCKS_PERTHREAD` 以增加 socket 网络带宽。NCCL 已为一些云提供商(如 AWS 或 GCP)预先调优了这两个环境变量。有关 NCCL 环境变量的完整列表,请参阅 [NVIDIA NCCL 的官方文档](NVIDIA NCCL 的官方文档)。
你可以使用 `torch.distributed.ProcessGroupNCCL.NCCLConfig` 和 `torch.distributed.ProcessGroupNCCL.Options` 进一步调优 NCCL 通信器。在解释器中使用 `help`(例如 `help(torch.distributed.ProcessGroupNCCL.NCCLConfig)`)了解更多相关信息。
## 基础知识
`torch.distributed` 包为在一台或多台机器上运行的多个计算节点间的多进程并行提供了 PyTorch 支持和通信原语。该类
`torch.nn.parallel.DistributedDataParallel()` 基于此功能构建,作为任何 PyTorch 模型的包装器,提供同步分布式训练。这与 `Multiprocessing` 包(`torch.multiprocessing` 和 `torch.nn.DataParallel()`)提供的并行性不同,因为它支持多台通过网络连接的机器,并且用户必须为每个进程显式启动主训练脚本的单独副本。在单机同步情况下,`torch.distributed` 或 `torch.nn.parallel.DistributedDataParallel()` 包装器可能比其他数据并行方法(包括 `torch.nn.DataParallel()`)具有优势:每个进程维护自己的优化器,并在每次迭代中执行完整的优化步骤。虽然这看起来是多余的,因为梯度已经跨进程收集并平均,因此对于每个进程都是相同的,但这意味着不需要参数广播步骤,从而减少了在节点之间传输张量所花费的时间。每个进程包含一个独立的 Python 解释器,消除了从单个 Python 进程驱动多个执行线程、模型副本或 GPU 所带来的额外解释器开销和“GIL 争用”。这对于大量使用 Python 运行时的模型尤其重要,包括具有循环层或许多小组件的模型。
## 初始化 #
在调用任何其他方法之前,需要使用 `torch.distributed.init_process_group()` 或 `torch.distributed.device_mesh.init_device_mesh()` 函数初始化该包。两者都会阻塞,直到所有进程加入。
> **警告**
>
> 初始化不是线程安全的。应从单个线程执行进程组创建,以防止跨 rank 的 ‘UUID’ 分配不一致,并防止初始化期间的竞争条件导致挂起。
### `torch.distributed.is_available()`[source] #
如果分布式包可用,则返回 `True`。否则,`torch.distributed` 不公开任何其他 API。目前,`torch.distributed` 在 Linux、MacOS 和 Windows 上可用。从源代码构建 PyTorch 时,设置 `USE_DISTRIBUTED=1` 以启用它。目前,Linux 和 Windows 的默认值为 `USE_DISTRIBUTED=1`,MacOS 的默认值为 `USE_DISTRIBUTED=0`。
**返回类型** `bool`
### `torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)`[source] #
初始化默认的分布式进程组。这也将初始化分布式包。
初始化进程组主要有两种方式:
1. 显式指定 `store`、`rank` 和 `world_size`。
2. 指定 `init_method`(一个 URL 字符串),指示在哪里/如何发现对等节点。可以选择指定 `rank` 和 `world_size`,或者将所有必需参数编码到 URL 中并省略它们。
如果两者均未指定,则假定 `init_method` 为 `"env://"`。
**参数**
* **backend** (`str` 或 `Backend`, *可选*) – 要使用的后端。根据构建时配置,有效值包括 `mpi`、`gloo`、`nccl`、`ucc`、`xccl` 或由第三方插件注册的后端。自 2.6 版本起,如果未提供 `backend`,c10d 将使用为 `device_id` 关键字参数(如果提供)指示的设备类型注册的后端。目前已知的默认注册包括:用于 cuda 的 `nccl`,用于 cpu 的 `gloo`,用于 xpu 的 `xccl`。如果既未提供 `backend` 也未提供 `device_id`,c10d 将检测运行时机器上的加速器,并使用为该检测到的加速器(或 cpu)注册的后端。此字段可以作为小写字符串给出(例如,`"gloo"`),也可以通过 `Backend` 属性访问(例如,`Backend.GLOO`)。如果在每台机器上使用多个进程且后端为 `nccl`,则每个进程必须对其使用的每个 GPU 具有独占访问权,因为进程间共享 GPU 可能导致死锁或 NCCL 无效使用。`ucc` 后端处于实验阶段。可以使用 `get_default_backend_for_device()` 查询设备的默认后端。
* **init_method** (`str`, *可选*) – 指定如何初始化进程组的 URL。如果未指定 `init_method` 或 `store`,则默认为 `"env://"`。与 `store` 互斥。
* **world_size** (`int`, *可选*) – 参与作业的进程数。如果指定了 `store`,则为必需项。
* **rank** (`int`, *可选*) – 当前进程的 rank(它应该是介于 0 和 `world_size-1` 之间的数字)。如果指定了 `store`,则为必需项。
* **store** (`Store`, *可选*) – 所有 worker 均可访问的键/值存储,用于交换连接/地址信息。与 `init_method` 互斥。
* **timeout** (`timedelta`, *可选*) – 针对进程组执行的操作的超时时间。NCCL 的默认值为 10 分钟,其他后端的默认值为 30 分钟。这是在此时间之后异步中止集体操作并导致进程崩溃的持续时间。这样做是因为 CUDA 执行是异步的,并且由于失败的异步 NCCL 操作可能导致后续 CUDA 操作在损坏的数据上运行,因此继续执行用户代码不再安全。当设置 `TORCH_NCCL_BLOCKING_WAIT` 时,进程将阻塞并等待此超时。
* **group_name** (`str`, *可选*, *已弃用*) – 组名。此参数被忽略
pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构建特定进程组期间需要传入的其他选项。截至目前,我们支持的唯一选项是用于 nccl 后端的 ProcessGroupNCCL.Options,可以指定 is_high_priority_stream,以便当有计算内核等待时,nccl 后端可以使用高优先级的 CUDA 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t device_id (torch.device | int, 可选) – 此进程将使用的单个特定设备,允许进行后端特定的优化。目前这有两个影响,仅在 NCCL 下有效:通信器会立即形成(立即调用 ncclCommInit* 而不是正常的延迟调用),并且子组将在可能时使用 ncclCommSplit 以避免不必要的组创建开销。如果您想尽早了解 NCCL 初始化错误,也可以使用此字段。如果提供的是 int,API 将假设使用编译时的加速器类型。注意 要启用 backend == Backend.MPI,需要在支持 MPI 的系统上从源代码构建 PyTorch。注意 对多个后端的支持是实验性的。目前,当未指定后端时,将同时创建 gloo 和 nccl 后端。gloo 后端将用于 CPU 张量的集合操作,而 nccl 后端将用于 CUDA 张量的集合操作。可以通过传入格式为 “<device_type>:<backend_name>,<device_type>:<backend_name>” 的字符串来指定自定义后端,例如 “cpu:gloo,cuda:custom_backend”。 torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None, backend_override=None)[source]# 基于 device_type、mesh_shape 和 mesh_dim_names 参数初始化 DeviceMesh。这将创建一个具有 n 维数组布局的 DeviceMesh,其中 n 是 mesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度标记为 mesh_dim_names[i]。注意 init_device_mesh 遵循 SPMD 编程模型,意味着相同的 PyTorch Python 程序在集群中的所有进程/秩上运行。确保所有秩上的 mesh_shape(描述设备布局的 nD 数组的维度)一致。不一致的 mesh_shape 可能导致挂起。注意 如果未找到进程组,init_device_mesh 将在后台初始化分布式通信所需的分布式进程组。参数 device_type (str) – 网格的设备类型。目前支持:“cpu”、“cuda/cuda-like”、“xpu”。不允许传入带有 GPU 索引的设备类型,例如 “cuda:0”。 mesh_shape (Tuple[int]) – 定义描述设备布局的多维数组维度的元组。 mesh_dim_names (Tuple[str], 可选) – 分配给描述设备布局的多维数组每个维度的网格维度名称元组。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须是唯一的。 backend_override (Dict[int | str, tuple[str, Options] | str | Options], 可选) – 对将为每个网格维度创建的某些或所有 ProcessGroups 的覆盖。每个键可以是维度的索引或其名称(如果提供了 mesh_dim_names)。每个值可以是包含后端名称及其选项的元组,或者只是这两个组件之一(在这种情况下,另一个将设置为其默认值)。返回 表示设备布局的 DeviceMesh 对象。返回类型 DeviceMesh 示例: >>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp")) torch.distributed.is_initialized()[source]# 检查默认进程组是否已初始化。返回类型 bool torch.distributed.is_mpi_available()[source]# 检查 MPI 后端是否可用。返回类型 bool torch.distributed.is_nccl_available()[source]# 检查 NCCL 后端是否可用。返回类型 bool torch.distributed.is_gloo_available()[source]# 检查 Gloo 后端是否可用。返回类型 bool torch.distributed.distributed_c10d.is_xccl_available()[source]# 检查 XCCL 后端是否可用。返回类型 bool torch.distributed.is_torchelastic_launched()[source]# 检查此进程是否使用 torch.distributed.elastic(即 torchelastic)启动。TORCHELASTIC_RUN_ID 环境变量的存在用作确定当前进程是否使用 torchelastic 启动的代理。这是一个合理的代理,因为 TORCHELASTIC_RUN_ID 映射到 rendezvous id,这始终是一个非空值,表示用于对等发现目的的作业 id。返回类型 bool torch.distributed.get_default_backend_for_device(device)[source]# 返回给定设备的默认后端。参数 device (Union[str, torch.device]) – 要获取默认后端的设备。返回 给定设备的默认后端,以小写字符串形式返回。返回类型 str 目前有三种初始化方法
支持:TCP 初始化#
有两种使用 TCP 进行初始化的方法,两者都需要一个所有进程均可访问的网络地址以及所需的 `world_size`。第一种方法需要指定属于 rank 0 进程的地址。此初始化方法要求所有进程都手动指定了 ranks。请注意,最新版本的分布式包中不再支持多播地址。`group_name` 也已弃用。
```python
import torch.distributed as dist
# 使用其中一台机器的地址 \{#skill-metadata}
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)
共享文件系统初始化#
另一种初始化方法利用组内所有机器共享且可见的文件系统,以及所需的 world_size。URL 应以 file:// 开头,并包含共享文件系统上(现有目录中)一个不存在文件的路径。文件系统初始化会在文件不存在时自动创建该文件,但不会删除该文件。因此,你有责任确保在下一次对相同文件路径/名称调用 init_process_group() 之前清理该文件。请注意,最新版本的分布式包中不再支持自动 rank 分配,group_name 也已弃用。
警告 此方法假设文件系统支持使用
fcntl进行锁定——大多数本地系统和 NFS 都支持它。
警告 此方法始终会创建文件,并尽力在程序结束时清理和删除该文件。换句话说,每次使用文件初始化方法进行初始化都需要一个全新的空文件才能成功。如果再次使用上一次初始化使用的同一文件(碰巧未被清理),这将导致意外行为,并常常引起死锁和失败。因此,尽管此方法会尽力清理文件,但如果自动删除不成功,你有责任确保在训练结束时删除该文件,以防止下次重复使用同一文件。如果你计划对同一文件名多次调用
init_process_group(),这一点尤其重要。换言之,如果文件未被删除/清理,并且你再次对该文件调用init_process_group(),预计会发生失败。这里的经验法则是,确保每次调用init_process_group()时文件都不存在或为空。
import torch.distributed as dist
# 应始终指定 rank \{#reference-full-skillmd}
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank)
环境变量初始化# 此方法将从环境变量中读取配置,允许完全自定义信息的获取方式。需要设置的变量如下:
MASTER_PORT- 必需;必须是 rank 0 机器上的空闲端口MASTER_ADDR- 必需(rank 0 除外);rank 0 节点的地址WORLD_SIZE- 必需;可以在此处设置,也可以在调用 init 函数时设置RANK- 必需;可以在此处设置,也可以在调用 init 函数时设置
Rank 为 0 的机器将用于建立所有连接。这是默认方法,意味着无需指定 init_method(或者可以指定为 env://)。
优化初始化时间#
TORCH_GLOO_LAZY_INIT- 按需建立连接,而不是使用全网格连接,这可以大大改善非 all2all 操作的初始化时间。
初始化后#
一旦运行了 torch.distributed.init_process_group(),就可以使用以下函数。要检查进程组是否已初始化,请使用 torch.distributed.is_initialized()。
class torch.distributed.Backend(name)[source]#
一个类似枚举的后端类。可用后端包括:GLOO、NCCL、UCC、MPI、XCCL 以及其他注册的后端。此类的值为小写字符串,例如 "gloo"。它们可以作为属性访问,例如 Backend.NCCL。可以直接调用此类来解析字符串,例如 Backend(backend_str) 将检查 backend_str 是否有效,如果有效则返回解析后的小写字符串。它还接受大写字符串,例如 Backend("GLOO") 返回 "gloo"。
注意
Backend.UNDEFINED条目存在,但仅用作某些字段的初始值。用户既不应直接使用它,也不应假设其存在。
classmethod register_backend(name, func, extended_api=False, devices=None)[source]#
使用给定的名称和实例化函数注册一个新后端。此类方法由第三方 ProcessGroup 扩展用于注册新后端。
参数
- name (str) – ProcessGroup 扩展的后端名称。它应与
init_process_group()中的名称匹配。 - func (function) – 实例化后端的函数处理程序。该函数应在后端扩展中实现,并接受四个参数,包括
store、rank、world_size和timeout。 - extended_api (bool, optional) – 后端是否支持扩展参数结构。默认值:
False。如果设置为True,后端将获得一个c10d::DistributedBackendOptions实例和一个进程组
由后端实现定义的选项对象。
device(str 或 list of str,可选)– 此后端支持的设备类型,例如 “cpu”、“cuda” 等。如果为 None,则假定同时支持 “cpu” 和 “cuda”。
注意 对第三方后端的支持是实验性的,可能会发生变化。
torch.distributed.get_backend(group=None)[源代码]#
返回给定进程组的后端。
参数 group(ProcessGroup,可选)– 要操作的进程组。默认为通用的主进程组。如果指定了其他特定组,则调用进程必须是该组的一部分。
返回 给定进程组的后端,以小写字符串形式表示。
返回类型 Backend
torch.distributed.get_rank(group=None)[源代码]#
返回当前进程在所提供的组中的秩(rank),否则返回默认值。秩是分配给分布式进程组中每个进程的唯一标识符。它们始终是从 0 到 world_size 的连续整数。
参数 group(ProcessGroup,可选)– 要操作的进程组。如果为 None,将使用默认进程组。
返回 进程组的秩;如果不属于该组,则返回 -1
返回类型 int
torch.distributed.get_world_size(group=None)[源代码]#
返回当前进程组中的进程数量。
参数 group(ProcessGroup,可选)– 要操作的进程组。如果为 None,将使用默认进程组。
返回 进程组的世界大小(world size);如果不属于该组,则返回 -1
返回类型 int
关闭(Shutdown)
通过在退出时调用 destroy_process_group() 来清理资源非常重要。遵循的最简单模式是在训练脚本中不再需要通信的位置(通常在 main() 的末尾附近),通过将 group 参数设置为默认值 None 来调用 destroy_process_group(),从而销毁每个进程组和后端。该调用应在每个 trainer 进程中执行一次,而不是在外层进程启动器级别执行。
如果进程组(pg)中的所有秩未在超时持续时间內调用 destroy_process_group(),尤其是在应用程序中存在多个进程组时(例如用于 N-D 并行),可能会导致退出时挂起。这是因为 ProcessGroupNCCL 的析构函数会调用 ncclCommAbort,而该函数必须集体调用,但如果由 Python 的垃圾回收机制(GC)调用 ProcessGroupNCCL 的析构函数,其调用顺序是不确定的。调用 destroy_process_group() 有助于确保在所有秩上以一致的顺序调用 ncclCommAbort,并避免在 ProcessGroupNCCL 的析构过程中调用 ncclCommAbort。
重新初始化(Reinitialization)
destroy_process_group 也可用于销毁单独的进程组。一种用例可能是容错训练,其中进程组可能在运行时被销毁,然后初始化一个新的进程组。在这种情况下,关键是在调用 destroy 之后以及随后初始化之前,使用除 torch.distributed 原语以外的某种方式同步 trainer 进程。由于难以实现这种同步,目前不支持/未测试此行为,并将其视为已知问题。如果此用例阻碍了您的工作,请提交 github issue 或 RFC。
组(Groups)
默认情况下,集合通信操作在默认组(也称为 world)上进行,并要求所有进程进入分布式函数调用。然而,某些工作负载可以从更细粒度的通信中受益。这就是分布式组发挥作用的地方。new_group() 函数可用于创建新组,包含所有进程的任意子集。它返回一个不透明的组句柄,可以作为 group 参数提供给所有集合通信操作(集合通信操作是以某些众所周知的编程模式交换信息的分布式函数)。
torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[源代码]#
创建一个新的分布式组。
此函数要求主组中的所有进程(即所有属于分布式作业的进程)都进入此函数,即使它们不会成为该组的成员。此外,应在所有进程中以相同的顺序创建组。
警告 安全并发使用:当使用带有 NCCL 后端的多个进程组时,用户必须确保跨秩的集合通信执行顺序全局一致。如果进程内的多个线程发出集合通信操作,则必须进行显式同步以确保顺序一致。
当使用
torch.distributed通信 API 的异步变体时,会返回一个 work 对象,并且通信内核被入队到单独的 CUDA 流上,从而允许通信和计算重叠。一旦在一个进程组上发出了一个或多个异步操作,在使用另一个进程组之前,必须通过调用work.wait()与其他 cuda 流进行同步。有关更多详细信息,请参阅 同时使用多个 NCCL 通信器。
参数 ranks(list[int])– 组成员的秩列表。如果为 None,将设置为所有秩。
默认值为 None。timeout (timedelta, 可选) – 详见 init_process_group 以了解详细信息和默认值。backend (str 或 Backend, 可选) – 要使用的后端。根据构建时的配置,有效值为 gloo 和 nccl。默认情况下使用与全局组相同的后端。此字段应作为小写字符串提供(例如,"gloo"),也可以通过 Backend 属性访问(例如,Backend.GLOO)。如果传入 None,将使用对应于默认进程组的后端。默认值为 None。pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构造特定进程组期间需要传入哪些附加选项。例如,对于 nccl 后端,可以指定 is_high_priority_stream,以便进程组可以使用高优先级的 CUDA 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tuse_local_synchronization (bool, 可选):在进程组创建结束时执行组内局部屏障。不同之处在于,非成员 rank 无需调用 API,也不加入该屏障。group_desc (str, 可选) – 用于描述进程组的字符串。device_id (torch.device, 可选) – 要将此进程“绑定”到的单个特定设备。如果提供了此字段,new_group 调用将尝试立即为该设备初始化通信后端。返回 一个分布式组的句柄,可传递给集合调用;如果当前 rank 不属于 ranks,则返回 GroupMember.NON_GROUP_MEMBER。注意:use_local_synchronization 不适用于 MPI。注意:虽然 use_local_synchronization=True 在大型集群和小型进程组中可以显著加快运行速度,但必须小心,因为它会改变集群行为,因为非成员 rank 不会加入组屏障()。注意:当每个 rank 创建多个重叠的进程组时,use_local_synchronization=True 可能导致死锁。为避免这种情况,请确保所有 rank 遵循相同的全局创建顺序。
torch.distributed.get_group_rank(group, global_rank)[source]# 将全局 rank 转换为组内 rank。global_rank 必须是 group 的一部分,否则会引发 RuntimeError。参数 group (ProcessGroup) – 用于查找相对 rank 的 ProcessGroup。global_rank (int) – 要查询的全局 rank。返回 global_rank 相对于 group 的组内 rank 返回类型 int 注意:在默认进程组上调用此函数将返回恒等映射。
torch.distributed.get_global_rank(group, group_rank)[source]# 将组内 rank 转换为全局 rank。group_rank 必须是 group 的一部分,否则会引发 RuntimeError。参数 group (ProcessGroup) – 用于从中查找全局 rank 的 ProcessGroup。group_rank (int) – 要查询的组内 rank。返回 group_rank 相对于 group 的全局 rank 返回类型 int 注意:在默认进程组上调用此函数将返回恒等映射。
torch.distributed.get_process_group_ranks(group)[source]# 获取与 group 关联的所有 rank。参数 group (Optional[ProcessGroup]) – 用于获取所有 rank 的 ProcessGroup。如果为 None,将使用默认进程组。返回 按组内 rank 排序的全局 rank 列表 返回类型 list[int]
DeviceMesh# DeviceMesh 是一种更高级别的抽象,用于管理进程组(或 NCCL 通信器)。它允许用户轻松创建节点间和节点内进程组,而无需担心如何为不同的子进程组正确设置 rank,并有助于轻松管理这些分布式进程组。可以使用 init_device_mesh() 函数创建新的 DeviceMesh,其中网格形状描述了设备拓扑。
class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, backend_override=None, _init_backend=True)[source]# DeviceMesh 表示一个设备网格,其中设备的布局可以表示为一个 n 维数组,n 维数组中的每个值是默认进程组 rank 的全局 ID。DeviceMesh 可用于设置跨集群的 N 维设备连接,并管理用于 N 维并行化的 ProcessGroups。通信可以在 DeviceMesh 的每个维度上分别进行。DeviceMesh 尊重用户已选择的设备(即,如果在 DeviceMesh 初始化之前用户调用了 torch.cuda.set_device),并且如果用户事先未设置设备,它将为当前进程选择/设置设备。请注意,手动设备选择应在 DeviceMesh 初始化之前进行。当与 DTensor API 一起使用时,DeviceMesh 也可用作上下文管理器。注意:DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/rank 上运行。因此,用户需要确保描述设备布局的网格数组在所有 rank 上保持一致。不一致的网格将导致静默挂起。参数 device_type (str) – 网格的设备类型。目前支持:“cpu”、“cuda/cuda-like”。mesh (ndarray) – 描述设备布局的多维数组或整数张量,其中的 ID
是默认进程组的全局 ID。返回表示设备布局的 DeviceMesh 对象。返回类型 DeviceMesh 以下程序以 SPMD 方式在每个进程/秩上运行。在此示例中,我们有 2 台主机,每台主机有 4 个 GPU。对 mesh 的第一维进行归约将在列 (0, 4)、.. 和 (3, 7) 之间进行归约,对 mesh 的第二维进行归约将在行 (0, 1, 2, 3) 和 (4, 5, 6, 7) 之间进行归约。示例: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # 将设备网格初始化为 (2, 4),以表示 >>> # 跨主机(第 0 维)和主机内(第 1 维)的拓扑结构。 >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source]# 从现有的 ProcessGroup 或现有 ProcessGroup 列表构造具有指定 device_type 的 DeviceMesh。构造的设备网格的维度数等于传入的组数。例如,如果传入单个进程组,则生成的 DeviceMesh 是一维网格。如果传入包含 2 个进程组的列表,则生成的 DeviceMesh 是二维网格。如果传入多个组,则必须提供 mesh 和 mesh_dim_names 参数。传入的进程组的顺序决定了网格的拓扑结构。例如,第一个进程组将是 DeviceMesh 的第 0 维。传入的 mesh 张量的维度数必须与传入的进程组数量相同,并且 mesh 张量中维度的顺序必须与传入的进程组中的顺序匹配。参数 group (ProcessGroup 或 list[ProcessGroup]) – 现有的 ProcessGroup 或现有 ProcessGroups 列表。 device_type (str) – 网格的设备类型。目前支持:“cpu”、“cuda/cuda-like”。不允许传入带有 GPU 索引的设备类型,例如“cuda:0”。 mesh (torch.Tensor 或 ArrayLike, 可选) – 描述设备布局的多维数组或整数张量,其中 ID 是默认进程组的全局 ID。默认为 None。 mesh_dim_names (tuple[str], 可选) – 要分配给描述设备布局的多维数组每个维度的网格维度名称元组。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须是唯一的。默认为 None。返回 表示设备布局的 DeviceMesh 对象。返回类型 DeviceMesh get_all_groups()[source]# 返回所有网格维度的 ProcessGroups 列表。返回 ProcessGroup 对象列表。返回类型 list[torch.distributed.distributed_c10d.ProcessGroup] get_coordinate()[source]# 返回此秩相对于网格所有维度的相对索引。如果此秩不属于网格,则返回 None。返回类型 Optional[list[int]] get_group(mesh_dim=None)[source]# 返回由 mesh_dim 指定的单个 ProcessGroup,或者,如果未指定 mesh_dim 且 DeviceMesh 是一维的,则返回网格中唯一的 ProcessGroup。参数 mesh_dim (str/python:int, 可选) – 可以是网格维度的名称或索引 None. (网格维度的。默认为) – 返回 ProcessGroup 对象。返回类型 ProcessGroup get_local_rank(mesh_dim=None)[source]# 返回 DeviceMesh 给定 mesh_dim 的本地秩。参数 mesh_dim (str/python:int, 可选) – 可以是网格维度的名称或索引 None. (网格维度的。默认为) – 返回 表示本地秩的整数。返回类型 int 以下程序以 SPMD 方式在每个进程/秩上运行。在此示例中,我们有 2 台主机,每台主机有 4 个 GPU。在秩 0、1、2、3 上调用 mesh_2d.get_local_rank(mesh_dim=0) 将返回 0。在秩 4、5、6、7 上调用 mesh_2d.get_local_rank(mesh_dim=0) 将返回 1。在秩 0、4 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 0。在秩 1、5 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 1。在秩 2、6 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 2。在秩 3、7 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 3。示例: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # 将设备网格初始化为 (2, 4),以表示 >>> # 跨主机(第 0 维)和主机内(第 1 维)的拓扑结构。 >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) get_rank()[source]# 返回当前全局秩。返回类型 int 点对点通信# torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source]# 同步发送张量。警告 NCCL 后端不支持 tag。参数 tensor (Tensor) – 要发送的张量。 dst (int) – 全局进程组上的目标秩(无论 group 参数如何)。目标秩不应与当前进程的秩相同。 group (ProcessGroup, 可选) – 要工作的进程组。如果为 None,将使用默认进程组。 tag (int, 可选) – 用于与远程 recv 匹配的标签 group_dst (int, 可选) – 组上的目标秩。同时指定 dst 和 group_dst 无效。 torch.distributed.recv(tensor, src=None, group=None, tag=0,
group_src=None)[source]# 同步接收张量。警告:NCCL 后端不支持 tag 参数。
参数
- tensor (Tensor) – 用于填充接收数据的张量。
- src (int, 可选) – 全局进程组中的源秩(不受
group参数影响)。如果未指定,将从任何进程接收。 - group (ProcessGroup, 可选) – 要工作的进程组。如果为 None,将使用默认进程组。
- tag (int, 可选) – 用于与远程发送匹配的标签。
- group_src (int, 可选) – 组内的目标秩。同时指定
src和group_src是无效的。
返回
发送者秩。如果不属于该组,则返回 -1。
返回类型
int
isend() 和 irecv() 在使用时返回分布式请求对象。通常,这些对象的类型未指定,因为它们绝不应手动创建,但它们保证支持以下两种方法:
is_completed()- 如果操作已完成,则返回 Truewait()- 阻塞进程直到操作完成。
一旦 is_completed() 返回,它保证返回 True。
torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source]# 异步发送张量。
警告 在请求完成之前修改 tensor 会导致未定义行为。
警告 NCCL 后端不支持 tag 参数。
与阻塞式的 send 不同,isend 允许 src == dst 秩,即向自身发送。
参数
- tensor (Tensor) – 要发送的张量。
- dst (int) – 全局进程组中的目标秩(不受
group参数影响)。 - group (ProcessGroup, 可选) – 要工作的进程组。如果为 None,将使用默认进程组。
- tag (int, 可选) – 用于与远程接收匹配的标签。
- group_dst (int, 可选) – 组内的目标秩。同时指定
dst和group_dst是无效的。
返回
一个分布式请求对象。如果不属于该组,则为 None。
返回类型
Optional[Work]
torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source]# 异步接收张量。
警告 NCCL 后端不支持 tag 参数。
与阻塞式的 recv 不同,irecv 允许 src == dst 秩,即从自身接收。
参数
- tensor (Tensor) – 用于填充接收数据的张量。
- src (int, 可选) – 全局进程组中的源秩(不受
group参数影响)。如果未指定,将从任何进程接收。 - group (ProcessGroup, 可选) – 要工作的进程组。如果为 None,将使用默认进程组。
- tag (int, 可选) – 用于与远程发送匹配的标签。
- group_src (int, 可选) – 组内的目标秩。同时指定
src和group_src是无效的。
返回
一个分布式请求对象。如果不属于该组,则为 None。
返回类型
Optional[Work]
torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None, use_batch=False)[source]# 同步发送 object_list 中的可 pickle 对象。
类似于 send(),但可以传入 Python 对象。请注意,object_list 中的所有对象必须是可 pickle 的才能被发送。
参数
- object_list (List[Any]) – 要发送的输入对象列表。每个对象必须是可 pickle 的。接收方必须提供大小相等的列表。
- dst (int) – 发送
object_list的目标秩。目标秩基于全局进程组(不受group参数影响)。 - group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要工作的进程组。如果为 None,将使用默认进程组。默认为 None。
- device (torch.device, 可选) – 如果不为 None,对象将被序列化并转换为张量,在发送前移动到该设备。默认为 None。
- group_dst (int, 可选) – 组内的目标秩。必须指定
dst和group_dst中的一个,但不能同时指定两者。 - use_batch (bool, 可选) – 如果为 True,使用批量点对点操作而不是常规发送操作。这避免了初始化 2 秩通信器,并使用现有的整个组通信器。有关用法和假设,请参阅
batch_isend_irecv。默认为 False。
返回
None。
注意 对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()给出,用户有责任确保通过torch.cuda.set_device()设置此设备,以便每个秩拥有独立的 GPU。
警告 对象集合操作存在许多严重的性能和可扩展性限制。详情请参阅 [Object collectives](Object collectives)。
警告
send_object_list()隐式使用 pickle 模块,已知其不安全。有可能构造恶意的 pickle 数据,在反序列化期间执行任意代码。仅在使用受信任的数据时调用此函数。
警告 使用 GPU 张量调用
send_object_list()的支持不佳且效率低下,因为张量会被 pickle,从而导致 GPU -> CPU 传输。请考虑改用send()。
示例::
>>> # 注意:省略了每个秩上的进程组初始化。
>>> import torch.distributed as dist
>>> # 假设后端不是 NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>> # 假设 world_size 为 2。
>>> objects = ["foo", 12, {1: 2}] # 任何可 pickle 的对象
>>>
dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None, use_batch=False)[source]# 同步接收 object_list 中的可序列化对象。与 recv() 类似,但可以接收 Python 对象。参数 object_list (List[Any]) – 用于接收对象的列表。必须提供一个大小等于待发送列表大小的列表。 src (int, optional) – 从中接收 object_list 的源秩。源秩基于全局进程组(无论 group 参数如何)。如果设置为 None,将从任意秩接收。默认为 None。 group (Optional[ProcessGroup]) – (ProcessGroup, optional):要使用的进程组。如果为 None,将使用默认进程组。默认为 None。 device (torch.device, optional) – 如果不为 None,则在此设备上接收。默认为 None。 group_src (int, optional) – 组内的目标秩。同时指定 src 和 group_src 是无效的。 use_batch (bool, optional) – 如果为 True,则使用批量点对点操作而不是常规发送操作。这避免了初始化双秩通信器,并使用现有的整个组通信器。有关用法和假设,请参阅 batch_isend_irecv。默认为 False。返回 发送者秩。如果该秩不属于该组,则返回 -1。如果该秩属于该组,object_list 将包含来自 src 秩的已发送对象。注意 对于基于 NCCL 的进程组,在通信发生之前,必须将对象的内部张量表示移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任通过 torch.cuda.set_device() 确保每个秩都有一个独立的 GPU。警告 对象集合操作存在许多严重的性能和可扩展性限制。有关详细信息,请参阅对象集合操作。警告 recv_object_list() 隐式使用 pickle 模块,众所周知这是不安全的。可以构造恶意的 pickle 数据,这些数据在反序列化期间会执行任意代码。仅在使用受信任的数据时调用此函数。警告 使用 GPU 张量调用 recv_object_list() 的支持不佳且效率低下,因为它会导致 GPU -> CPU 传输,因为张量会被序列化。请考虑改用 recv()。示例::>>> # 注意:省略了每个秩上的进程组初始化。 >>> import torch.distributed as dist >>> # 假设后端不是 NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # 假设 world_size 为 2。 >>> objects = ["foo", 12, {1: 2}] # 任何可序列化的对象 >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.batch_isend_irecv(p2p_op_list)[source]# 异步发送或接收一批张量,并返回请求列表。处理 p2p_op_list 中的每个操作,并返回相应的请求。目前支持 NCCL、Gloo 和 UCC 后端。参数 p2p_op_list (list[torch.distributed.distributed_c10d.P2POp]) – 点对点操作列表(每个操作的类型为 torch.distributed.P2POp)。列表中 isend/irecv 的顺序很重要,并且需要与远程端相应的 isend/irecv 匹配。返回 通过调用 op_list 中相应操作返回的分布式请求对象列表。返回类型 list[torch.distributed.distributed_c10d.Work] 示例 >>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size) >>> recv_op = dist.P2POp( ... dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size ... ) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1 注意 请注意,当将此 API 与 NCCL PG 后端一起使用时,用户必须使用 torch.cuda.set_device 设置当前 GPU 设备,否则会导致意外的挂起问题。此外,如果此 API 是传递给 dist.P2POp 的组中的第一个集合调用,则该组的所有秩都必须参与此 API 调用;否则,行为未定义。如果此 API 调用不是组中的第一个集合调用,则允许仅涉及组中部分秩的批量点对点操作。 class torch.distributed.P2POp(op, tensor, peer=None, group=None, tag=0, group_peer=None)[source]# 一个用于构建 batch_isend_irecv 点对点操作的类。此类构建 P2P 操作的类型、通信缓冲区、对等秩、进程组和标签。此类的实例将传递给 batch_isend_irecv 以进行点对点通信。参数 op (Callable) – 向对等进程发送数据或从对等进程接收数据的函数。op 的类型为 torch.distributed.isend 或 torch.distributed.irecv。 tensor (Tensor) – 要发送或接收的张量。 peer (int,
optional) – 目标或源秩。group (ProcessGroup, optional) – 要使用的进程组。如果为 None,将使用默认进程组。tag (int, optional) – 用于匹配发送与接收的标签。group_peer (int, optional) – 目标或源秩。同步和异步集体操作# 每个集体操作函数都支持以下两种操作,具体取决于传入集体操作的 async_op 标志的设置:同步操作 - 默认模式,当 async_op 设置为 False 时。当函数返回时,保证集体操作已执行。对于 CUDA 操作,不保证 CUDA 操作已完成,因为 CUDA 操作是异步的。对于 CPU 集体操作,任何利用集体调用输出的后续函数调用都将按预期行为。对于 CUDA 集体操作,在同一 CUDA 流上利用输出的函数调用将按预期行为。用户必须在不同流下运行的场景中注意同步。有关 CUDA 语义(如流同步)的详细信息,请参阅 CUDA 语义。请参阅下面的脚本,以查看 CPU 和 CUDA 操作在这些语义上的差异示例。异步操作 - 当 async_op 设置为 True 时。集体操作函数返回一个分布式请求对象。通常,您不需要手动创建它,并且保证支持以下两个方法:is_completed() - 对于 CPU 集体操作,如果完成则返回 True。对于 CUDA 操作,如果操作已成功入队到 CUDA 流并且可以在默认流上使用输出而无需进一步同步,则返回 True。wait() - 对于 CPU 集体操作,将阻塞进程直到操作完成。对于 CUDA 集体操作,将阻塞当前活动的 CUDA 流直到操作完成(但不会阻塞 CPU)。get_future() - 返回 torch.C.Future 对象。支持 NCCL,也支持 GLOO 和 MPI 上的大多数操作,点对点操作除外。注意:随着我们继续采用 Futures 并合并 API,get_future() 调用可能会变得多余。示例 以下代码可以作为使用分布式集体操作时 CUDA 操作语义的参考。它展示了在不同 CUDA 流上使用集体输出时显式同步的必要性: # 代码在每个秩上运行。 dist.init_process_group("nccl", rank=rank, world_size=2) output = torch.tensor([rank]).cuda(rank) s = torch.cuda.Stream() handle = dist.all_reduce(output, async_op=True) # Wait 确保操作已入队,但不一定已完成。 handle.wait() # 在非默认流上使用结果。 with torch.cuda.stream(s): s.wait_stream(torch.cuda.default_stream()) output.add(100) if rank == 0: # 如果省略了对 wait_stream 的显式调用,下面的输出将是 # 非确定性的 1 或 101,具体取决于 allreduce 是否在 add 完成后覆盖了该值。 print(output) 集体函数# torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[source]# 将张量广播到整个组。参与集体的所有进程中的 tensor 必须具有相同数量的元素。参数 tensor (Tensor) – 如果 src 是当前进程的秩,则为要发送的数据;否则为用于保存接收数据的张量。src (int) – 全局进程组上的源秩(无论 group 参数如何)。group (ProcessGroup, optional) – 要使用的进程组。如果为 None,将使用默认进程组。async_op (bool, optional) – 此操作是否应为异步操作 group_src (int) – 组上的源秩。必须指定 group_src 和 src 中的一个,但不能同时指定两者。返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[source]# 将 object_list 中可序列化的对象广播到整个组。类似于 broadcast(),但可以传入 Python 对象。请注意,object_list 中的所有对象必须是可序列化的才能被广播。参数 object_list (List[Any]) – 要广播的输入对象列表。每个对象必须是可序列化的。只有 src 秩上的对象会被广播,但每个秩必须提供大小相等的列表。src (int) – 广播 object_list 的源秩。源秩基于全局进程组(无论 group 参数如何) group (Optional[ProcessGroup]) – (ProcessGroup, optional):要使用的进程组。如果为 None,将使用默认进程组。默认为 None。device (torch.device, optional) – 如果不为 None,对象将被序列化并转换为张量,在广播之前移动到该设备。默认为 None。group_src (int) – 组上的源秩。不得同时指定 group_src 和 src 中的一个以上。返回 None。如果秩属于该组,object_list 将包含来自 src 秩的广播对象。注意 对于基于 NCCL 的进程组,内部张量
在通信发生之前,必须将对象的表示移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任通过 torch.cuda.set_device() 确保此设置正确,以便每个 rank 拥有独立的 GPU。
注意,此 API 与 broadcast() 集体操作略有不同,因为它不提供 async_op 句柄,因此是一个阻塞调用。
对象集体操作存在许多严重的性能和可扩展性限制。详情请参阅 Object collectives。
broadcast_object_list() 隐式使用 pickle 模块,众所周知该模块是不安全的。可以构造恶意的 pickle 数据,在反序列化期间执行任意代码。请仅在使用受信任的数据时调用此函数。
使用 GPU 张量调用 broadcast_object_list() 的支持不佳且效率低下,因为张量会被 pickle 序列化,从而导致 GPU -> CPU 传输。请考虑改用 broadcast()。
示例:
>>> # 注意:省略了每个 rank 上的进程组初始化。
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>> # 假设 world_size 为 3。
>>> objects = ["foo", 12, {1: 2}] # 任何可 pickle 的对象
>>> else:
>>> objects = [None, None, None]
>>> # 假设后端不是 NCCL
>>> device = torch.device("cpu")
>>> dist.broadcast_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]
以所有机器都获得最终结果的方式归约张量数据。调用后,所有进程中的 tensor 将在位级别上完全相同。支持复数张量。
参数
- tensor (Tensor) – 集体操作的输入和输出。该函数就地操作。
- op (可选) –
torch.distributed.ReduceOp枚举中的一个值。指定用于逐元素归约的操作。 - group (ProcessGroup, 可选) – 要工作的进程组。如果为
None,将使用默认进程组。 - async_op (bool, 可选) – 此操作是否应为异步操作
返回
如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None。
示例
>>> # 下面的所有张量都是 torch.int64 类型。
>>> # 我们有 2 个进程组,2 个 ranks。
>>> device = torch.device(f"cuda:{rank}")
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4, 6], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # 下面的所有张量都是 torch.cfloat 类型。
>>> # 我们有 2 个进程组,2 个 ranks。
>>> tensor = torch.tensor(
... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device
... ) + 2 * rank * (1 + 1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[source]
跨所有机器归约张量数据。只有 rank 为 dst 的进程才会接收最终结果。
参数
- tensor (Tensor) – 集体操作的输入和输出。该函数就地操作。
- dst (int) – 全局进程组上的目标 rank(无论
group参数如何) - op (可选) –
torch.distributed.ReduceOp枚举中的一个值。指定用于逐元素归约的操作。 - group (ProcessGroup, 可选) – 要工作的进程组。如果为
None,将使用默认进程组。 - async_op (bool, 可选) – 此操作是否应为异步操作
- group_dst (int) – 组内的目标 rank。必须指定
group_dst和dst中的一个,但不能同时指定两者。
返回
如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None。
torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]
从整个组中收集张量到一个列表中。支持复数和大小不均的张量。
参数
- tensor_list (list[Tensor]) – 输出列表。它应包含正确大小的张量,用于集体操作的输出。支持大小不均的张量。
- tensor (Tensor) – 要从当前进程广播的张量。
- group (ProcessGroup, 可选) – 要工作的进程组。如果为
None,将使用默认进程组。 - async_op (bool, 可选) – 此操作是否应为异步操作
返回
如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None。
示例
>>> # 下面的所有张量都是 torch.int64 dtype。
>>> # 我们有 2 个进程组,2 个 ranks。
>>> device = torch.device(f"cuda:{rank}")
>>> tensor_list = [
... torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)
... ]
>>> tensor_list
[tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0
[tensor([0, 0],
device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1 >>> # 以下所有张量均为 torch.cfloat 数据类型。 >>> # 我们有 2 个进程组,2 个秩。 >>> tensor_list = [ ... torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1 torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source]# 从所有秩收集张量并将它们放入单个输出张量中。此函数要求每个进程上的所有张量大小相同。 参数 output_tensor (Tensor) – 输出张量,用于容纳来自所有秩的张量元素。其大小必须正确,具有以下形式之一:(i) 沿主维度连接所有输入张量;关于“连接”的定义,请参阅 torch.cat();(ii) 沿主维度堆叠所有输入张量;关于“堆叠”的定义,请参阅 torch.stack()。下面的示例可以更好地解释支持的输出形式。 input_tensor (Tensor) – 要从当前秩收集的张量。与 all_gather API 不同,此 API 中的输入张量在所有秩上必须具有相同的大小。 group (ProcessGroup, 可选) – 要工作的进程组。如果为 None,将使用默认进程组。 async_op (bool, 可选) – 此操作是否应为异步操作 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None。 示例 >>> # 以下所有张量均为 torch.int64 数据类型且位于 CUDA 设备上。 >>> # 我们有两个秩。 >>> device = torch.device(f"cuda:{rank}") >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # 输出为连接形式 >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # 输出为堆叠形式 >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1 torch.distributed.all_gather_object(object_list, obj, group=None)[source]# 将整个组中的可 pickle 对象收集到一个列表中。类似于 all_gather(),但可以传入 Python 对象。请注意,对象必须是可 pickle 的才能被收集。 参数 object_list (list[Any]) – 输出列表。其大小应正确设置为该集体通信的组大小,并将包含输出。 obj (Any) – 要从当前进程广播的可 pickle Python 对象。 group (ProcessGroup, 可选) – 要工作的进程组。如果为 None,将使用默认进程组。默认为 None。 返回 None。如果调用秩属于此组,则集体通信的输出将填充到输入的 object_list 中。如果调用秩不属于该组,则传入的 object_list 将保持不变。 注意 请注意,此 API 与 all_gather() 集体通信略有不同,因为它不提供 async_op 句柄,因此将是阻塞调用。 注意 对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任确保通过 torch.cuda.set_device() 设置此设备,以便每个秩拥有独立的 GPU。 警告 对象集体通信在性能和可扩展性方面存在许多严重限制。有关详细信息,请参阅对象集体通信。 警告 all_gather_object() 隐式使用 pickle 模块,已知该模块不安全。有可能构造恶意的 pickle 数据,这些数据在反序列化期间会执行任意代码。仅在使用可信数据时调用此函数。 警告 使用 GPU 张量调用 all_gather_object() 的支持不佳且效率低下,因为它会产生 GPU -> CPU
传输,因为张量会被 pickle 序列化。请考虑改用 all_gather()。示例::
>>> # 注意:每个 rank 上的进程组初始化已省略。
>>> import torch.distributed as dist
>>> # 假设 world_size 为 3。
>>> gather_objects = ["foo", 12, {1: 2}] # 任何可 picklable 的对象
>>> output = [None for _ in gather_objects]
>>> dist.all_gather_object(output, gather_objects[dist.get_rank()])
>>> output
['foo', 12, {1: 2}]
torch.distributed.gather(tensor, gather_list=None, dst=None, group=None, async_op=False, group_dst=None)[source]#
将张量列表收集到单个进程中。此函数要求每个进程上的所有张量大小相同。
参数
- tensor (Tensor) – 输入张量。
- gather_list (list[Tensor], 可选) – 用于存储收集数据的适当且大小相同的张量列表(默认为 None,必须在目标 rank 上指定)
- dst (int, 可选) – 全局进程组中的目标 rank(无论
group参数如何)。(如果dst和group_dst均为 None,则默认为全局 rank 0) - group (ProcessGroup, 可选) – 要工作的进程组。如果为 None,将使用默认进程组。
- async_op (bool, 可选) – 此操作是否应为异步操作
- group_dst (int, 可选) – 组内的目标 rank。同时指定
dst和group_dst是无效的
返回
如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None。
注意
请注意,gather_list 中的所有张量必须具有相同的大小。
示例::
>>> # 我们有 2 个进程组,2 个 ranks。
>>> tensor_size = 2
>>> device = torch.device(f'cuda:{rank}')
>>> tensor = torch.ones(tensor_size, device=device) + rank
>>> if dist.get_rank() == 0:
>>> gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)]
>>> else:
>>> gather_list = None
>>> dist.gather(tensor, gather_list, dst=0)
>>> # Rank 0 获取收集的数据。
>>> gather_list
[tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0
None # Rank 1
torch.distributed.gather_object(obj, object_gather_list=None, dst=None, group=None, group_dst=None)[source]#
在单个进程中从整个组收集可 picklable 的对象。类似于 gather(),但可以传入 Python 对象。请注意,对象必须是可 picklable 的才能被收集。
参数
- obj (Any) – 输入对象。必须是可 picklable 的。
- object_gather_list (list[Any]) – 输出列表。在
dstrank 上,它的大小应正确设置为该集体操作的组大小,并将包含输出。在非dstranks 上必须为 None。(默认为 None) - dst (int, 可选) – 全局进程组中的目标 rank(无论
group参数如何)。(如果dst和group_dst均为 None,则默认为全局 rank 0) - group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要工作的进程组。如果为 None,将使用默认进程组。默认为 None。
- group_dst (int, 可选) – 组内的目标 rank。同时指定
dst和group_dst是无效的
返回
None。在 dst rank 上,object_gather_list 将包含集体操作的输出。
注意
请注意,此 API 与 gather 集体操作略有不同,因为它不提供 async_op 句柄,因此将是阻塞调用。
注意
对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任确保通过 torch.cuda.set_device() 设置此设备,以便每个 rank 拥有独立的 GPU。
警告
对象集体操作存在许多严重的性能和可扩展性限制。详见 Object collectives。
警告
gather_object() 隐式使用 pickle 模块,已知其不安全。可以构造恶意的 pickle 数据,在反序列化期间执行任意代码。仅在使用可信数据时调用此函数。
警告
使用 GPU 张量调用 gather_object() 的支持不佳且效率低下,因为张量会被 pickle 序列化,从而导致 GPU -> CPU 传输。请考虑改用 gather()。
示例::
>>> # 注意:每个 rank 上的进程组初始化已省略。
>>> import torch.distributed as dist
>>> # 假设 world_size 为 3。
>>> gather_objects = ["foo", 12, {1: 2}] # 任何可 picklable 的对象
>>> output = [None for _ in gather_objects]
>>> dist.gather_object(
... gather_objects[dist.get_rank()],
... output if dist.get_rank() == 0 else None,
... dst=0
... )
>>> # 在 rank 0 上
>>> output
['foo', 12, {1: 2}]
torch.distributed.scatter(tensor, scatter_list=None, src=None, group=None, async_op=False, group_src=None)[source]#
将张量列表分散到组中的所有进程。每个进程将恰好接收一个张量,并将其数据存储在 tensor 参数中。支持复数张量。
参数
- tensor (Tensor) – 输出张量。
- scatter_list (list[Tensor]) – 要分散的张量列表(默认为 None,必须在源 rank 上指定)
- src (int) – 全局进程组中的源 rank(无论
group参数如何)。(如果src和group_src均为 None,则默认为全局 rank 0) - group (ProcessGroup, 可选) – The
要工作的进程组。如果为 None,将使用默认进程组。 async_op (bool, 可选) – 此操作是否应为异步操作 group_src (int, 可选) – 组内的源秩。同时指定 src 和 group_src 是无效的
返回
如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None。
注意
请注意,scatter_list 中的所有 Tensor 必须具有相同的大小。
示例::
>>> # 注意:省略了每个秩上的进程组初始化。
>>> import torch.distributed as dist
>>> tensor_size = 2
>>> device = torch.device(f'cuda:{rank}')
>>> output_tensor = torch.zeros(tensor_size, device=device)
>>> if dist.get_rank() == 0:
>>> # 假设 world_size 为 2。
>>> # 仅张量,且所有张量必须大小相同。
>>> t_ones = torch.ones(tensor_size, device=device)
>>> t_fives = torch.ones(tensor_size, device=device) * 5
>>> scatter_list = [t_ones, t_fives]
>>> else:
>>> scatter_list = None
>>> dist.scatter(output_tensor, scatter_list, src=0)
>>> # 秩 i 获取 scatter_list[i]。
>>> output_tensor
tensor([1., 1.], device='cuda:0') # 秩 0
tensor([5., 5.], device='cuda:1') # 秩 1
torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list=None, src=None, group=None, group_src=None)[源代码]#
将 scatter_object_input_list 中可序列化的对象散射(scatter)到整个组。与 scatter() 类似,但可以传入 Python 对象。在每个秩上,散射后的对象将存储为 scatter_object_output_list 的第一个元素。请注意,scatter_object_input_list 中的所有对象必须是可序列化的(picklable),才能进行散射。
参数
- scatter_object_output_list (List[Any]) – 非空列表,其第一个元素将存储散射到此秩的对象。
- scatter_object_input_list (List[Any], 可选) – 要散射的输入对象列表。每个对象必须是可序列化的。只有源秩(src rank)上的对象会被散射,对于非源秩,该参数可以为 None。
- src (int) – 从中散射
scatter_object_input_list的源秩。源秩基于全局进程组(无论group参数如何)。(如果src和group_src均为 None,则默认为全局秩 0) - group (Optional[ProcessGroup]) – (ProcessGroup, 可选):要工作的进程组。如果为 None,将使用默认进程组。默认为 None。
- group_src (int, 可选) – 组内的源秩。同时指定
src和group_src是无效的
返回
None。如果秩属于该组,scatter_object_output_list 的第一个元素将被设置为此秩散射得到的对象。
注意
请注意,此 API 与 scatter 集合通信略有不同,因为它不提供 async_op 句柄,因此将是阻塞调用。
警告
对象集合通信在性能和可扩展性方面存在许多严重限制。详见 [对象集合通信](Object collectives)。
警告
scatter_object_list() 隐式使用 pickle 模块,众所周知这是不安全的。有可能构造恶意的 pickle 数据,在反序列化期间执行任意代码。请仅在使用可信数据时调用此函数。
警告
使用 GPU 张量调用 scatter_object_list() 的支持不佳且效率低下,因为张量会被序列化,从而导致 GPU -> CPU 传输。请考虑改用 scatter()。
示例::
>>> # 注意:省略了每个秩上的进程组初始化。
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>> # 假设 world_size 为 3。
>>> objects = ["foo", 12, {1: 2}] # 任何可序列化的对象
>>> else:
>>> # 在非源秩上可以是任何列表,元素不会被使用。
>>> objects = [None, None, None]
>>> output_list = [None]
>>> dist.scatter_object_list(output_list, objects, src=0)
>>> # 秩 i 获取 objects[i]。例如,在秩 2 上:
>>> output_list
[{1: 2}]
torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代码]#
归约(reduce),然后将张量列表散射(scatter)到组中的所有进程。
参数
- output (Tensor) – 输出张量。
- input_list (list[Tensor]) – 要归约和散射的张量列表。
- op (可选) –
torch.distributed.ReduceOp枚举中的值之一。指定用于逐元素归约的操作。 - group (ProcessGroup, 可选) – 要工作的进程组。如果为 None,将使用默认进程组。
- async_op (bool, 可选) – 此操作是否应为异步操作。
返回
如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None。
torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代码]#
归约(reduce),然后将张量散射(scatter)到组中的所有秩。
参数
- output (Tensor) – 输出张量。它在所有秩上应具有相同的大小。
- input (Tensor) – 要归约和散射的输入张量。其大小应为输出张量大小乘以 world size。输入张量可以具有以下形状之一:(i) 沿主维度连接输出张量,或 (ii) 沿主维度堆叠输出张量。关于“连接”的定义,参见
torch.cat()。关于“堆叠”的定义,参见torch.stack()。 - group (ProcessGroup,
optional) – 要操作进程组。如果为 None,将使用默认进程组。 async_op (bool, optional) – 此操作是否应为异步操作。 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None。 示例 >>> # 以下所有张量均为 torch.int64 类型且位于 CUDA 设备上。 >>> # 我们有两个 rank。 >>> device = torch.device(f"cuda:{rank}") >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) >>> # 输入为拼接形式 >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) >>> tensor_in tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # 输入为堆叠形式 >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) >>> tensor_in tensor([[0, 1], [2, 3]], device='cuda:0') # Rank 0 tensor([[0, 1], [2, 3]], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source]# 分割输入张量,然后将分割后的列表散播(scatter)到组中的所有进程。随后,将来自组中所有进程的接收张量拼接起来,并作为单个输出张量返回。支持复数张量。 参数 output (Tensor) – 收集并拼接后的输出张量。 input (Tensor) – 要散播的输入张量。 output_split_sizes – (list[Int], optional):第 0 维的输出分割大小。如果指定为 None 或为空,则输出张量的第 0 维必须能被 world_size 整除。 input_split_sizes – (list[Int], optional):第 0 维的输入分割大小。如果指定为 None 或为空,则输入张量的第 0 维必须能被 world_size 整除。 group (ProcessGroup, optional) – 要操作的进程组。如果为 None,将使用默认进程组。 async_op (bool, optional) – 此操作是否应为异步操作。 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None。 警告 all_to_all_single 是实验性的,可能会发生变化。 示例 >>> input = torch.arange(4) + rank * 4 >>> input tensor([0, 1, 2, 3]) # Rank 0 tensor([4, 5, 6, 7]) # Rank 1 tensor([8, 9, 10, 11]) # Rank 2 tensor([12, 13, 14, 15]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([0, 4, 8, 12]) # Rank 0 tensor([1, 5, 9, 13]) # Rank 1 tensor([2, 6, 10, 14]) # Rank 2 tensor([3, 7, 11, 15]) # Rank 3 >>> # 本质上,它类似于以下操作: >>> scatter_list = list(input.chunk(world_size)) >>> gather_list = list(output.chunk(world_size)) >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i) >>> # 另一个不均匀分割的示例 >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> output = ... >>> dist.all_to_all_single(output, input, output_splits, input_splits) >>> output tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0 tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1 tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2 tensor([ 5, 17, 18, 24, 36]) # Rank 3 >>> # 另一个使用 torch.cfloat 类型张量的示例。 >>> input = torch.tensor( ... [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat ... ) + 4 * rank * (1 + 1j) >>> input tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0 tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1 tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2 tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0 tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1 tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2 tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3 torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source]# 将输入张量列表散播到组中的所有进程,并在输出列表中返回收集的张量列表。支持复数张量。 参数 output_tensor_list (list[Tensor]) – 要收集的张量列表,每个 rank 一个。 input_tensor_list (list[Tensor]) – 要散播的张量列表,每个 rank 一个。 group (ProcessGroup, optional) – 要操作的进程组。如果为 None,将使用默认进程组。 async_op (bool, optional) – 此操作是否应为异步操作。 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是异步操作或不属于该组,则返回 None。 警告 all_to_all 是实验性的,可能会发生变化。 示例 >>> input = torch.arange(4) + rank * 4 >>> input =
list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3 >>> # 本质上,它类似于以下操作: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i) >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3 >>> # 另一个使用 torch.cfloat 类型张量的示例。 >>> input = torch.tensor( ... [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat ... ) + 4 * rank * (1 + 1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3 torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source]# 同步所有进程。如果 async_op 为 False,或者在 wait() 上调用了异步工作句柄,此集体操作将阻塞进程,直到整个组进入此函数。 参数 group (ProcessGroup, 可选) – 要工作的进程组。如果为 None,将使用默认进程组。 async_op (bool, 可选) – 此操作是否应为异步操作 device_ids ([int], 可选) – 设备/GPU id 列表。仅期望一个 id。 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是 async_op 或不属于该组,则返回 None 注意 ProcessGroupNCCL 现在会阻塞 CPU 线程,直到 barrier 集体操作完成。 注意 ProcessGroupNCCL 将 barrier 实现为单元素张量的 all_reduce。必须选择一个设备来分配此张量。设备选择按以下顺序检查确定:(1) 如果 barrier 的 device_ids 参数不为 None,则使用传入的第一个设备;(2) 如果 init_process_group 传入的设备不为 None,则使用该设备;(3) 如果已执行过其他带有张量输入的集体操作,则使用与此进程组首次一起使用的设备;(4) 由全局 rank 对本地设备数量取模指示的设备索引。 torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source]# 类似于 torch.distributed.barrier 同步进程,但考虑可配置的超时。它能够报告在提供的超时时间内未通过此 barrier 的 rank。具体来说,对于非零 rank,将阻塞直到处理来自 rank 0 的发送/接收。Rank 0 将阻塞直到处理完来自其他所有 rank 的发送/接收,并将报告未能及时响应的 rank 的故障。请注意,如果一个 rank 未到达 monitored_barrier(例如由于挂起),所有其他 rank 将在 monitored_barrier 中失败。此集体操作将阻塞组中的所有进程/rank,直到整个组成功退出该函数,使其适用于调试和同步。但是,它可能会影响性能,应仅用于调试或需要在主机端进行完全同步点的场景。出于调试目的,可以在应用程序的集体操作之前插入此 barrier
调用以检查是否有任何 rank 不同步。注意:此集体通信仅支持 GLOO 后端。
参数
- group (
ProcessGroup, 可选) – 要操作的处理组。如果为None,将使用默认处理组。 - timeout (
datetime.timedelta, 可选) –monitored_barrier的超时时间。如果为None,将使用默认处理组超时时间。 - wait_all_ranks (
bool, 可选) – 是否收集所有失败的 rank。默认情况下,此值为False,rank 0 上的monitored_barrier会在遇到第一个失败的 rank 时抛出异常,以便快速失败。通过设置wait_all_ranks=True,monitored_barrier将收集所有失败的 rank,并抛出一个包含所有失败 rank 信息的错误。
返回
None。
示例
>>> # 注意:省略了每个 rank 上的进程组初始化。
>>> import torch.distributed as dist
>>> if dist.get_rank() != 1:
>>> dist.monitored_barrier() # 抛出异常,表明
>>> # rank 1 未调用 monitored_barrier。
>>> # wait_all_ranks=True 的示例
>>> if dist.get_rank() == 0:
>>> dist.monitored_barrier(wait_all_ranks=True) # 抛出异常
>>> # 表明 rank 1, 2, ... world_size - 1 未调用
>>> # monitored_barrier。
class torch.distributed.Work
Work 对象表示 PyTorch 分布式包中挂起的异步操作的句柄。它由非阻塞集体通信操作返回,例如 dist.all_reduce(tensor, async_op=True)。
block_current_stream(self: torch._C._distributed_c10d.Work) → None
阻塞当前活跃的 GPU 流,直到操作完成。对于基于 GPU 的集体通信,这等同于同步。对于由 CPU 发起的集体通信(如使用 Gloo),这将阻塞 CUDA 流直到操作完成。在所有情况下,此方法都会立即返回。要检查操作是否成功,您应该异步检查 Work 对象的结果。
boxed(self: torch._C._distributed_c10d.Work) → object
exception(self: torch._C._distributed_c10d.Work) → std::__exception_ptr::exception_ptr
get_future(self: torch._C._distributed_c10d.Work) → torch.Future
返回
一个与 Work 完成相关联的 torch.futures.Future 对象。例如,可以通过 fut = process_group.allreduce(tensors).get_future() 获取 future 对象。
示例
下面是一个简单的 allreduce DDP 通信钩子示例,它使用 get_future API 检索与 allreduce 完成相关联的 Future。
>>> def allreduce(process_group: dist.ProcessGroup, bucket: dist.GradBucket): -> torch.futures.Future
>>> group_to_use = process_group if process_group is not None else torch.distributed.group.WORLD
>>> tensor = bucket.buffer().div_(group_to_use.size())
>>> return torch.distributed.all_reduce(tensor, group=group_to_use, async_op=True).get_future()
>>> ddp_model.register_comm_hook(state=None, hook=allreduce)
警告
get_futureAPI 支持 NCCL,以及部分支持 GLOO 和 MPI 后端(不支持像 send/recv 这样的点对点操作),并将返回一个torch.futures.Future。在上面的示例中,allreduce 工作将使用 NCCL 后端在 GPU 上执行,fut.wait()将在将适当的 NCCL 流与 PyTorch 的当前设备流同步后返回,以确保我们可以进行异步 CUDA 执行,并且它不会等待 GPU 上的整个操作完成。请注意,CUDAFuture不支持TORCH_NCCL_BLOCKING_WAIT标志或 NCCL 的barrier()。此外,如果通过fut.then()添加了回调函数,它将等待WorkNCCL的 NCCL 流与ProcessGroupNCCL的专用回调流同步,并在回调流上运行回调后内联调用该回调。fut.then()将返回另一个CUDAFuture,其中包含回调的返回值和记录回调流的CUDAEvent。对于 CPU 工作,当工作完成且value()张量就绪时,fut.done()返回 true。对于 GPU 工作,仅当操作已入队时,fut.done()才返回 true。对于混合 CPU-GPU 工作(例如,使用 GLOO 发送 GPU 张量),当张量到达相应节点但尚未在相应 GPU 上同步时(类似于 GPU 工作),fut.done()返回 true。
get_future_result(self: torch._C._distributed_c10d.Work) → torch.Future
返回
一个 int 类型的 torch.futures.Future 对象,映射到 WorkResult 的枚举类型。例如,可以通过 fut = process_group.allreduce(tensor).get_future_result() 获取 future 对象。
示例
用户可以使用 fut.wait() 阻塞等待工作完成,并通过 fut.value() 获取 WorkResult。此外,用户可以使用 fut.then(call_back_func) 注册一个在工作完成时调用的回调函数,而无需阻塞当前线程。
警告
get_future_resultAPI 支持 NCCL
is_completed(self: torch._C._distributed_c10d.Work) → bool
is_success(self: torch._C._distributed_c10d.Work) → bool
result(self: torch._C._distributed_c10d.Work) → list[torch.Tensor]
source_rank(self: torch._C._distributed_c10d.Work) → int
synchronize(self: torch._C._distributed_c10d.Work) → None
static unbox(arg0: object) → torch._C._distributed_c10d.Work
wait(self:
torch._C._distributed_c10d.Work, timeout: datetime.timedelta = datetime.timedelta(0)) → bool# 返回 true/false。示例:: try:work.wait(timeout) except:# 一些处理 警告 在正常情况下,用户无需设置超时。调用 wait() 与调用 synchronize() 相同:让当前流阻塞直到 NCCL 工作完成。但是,如果设置了 timeout,它将阻塞 CPU 线程,直到 NCCL 工作完成或超时。如果超时,将抛出异常。 class torch.distributed.ReduceOp# 一个类似枚举的类,用于可用的归约操作:SUM、PRODUCT、MIN、MAX、BAND、BOR、BXOR 和 PREMUL_SUM。当使用 NCCL 后端时,BAND、BOR 和 BXOR 归约不可用。AVG 在跨秩求和之前将值除以 world size。AVG 仅适用于 NCCL 后端,且仅适用于 NCCL 2.10 或更高版本。PREMUL_SUM 在归约之前在本地将输入乘以给定的标量。PREMUL_SUM 仅适用于 NCCL 后端,且仅适用于 NCCL 2.11 或更高版本。用户应使用 torch.distributed._make_nccl_premul_sum。此外,MAX、MIN 和 PRODUCT 不支持复数张量。可以通过属性访问此类的值,例如 ReduceOp.SUM。它们用于指定归约集合操作的策略,例如 reduce()。此类不支持 members 属性。 class torch.distributed.reduce_op# 已弃用的类似枚举的归约操作类:SUM、PRODUCT、MIN 和 MAX。建议改用 ReduceOp。 分布式键值存储# 分布式包附带一个分布式键值存储,可用于在组内的进程之间共享信息,以及在 torch.distributed.init_process_group() 中初始化分布式包(通过显式创建存储作为指定 init_method 的替代方案)。键值存储有 3 种选择:TCPStore、FileStore 和 HashStore。 class torch.distributed.Store# 所有存储实现的基础类,例如 PyTorch 分布式提供的 3 种存储:(TCPStore、FileStore 和 HashStore)。 init(self: torch._C._distributed_c10d.Store) → None# add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: SupportsInt) → int# 对给定键的首次 add 调用会在存储中创建一个与该键关联的计数器,并初始化为 amount。随后使用相同键调用 add 会将计数器增加指定的 amount。如果对在存储中已通过 set() 设置的键调用 add(),将导致异常。 参数 key (str) – 存储中将递增其计数器的键。 amount (int) – 计数器递增的数量。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 为例,也可以使用其他存储类型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # 应返回 7 >>> store.get("first_key") append(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) → None# 根据提供的键和值将键值对追加到存储中。如果存储中不存在该键,则会创建它。 参数 key (str) – 要追加到存储中的键。 value (str) – 要与键关联并添加到存储中的值。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.append("first_key", "po") >>> store.append("first_key", "tato") >>> # 应返回 "potato" >>> store.get("first_key") check(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) → bool# 调用此方法以检查给定的键列表是否在存储中存有值。在正常情况下,此调用会立即返回,但在某些边缘死锁情况下仍会受到影响,例如在 TCPStore 被销毁后调用 check()。调用 check() 时传入一个键列表,以检查这些键是否存储在存储中。 参数 keys (list[str]) – 要查询是否存储在存储中的键。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 为例,也可以使用其他存储类型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> # 应返回 7 >>> store.check(["first_key"]) clone(self: torch._C._distributed_c10d.Store) → torch._C._distributed_c10d.Store# 克隆存储并返回一个指向相同底层存储的新对象。返回的存储可以与原始对象并发使用。这旨在提供一种安全的方式,通过为每个线程克隆一个存储来从多个线程使用存储。 compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) → bytes# 根据提供的键将键值对插入存储中,并在插入之前执行 expected_value 和 desired_value 之间的比较。仅当键的 expected_value 已存在于存储中或 expected_value 为空时,才会设置 desired_value
string。参数 key (str) – 要在存储中检查的键。 expected_value (str) – 在插入之前要与键关联进行检查的值。 desired_value (str) – 要与键关联并添加到存储中的值。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # 应返回 "second_value" >>> store.get("key") delete_key(self: torch._C._distributed_c10d.Store, arg0: str) → bool# 从存储中删除与键关联的键值对。如果成功删除键,则返回 true;否则返回 false。 警告 delete_key API 仅由 TCPStore 和 HashStore 支持。将此 API 与 FileStore 一起使用将导致异常。 参数 key (str) – 要从存储中删除的键 返回 如果删除了键,则返回 True,否则返回 False。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 为例,也可以使用 HashStore >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # 这应返回 true >>> store.delete_key("first_key") >>> # 这应返回 false >>> store.delete_key("bad_key") get(self: torch._C._distributed_c10d.Store, arg0: str) → bytes# 检索存储中与给定键关联的值。如果存储中不存在该键,函数将在抛出异常之前等待超时时间,该超时时间在初始化存储时定义。 参数 key (str) – 函数将返回与此键关联的值。 返回 如果键在存储中,则返回与键关联的值。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # 应返回 "first_value" >>> store.get("first_key") has_extended_api(self: torch._C._distributed_c10d.Store) → bool# 如果存储支持扩展操作,则返回 true。 multi_get(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) → list[bytes]# 检索 keys 中的所有值。如果 keys 中的任何键不在存储中,函数将等待超时 参数 keys (List[str]) – 要从存储中检索的键。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "po") >>> store.set("second_key", "tato") >>> # 应返回 [b"po", b"tato"] >>> store.multi_get(["first_key", "second_key"]) multi_set(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str], arg1: collections.abc.Sequence[str]) → None# 根据提供的键和值将键值对列表插入到存储中 参数 keys (List[str]) – 要插入的键。 values (List[str]) – 要插入的值。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.multi_set(["first_key", "second_key"], ["po", "tato"]) >>> # 应返回 b"po" >>> store.get("first_key") num_keys(self: torch._C._distributed_c10d.Store) → int# 返回存储中设置的键的数量。请注意,此数字通常比通过 set() 和 add() 添加的键数多一,因为有一个键用于协调所有使用该存储的工作进程。 警告 当与 TCPStore 一起使用时,num_keys 返回写入底层文件的键的数量。如果存储被销毁并使用同一文件创建另一个存储,原始键将被保留。 返回 存储中存在的键的数量。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 为例,也可以使用其他存储类型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # 这应返回 2 >>> store.num_keys() queue_len(self: torch._C._distributed_c10d.Store, arg0: str) → int# 返回指定队列的长度。如果队列不存在,则返回 0。有关更多详细信息,请参阅 queue_push。 参数 key (str) – 要获取长度的队列的键。 queue_pop(self: torch._C._distributed_c10d.Store, key: str, block: bool = True) → bytes# 从指定队列中弹出一个值,如果队列为空,则等待直到超时。有关更多详细信息,请参阅 queue_push。如果 block 为 False,且队列为空,则将引发 dist.QueueEmptyError。 参数 key (str) – 要从中弹出值的队列的键。 block (bool) – 是否阻塞等待键或立即返回。 queue_push(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) → None# 将一个值推送到指定队列中。对队列和 set/get 操作使用相同的键可能会导致意外行为。队列支持 wait/check 操作。使用队列的 wait 操作只会唤醒一个等待的工作进程,而不是全部。 参数 key
(str) – 要推送到的队列的键。 value (str) – 要推送到队列中的值。 set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) → None# 根据提供的键和值将键值对插入到存储中。如果该键已存在于存储中,它将用新提供的值覆盖旧值。 参数 key (str) – 要添加到存储中的键。 value (str) – 与要添加到存储中的键关联的值。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # 应返回 "first_value" >>> store.get("first_key") set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) → None# 设置存储的默认超时时间。此超时时间在初始化以及 wait() 和 get() 中使用。 参数 timeout (timedelta) – 要在存储中设置的超时时间。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 为例,也可以使用其他存储类型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # 这将在 10 秒后抛出异常 >>> store.wait(["bad_key"]) property timeout# 获取存储的超时时间。 wait(*args, **kwargs)# 重载函数。 wait(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) -> None 等待 keys 中的每个键被添加到存储中。如果在超时(在存储初始化期间设置)之前未设置所有键,则 wait 将抛出异常。 参数 keys (list) – 等待直到它们在存储中被设置的键列表。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 为例,也可以使用其他存储类型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # 这将在 30 秒后抛出异常 >>> store.wait(["bad_key"]) wait(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str], arg1: datetime.timedelta) -> None 等待 keys 中的每个键被添加到存储中,如果键未在提供的超时时间内被设置,则抛出异常。 参数 keys (list) – 等待直到它们在存储中被设置的键列表。 timeout (timedelta) – 在抛出异常之前等待键被添加的时间。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 为例,也可以使用其他存储类型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # 这将在 10 秒后抛出异常 >>> store.wait(["bad_key"], timedelta(seconds=10)) class torch.distributed.TCPStore# 基于 TCP 的分布式键值存储实现。服务器存储保存数据,而客户端存储可以通过 TCP 连接到服务器存储并执行操作,例如使用 set() 插入键值对,使用 get() 检索键值对等。应该始终初始化一个服务器存储,因为客户端存储将等待服务器建立连接。 参数 host_name (str) – 服务器存储应运行的主机名或 IP 地址。 port (int) – 服务器存储应监听传入请求的端口。 world_size (int, optional) – 存储用户的总数(客户端数量 + 1 个服务器)。默认为 None(None 表示非固定数量的存储用户)。 is_master (bool, optional) – 初始化服务器存储时为 True,客户端存储时为 False。默认为 False。 timeout (timedelta, optional) – 存储在初始化期间以及用于 get() 和 wait() 等方法时使用的超时时间。默认为 timedelta(seconds=300) wait_for_workers (bool, optional) – 是否等待所有工作进程与服务器存储连接。仅当 world_size 为固定值时适用。默认为 True。 multi_tenant (bool, optional) – 如果为 True,则当前进程中具有相同主机/端口的所有 TCPStore 实例将使用相同的底层 TCPServer。默认为 False。 master_listen_fd (int, optional) – 如果指定,底层 TCPServer 将监听此文件描述符,该描述符必须是已绑定到端口的套接字。要绑定临时端口,我们建议将端口设置为 0 并读取 .port。默认为 None(意味着服务器创建一个新的套接字并尝试将其绑定到端口)。 use_libuv (bool, optional) – 如果为 True,则使用 libuv 作为 TCPServer 后端。默认为 True。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 在进程 1(服务器)上运行 >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # 在进程 2(客户端)上运行 >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # 初始化后,从客户端或服务器使用任何存储方法 >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key") init(self: torch._C._distributed_c10d.TCPStore, host_name: str, port: SupportsInt, world_size:
SupportsInt | None = None, is_master: bool = False, timeout: datetime.timedelta = datetime.timedelta(seconds=300), wait_for_workers: bool = True, multi_tenant: bool = False, master_listen_fd: SupportsInt | None = None, use_libuv: bool = True) → None# 创建一个新的 TCPStore。 property host# 获取 Store 监听请求的主机名。 property libuvBackend# 如果使用的是 libuv 后端,则返回 True。 property port# 获取 Store 监听请求的端口号。 class torch.distributed.HashStore# 一种基于底层哈希映射的线程安全 Store 实现。此 Store 可在同一进程内使用(例如,由其他线程使用),但不能跨进程使用。示例::>>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store 可以从其他线程使用 >>> # 初始化后使用任何 Store 方法 >>> store.set("first_key", "first_value") init(self: torch._C._distributed_c10d.HashStore) → None# 创建一个新的 HashStore。 class torch.distributed.FileStore# 一种使用文件存储底层键值对的 Store 实现。参数 file_name (str) – 用于存储键值对的文件路径 world_size (int, 可选) – 使用该 Store 的进程总数。默认值为 -1(负值表示 Store 用户数量不固定)。示例::>>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # 初始化后,从客户端或服务器使用任何 Store 方法 >>> store1.set("first_key", "first_value") >>> store2.get("first_key") init(self: torch._C._distributed_c10d.FileStore, file_name: str, world_size: SupportsInt = -1) → None# 创建一个新的 FileStore。 property path# 获取 FileStore 用于存储键值对的文件路径。 class torch.distributed.PrefixStore# 一个包装器,可包装任意三种键值存储(TCPStore、FileStore 和 HashStore)之一,并为插入到 Store 中的每个键添加前缀。参数 prefix (str) – 在插入 Store 之前添加到每个键的前缀字符串。 store (torch.distributed.store) – 构成底层键值存储的 Store 对象。 init(self: torch._C._distributed_c10d.PrefixStore, prefix: str, store: torch._C._distributed_c10d.Store) → None# 创建一个新的 PrefixStore。 property underlying_store# 获取 PrefixStore 所包装的底层 Store 对象。 集体通信性能分析# 注意,你可以使用 torch.profiler(推荐,仅在 1.8.1 之后可用)或 torch.autograd.profiler 来分析此处提到的集体通信和点对点通信 API。所有开箱即用的后端(gloo、nccl、mpi)均受支持,集体通信的使用情况将在性能分析输出/追踪中按预期呈现。分析你的代码与分析任何常规 torch 算子相同: import torch import torch.distributed as dist with torch.profiler(): tensor = torch.randn(20, 10) dist.all_reduce(tensor) 请参阅 profiler 文档以全面了解 profiler 功能。 多 GPU 集体函数# 警告 多 GPU 函数(指每个 CPU 线程对应多个 GPU)已弃用。截至目前,PyTorch Distributed 首选的编程模型是每个线程对应一个设备,正如本文档中的 API 所示。如果你是后端开发人员并希望支持每个线程对应多个设备,请联系 PyTorch Distributed 的维护者。 对象集体操作# 警告 对象集体操作存在一些严重的局限性。请继续阅读以确定它们是否适合你的用例安全使用。对象集体操作是一组类似集体的操作,适用于任意 Python 对象,只要这些对象可以被 pickle 序列化。实现了各种集体模式(例如 broadcast、all_gather 等),但它们大致都遵循以下模式:将输入对象转换为 pickle(原始字节),然后将其放入字节张量中;将此字节张量的大小告知对等节点(第一次集体操作);分配适当大小的张量以执行真正的集体通信;传输对象数据(第二次集体操作);将原始数据转换回 Python 对象(反 pickle)。对象集体操作有时会出现令人意外的性能或内存特性,导致运行时间过长或发生内存溢出(OOM),因此应谨慎使用。以下是一些常见问题。不对称的 pickle/unpickle 时间 - Pickle 序列化对象的速度可能较慢,具体取决于对象的数量、类型和大小。当集体操作具有扇入特性时(例如 gather_object),接收秩必须反 pickle 的对象数量是发送秩需要 pickle 的对象数量的 N 倍,这可能导致其他秩在下一次集体操作中超时。低效的张量通信 - 张量应通过常规集体 API 发送,而非对象集体 API。虽然可以通过对象集体 API 发送张量,但它们会被序列化和反序列化(对于非 CPU 张量,还包括 CPU 同步和设备到主机的复制),并且除了调试或故障排除代码外,在几乎所有情况下都不建议这样做
值得花精力重构代码以改用非对象集合通信(non-object collectives)。
意外的张量设备 - 如果你仍希望通过对象集合通信发送张量,那么对于 CUDA(以及可能的其他加速器)张量,还有一个特定的方面需要注意。如果你对一个当前位于 cuda:3 上的张量进行 pickle 序列化,然后对其进行 unpickle 反序列化,无论你在哪个进程中,或者该进程的“默认” CUDA 设备是哪个,你都会得到另一个位于 cuda:3 上的张量。使用常规的张量集合通信 API 时,“输出张量”始终位于相同的本地设备上,这通常符合预期。如果这是进程首次使用 GPU,对张量进行 unpickle 操作会隐式激活 CUDA 上下文,这可能会浪费大量的 GPU 内存。通过在将张量作为输入传递给对象集合通信之前将其移动到 CPU,可以避免此问题。
第三方后端
除了内置的 GLOO/MPI/NCCL 后端外,PyTorch 分布式还通过运行时注册机制支持第三方后端。有关如何通过 C++ 扩展开发第三方后端的参考,请参阅 [教程 - 自定义 C++ 和 CUDA 扩展](Tutorials - Custom C++ and CUDA Extensions) 和 test/cpp_extensions/cpp_c10d_extension.cpp。第三方后端的功能取决于其各自的实现。新后端派生自 c10d::ProcessGroup,并在导入时通过 torch.distributed.Backend.register_backend() 注册后端名称和实例化接口。当手动导入此后端并使用相应的后端名称调用 torch.distributed.init_process_group() 时,torch.distributed 包将在新后端上运行。
警告 第三方后端的支持是实验性的,可能会发生变化。
启动工具
torch.distributed 包还在 torch.distributed.launch 中提供了启动工具。此辅助工具可用于为分布式训练在每个节点上启动多个进程。
模块 torch.distributed.launch。torch.distributed.launch 是一个在每个训练节点上生成多个分布式训练进程的模块。
警告 此模块将被弃用,推荐使用
torchrun。
该工具可用于单节点分布式训练,其中每个节点将生成一个或多个进程。该工具既可用于 CPU 训练,也可用于 GPU 训练。如果该工具用于 GPU 训练,每个分布式进程将在单个 GPU 上运行。这可以显著提高单节点训练性能。它也可用于多节点分布式训练,通过在每个节点上生成多个进程,从而显著提高多节点分布式训练性能。这对于具有直接 GPU 支持的多个 Infiniband 接口的系统尤其有益,因为所有接口都可用于聚合通信带宽。在单节点分布式训练或多节点分布式训练这两种情况下,此工具都将启动每个节点指定数量的进程 (--nproc-per-node)。如果用于 GPU 训练,此数字需要小于或等于当前系统上的 GPU 数量 (nproc_per_node),并且每个进程将在从 GPU 0 到 GPU (nproc_per_node - 1) 的单个 GPU 上运行。
如何使用此模块:
单节点多进程分布式训练
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other arguments of your training script)
多节点多进程分布式训练:(例如两个节点)
节点 1:(IP:192.168.1.1,且有一个空闲端口:1234)
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE --nnodes=2 --node-rank=0 --master-addr="192.168.1.1" --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other arguments of your training script)
节点 2:
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE --nnodes=2 --node-rank=1 --master-addr="192.168.1.1" --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other arguments of your training script)
要查看此模块提供的可选参数:
python -m torch.distributed.launch --help
重要注意事项:
-
此工具和多进程分布式(单节点或多节点)GPU 训练目前仅在使用 NCCL 分布式后端时能达到最佳性能。因此,建议 GPU 训练使用 NCCL 后端。
-
在你的训练程序中,你必须解析命令行参数:
--local-rank=LOCAL_PROCESS_RANK,该参数将由本模块提供。如果你的训练程序使用 GPU,你应该确保你的代码仅在LOCAL_PROCESS_RANK对应的 GPU 设备上运行。可以通过以下方式实现:解析
local_rank参数>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()使用以下任一方式将你的设备设置为 local rank
>>> torch.cuda.set_device(args.local_rank) # 在你的代码运行之前或
>>> with torch.cuda.device(args.local_rank):
>>> # 你的运行代码
>>> ...
版本 2.0.0 变更: 启动器会将 --local-rank=<rank> 参数传递给
您的脚本。从 PyTorch 2.0.0 开始,推荐使用带连字符的 --local-rank,而非之前使用的带下划线的 --local_rank。为了向后兼容,用户可能需要在参数解析代码中同时处理这两种情况。这意味着在参数解析器中同时包含 "--local-rank" 和 "--local_rank"。如果仅提供 "--local_rank",启动器将触发错误:“error: unrecognized arguments: –local-rank=<rank>”。对于仅支持 PyTorch 2.0.0+ 的训练代码,包含 "--local-rank" 就足够了。
- 在训练程序中,您应该在开头调用以下函数以启动分布式后端。强烈建议使用
init_method='env://'。其他初始化方法(例如tcp://)也可能有效,但env://是本模块官方支持的方法。
>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>> init_method='env://')
- 在训练程序中,您可以使用常规分布式函数,也可以使用
torch.nn.parallel.DistributedDataParallel()模块。如果您的训练程序使用 GPU 进行训练,并且希望使用torch.nn.parallel.DistributedDataParallel()模块,配置方法如下:
>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>> device_ids=[args.local_rank],
>>> output_device=args.local_rank)
请确保 device_ids 参数设置为代码将要运行的唯一 GPU 设备 ID。这通常是进程的本地 rank。换句话说,为了使用此实用程序,device_ids 需要是 [args.local_rank],而 output_device 需要是 args.local_rank。
- 另一种通过环境变量
LOCAL_RANK将local_rank传递给子进程的方法。当您使用--use-env=True启动脚本时,此行为会被启用。您必须调整上面的子进程示例,将args.local_rank替换为os.environ['LOCAL_RANK'];当指定此标志时,启动器不会传递--local-rank。
警告
local_rank 不是全局唯一的:它仅在每台机器上的每个进程中是唯一的。因此,不要用它来决定是否(例如)写入网络文件系统。请参阅 pytorch/pytorch#12042 以了解如果不正确执行此操作可能会出错的示例。
Spawn 实用程序
Multiprocessing 包 - torch.multiprocessing 包还在 torch.multiprocessing.spawn() 中提供了 spawn 函数。此辅助函数可用于生成多个进程。它通过传入您想要运行的函数并生成 N 个进程来运行该函数。这也可用于多进程分布式训练。有关如何使用它的参考,请参阅 PyTorch 示例 - ImageNet 实现。
请注意,此函数需要 Python 3.4 或更高版本。
调试 torch.distributed 应用程序
由于难以理解的挂起、崩溃或跨 rank 的不一致行为,调试分布式应用程序可能具有挑战性。torch.distributed 提供了一套工具,以帮助以自助方式调试训练应用程序:
Python Breakpoint
在分布式环境中使用 Python 调试器非常方便,但由于它不能开箱即用,许多人根本不使用它。PyTorch 提供了一个围绕 pdb 的自定义包装器,以简化该过程。torch.distributed.breakpoint 使此过程变得简单。在内部,它以两种方式自定义 pdb 的断点行为,否则其行为与正常 pdb 相同:
- 仅在一个 rank(由用户指定)上附加调试器。
- 通过使用
torch.distributed.barrier()确保所有其他 rank 停止,一旦被调试的 rank 发出 continue 命令,该屏障将释放。 - 重定向子进程的 stdin,使其连接到您的终端。
要使用它,只需在所有 rank 上发出 torch.distributed.breakpoint(rank),在每种情况下使用相同的 rank 值。
Monitored Barrier
从 v1.10 开始,torch.distributed.monitored_barrier() 作为 torch.distributed.barrier() 的替代方案存在,当崩溃时(即并非所有 rank 都在提供的超时时间内调用 torch.distributed.monitored_barrier()),它会提供有关哪个 rank 可能出现故障的帮助信息。torch.distributed.monitored_barrier() 使用类似于确认过程中的 send/recv 通信原语实现主机侧屏障,允许 rank 0 报告哪些 rank 未能及时确认屏障。
例如,考虑以下函数,其中 rank 1 未能调用 torch.distributed.monitored_barrier()(在实践中,这可能是由于应用程序错误或之前的集体操作挂起所致):
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
# monitored barrier 需要 gloo 进程组来执行主机侧同步。
group_gloo = dist.new_group(backend="gloo")
if rank not in [1]:
dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
mp.spawn(worker, nprocs=2, args=())
在 rank 0 上会产生以下错误消息,允许
用户确定哪些 rank 可能存在故障并进一步调查: RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms Original exception: [gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG
在设置 TORCH_CPP_LOG_LEVEL=INFO 的情况下,环境变量 TORCH_DISTRIBUTED_DEBUG 可用于触发额外的有用日志记录和集体同步检查,以确保所有 rank 正确同步。根据所需的调试级别,TORCH_DISTRIBUTED_DEBUG 可以设置为 OFF(默认)、INFO 或 DETAIL。请注意,最详细的选项 DETAIL 可能会影响应用程序性能,因此应仅在调试问题时使用。
设置 TORCH_DISTRIBUTED_DEBUG=INFO 将在初始化使用 torch.nn.parallel.DistributedDataParallel() 训练的模型时产生额外的调试日志,而 TORCH_DISTRIBUTED_DEBUG=DETAIL 还将记录选定迭代次数内的运行时性能统计信息。这些运行时统计数据包括前向传播时间、反向传播时间、梯度通信时间等数据。
例如,给定以下应用程序:
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Linear(10, 10, bias=False)
self.b = torch.nn.Linear(10, 1, bias=False)
def forward(self, x):
a = self.a(x)
b = self.b(x)
return (a, b)
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
print("init model")
model = TwoLinLayerNet().cuda()
print("init ddp")
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
inp = torch.randn(10, 10).cuda()
print("train")
for _ in range(20):
output = ddp_model(inp)
loss = output[0] + output[1]
loss.sum().backward()
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ[
"TORCH_DISTRIBUTED_DEBUG"
] = "DETAIL" # set to DETAIL for runtime logging.
mp.spawn(worker, nprocs=2, args=())
在初始化时将呈现以下日志:
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
在运行时(当设置 TORCH_DISTRIBUTED_DEBUG=DETAIL 时)将呈现以下日志:
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0 Avg forward compute time: 40838608 Avg backward compute time: 5983335 Avg backward comm. time: 4326421 Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0 Avg forward compute time: 42850427 Avg backward compute time: 3885553 Avg backward comm. time: 2357981 Avg backward comm/comp overlap time: 2234674
此外,由于模型中存在未使用的参数,TORCH_DISTRIBUTED_DEBUG=INFO 增强了 torch.nn.parallel.DistributedDataParallel() 中的崩溃日志记录。目前,如果前向传播中可能存在未使用的参数,则必须在 torch.nn.parallel.DistributedDataParallel() 初始化时传入 find_unused_parameters=True,并且从 v1.10 开始,所有模型输出都必须用于损失计算,因为 torch.nn.parallel.DistributedDataParallel() 不支持反向传播中的未使用参数。这些约束对于大型模型尤其具有挑战性,因此当发生错误崩溃时,torch.nn.parallel.DistributedDataParallel() 将记录所有未使用参数的完全限定名称。
例如,在上述应用程序中,如果我们修改损失计算方式为 loss = output[1],那么 TwoLinLayerNet.a 在反向传播中不会接收梯度,从而导致 DDP 失败。在崩溃时,用户会收到关于未使用参数的信息,这对于大型模型而言手动查找可能具有挑战性:
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by making sure all `forward` function outputs participate in calculating loss. If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the
在报告此问题时,请提供模块的 forward 方法返回值的结构(例如,list、dict、iterable)。未收到 rank 0 梯度的参数:a.weight 未收到 rank 0 梯度的参数索引:0
设置 TORCH_DISTRIBUTED_DEBUG=DETAIL 将在用户直接或间接发出的每次集体通信调用(例如 DDP allreduce)上触发额外的一致性和同步检查。这是通过创建一个包装进程组来实现的,该包装进程组封装了由 torch.distributed.init_process_group() 和 torch.distributed.new_group() API 返回的所有进程组。因此,这些 API 将返回一个包装进程组,其用法与普通进程组完全相同,但在将集体通信分派到底层进程组之前会执行一致性检查。目前,这些检查包括 torch.distributed.monitored_barrier(),它确保所有 rank 完成其未完成的集体通信调用,并报告卡住的 rank。接下来,通过确保所有集体通信函数匹配并使用一致的张量形状调用,来检查集体通信本身的一致性。如果不是这种情况,当应用程序崩溃时,将包含详细的错误报告,而不是挂起或提供无信息的错误消息。
例如,考虑以下函数,其输入到 torch.distributed.all_reduce() 的形状不匹配:
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
tensor = torch.randn(10 if rank == 0 else 20).cuda()
dist.all_reduce(tensor)
torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
import os
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
mp.spawn(worker, nprocs=2, args=())
使用 NCCL 后端时,此类应用程序可能会导致挂起,这在非平凡场景中很难确定根本原因。如果用户启用 TORCH_DISTRIBUTED_DEBUG=DETAIL 并重新运行应用程序,以下错误消息将揭示根本原因:
work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes: 10 [ torch.LongTensor{1} ]
注意 为了在运行时对调试级别进行细粒度控制,还可以使用函数
torch.distributed.set_debug_level()、torch.distributed.set_debug_level_from_env()和torch.distributed.get_debug_level()。此外,TORCH_DISTRIBUTED_DEBUG=DETAIL可以与TORCH_SHOW_CPP_STACKTRACES=1结合使用,以便在检测到集体通信不同步时记录整个调用堆栈。这些集体通信不同步检查适用于所有使用由torch.distributed.init_process_group()和torch.distributed.new_group()API 创建的进程组支持的 c10d 集体通信调用的应用程序。
日志记录
除了通过 torch.distributed.monitored_barrier() 和 TORCH_DISTRIBUTED_DEBUG 提供的显式调试支持外,torch.distributed 的底层 C++ 库还会输出各种级别的日志消息。这些消息有助于了解分布式训练任务的执行状态,并排查诸如网络连接失败等问题。
下表显示了如何通过组合 TORCH_CPP_LOG_LEVEL 和 TORCH_DISTRIBUTED_DEBUG 环境变量来调整日志级别。
| TORCH_CPP_LOG_LEVEL | TORCH_DISTRIBUTED_DEBUG | 有效日志级别 |
|---|---|---|
| ERROR | ignored | Error |
| WARNING | ignored | Warning |
| INFO | ignored | Info |
| INFO | INFO | Debug |
| INFO | DETAIL | Trace (即 All) |
分布式组件会引发自定义的异常类型,这些类型派生自 RuntimeError:
torch.distributed.DistError:这是所有分布式异常的基类型。torch.distributed.DistBackendError:当发生特定于后端的错误时抛出此异常。例如,如果使用 NCCL 后端,且用户尝试使用 NCCL 库不可用的 GPU。torch.distributed.DistNetworkError:当网络库遇到错误时抛出此异常(例如:Connection reset by peer)。torch.distributed.DistStoreError:当 Store 遇到错误时抛出此异常(例如:TCPStore timeout)。
class torch.distributed.DistError
当分布式库中发生错误时引发的异常
class torch.distributed.DistBackendError
当分布式中发生后端错误时引发的异常
class torch.distributed.DistNetworkError
当分布式中发生网络错误时引发的异常
class torch.distributed.DistStoreError
当分布式 store 中发生错误时引发的异常
如果您正在运行单节点训练,交互式地在脚本中设置断点可能会很方便。我们提供了一种方便地在单个 rank 上设置断点的方法:
torch.distributed.breakpoint(rank=0, skip=0, timeout_s=3600)[source] #
设置断点,但仅在单个 rank 上。所有其他 rank 将等待您完成断点操作后再继续。
参数
- rank
(int) – 在哪个 rank 上中断。默认值:0 skip (int) – 跳过对此断点的前 skip 次调用。默认值:0.``` torch.distributed
**模式 3:** 初始化# 在调用任何其他方法之前,需要使用 `torch.distributed.init_process_group()` 或 `torch.distributed.device_mesh.init_device_mesh()` 函数对包进行初始化。两者都会阻塞,直到所有进程都加入为止。 **警告** 初始化不是线程安全的。应从单个线程执行进程组创建,以防止跨 ranks 的 ‘UUID’ 分配不一致,并防止初始化期间的竞争条件导致挂起。 torch.distributed.is_available()[source]# 如果分布式包可用,则返回 True。否则,`torch.distributed` 不暴露任何其他 API。目前,`torch.distributed` 在 Linux、MacOS 和 Windows 上可用。从源代码构建 PyTorch 时,设置 `USE_DISTRIBUTED=1` 以启用它。目前,Linux 和 Windows 的默认值为 `USE_DISTRIBUTED=1`,MacOS 为 `USE_DISTRIBUTED=0`。 返回类型 bool torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]# 初始化默认的分布式进程组。这也将初始化分布式包。 初始化进程组主要有两种方式: * 显式指定 `store`、`rank` 和 `world_size`。 * 指定 `init_method`(一个 URL 字符串),指示在哪里/如何发现对等节点。可以选择指定 `rank` 和 `world_size`,或者将所有必需参数编码到 URL 中并省略它们。 如果两者均未指定,则假定 `init_method` 为 “env://”。 **参数** **backend** (*str* 或 *Backend*, *可选*) – 要使用的后端。根据构建时的配置,有效值包括 `mpi`、`gloo`、`nccl`、`ucc`、`xccl` 或由第三方插件注册的后端。自 2.6 版本起,如果未提供 `backend`,c10d 将使用为 `device_id` 关键字参数(如果提供)指示的设备类型注册的后端。目前已知的默认注册为:cuda 对应 `nccl`,cpu 对应 `gloo`,xpu 对应 `xccl`。如果既未提供 `backend` 也未提供 `device_id`,c10d 将检测运行时机器上的加速器,并使用为该检测到的加速器(或 cpu)注册的后端。此字段可以作为小写字符串给出(例如,`"gloo"`),也可以通过 Backend 属性访问(例如,`Backend.GLOO`)。如果在每台机器上使用多个进程且后端为 `nccl`,则每个进程必须独占访问其使用的每个 GPU,因为进程间共享 GPU 可能导致死锁或 NCCL 无效使用。`ucc` 后端处于实验阶段。可以使用 `get_default_backend_for_device()` 查询设备的默认后端。 **init_method** (*str*, *可选*) – 指定如何初始化进程组的 URL。如果未指定 `init_method` 或 `store`,则默认为 “env://”。与 `store` 互斥。 **world_size** (*int*, *可选*) – 参与作业的进程数。如果指定了 `store`,则为必需项。 **rank** (*int*, *可选*) – 当前进程的 rank(它应该是介于 0 和 `world_size-1` 之间的数字)。如果指定了 `store`,则为必需项。 **store** (*Store*, *可选*) – 所有 worker 均可访问的键/值存储,用于交换连接/地址信息。与 `init_method` 互斥。 **timeout** (*timedelta*, *可选*) – 针对进程组执行的操作的超时时间。NCCL 的默认值为 10 分钟,其他后端的默认值为 30 分钟。这是在此时长之后集体操作将被异步中止并且进程将崩溃的时间。这样做是因为 CUDA 执行是异步的,继续执行用户代码不再安全,因为失败的异步 NCCL 操作可能导致后续 CUDA 操作在损坏的数据上运行。当设置 `TORCH_NCCL_BLOCKING_WAIT` 时,进程将阻塞并等待此超时。 **group_name** (*str*, *可选*, *已弃用*) – 组名。此参数被忽略 **pg_options** (*ProcessGroupOptions*, *可选*) – 进程组选项,指定在构造特定进程组期间需要传入哪些附加选项。截至目前,我们支持的唯一选项是用于 `nccl` 后端的 `ProcessGroupNCCL.Options`,可以指定 `is_high_priority_stream`,以便当有计算内核等待时,`nccl` 后端可以选取高优先级的 cuda 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t **device_id** (*torch.device* | *int*, *可选*) – 此进程将工作的单个特定设备,允许进行特定于后端的优化。目前这有两个影响,仅在 NCCL 下:通信器立即形成(立即调用 `ncclCommInit*` 而不是正常的延迟调用),并且子组将在可能时使用 `ncclCommSplit` 以避免不必要的组创建开销。如果您想尽早了解 NCCL 初始化错误,也可以使用此字段。如果提供的是 int,API 假设将使用编译时的加速器类型。 **注意** 要启用 `backend == Backend.MPI`,需要在支持 MPI 的系统上从源代码构建 PyTorch。 **注意** 对多个后端的支持是实验性的。目前,当未指定后端时,将同时创建 `gloo` 和 `nccl` 后端。`gloo` 后端将用于集体操作
对于 CPU 张量,将使用 Gloo 后端;对于 CUDA 张量,集合通信将使用 NCCL 后端。可以通过传入格式为 “<device_type>:<backend_name>,<device_type>:<backend_name>” 的字符串来指定自定义后端,例如 “cpu:gloo,cuda:custom_backend”。
torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None, backend_override=None)[source]#
根据 device_type、mesh_shape 和 mesh_dim_names 参数初始化 DeviceMesh。这将创建一个具有 n 维数组布局的 DeviceMesh,其中 n 是 mesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度将被标记为 mesh_dim_names[i]。注意,init_device_mesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序会在集群中的所有进程/秩上运行。确保所有秩上的 mesh_shape(描述设备布局的 n 维数组的维度)一致。不一致的 mesh_shape 可能导致挂起。注意,如果未找到进程组,init_device_mesh 将在后台初始化分布式通信所需的分布式进程组。
参数
* **device_type** (str) – 网格的设备类型。目前支持:“cpu”、“cuda/cuda-like”、“xpu”。不允许传入带有 GPU 索引的设备类型,例如 “cuda:0”。
* **mesh_shape** (Tuple[int]) – 一个元组,定义描述设备布局的多维数组的维度。
* **mesh_dim_names** (Tuple[str], 可选) – 一个元组,包含要分配给描述设备布局的多维数组每个维度的网格维度名称。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须是唯一的。
* **backend_override** (Dict[int | str, tuple[str, Options] | str | Options], 可选) – 对将为每个网格维度创建的部分或全部 ProcessGroups 的重写。每个键可以是维度的索引或其名称(如果提供了 mesh_dim_names)。每个值可以是一个包含后端名称及其选项的元组,或者仅是这两个组件之一(在这种情况下,另一个将设置为其默认值)。
返回
表示设备布局的 DeviceMesh 对象。
返回类型
DeviceMesh
示例:
```python
>>> from torch.distributed.device_mesh import init_device_mesh
>>>
>>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,))
>>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
torch.distributed.is_initialized()[source]# 检查默认进程组是否已初始化。
返回类型 bool
torch.distributed.is_mpi_available()[source]# 检查 MPI 后端是否可用。
返回类型 bool
torch.distributed.is_nccl_available()[source]# 检查 NCCL 后端是否可用。
返回类型 bool
torch.distributed.is_gloo_available()[source]# 检查 Gloo 后端是否可用。
返回类型 bool
torch.distributed.distributed_c10d.is_xccl_available()[source]# 检查 XCCL 后端是否可用。
返回类型 bool
torch.distributed.is_torchelastic_launched()[source]# 检查此进程是否使用 torch.distributed.elastic(即 torchelastic)启动。使用 TORCHELASTIC_RUN_ID 环境变量的存在作为代理,以确定当前进程是否使用 torchelastic 启动。这是一个合理的代理,因为 TORCHELASTIC_RUN_ID 映射到 rendezvous id,它始终是一个非空值,表示用于对等发现的任务 id。
返回类型 bool
torch.distributed.get_default_backend_for_device(device)[source]# 返回给定设备的默认后端。
参数
- device (Union[str, torch.device]) – 要获取默认后端的设备。
返回 给定设备的默认后端,以小写字符串形式返回。
返回类型 str
目前支持三种初始化方法:
TCP 初始化 有两种使用 TCP 进行初始化的方法,都需要一个所有进程均可访问的网络地址和一个期望的 world_size。第一种方法需要指定属于秩 0 进程的地址。此初始化方法要求所有进程手动指定秩。请注意,最新版本的分布式包不再支持多播地址。group_name 也已弃用。
import torch.distributed as dist
# 使用其中一台机器的地址
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)
共享文件系统初始化 另一种初始化方法利用组内所有机器共享且可见的文件系统,以及一个期望的 world_size。URL 应以 file:// 开头,并包含共享文件系统上(现有目录中)一个不存在文件的路径。文件系统初始化会自动创建该文件(如果不存在),但不会删除该文件。因此,你有责任确保在下一次在同一文件路径/名称上调用 init_process_group() 之前清理该文件。请注意,最新版本的分布式包不再支持自动秩分配,group_name 也已弃用。
.. warning:: 此方法假设文件系统支持使用 fcntl 进行锁定 -
大多数本地系统和 NFS 都支持此方法。
警告:此方法将始终创建文件,并尽力在程序结束时清理和删除该文件。换句话说,每次使用文件初始化方法时,都需要一个全新的空文件才能成功初始化。如果重复使用前一次初始化使用的同一文件(该文件碰巧未被清理),则属于意外行为,通常会导致死锁和失败。因此,尽管此方法会尽力清理文件,但如果自动删除失败,你有责任确保在训练结束时删除该文件,以防止下次运行时重用同一文件。如果你计划对同一文件名多次调用 init_process_group(),这一点尤为重要。换言之,如果文件未被删除/清理,并且你再次对该文件调用 init_process_group(),预计会发生失败。这里的经验法则是:确保每次调用 init_process_group() 时,该文件要么不存在,要么为空。
import torch.distributed as dist
# 必须始终指定 rank
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank)
环境变量初始化
此方法将从环境变量中读取配置,允许用户完全自定义信息的获取方式。需要设置的变量如下:
MASTER_PORT- 必需;必须是 rank 0 所在机器上的空闲端口MASTER_ADDR- 必需(rank 0 除外);rank 0 节点的地址WORLD_SIZE- 必需;可以在此处设置,也可以在调用初始化函数时设置RANK- 必需;可以在此处设置,也可以在调用初始化函数时设置
Rank 0 所在的机器将用于建立所有连接。这是默认方法,意味着无需指定 init_method(或者可以设置为 env://)。
优化初始化时间
TORCH_GLOO_LAZY_INIT- 按需建立连接,而不是使用全网格(full mesh),这可以显著改善非 all2all 操作的初始化时间。torch.distributed.init_process_group()模式 4: 示例:```
from torch.distributed.device_mesh import init_device_mesh
mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
**模式 5:** 组# 默认情况下,集体通信操作在默认组(也称为 world)上进行,并要求所有进程进入分布式函数调用。然而,某些工作负载可以从更细粒度的通信中受益。这就是分布式组发挥作用的地方。`new_group()` 函数可用于创建新组,包含所有进程的任意子集。它返回一个不透明的组句柄,该句柄可以作为 `group` 参数传递给所有集体通信函数(集体通信是以某些众所周知的编程模式交换信息的分布式函数)。
`torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source]`# 创建一个新的分布式组。此函数要求主组中的所有进程(即属于分布式作业的所有进程)都进入此函数,即使它们不是该组的成员。此外,应在所有进程中以相同的顺序创建组。
**警告** 安全并发使用:当使用带有 NCCL 后端的多个进程组时,用户必须确保跨秩(ranks)的集体通信执行顺序在全局范围内保持一致。如果进程内的多个线程发出集体通信,则需要进行显式同步以确保顺序一致。当使用 `torch.distributed` 通信 API 的异步变体时,会返回一个 work 对象,并且通信内核被排入单独的 CUDA 流中,从而允许通信和计算重叠。一旦在一个进程组上发出了一个或多个异步操作,在使用另一个进程组之前,必须通过调用 `work.wait()` 与其他 cuda 流进行同步。有关更多详细信息,请参阅 [同时使用多个 NCCL 通信器](https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using-multiple-nccl-communicators-concurrently)。
**参数**
* **ranks** (`list[int]`) – 组成员的秩列表。如果为 `None`,将设置为所有秩。默认为 `None`。
* **timeout** (`timedelta`, 可选) – 参见 `init_process_group` 了解详细信息和默认值。
* **backend** (`str` 或 `Backend`, 可选) – 要使用的后端。根据构建时的配置,有效值为 `gloo` 和 `nccl`。默认使用与全局组相同的后端。此字段应作为小写字符串提供(例如,`"gloo"`),也可以通过 `Backend` 属性访问(例如,`Backend.GLOO`)。如果传入 `None`,将使用对应于默认进程组的后端。默认为 `None`。
* **pg_options** (`ProcessGroupOptions`, 可选) – 进程组选项,指定在构建特定进程组期间需要传入哪些附加选项。例如,对于 nccl 后端,可以指定 `is_high_priority_stream`,以便进程组可以使用高优先级的 cuda 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t
* **use_local_synchronization** (`bool`, 可选):在进程组创建结束时执行组本地屏障。不同之处在于,非成员秩不需要调用 API 也不加入屏障。
* **group_desc** (`str`, 可选) – 用于描述进程组的字符串。
* **device_id** (`torch.device`, 可选) – 将此进程“绑定”到的单个特定设备,如果提供了此字段,`new_group` 调用将尝试立即为该设备初始化通信后端。
**返回**
分布式组的句柄,可以传递给集体通信调用;如果该秩不属于 `ranks`,则返回 `GroupMember.NON_GROUP_MEMBER`。
**注意** `use_local_synchronization` 不适用于 MPI。
**注意** 虽然 `use_local_synchronization=True` 在大型集群和小型进程组中可以显著更快,但必须小心,因为它会改变集群行为,因为非成员秩不会加入组屏障 `barrier()`。
**注意** 当每个秩创建多个重叠的进程组时,`use_local_synchronization=True` 可能导致死锁。为避免这种情况,请确保所有秩遵循相同的全局创建顺序。
`torch.distributed.get_group_rank(group, global_rank)[source]`# 将全局秩转换为组内秩。`global_rank` 必须是 `group` 的一部分,否则会引发 `RuntimeError`。
**参数**
* **group** (`ProcessGroup`) – 用于查找相对秩的 ProcessGroup。
* **global_rank** (`int`) – 要查询的全局秩。
**返回**
相对于 `group` 的 `global_rank` 的组内秩
**返回类型**
`int`
**注意** 在默认进程组上调用此函数返回恒等值。
`torch.distributed.get_global_rank(group, group_rank)[source]`# 将组内秩转换为全局秩。`group_rank` 必须是 `group` 的一部分,否则会引发 `RuntimeError`。
**参数**
* **group** (`ProcessGroup`) – 用于从中查找全局秩的 ProcessGroup。
* **group_rank** (`int`) – 要查询的组内秩。
**返回**
相对于 `group` 的 `group_rank` 的全局秩
**返回类型**
`int`
**注意** 在默认进程组上调用此函数返回恒等值。
`torch.distributed.get_process_group_ranks(group)[source]`# 获取与 `group` 关联的所有秩。
**参数**
* **group** (`Optional[ProcessGroup]`) – 从中获取所有秩的 ProcessGroup。如果为 `None`,将使用默认进程组。
返回按组排名排序的全局排名列表。返回类型 list[int]```
new_group()
```**模式 6:** 警告 安全的并发用法:当使用带有 NCCL 后端的多个进程组时,用户必须确保跨排名的集合操作具有全局一致的执行顺序。如果进程内的多个线程发出集合操作,则必须进行显式同步以确保顺序一致。当使用 `torch.distributed` 通信 API 的异步变体时,会返回一个 work 对象,并且通信内核被入队到单独的 CUDA 流上,从而允许通信和计算重叠。一旦在一个进程组上发出了一个或多个异步操作,在使用另一个进程组之前,必须通过调用 `work.wait()` 与其他 cuda 流进行同步。有关更多详细信息,请参阅 [同时使用多个 NCCL 通信器](https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using-multiple-nccl-communicators-concurrently)。```
NCCL
```**模式 7:** 注意 如果您将 DistributedDataParallel 与分布式 RPC 框架结合使用,则应始终使用 `torch.distributed.autograd.backward()` 来计算梯度,并使用 `torch.distributed.optim.DistributedOptimizer` 来优化参数。示例:
```python
>>> import torch.distributed.autograd as dist_autograd
>>> from torch.nn.parallel import DistributedDataParallel as DDP
>>> import torch
>>> from torch import optim
>>> from torch.distributed.optim import DistributedOptimizer
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>>
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> rref = rpc.remote("worker1", torch.add, args=(t1, t2))
>>> ddp_model = DDP(my_model)
>>>
>>> # 设置优化器
>>> optimizer_params = [rref]
>>> for param in ddp_model.parameters():
>>> optimizer_params.append(RRef(param))
>>>
>>> dist_optim = DistributedOptimizer(
>>> optim.SGD,
>>> optimizer_params,
>>> lr=0.05,
>>> )
>>>
>>> with dist_autograd.context() as context_id:
>>> pred = ddp_model(rref.to_here())
>>> loss = loss_func(pred, target)
>>> dist_autograd.backward(context_id, [loss])
>>> dist_optim.step(context_id)
torch.distributed.autograd.backward()
```**模式 8:** static_graph (bool) – 当设置为 True 时,DDP 知道训练图是静态的。静态图意味着 1) 在整个训练循环中,已使用和未使用的参数集不会改变;在这种情况下,用户是否设置 `find_unused_parameters = True` 无关紧要。2) 图的训练方式在整个训练循环中不会改变(意味着没有依赖于迭代的控制流)。当 `static_graph` 设置为 True 时,DDP 将支持过去无法支持的情况:1) 重入反向传播。2) 多次激活检查点。3) 当模型有未使用的参数时的激活检查点。4) 存在位于 forward 函数之外的模型参数。5) 当存在未使用的参数时,可能会提高性能,因为当 `static_graph` 设置为 True 时,DDP 不会在每次迭代中搜索图以检测未使用的参数。要检查是否可以将 `static_graph` 设置为 True,一种方法是检查上一个模型训练结束时的 ddp 日志数据,如果 `ddp_logging_data.get("can_set_static_graph") == True`,通常您也可以设置 `static_graph = True`。示例:
```python
>>> model_DDP = torch.nn.parallel.DistributedDataParallel(model)
>>> # 训练循环
>>> ...
>>> ddp_logging_data = model_DDP._get_ddp_logging_data()
>>> static_graph = ddp_logging_data.get("can_set_static_graph")
True
```## 参考文件此技能包含 `references/` 中的综合文档:- **other.md** - 其他文档在需要详细信息时,使用 `view` 读取特定的参考文件。## 使用此技能### 对于初学者
从 `getting_started` 或 `tutorials` 参考文件开始,了解基本概念。### 对于特定功能
使用适当的类别参考文件(api、guides 等)获取详细信息。### 对于代码示例
上面的快速参考部分包含从官方文档中提取的常见模式。## 资源### references/
从官方来源提取的组织化文档。这些文件包含:
- 详细解释
- 带有语言注释的代码示例
- 指向原始文档的链接
- 用于快速导航的目录### scripts/
在此处添加用于常见自动化任务的辅助脚本。### assets/
在此处添加模板、样板代码或示例项目。## 注意- 此技能是从官方文档自动生成的
- 参考文件保留了源文档的结构和示例
- 代码示例包括语言检测以实现更好的语法高亮显示
- 快速参考模式是从文档中的常见用法示例中提取的## 更新要使用更新的文档刷新此技能:
1. 使用相同的配置重新运行抓取工具
2. 将使用最新信息重建技能