跳到主要內容

# Pytorch Fsdp使用 PyTorch FSDP 進行完全分片數據並行訓練的專家指南 - 參數分片、混合精度、CPU 卸載、FSDP2## 技能元數據| | | |---|---| | 來源 | 可選 — 使用 hermes skills install official/mlops/pytorch-fsdp 安裝 | | 路徑 | optional-skills/mlops/pytorch-fsdp | | 版本 | 1.0.0 | | 作者 | Orchestra Research | | 許可證 | MIT | | 依賴項 | torch>=2.0, transformers | | 平臺 | linux, macos | | 標籤 | Distributed Training, PyTorch, FSDP, Data Parallel, Sharding, Mixed Precision, CPU Offloading, FSDP2, Large-Scale Training |## 參考:完整 SKILL.md:::info 以下是 Hermes 在觸發此技能時加載的完整技能定義。這是技能激活時代理看到的指令。 :::# Pytorch-Fsdp 技能提供源自官方文檔的 pytorch-fsdp 開發綜合協助。## 何時使用此技能當出現以下情況時,應觸發此技能:

  • 使用 pytorch-fsdp 時
  • 詢問 pytorch-fsdp 功能或 API 時
  • 實現 pytorch-fsdp 解決方案時
  • 調試 pytorch-fsdp 代碼時
  • 學習 pytorch-fsdp 最佳實踐時## 快速參考### 常見模式

模式 1: 通用 Join 上下文管理器

創建日期:2025 年 6 月 6 日 | 最後更新日期:2025 年 6 月 6 日

通用 join 上下文管理器有助於在輸入不均勻的情況下進行分佈式訓練。本頁概述了相關類的 API:JoinJoinableJoinHook。有關教程,請參閱[使用 Join 上下文管理器進行不均勻輸入的分佈式訓練](Distributed Training with Uneven Inputs Using the Join Context Manager)。

class torch.distributed.algorithms.Join(joinables, enable=True, throw_on_early_termination=False, **kwargs)[source]#

此類定義了通用 join 上下文管理器,它允許在進程加入後調用自定義鉤子。這些鉤子應掩蓋(shadow)未加入進程的集體通信,以防止掛起和出錯,並確保算法正確性。有關鉤子定義的詳細信息,請參閱 JoinHook

警告 上下文管理器要求每個參與的 Joinable 在其自身的每迭代集體通信之前調用方法 notify_join_context(),以確保正確性。

警告 上下文管理器要求 JoinHook 對象中的所有 process_group 屬性相同。如果有多個 JoinHook 對象,則使用第一個對象的設備。進程組和設備信息用於檢查未加入的進程,並在啟用 throw_on_early_termination 時通知進程拋出異常,這兩者均使用 all-reduce 操作。

參數

  • joinables (List[Joinable]) – 參與的 Joinable 列表;它們的鉤子按給定順序迭代。
  • enable (bool) – 啟用不均勻輸入檢測的標誌;設置為 False 將禁用上下文管理器的功能,僅應在用戶確定輸入不會不均勻時設置(默認值:True)。
  • throw_on_early_termination (bool) – 控制是否在檢測到不均勻輸入時拋出異常的標誌(默認值:False)。

示例:

>>> import os
>>> import torch
>>> import torch.distributed as dist
>>> import torch.multiprocessing as mp
>>> import torch.nn.parallel.DistributedDataParallel as DDP
>>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO
>>> from torch.distributed.algorithms.join import Join
>>>
>>> # 在每個生成的 worker 上
>>> def worker(rank):
>>> dist.init_process_group("nccl", rank=rank, world_size=2)
>>> model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank])
>>> optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01)
>>> # Rank 1 比 rank 0 多獲得一個輸入
>>> inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)]
>>> with Join([model, optim]):
>>> for input in inputs:
>>> loss = model(input).sum()
>>> loss.backward()
>>> optim.step()
>>> # 所有 rank 都到達此處,沒有掛起或出錯

static notify_join_context(joinable)[source]#

通知 join 上下文管理器調用進程尚未加入。然後,如果 throw_on_early_termination=True,則檢查是否檢測到不均勻輸入(即,是否有一個進程已經加入),如果是,則拋出異常。此方法應由 Joinable 對象在其每迭代集體通信之前調用。例如,這應該在 DistributedDataParallel 的前向傳播開始時調用。只有傳入上下文管理器的第一個 Joinable 對象在此方法中執行集體通信,對於其他對象,此方法為空操作。

參數

  • joinable (Joinable) – 調用此方法的 Joinable 對象。

返回

如果 joinable 是傳入上下文管理器的第一個對象,則返回一個異步工作句柄,用於 all-reduce 以通知上下文管理器該進程尚未加入;否則返回 None

class torch.distributed.algorithms.Joinable[source]#

這定義了可加入類(joinable classes)的抽象基類。可加入類(繼承自 Joinable)應實現 join_hook(),該方法返回一個 JoinHook 實例,此外還應實現 join_device()join_process_group(),它們分別返回設備和進程組信息。

abstract property join_device: device#

返回執行 join 上下文管理器所需的集體通信的設備。

abstract join_hook(**kwargs)[source]#

為給定的 Joinable 返回一個 JoinHook 實例。

參數

  • kwargs (dict) – 包含任何關鍵字參數的字典,用於在運行時修改 join 鉤子的行為;共享同一 join 上下文管理器的所有 Joinable 實例都會轉發相同的 kwargs 值。

返回類型

JoinHook

abstract property join_process_group: Any#

返回 join 上下文管理器本身所需的集體通信的進程組。

class torch.distributed.algorithms.JoinHook[source]#

這定義了一個 join 鉤子,它在 join 上下文管理器中提供兩個入口點。入口點:一個主鉤子(main hook),當存在未加入的進程時會重複調用;一個後鉤子(post-hook),當所有進程都加入後調用一次。要為通用 join 上下文管理器實現 join 鉤子,請定義一個繼承自 JoinHook 的類,並根據需要重寫 main_hook()post_hook()

main_hook()[source]#

當存在未加入的進程時調用此鉤子,以掩蓋訓練迭代中的集體通信。訓練迭代即,在

一次前向傳播、反向傳播和優化器步驟。post_hook(is_last_joiner)[source]# 在所有進程加入後調用鉤子。它傳遞一個額外的布爾參數 is_last_joiner,該參數指示當前 rank 是否是最後加入的之一。參數 is_last_joiner (bool) – 如果當前 rank 是最後加入的之一,則為 True;否則為 False。``` Join


**模式 2:** 分佈式通信包 - torch.distributed

# 創建日期:2017 年 7 月 12 日 | 最後更新日期:2025 年 9 月 4 日

> **注意**
> 請參閱 [PyTorch Distributed Overview](PyTorch Distributed Overview) 以簡要了解與分佈式訓練相關的所有功能。

## 後端 (Backends)

`torch.distributed` 支持四種內置後端,每種後端具有不同的功能。下表顯示了每個後端在 CPU 或 GPU 上可用的函數。對於 NCCL,GPU 指的是 CUDA GPU;對於 XCCL,GPU 指的是 XPU GPU。僅當用於構建 PyTorch 的 MPI 實現支持時,MPI 才支持 CUDA。

| Backend | gloo | | mpi | | nccl | | xccl | |
| :--- | :---: | :---: | :---: | :---: | :---: | :---: | :---: | :---: |
| **Device** | **CPU** | **GPU** | **CPU** | **GPU** | **CPU** | **GPU** | **CPU** | **GPU** |
| send | ✓ | ✘ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| recv | ✓ | ✘ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| broadcast | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| all_reduce | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| reduce | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| all_gather | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| gather | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| scatter | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| reduce_scatter | ✓ | ✓ | ✘ | ✘ | ✘ | ✓ | ✘ | ✓ |
| all_to_all | ✓ | ✓ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |
| barrier | ✓ | ✘ | ✓ | ? | ✘ | ✓ | ✘ | ✓ |

## PyTorch 附帶的後端

PyTorch 分佈式包支持 Linux(穩定版)、MacOS(穩定版)和 Windows(原型版)。默認情況下,在 Linux 上,Gloo 和 NCCL 後端會被構建幷包含在 PyTorch 分佈式包中(僅在使用 CUDA 構建時才包含 NCCL)。MPI 是一個可選後端,只有當你從源代碼構建 PyTorch 時才能包含它(例如,在安裝了 MPI 的主機上構建 PyTorch)。

> **注意**
> 自 PyTorch v1.8 起,Windows 支持除 NCCL 以外的所有集體通信後端。如果 `init_process_group()` 的 `init_method` 參數指向一個文件,它必須遵循以下架構:
> * 本地文件系統:`init_method="file:///d:/tmp/some_file"`
> * 共享文件系統:`init_method="file://////{machine_name}/{share_folder_name}/some_file"`
>
> 與 Linux 平臺相同,你可以通過設置環境變量 `MASTER_ADDR` 和 `MASTER_PORT` 來啟用 TcpStore。

## 使用哪個後端?

過去,我們經常被問到:“我應該使用哪個後端?”。

**經驗法則**

* **使用 CUDA GPU 進行分佈式訓練:** 使用 NCCL 後端。
* **使用 XPU GPU 進行分佈式訓練:** 使用 XCCL 後端。
* **使用 CPU 進行分佈式訓練:** 使用 Gloo 後端。

**配備 InfiniBand 互連的 GPU 主機**

使用 NCCL,因為它是目前唯一支持 InfiniBand 和 GPUDirect 的後端。

**配備以太網互連的 GPU 主機**

使用 NCCL,因為它目前提供最佳的分佈式 GPU 訓練性能,特別是對於多進程單節點或多節點分佈式訓練。如果你遇到任何 NCCL 問題,請使用 Gloo 作為後備選項。(請注意,Gloo 在 GPU 上的運行速度目前比 NCCL 慢。)

**配備 InfiniBand 互連的 CPU 主機**

如果你的 InfiniBand 已啟用 IP over IB,請使用 Gloo,否則請使用 MPI。我們計劃在即將發佈的版本中為 Gloo 添加 InfiniBand 支持。

**配備以太網互連的 CPU 主機**

使用 Gloo,除非你有特定理由使用 MPI。

## 常見環境變量

### 選擇要使用的網絡接口

默認情況下,NCCL 和 Gloo 後端都會嘗試找到合適的網絡接口。如果自動檢測到的接口不正確,你可以使用以下環境變量覆蓋它(適用於相應的後端):

* `NCCL_SOCKET_IFNAME`,例如 `export NCCL_SOCKET_IFNAME=eth0`
* `GLOO_SOCKET_IFNAME`,例如 `export GLOO_SOCKET_IFNAME=eth0`

如果你使用的是 Gloo 後端,可以通過逗號分隔來指定多個接口,如下所示:`export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3`。後端將以輪詢方式在這些接口上分發操作。**務必**確保所有進程在此變量中指定相同數量的接口。

### 其他 NCCL 環境變量

**調試** - 如果 NCCL 失敗,你可以設置 `NCCL_DEBUG=INFO` 以打印明確的警告消息以及基本的 NCCL 初始化信息。你還可以使用 `NCCL_DEBUG_SUBSYS` 獲取有關 NCCL 特定方面的更多詳細信息。例如,`NCCL_DEBUG_SUBSYS=COLL` 將打印集體調用的日誌,這在調試掛起問題時可能很有幫助,尤其是那些由集體類型或消息大小不匹配引起的問題。如果出現拓撲檢測失敗,設置 `NCCL_DEBUG_SUBSYS=GRAPH` 將有助於檢查詳細的檢測結果,並在需要 NCCL 團隊進一步幫助時作為參考保存。

**性能調優** - NCCL 根據其拓撲檢測執行自動調優,以節省用戶的調優工作。在某些基於 socket 的系統上,用戶可能仍會嘗試調整 `NCCL_SOCKET_NTHREADS` 和 `NCCL_NSOCKS_PERTHREAD` 以增加 socket 網絡帶寬。NCCL 已為一些雲提供商(如 AWS 或 GCP)預先調優了這兩個環境變量。有關 NCCL 環境變量的完整列表,請參閱 [NVIDIA NCCL 的官方文檔](NVIDIA NCCL 的官方文檔)。

你可以使用 `torch.distributed.ProcessGroupNCCL.NCCLConfig` 和 `torch.distributed.ProcessGroupNCCL.Options` 進一步調優 NCCL 通信器。在解釋器中使用 `help`(例如 `help(torch.distributed.ProcessGroupNCCL.NCCLConfig)`)瞭解更多相關信息。

## 基礎知識

`torch.distributed` 包為在一臺或多臺機器上運行的多個計算節點間的多進程並行提供了 PyTorch 支持和通信原語。該類

`torch.nn.parallel.DistributedDataParallel()` 基於此功能構建,作為任何 PyTorch 模型的包裝器,提供同步分佈式訓練。這與 `Multiprocessing` 包(`torch.multiprocessing` 和 `torch.nn.DataParallel()`)提供的並行性不同,因為它支持多臺通過網絡連接的機器,並且用戶必須為每個進程顯式啟動主訓練腳本的單獨副本。在單機同步情況下,`torch.distributed` 或 `torch.nn.parallel.DistributedDataParallel()` 包裝器可能比其他數據並行方法(包括 `torch.nn.DataParallel()`)具有優勢:每個進程維護自己的優化器,並在每次迭代中執行完整的優化步驟。雖然這看起來是多餘的,因為梯度已經跨進程收集並平均,因此對於每個進程都是相同的,但這意味著不需要參數廣播步驟,從而減少了在節點之間傳輸張量所花費的時間。每個進程包含一個獨立的 Python 解釋器,消除了從單個 Python 進程驅動多個執行線程、模型副本或 GPU 所帶來的額外解釋器開銷和“GIL 爭用”。這對於大量使用 Python 運行時的模型尤其重要,包括具有循環層或許多小組件的模型。

## 初始化 #

在調用任何其他方法之前,需要使用 `torch.distributed.init_process_group()` 或 `torch.distributed.device_mesh.init_device_mesh()` 函數初始化該包。兩者都會阻塞,直到所有進程加入。

> **警告**
>
> 初始化不是線程安全的。應從單個線程執行進程組創建,以防止跨 rank 的 ‘UUID’ 分配不一致,並防止初始化期間的競爭條件導致掛起。

### `torch.distributed.is_available()`[source] #

如果分佈式包可用,則返回 `True`。否則,`torch.distributed` 不公開任何其他 API。目前,`torch.distributed` 在 Linux、MacOS 和 Windows 上可用。從源代碼構建 PyTorch 時,設置 `USE_DISTRIBUTED=1` 以啟用它。目前,Linux 和 Windows 的默認值為 `USE_DISTRIBUTED=1`,MacOS 的默認值為 `USE_DISTRIBUTED=0`。

**返回類型** `bool`

### `torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)`[source] #

初始化默認的分佈式進程組。這也將初始化分佈式包。

初始化進程組主要有兩種方式:
1. 顯式指定 `store`、`rank` 和 `world_size`。
2. 指定 `init_method`(一個 URL 字符串),指示在哪裡/如何發現對等節點。可以選擇指定 `rank` 和 `world_size`,或者將所有必需參數編碼到 URL 中並省略它們。

如果兩者均未指定,則假定 `init_method` 為 `"env://"`。

**參數**

* **backend** (`str` 或 `Backend`, *可選*) – 要使用的後端。根據構建時配置,有效值包括 `mpi`、`gloo`、`nccl`、`ucc`、`xccl` 或由第三方插件註冊的後端。自 2.6 版本起,如果未提供 `backend`,c10d 將使用為 `device_id` 關鍵字參數(如果提供)指示的設備類型註冊的後端。目前已知的默認註冊包括:用於 cuda 的 `nccl`,用於 cpu 的 `gloo`,用於 xpu 的 `xccl`。如果既未提供 `backend` 也未提供 `device_id`,c10d 將檢測運行時機器上的加速器,並使用為該檢測到的加速器(或 cpu)註冊的後端。此字段可以作為小寫字符串給出(例如,`"gloo"`),也可以通過 `Backend` 屬性訪問(例如,`Backend.GLOO`)。如果在每臺機器上使用多個進程且後端為 `nccl`,則每個進程必須對其使用的每個 GPU 具有獨佔訪問權,因為進程間共享 GPU 可能導致死鎖或 NCCL 無效使用。`ucc` 後端處於實驗階段。可以使用 `get_default_backend_for_device()` 查詢設備的默認後端。
* **init_method** (`str`, *可選*) – 指定如何初始化進程組的 URL。如果未指定 `init_method` 或 `store`,則默認為 `"env://"`。與 `store` 互斥。
* **world_size** (`int`, *可選*) – 參與作業的進程數。如果指定了 `store`,則為必需項。
* **rank** (`int`, *可選*) – 當前進程的 rank(它應該是介於 0 和 `world_size-1` 之間的數字)。如果指定了 `store`,則為必需項。
* **store** (`Store`, *可選*) – 所有 worker 均可訪問的鍵/值存儲,用於交換連接/地址信息。與 `init_method` 互斥。
* **timeout** (`timedelta`, *可選*) – 針對進程組執行的操作的超時時間。NCCL 的默認值為 10 分鐘,其他後端的默認值為 30 分鐘。這是在此時間之後異步中止集體操作並導致進程崩潰的持續時間。這樣做是因為 CUDA 執行是異步的,並且由於失敗的異步 NCCL 操作可能導致後續 CUDA 操作在損壞的數據上運行,因此繼續執行用戶代碼不再安全。當設置 `TORCH_NCCL_BLOCKING_WAIT` 時,進程將阻塞並等待此超時。
* **group_name** (`str`, *可選*, *已棄用*) – 組名。此參數被忽略

pg_options (ProcessGroupOptions, 可選) – 進程組選項,指定在構建特定進程組期間需要傳入的其他選項。截至目前,我們支持的唯一選項是用於 nccl 後端的 ProcessGroupNCCL.Options,可以指定 is_high_priority_stream,以便當有計算內核等待時,nccl 後端可以使用高優先級的 CUDA 流。有關配置 nccl 的其他可用選項,請參閱 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t device_id (torch.device | int, 可選) – 此進程將使用的單個特定設備,允許進行後端特定的優化。目前這有兩個影響,僅在 NCCL 下有效:通信器會立即形成(立即調用 ncclCommInit* 而不是正常的延遲調用),並且子組將在可能時使用 ncclCommSplit 以避免不必要的組創建開銷。如果您想盡早了解 NCCL 初始化錯誤,也可以使用此字段。如果提供的是 int,API 將假設使用編譯時的加速器類型。注意 要啟用 backend == Backend.MPI,需要在支持 MPI 的系統上從源代碼構建 PyTorch。注意 對多個後端的支持是實驗性的。目前,當未指定後端時,將同時創建 gloo 和 nccl 後端。gloo 後端將用於 CPU 張量的集合操作,而 nccl 後端將用於 CUDA 張量的集合操作。可以通過傳入格式為 “<device_type>:<backend_name>,<device_type>:<backend_name>” 的字符串來指定自定義後端,例如 “cpu:gloo,cuda:custom_backend”。 torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None, backend_override=None)[source]# 基於 device_type、mesh_shape 和 mesh_dim_names 參數初始化 DeviceMesh。這將創建一個具有 n 維數組佈局的 DeviceMesh,其中 n 是 mesh_shape 的長度。如果提供了 mesh_dim_names,則每個維度標記為 mesh_dim_names[i]。注意 init_device_mesh 遵循 SPMD 編程模型,意味著相同的 PyTorch Python 程序在集群中的所有進程/秩上運行。確保所有秩上的 mesh_shape(描述設備佈局的 nD 數組的維度)一致。不一致的 mesh_shape 可能導致掛起。注意 如果未找到進程組,init_device_mesh 將在後臺初始化分佈式通信所需的分佈式進程組。參數 device_type (str) – 網格的設備類型。目前支持:“cpu”、“cuda/cuda-like”、“xpu”。不允許傳入帶有 GPU 索引的設備類型,例如 “cuda:0”。 mesh_shape (Tuple[int]) – 定義描述設備佈局的多維數組維度的元組。 mesh_dim_names (Tuple[str], 可選) – 分配給描述設備佈局的多維數組每個維度的網格維度名稱元組。其長度必須與 mesh_shape 的長度匹配。mesh_dim_names 中的每個字符串必須是唯一的。 backend_override (Dict[int | str, tuple[str, Options] | str | Options], 可選) – 對將為每個網格維度創建的某些或所有 ProcessGroups 的覆蓋。每個鍵可以是維度的索引或其名稱(如果提供了 mesh_dim_names)。每個值可以是包含後端名稱及其選項的元組,或者只是這兩個組件之一(在這種情況下,另一個將設置為其默認值)。返回 表示設備佈局的 DeviceMesh 對象。返回類型 DeviceMesh 示例: >>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp")) torch.distributed.is_initialized()[source]# 檢查默認進程組是否已初始化。返回類型 bool torch.distributed.is_mpi_available()[source]# 檢查 MPI 後端是否可用。返回類型 bool torch.distributed.is_nccl_available()[source]# 檢查 NCCL 後端是否可用。返回類型 bool torch.distributed.is_gloo_available()[source]# 檢查 Gloo 後端是否可用。返回類型 bool torch.distributed.distributed_c10d.is_xccl_available()[source]# 檢查 XCCL 後端是否可用。返回類型 bool torch.distributed.is_torchelastic_launched()[source]# 檢查此進程是否使用 torch.distributed.elastic(即 torchelastic)啟動。TORCHELASTIC_RUN_ID 環境變量的存在用作確定當前進程是否使用 torchelastic 啟動的代理。這是一個合理的代理,因為 TORCHELASTIC_RUN_ID 映射到 rendezvous id,這始終是一個非空值,表示用於對等發現目的的作業 id。返回類型 bool torch.distributed.get_default_backend_for_device(device)[source]# 返回給定設備的默認後端。參數 device (Union[str, torch.device]) – 要獲取默認後端的設備。返回 給定設備的默認後端,以小寫字符串形式返回。返回類型 str 目前有三種初始化方法

支持:TCP 初始化#
有兩種使用 TCP 進行初始化的方法,兩者都需要一個所有進程均可訪問的網絡地址以及所需的 `world_size`。第一種方法需要指定屬於 rank 0 進程的地址。此初始化方法要求所有進程都手動指定了 ranks。請注意,最新版本的分佈式包中不再支持多播地址。`group_name` 也已棄用。

```python
import torch.distributed as dist

# 使用其中一臺機器的地址 \{#skill-metadata}
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)

共享文件系統初始化# 另一種初始化方法利用組內所有機器共享且可見的文件系統,以及所需的 world_size。URL 應以 file:// 開頭,幷包含共享文件系統上(現有目錄中)一個不存在文件的路徑。文件系統初始化會在文件不存在時自動創建該文件,但不會刪除該文件。因此,你有責任確保在下一次對相同文件路徑/名稱調用 init_process_group() 之前清理該文件。請注意,最新版本的分佈式包中不再支持自動 rank 分配,group_name 也已棄用。

警告 此方法假設文件系統支持使用 fcntl 進行鎖定——大多數本地系統和 NFS 都支持它。

警告 此方法始終會創建文件,並盡力在程序結束時清理和刪除該文件。換句話說,每次使用文件初始化方法進行初始化都需要一個全新的空文件才能成功。如果再次使用上一次初始化使用的同一文件(碰巧未被清理),這將導致意外行為,並常常引起死鎖和失敗。因此,儘管此方法會盡力清理文件,但如果自動刪除不成功,你有責任確保在訓練結束時刪除該文件,以防止下次重複使用同一文件。如果你計劃對同一文件名多次調用 init_process_group(),這一點尤其重要。換言之,如果文件未被刪除/清理,並且你再次對該文件調用 init_process_group(),預計會發生失敗。這裡的經驗法則是,確保每次調用 init_process_group() 時文件都不存在或為空。

import torch.distributed as dist

# 應始終指定 rank \{#reference-full-skillmd}
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank)

環境變量初始化# 此方法將從環境變量中讀取配置,允許完全自定義信息的獲取方式。需要設置的變量如下:

  • MASTER_PORT - 必需;必須是 rank 0 機器上的空閒端口
  • MASTER_ADDR - 必需(rank 0 除外);rank 0 節點的地址
  • WORLD_SIZE - 必需;可以在此處設置,也可以在調用 init 函數時設置
  • RANK - 必需;可以在此處設置,也可以在調用 init 函數時設置

Rank 為 0 的機器將用於建立所有連接。這是默認方法,意味著無需指定 init_method(或者可以指定為 env://)。

優化初始化時間#

  • TORCH_GLOO_LAZY_INIT - 按需建立連接,而不是使用全網格連接,這可以大大改善非 all2all 操作的初始化時間。

初始化後# 一旦運行了 torch.distributed.init_process_group(),就可以使用以下函數。要檢查進程組是否已初始化,請使用 torch.distributed.is_initialized()

class torch.distributed.Backend(name)[source]# 一個類似枚舉的後端類。可用後端包括:GLOO、NCCL、UCC、MPI、XCCL 以及其他註冊的後端。此類的值為小寫字符串,例如 "gloo"。它們可以作為屬性訪問,例如 Backend.NCCL。可以直接調用此類來解析字符串,例如 Backend(backend_str) 將檢查 backend_str 是否有效,如果有效則返回解析後的小寫字符串。它還接受大寫字符串,例如 Backend("GLOO") 返回 "gloo"

注意 Backend.UNDEFINED 條目存在,但僅用作某些字段的初始值。用戶既不應直接使用它,也不應假設其存在。

classmethod register_backend(name, func, extended_api=False, devices=None)[source]# 使用給定的名稱和實例化函數註冊一個新後端。此類方法由第三方 ProcessGroup 擴展用於註冊新後端。

參數

  • name (str) – ProcessGroup 擴展的後端名稱。它應與 init_process_group() 中的名稱匹配。
  • func (function) – 實例化後端的函數處理程序。該函數應在後端擴展中實現,並接受四個參數,包括 storerankworld_sizetimeout
  • extended_api (bool, optional) – 後端是否支持擴展參數結構。默認值:False。如果設置為 True,後端將獲得一個 c10d::DistributedBackendOptions 實例和一個進程組

由後端實現定義的選項對象。

device(str 或 list of str,可選)– 此後端支持的設備類型,例如 “cpu”、“cuda” 等。如果為 None,則假定同時支持 “cpu” 和 “cuda”。

注意 對第三方後端的支持是實驗性的,可能會發生變化。

torch.distributed.get_backend(group=None)[源代碼]#

返回給定進程組的後端。

參數 group(ProcessGroup,可選)– 要操作的進程組。默認為通用的主進程組。如果指定了其他特定組,則調用進程必須是該組的一部分。

返回 給定進程組的後端,以小寫字符串形式表示。

返回類型 Backend

torch.distributed.get_rank(group=None)[源代碼]#

返回當前進程在所提供的組中的秩(rank),否則返回默認值。秩是分配給分佈式進程組中每個進程的唯一標識符。它們始終是從 0 到 world_size 的連續整數。

參數 group(ProcessGroup,可選)– 要操作的進程組。如果為 None,將使用默認進程組。

返回 進程組的秩;如果不屬於該組,則返回 -1

返回類型 int

torch.distributed.get_world_size(group=None)[源代碼]#

返回當前進程組中的進程數量。

參數 group(ProcessGroup,可選)– 要操作的進程組。如果為 None,將使用默認進程組。

返回 進程組的世界大小(world size);如果不屬於該組,則返回 -1

返回類型 int

關閉(Shutdown)

通過在退出時調用 destroy_process_group() 來清理資源非常重要。遵循的最簡單模式是在訓練腳本中不再需要通信的位置(通常在 main() 的末尾附近),通過將 group 參數設置為默認值 None 來調用 destroy_process_group(),從而銷燬每個進程組和後端。該調用應在每個 trainer 進程中執行一次,而不是在外層進程啟動器級別執行。

如果進程組(pg)中的所有秩未在超時持續時間內調用 destroy_process_group(),尤其是在應用程序中存在多個進程組時(例如用於 N-D 並行),可能會導致退出時掛起。這是因為 ProcessGroupNCCL 的析構函數會調用 ncclCommAbort,而該函數必須集體調用,但如果由 Python 的垃圾回收機制(GC)調用 ProcessGroupNCCL 的析構函數,其調用順序是不確定的。調用 destroy_process_group() 有助於確保在所有秩上以一致的順序調用 ncclCommAbort,並避免在 ProcessGroupNCCL 的析構過程中調用 ncclCommAbort

重新初始化(Reinitialization)

destroy_process_group 也可用於銷燬單獨的進程組。一種用例可能是容錯訓練,其中進程組可能在運行時被銷燬,然後初始化一個新的進程組。在這種情況下,關鍵是在調用 destroy 之後以及隨後初始化之前,使用除 torch.distributed 原語以外的某種方式同步 trainer 進程。由於難以實現這種同步,目前不支持/未測試此行為,並將其視為已知問題。如果此用例阻礙了您的工作,請提交 github issue 或 RFC。

組(Groups)

默認情況下,集合通信操作在默認組(也稱為 world)上進行,並要求所有進程進入分佈式函數調用。然而,某些工作負載可以從更細粒度的通信中受益。這就是分佈式組發揮作用的地方。new_group() 函數可用於創建新組,包含所有進程的任意子集。它返回一個不透明的組句柄,可以作為 group 參數提供給所有集合通信操作(集合通信操作是以某些眾所周知的編程模式交換信息的分佈式函數)。

torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[源代碼]#

創建一個新的分佈式組。

此函數要求主組中的所有進程(即所有屬於分佈式作業的進程)都進入此函數,即使它們不會成為該組的成員。此外,應在所有進程中以相同的順序創建組。

警告 安全併發使用:當使用帶有 NCCL 後端的多個進程組時,用戶必須確保跨秩的集合通信執行順序全局一致。如果進程內的多個線程發出集合通信操作,則必須進行顯式同步以確保順序一致。

當使用 torch.distributed 通信 API 的異步變體時,會返回一個 work 對象,並且通信內核被入隊到單獨的 CUDA 流上,從而允許通信和計算重疊。一旦在一個進程組上發出了一個或多個異步操作,在使用另一個進程組之前,必須通過調用 work.wait() 與其他 cuda 流進行同步。有關更多詳細信息,請參閱 同時使用多個 NCCL 通信器

參數 ranks(list[int])– 組成員的秩列表。如果為 None,將設置為所有秩。

默認值為 None。timeout (timedelta, 可選) – 詳見 init_process_group 以瞭解詳細信息和默認值。backend (str 或 Backend, 可選) – 要使用的後端。根據構建時的配置,有效值為 gloo 和 nccl。默認情況下使用與全局組相同的後端。此字段應作為小寫字符串提供(例如,"gloo"),也可以通過 Backend 屬性訪問(例如,Backend.GLOO)。如果傳入 None,將使用對應於默認進程組的後端。默認值為 None。pg_options (ProcessGroupOptions, 可選) – 進程組選項,指定在構造特定進程組期間需要傳入哪些附加選項。例如,對於 nccl 後端,可以指定 is_high_priority_stream,以便進程組可以使用高優先級的 CUDA 流。有關配置 nccl 的其他可用選項,請參閱 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tuse_local_synchronization (bool, 可選):在進程組創建結束時執行組內局部屏障。不同之處在於,非成員 rank 無需調用 API,也不加入該屏障。group_desc (str, 可選) – 用於描述進程組的字符串。device_id (torch.device, 可選) – 要將此進程“綁定”到的單個特定設備。如果提供了此字段,new_group 調用將嘗試立即為該設備初始化通信後端。返回 一個分佈式組的句柄,可傳遞給集合調用;如果當前 rank 不屬於 ranks,則返回 GroupMember.NON_GROUP_MEMBER。注意:use_local_synchronization 不適用於 MPI。注意:雖然 use_local_synchronization=True 在大型集群和小型進程組中可以顯著加快運行速度,但必須小心,因為它會改變集群行為,因為非成員 rank 不會加入組屏障()。注意:當每個 rank 創建多個重疊的進程組時,use_local_synchronization=True 可能導致死鎖。為避免這種情況,請確保所有 rank 遵循相同的全局創建順序。

torch.distributed.get_group_rank(group, global_rank)[source]# 將全局 rank 轉換為組內 rank。global_rank 必須是 group 的一部分,否則會引發 RuntimeError。參數 group (ProcessGroup) – 用於查找相對 rank 的 ProcessGroup。global_rank (int) – 要查詢的全局 rank。返回 global_rank 相對於 group 的組內 rank 返回類型 int 注意:在默認進程組上調用此函數將返回恆等映射。

torch.distributed.get_global_rank(group, group_rank)[source]# 將組內 rank 轉換為全局 rank。group_rank 必須是 group 的一部分,否則會引發 RuntimeError。參數 group (ProcessGroup) – 用於從中查找全局 rank 的 ProcessGroup。group_rank (int) – 要查詢的組內 rank。返回 group_rank 相對於 group 的全局 rank 返回類型 int 注意:在默認進程組上調用此函數將返回恆等映射。

torch.distributed.get_process_group_ranks(group)[source]# 獲取與 group 關聯的所有 rank。參數 group (Optional[ProcessGroup]) – 用於獲取所有 rank 的 ProcessGroup。如果為 None,將使用默認進程組。返回 按組內 rank 排序的全局 rank 列表 返回類型 list[int]

DeviceMesh# DeviceMesh 是一種更高級別的抽象,用於管理進程組(或 NCCL 通信器)。它允許用戶輕鬆創建節點間和節點內進程組,而無需擔心如何為不同的子進程組正確設置 rank,並有助於輕鬆管理這些分佈式進程組。可以使用 init_device_mesh() 函數創建新的 DeviceMesh,其中網格形狀描述了設備拓撲。

class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, backend_override=None, _init_backend=True)[source]# DeviceMesh 表示一個設備網格,其中設備的佈局可以表示為一個 n 維數組,n 維數組中的每個值是默認進程組 rank 的全局 ID。DeviceMesh 可用於設置跨集群的 N 維設備連接,並管理用於 N 維並行化的 ProcessGroups。通信可以在 DeviceMesh 的每個維度上分別進行。DeviceMesh 尊重用戶已選擇的設備(即,如果在 DeviceMesh 初始化之前用戶調用了 torch.cuda.set_device),並且如果用戶事先未設置設備,它將為當前進程選擇/設置設備。請注意,手動設備選擇應在 DeviceMesh 初始化之前進行。當與 DTensor API 一起使用時,DeviceMesh 也可用作上下文管理器。注意:DeviceMesh 遵循 SPMD 編程模型,這意味著相同的 PyTorch Python 程序在集群中的所有進程/rank 上運行。因此,用戶需要確保描述設備佈局的網格數組在所有 rank 上保持一致。不一致的網格將導致靜默掛起。參數 device_type (str) – 網格的設備類型。目前支持:“cpu”、“cuda/cuda-like”。mesh (ndarray) – 描述設備佈局的多維數組或整數張量,其中的 ID

是默認進程組的全局 ID。返回表示設備佈局的 DeviceMesh 對象。返回類型 DeviceMesh 以下程序以 SPMD 方式在每個進程/秩上運行。在此示例中,我們有 2 臺主機,每臺主機有 4 個 GPU。對 mesh 的第一維進行歸約將在列 (0, 4)、.. 和 (3, 7) 之間進行歸約,對 mesh 的第二維進行歸約將在行 (0, 1, 2, 3) 和 (4, 5, 6, 7) 之間進行歸約。示例: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # 將設備網格初始化為 (2, 4),以表示 >>> # 跨主機(第 0 維)和主機內(第 1 維)的拓撲結構。 >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source]# 從現有的 ProcessGroup 或現有 ProcessGroup 列表構造具有指定 device_type 的 DeviceMesh。構造的設備網格的維度數等於傳入的組數。例如,如果傳入單個進程組,則生成的 DeviceMesh 是一維網格。如果傳入包含 2 個進程組的列表,則生成的 DeviceMesh 是二維網格。如果傳入多個組,則必須提供 mesh 和 mesh_dim_names 參數。傳入的進程組的順序決定了網格的拓撲結構。例如,第一個進程組將是 DeviceMesh 的第 0 維。傳入的 mesh 張量的維度數必須與傳入的進程組數量相同,並且 mesh 張量中維度的順序必須與傳入的進程組中的順序匹配。參數 group (ProcessGroup 或 list[ProcessGroup]) – 現有的 ProcessGroup 或現有 ProcessGroups 列表。 device_type (str) – 網格的設備類型。目前支持:“cpu”、“cuda/cuda-like”。不允許傳入帶有 GPU 索引的設備類型,例如“cuda:0”。 mesh (torch.Tensor 或 ArrayLike, 可選) – 描述設備佈局的多維數組或整數張量,其中 ID 是默認進程組的全局 ID。默認為 None。 mesh_dim_names (tuple[str], 可選) – 要分配給描述設備佈局的多維數組每個維度的網格維度名稱元組。其長度必須與 mesh_shape 的長度匹配。mesh_dim_names 中的每個字符串必須是唯一的。默認為 None。返回 表示設備佈局的 DeviceMesh 對象。返回類型 DeviceMesh get_all_groups()[source]# 返回所有網格維度的 ProcessGroups 列表。返回 ProcessGroup 對象列表。返回類型 list[torch.distributed.distributed_c10d.ProcessGroup] get_coordinate()[source]# 返回此秩相對於網格所有維度的相對索引。如果此秩不屬於網格,則返回 None。返回類型 Optional[list[int]] get_group(mesh_dim=None)[source]# 返回由 mesh_dim 指定的單個 ProcessGroup,或者,如果未指定 mesh_dim 且 DeviceMesh 是一維的,則返回網格中唯一的 ProcessGroup。參數 mesh_dim (str/python:int, 可選) – 可以是網格維度的名稱或索引 None. (網格維度的。默認為) – 返回 ProcessGroup 對象。返回類型 ProcessGroup get_local_rank(mesh_dim=None)[source]# 返回 DeviceMesh 給定 mesh_dim 的本地秩。參數 mesh_dim (str/python:int, 可選) – 可以是網格維度的名稱或索引 None. (網格維度的。默認為) – 返回 表示本地秩的整數。返回類型 int 以下程序以 SPMD 方式在每個進程/秩上運行。在此示例中,我們有 2 臺主機,每臺主機有 4 個 GPU。在秩 0、1、2、3 上調用 mesh_2d.get_local_rank(mesh_dim=0) 將返回 0。在秩 4、5、6、7 上調用 mesh_2d.get_local_rank(mesh_dim=0) 將返回 1。在秩 0、4 上調用 mesh_2d.get_local_rank(mesh_dim=1) 將返回 0。在秩 1、5 上調用 mesh_2d.get_local_rank(mesh_dim=1) 將返回 1。在秩 2、6 上調用 mesh_2d.get_local_rank(mesh_dim=1) 將返回 2。在秩 3、7 上調用 mesh_2d.get_local_rank(mesh_dim=1) 將返回 3。示例: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # 將設備網格初始化為 (2, 4),以表示 >>> # 跨主機(第 0 維)和主機內(第 1 維)的拓撲結構。 >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) get_rank()[source]# 返回當前全局秩。返回類型 int 點對點通信# torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source]# 同步發送張量。警告 NCCL 後端不支持 tag。參數 tensor (Tensor) – 要發送的張量。 dst (int) – 全局進程組上的目標秩(無論 group 參數如何)。目標秩不應與當前進程的秩相同。 group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。 tag (int, 可選) – 用於與遠程 recv 匹配的標籤 group_dst (int, 可選) – 組上的目標秩。同時指定 dst 和 group_dst 無效。 torch.distributed.recv(tensor, src=None, group=None, tag=0,

group_src=None)[source]# 同步接收張量。警告:NCCL 後端不支持 tag 參數。

參數

  • tensor (Tensor) – 用於填充接收數據的張量。
  • src (int, 可選) – 全局進程組中的源秩(不受 group 參數影響)。如果未指定,將從任何進程接收。
  • group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。
  • tag (int, 可選) – 用於與遠程發送匹配的標籤。
  • group_src (int, 可選) – 組內的目標秩。同時指定 srcgroup_src 是無效的。

返回

發送者秩。如果不屬於該組,則返回 -1。

返回類型

int

isend()irecv() 在使用時返回分佈式請求對象。通常,這些對象的類型未指定,因為它們絕不應手動創建,但它們保證支持以下兩種方法:

  • is_completed() - 如果操作已完成,則返回 True
  • wait() - 阻塞進程直到操作完成。

一旦 is_completed() 返回,它保證返回 True。

torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source]# 異步發送張量。

警告 在請求完成之前修改 tensor 會導致未定義行為。

警告 NCCL 後端不支持 tag 參數。

與阻塞式的 send 不同,isend 允許 src == dst 秩,即向自身發送。

參數

  • tensor (Tensor) – 要發送的張量。
  • dst (int) – 全局進程組中的目標秩(不受 group 參數影響)。
  • group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。
  • tag (int, 可選) – 用於與遠程接收匹配的標籤。
  • group_dst (int, 可選) – 組內的目標秩。同時指定 dstgroup_dst 是無效的。

返回

一個分佈式請求對象。如果不屬於該組,則為 None。

返回類型

Optional[Work]

torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source]# 異步接收張量。

警告 NCCL 後端不支持 tag 參數。

與阻塞式的 recv 不同,irecv 允許 src == dst 秩,即從自身接收。

參數

  • tensor (Tensor) – 用於填充接收數據的張量。
  • src (int, 可選) – 全局進程組中的源秩(不受 group 參數影響)。如果未指定,將從任何進程接收。
  • group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。
  • tag (int, 可選) – 用於與遠程發送匹配的標籤。
  • group_src (int, 可選) – 組內的目標秩。同時指定 srcgroup_src 是無效的。

返回

一個分佈式請求對象。如果不屬於該組,則為 None。

返回類型

Optional[Work]

torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None, use_batch=False)[source]# 同步發送 object_list 中的可 pickle 對象。

類似於 send(),但可以傳入 Python 對象。請注意,object_list 中的所有對象必須是可 pickle 的才能被髮送。

參數

  • object_list (List[Any]) – 要發送的輸入對象列表。每個對象必須是可 pickle 的。接收方必須提供大小相等的列表。
  • dst (int) – 發送 object_list 的目標秩。目標秩基於全局進程組(不受 group 參數影響)。
  • group (Optional[ProcessGroup]) – (ProcessGroup, 可選): 要工作的進程組。如果為 None,將使用默認進程組。默認為 None。
  • device (torch.device, 可選) – 如果不為 None,對象將被序列化並轉換為張量,在發送前移動到該設備。默認為 None。
  • group_dst (int, 可選) – 組內的目標秩。必須指定 dstgroup_dst 中的一個,但不能同時指定兩者。
  • use_batch (bool, 可選) – 如果為 True,使用批量點對點操作而不是常規發送操作。這避免了初始化 2 秩通信器,並使用現有的整個組通信器。有關用法和假設,請參閱 batch_isend_irecv。默認為 False。

返回

None。

注意 對於基於 NCCL 的進程組,對象的內部張量表示必須在通信發生前移動到 GPU 設備。在這種情況下,使用的設備由 torch.cuda.current_device() 給出,用戶有責任確保通過 torch.cuda.set_device() 設置此設備,以便每個秩擁有獨立的 GPU。

警告 對象集合操作存在許多嚴重的性能和可擴展性限制。詳情請參閱 [Object collectives](Object collectives)。

警告 send_object_list() 隱式使用 pickle 模塊,已知其不安全。有可能構造惡意的 pickle 數據,在反序列化期間執行任意代碼。僅在使用受信任的數據時調用此函數。

警告 使用 GPU 張量調用 send_object_list() 的支持不佳且效率低下,因為張量會被 pickle,從而導致 GPU -> CPU 傳輸。請考慮改用 send()

示例::

>>> # 注意:省略了每個秩上的進程組初始化。
>>> import torch.distributed as dist
>>> # 假設後端不是 NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>> # 假設 world_size 為 2。
>>> objects = ["foo", 12, {1: 2}] # 任何可 pickle 的對象
>>>

dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None, use_batch=False)[source]# 同步接收 object_list 中的可序列化對象。與 recv() 類似,但可以接收 Python 對象。參數 object_list (List[Any]) – 用於接收對象的列表。必須提供一個大小等於待發送列表大小的列表。 src (int, optional) – 從中接收 object_list 的源秩。源秩基於全局進程組(無論 group 參數如何)。如果設置為 None,將從任意秩接收。默認為 None。 group (Optional[ProcessGroup]) – (ProcessGroup, optional):要使用的進程組。如果為 None,將使用默認進程組。默認為 None。 device (torch.device, optional) – 如果不為 None,則在此設備上接收。默認為 None。 group_src (int, optional) – 組內的目標秩。同時指定 src 和 group_src 是無效的。 use_batch (bool, optional) – 如果為 True,則使用批量點對點操作而不是常規發送操作。這避免了初始化雙秩通信器,並使用現有的整個組通信器。有關用法和假設,請參閱 batch_isend_irecv。默認為 False。返回 發送者秩。如果該秩不屬於該組,則返回 -1。如果該秩屬於該組,object_list 將包含來自 src 秩的已發送對象。注意 對於基於 NCCL 的進程組,在通信發生之前,必須將對象的內部張量表示移動到 GPU 設備。在這種情況下,使用的設備由 torch.cuda.current_device() 給出,用戶有責任通過 torch.cuda.set_device() 確保每個秩都有一個獨立的 GPU。警告 對象集合操作存在許多嚴重的性能和可擴展性限制。有關詳細信息,請參閱對象集合操作。警告 recv_object_list() 隱式使用 pickle 模塊,眾所周知這是不安全的。可以構造惡意的 pickle 數據,這些數據在反序列化期間會執行任意代碼。僅在使用受信任的數據時調用此函數。警告 使用 GPU 張量調用 recv_object_list() 的支持不佳且效率低下,因為它會導致 GPU -> CPU 傳輸,因為張量會被序列化。請考慮改用 recv()。示例::>>> # 注意:省略了每個秩上的進程組初始化。 >>> import torch.distributed as dist >>> # 假設後端不是 NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # 假設 world_size 為 2。 >>> objects = ["foo", 12, {1: 2}] # 任何可序列化的對象 >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.batch_isend_irecv(p2p_op_list)[source]# 異步發送或接收一批張量,並返回請求列表。處理 p2p_op_list 中的每個操作,並返回相應的請求。目前支持 NCCL、Gloo 和 UCC 後端。參數 p2p_op_list (list[torch.distributed.distributed_c10d.P2POp]) – 點對點操作列表(每個操作的類型為 torch.distributed.P2POp)。列表中 isend/irecv 的順序很重要,並且需要與遠程端相應的 isend/irecv 匹配。返回 通過調用 op_list 中相應操作返回的分佈式請求對象列表。返回類型 list[torch.distributed.distributed_c10d.Work] 示例 >>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size) >>> recv_op = dist.P2POp( ... dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size ... ) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1 注意 請注意,當將此 API 與 NCCL PG 後端一起使用時,用戶必須使用 torch.cuda.set_device 設置當前 GPU 設備,否則會導致意外的掛起問題。此外,如果此 API 是傳遞給 dist.P2POp 的組中的第一個集合調用,則該組的所有秩都必須參與此 API 調用;否則,行為未定義。如果此 API 調用不是組中的第一個集合調用,則允許僅涉及組中部分秩的批量點對點操作。 class torch.distributed.P2POp(op, tensor, peer=None, group=None, tag=0, group_peer=None)[source]# 一個用於構建 batch_isend_irecv 點對點操作的類。此類構建 P2P 操作的類型、通信緩衝區、對等秩、進程組和標籤。此類的實例將傳遞給 batch_isend_irecv 以進行點對點通信。參數 op (Callable) – 向對等進程發送數據或從對等進程接收數據的函數。op 的類型為 torch.distributed.isend 或 torch.distributed.irecv。 tensor (Tensor) – 要發送或接收的張量。 peer (int,

optional) – 目標或源秩。group (ProcessGroup, optional) – 要使用的進程組。如果為 None,將使用默認進程組。tag (int, optional) – 用於匹配發送與接收的標籤。group_peer (int, optional) – 目標或源秩。同步和異步集體操作# 每個集體操作函數都支持以下兩種操作,具體取決於傳入集體操作的 async_op 標誌的設置:同步操作 - 默認模式,當 async_op 設置為 False 時。當函數返回時,保證集體操作已執行。對於 CUDA 操作,不保證 CUDA 操作已完成,因為 CUDA 操作是異步的。對於 CPU 集體操作,任何利用集體調用輸出的後續函數調用都將按預期行為。對於 CUDA 集體操作,在同一 CUDA 流上利用輸出的函數調用將按預期行為。用戶必須在不同流下運行的場景中注意同步。有關 CUDA 語義(如流同步)的詳細信息,請參閱 CUDA 語義。請參閱下面的腳本,以查看 CPU 和 CUDA 操作在這些語義上的差異示例。異步操作 - 當 async_op 設置為 True 時。集體操作函數返回一個分佈式請求對象。通常,您不需要手動創建它,並且保證支持以下兩個方法:is_completed() - 對於 CPU 集體操作,如果完成則返回 True。對於 CUDA 操作,如果操作已成功入隊到 CUDA 流並且可以在默認流上使用輸出而無需進一步同步,則返回 True。wait() - 對於 CPU 集體操作,將阻塞進程直到操作完成。對於 CUDA 集體操作,將阻塞當前活動的 CUDA 流直到操作完成(但不會阻塞 CPU)。get_future() - 返回 torch.C.Future 對象。支持 NCCL,也支持 GLOO 和 MPI 上的大多數操作,點對點操作除外。注意:隨著我們繼續採用 Futures 併合並 API,get_future() 調用可能會變得多餘。示例 以下代碼可以作為使用分佈式集體操作時 CUDA 操作語義的參考。它展示了在不同 CUDA 流上使用集體輸出時顯式同步的必要性: # 代碼在每個秩上運行。 dist.init_process_group("nccl", rank=rank, world_size=2) output = torch.tensor([rank]).cuda(rank) s = torch.cuda.Stream() handle = dist.all_reduce(output, async_op=True) # Wait 確保操作已入隊,但不一定已完成。 handle.wait() # 在非默認流上使用結果。 with torch.cuda.stream(s): s.wait_stream(torch.cuda.default_stream()) output.add(100) if rank == 0: # 如果省略了對 wait_stream 的顯式調用,下面的輸出將是 # 非確定性的 1 或 101,具體取決於 allreduce 是否在 add 完成後覆蓋了該值。 print(output) 集體函數# torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[source]# 將張量廣播到整個組。參與集體的所有進程中的 tensor 必須具有相同數量的元素。參數 tensor (Tensor) – 如果 src 是當前進程的秩,則為要發送的數據;否則為用於保存接收數據的張量。src (int) – 全局進程組上的源秩(無論 group 參數如何)。group (ProcessGroup, optional) – 要使用的進程組。如果為 None,將使用默認進程組。async_op (bool, optional) – 此操作是否應為異步操作 group_src (int) – 組上的源秩。必須指定 group_src 和 src 中的一個,但不能同時指定兩者。返回 如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[source]# 將 object_list 中可序列化的對象廣播到整個組。類似於 broadcast(),但可以傳入 Python 對象。請注意,object_list 中的所有對象必須是可序列化的才能被廣播。參數 object_list (List[Any]) – 要廣播的輸入對象列表。每個對象必須是可序列化的。只有 src 秩上的對象會被廣播,但每個秩必須提供大小相等的列表。src (int) – 廣播 object_list 的源秩。源秩基於全局進程組(無論 group 參數如何) group (Optional[ProcessGroup]) – (ProcessGroup, optional):要使用的進程組。如果為 None,將使用默認進程組。默認為 None。device (torch.device, optional) – 如果不為 None,對象將被序列化並轉換為張量,在廣播之前移動到該設備。默認為 None。group_src (int) – 組上的源秩。不得同時指定 group_src 和 src 中的一個以上。返回 None。如果秩屬於該組,object_list 將包含來自 src 秩的廣播對象。注意 對於基於 NCCL 的進程組,內部張量

在通信發生之前,必須將對象的表示移動到 GPU 設備。在這種情況下,使用的設備由 torch.cuda.current_device() 給出,用戶有責任通過 torch.cuda.set_device() 確保此設置正確,以便每個 rank 擁有獨立的 GPU。

備註

注意,此 API 與 broadcast() 集體操作略有不同,因為它不提供 async_op 句柄,因此是一個阻塞調用。

注意

對象集體操作存在許多嚴重的性能和可擴展性限制。詳情請參閱 Object collectives

注意

broadcast_object_list() 隱式使用 pickle 模塊,眾所周知該模塊是不安全的。可以構造惡意的 pickle 數據,在反序列化期間執行任意代碼。請僅在使用受信任的數據時調用此函數。

注意

使用 GPU 張量調用 broadcast_object_list() 的支持不佳且效率低下,因為張量會被 pickle 序列化,從而導致 GPU -> CPU 傳輸。請考慮改用 broadcast()

示例:

>>> # 注意:省略了每個 rank 上的進程組初始化。
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>> # 假設 world_size 為 3。
>>> objects = ["foo", 12, {1: 2}] # 任何可 pickle 的對象
>>> else:
>>> objects = [None, None, None]
>>> # 假設後端不是 NCCL
>>> device = torch.device("cpu")
>>> dist.broadcast_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]

torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]

以所有機器都獲得最終結果的方式歸約張量數據。調用後,所有進程中的 tensor 將在位級別上完全相同。支持複數張量。

參數

  • tensor (Tensor) – 集體操作的輸入和輸出。該函數就地操作。
  • op (可選) – torch.distributed.ReduceOp 枚舉中的一個值。指定用於逐元素歸約的操作。
  • group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。
  • async_op (bool, 可選) – 此操作是否應為異步操作

返回

如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None

示例

>>> # 下面的所有張量都是 torch.int64 類型。
>>> # 我們有 2 個進程組,2 個 ranks。
>>> device = torch.device(f"cuda:&#123;rank&#125;")
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4, 6], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # 下面的所有張量都是 torch.cfloat 類型。
>>> # 我們有 2 個進程組,2 個 ranks。
>>> tensor = torch.tensor(
... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device
... ) + 2 * rank * (1 + 1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1

torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[source]

跨所有機器歸約張量數據。只有 rank 為 dst 的進程才會接收最終結果。

參數

  • tensor (Tensor) – 集體操作的輸入和輸出。該函數就地操作。
  • dst (int) – 全局進程組上的目標 rank(無論 group 參數如何)
  • op (可選) – torch.distributed.ReduceOp 枚舉中的一個值。指定用於逐元素歸約的操作。
  • group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。
  • async_op (bool, 可選) – 此操作是否應為異步操作
  • group_dst (int) – 組內的目標 rank。必須指定 group_dstdst 中的一個,但不能同時指定兩者。

返回

如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None

torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]

從整個組中收集張量到一個列表中。支持複數和大小不均的張量。

參數

  • tensor_list (list[Tensor]) – 輸出列表。它應包含正確大小的張量,用於集體操作的輸出。支持大小不均的張量。
  • tensor (Tensor) – 要從當前進程廣播的張量。
  • group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。
  • async_op (bool, 可選) – 此操作是否應為異步操作

返回

如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None

示例

>>> # 下面的所有張量都是 torch.int64  dtype。
>>> # 我們有 2 個進程組,2 個 ranks。
>>> device = torch.device(f"cuda:&#123;rank&#125;")
>>> tensor_list = [
... torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)
... ]
>>> tensor_list
[tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0
[tensor([0, 0],

device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1 >>> # 以下所有張量均為 torch.cfloat 數據類型。 >>> # 我們有 2 個進程組,2 個秩。 >>> tensor_list = [ ... torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1 torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source]# 從所有秩收集張量並將它們放入單個輸出張量中。此函數要求每個進程上的所有張量大小相同。 參數 output_tensor (Tensor) – 輸出張量,用於容納來自所有秩的張量元素。其大小必須正確,具有以下形式之一:(i) 沿主維度連接所有輸入張量;關於“連接”的定義,請參閱 torch.cat();(ii) 沿主維度堆疊所有輸入張量;關於“堆疊”的定義,請參閱 torch.stack()。下面的示例可以更好地解釋支持的輸出形式。 input_tensor (Tensor) – 要從當前秩收集的張量。與 all_gather API 不同,此 API 中的輸入張量在所有秩上必須具有相同的大小。 group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。 async_op (bool, 可選) – 此操作是否應為異步操作 返回 如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None。 示例 >>> # 以下所有張量均為 torch.int64 數據類型且位於 CUDA 設備上。 >>> # 我們有兩個秩。 >>> device = torch.device(f"cuda:{rank}") >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # 輸出為連接形式 >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # 輸出為堆疊形式 >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1 torch.distributed.all_gather_object(object_list, obj, group=None)[source]# 將整個組中的可 pickle 對象收集到一個列表中。類似於 all_gather(),但可以傳入 Python 對象。請注意,對象必須是可 pickle 的才能被收集。 參數 object_list (list[Any]) – 輸出列表。其大小應正確設置為該集體通信的組大小,並將包含輸出。 obj (Any) – 要從當前進程廣播的可 pickle Python 對象。 group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。默認為 None。 返回 None。如果調用秩屬於此組,則集體通信的輸出將填充到輸入的 object_list 中。如果調用秩不屬於該組,則傳入的 object_list 將保持不變。 注意 請注意,此 API 與 all_gather() 集體通信略有不同,因為它不提供 async_op 句柄,因此將是阻塞調用。 注意 對於基於 NCCL 的進程組,對象的內部張量表示必須在通信發生之前移動到 GPU 設備。在這種情況下,使用的設備由 torch.cuda.current_device() 給出,用戶有責任確保通過 torch.cuda.set_device() 設置此設備,以便每個秩擁有獨立的 GPU。 警告 對象集體通信在性能和可擴展性方面存在許多嚴重限制。有關詳細信息,請參閱對象集體通信。 警告 all_gather_object() 隱式使用 pickle 模塊,已知該模塊不安全。有可能構造惡意的 pickle 數據,這些數據在反序列化期間會執行任意代碼。僅在使用可信數據時調用此函數。 警告 使用 GPU 張量調用 all_gather_object() 的支持不佳且效率低下,因為它會產生 GPU -> CPU

傳輸,因為張量會被 pickle 序列化。請考慮改用 all_gather()。示例::

>>> # 注意:每個 rank 上的進程組初始化已省略。
>>> import torch.distributed as dist
>>> # 假設 world_size 為 3。
>>> gather_objects = ["foo", 12, {1: 2}] # 任何可 picklable 的對象
>>> output = [None for _ in gather_objects]
>>> dist.all_gather_object(output, gather_objects[dist.get_rank()])
>>> output
['foo', 12, {1: 2}]

torch.distributed.gather(tensor, gather_list=None, dst=None, group=None, async_op=False, group_dst=None)[source]#

將張量列表收集到單個進程中。此函數要求每個進程上的所有張量大小相同。

參數

  • tensor (Tensor) – 輸入張量。
  • gather_list (list[Tensor], 可選) – 用於存儲收集數據的適當且大小相同的張量列表(默認為 None,必須在目標 rank 上指定)
  • dst (int, 可選) – 全局進程組中的目標 rank(無論 group 參數如何)。(如果 dstgroup_dst 均為 None,則默認為全局 rank 0)
  • group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。
  • async_op (bool, 可選) – 此操作是否應為異步操作
  • group_dst (int, 可選) – 組內的目標 rank。同時指定 dstgroup_dst 是無效的

返回

如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None。

注意

請注意,gather_list 中的所有張量必須具有相同的大小。

示例::

>>> # 我們有 2 個進程組,2 個 ranks。
>>> tensor_size = 2
>>> device = torch.device(f'cuda:&#123;rank&#125;')
>>> tensor = torch.ones(tensor_size, device=device) + rank
>>> if dist.get_rank() == 0:
>>> gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)]
>>> else:
>>> gather_list = None
>>> dist.gather(tensor, gather_list, dst=0)
>>> # Rank 0 獲取收集的數據。
>>> gather_list
[tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0
None # Rank 1

torch.distributed.gather_object(obj, object_gather_list=None, dst=None, group=None, group_dst=None)[source]#

在單個進程中從整個組收集可 picklable 的對象。類似於 gather(),但可以傳入 Python 對象。請注意,對象必須是可 picklable 的才能被收集。

參數

  • obj (Any) – 輸入對象。必須是可 picklable 的。
  • object_gather_list (list[Any]) – 輸出列表。在 dst rank 上,它的大小應正確設置為該集體操作的組大小,並將包含輸出。在非 dst ranks 上必須為 None。(默認為 None)
  • dst (int, 可選) – 全局進程組中的目標 rank(無論 group 參數如何)。(如果 dstgroup_dst 均為 None,則默認為全局 rank 0)
  • group (Optional[ProcessGroup]) – (ProcessGroup, 可選): 要工作的進程組。如果為 None,將使用默認進程組。默認為 None。
  • group_dst (int, 可選) – 組內的目標 rank。同時指定 dstgroup_dst 是無效的

返回

None。在 dst rank 上,object_gather_list 將包含集體操作的輸出。

注意

請注意,此 API 與 gather 集體操作略有不同,因為它不提供 async_op 句柄,因此將是阻塞調用。

注意

對於基於 NCCL 的進程組,對象的內部張量表示必須在通信發生之前移動到 GPU 設備。在這種情況下,使用的設備由 torch.cuda.current_device() 給出,用戶有責任確保通過 torch.cuda.set_device() 設置此設備,以便每個 rank 擁有獨立的 GPU。

警告

對象集體操作存在許多嚴重的性能和可擴展性限制。詳見 Object collectives

警告

gather_object() 隱式使用 pickle 模塊,已知其不安全。可以構造惡意的 pickle 數據,在反序列化期間執行任意代碼。僅在使用可信數據時調用此函數。

警告

使用 GPU 張量調用 gather_object() 的支持不佳且效率低下,因為張量會被 pickle 序列化,從而導致 GPU -> CPU 傳輸。請考慮改用 gather()

示例::

>>> # 注意:每個 rank 上的進程組初始化已省略。
>>> import torch.distributed as dist
>>> # 假設 world_size 為 3。
>>> gather_objects = ["foo", 12, {1: 2}] # 任何可 picklable 的對象
>>> output = [None for _ in gather_objects]
>>> dist.gather_object(
... gather_objects[dist.get_rank()],
... output if dist.get_rank() == 0 else None,
... dst=0
... )
>>> # 在 rank 0 上
>>> output
['foo', 12, {1: 2}]

torch.distributed.scatter(tensor, scatter_list=None, src=None, group=None, async_op=False, group_src=None)[source]#

將張量列表分散到組中的所有進程。每個進程將恰好接收一個張量,並將其數據存儲在 tensor 參數中。支持複數張量。

參數

  • tensor (Tensor) – 輸出張量。
  • scatter_list (list[Tensor]) – 要分散的張量列表(默認為 None,必須在源 rank 上指定)
  • src (int) – 全局進程組中的源 rank(無論 group 參數如何)。(如果 srcgroup_src 均為 None,則默認為全局 rank 0)
  • group (ProcessGroup, 可選) – The

要工作的進程組。如果為 None,將使用默認進程組。 async_op (bool, 可選) – 此操作是否應為異步操作 group_src (int, 可選) – 組內的源秩。同時指定 src 和 group_src 是無效的

返回

如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None。

注意

請注意,scatter_list 中的所有 Tensor 必須具有相同的大小。

示例::

>>> # 注意:省略了每個秩上的進程組初始化。
>>> import torch.distributed as dist
>>> tensor_size = 2
>>> device = torch.device(f'cuda:&#123;rank&#125;')
>>> output_tensor = torch.zeros(tensor_size, device=device)
>>> if dist.get_rank() == 0:
>>> # 假設 world_size 為 2。
>>> # 僅張量,且所有張量必須大小相同。
>>> t_ones = torch.ones(tensor_size, device=device)
>>> t_fives = torch.ones(tensor_size, device=device) * 5
>>> scatter_list = [t_ones, t_fives]
>>> else:
>>> scatter_list = None
>>> dist.scatter(output_tensor, scatter_list, src=0)
>>> # 秩 i 獲取 scatter_list[i]。
>>> output_tensor
tensor([1., 1.], device='cuda:0') # 秩 0
tensor([5., 5.], device='cuda:1') # 秩 1

torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list=None, src=None, group=None, group_src=None)[源代碼]#

scatter_object_input_list 中可序列化的對象散射(scatter)到整個組。與 scatter() 類似,但可以傳入 Python 對象。在每個秩上,散射後的對象將存儲為 scatter_object_output_list 的第一個元素。請注意,scatter_object_input_list 中的所有對象必須是可序列化的(picklable),才能進行散射。

參數

  • scatter_object_output_list (List[Any]) – 非空列表,其第一個元素將存儲散射到此秩的對象。
  • scatter_object_input_list (List[Any], 可選) – 要散射的輸入對象列表。每個對象必須是可序列化的。只有源秩(src rank)上的對象會被散射,對於非源秩,該參數可以為 None。
  • src (int) – 從中散射 scatter_object_input_list 的源秩。源秩基於全局進程組(無論 group 參數如何)。(如果 srcgroup_src 均為 None,則默認為全局秩 0)
  • group (Optional[ProcessGroup]) – (ProcessGroup, 可選):要工作的進程組。如果為 None,將使用默認進程組。默認為 None。
  • group_src (int, 可選) – 組內的源秩。同時指定 srcgroup_src 是無效的

返回

None。如果秩屬於該組,scatter_object_output_list 的第一個元素將被設置為此秩散射得到的對象。

注意

請注意,此 API 與 scatter 集合通信略有不同,因為它不提供 async_op 句柄,因此將是阻塞調用。

警告

對象集合通信在性能和可擴展性方面存在許多嚴重限制。詳見 [對象集合通信](Object collectives)。

警告

scatter_object_list() 隱式使用 pickle 模塊,眾所周知這是不安全的。有可能構造惡意的 pickle 數據,在反序列化期間執行任意代碼。請僅在使用可信數據時調用此函數。

警告

使用 GPU 張量調用 scatter_object_list() 的支持不佳且效率低下,因為張量會被序列化,從而導致 GPU -> CPU 傳輸。請考慮改用 scatter()

示例::

>>> # 注意:省略了每個秩上的進程組初始化。
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>> # 假設 world_size 為 3。
>>> objects = ["foo", 12, {1: 2}] # 任何可序列化的對象
>>> else:
>>> # 在非源秩上可以是任何列表,元素不會被使用。
>>> objects = [None, None, None]
>>> output_list = [None]
>>> dist.scatter_object_list(output_list, objects, src=0)
>>> # 秩 i 獲取 objects[i]。例如,在秩 2 上:
>>> output_list
[{1: 2}]

torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代碼]#

歸約(reduce),然後將張量列表散射(scatter)到組中的所有進程。

參數

  • output (Tensor) – 輸出張量。
  • input_list (list[Tensor]) – 要歸約和散射的張量列表。
  • op (可選) – torch.distributed.ReduceOp 枚舉中的值之一。指定用於逐元素歸約的操作。
  • group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。
  • async_op (bool, 可選) – 此操作是否應為異步操作。

返回

如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None。

torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代碼]#

歸約(reduce),然後將張量散射(scatter)到組中的所有秩。

參數

  • output (Tensor) – 輸出張量。它在所有秩上應具有相同的大小。
  • input (Tensor) – 要歸約和散射的輸入張量。其大小應為輸出張量大小乘以 world size。輸入張量可以具有以下形狀之一:(i) 沿主維度連接輸出張量,或 (ii) 沿主維度堆疊輸出張量。關於“連接”的定義,參見 torch.cat()。關於“堆疊”的定義,參見 torch.stack()
  • group (ProcessGroup,

optional) – 要操作進程組。如果為 None,將使用默認進程組。 async_op (bool, optional) – 此操作是否應為異步操作。 返回 如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None。 示例 >>> # 以下所有張量均為 torch.int64 類型且位於 CUDA 設備上。 >>> # 我們有兩個 rank。 >>> device = torch.device(f"cuda:{rank}") >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) >>> # 輸入為拼接形式 >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) >>> tensor_in tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # 輸入為堆疊形式 >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) >>> tensor_in tensor([[0, 1], [2, 3]], device='cuda:0') # Rank 0 tensor([[0, 1], [2, 3]], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source]# 分割輸入張量,然後將分割後的列表散播(scatter)到組中的所有進程。隨後,將來自組中所有進程的接收張量拼接起來,並作為單個輸出張量返回。支持複數張量。 參數 output (Tensor) – 收集並拼接後的輸出張量。 input (Tensor) – 要散播的輸入張量。 output_split_sizes – (list[Int], optional):第 0 維的輸出分割大小。如果指定為 None 或為空,則輸出張量的第 0 維必須能被 world_size 整除。 input_split_sizes – (list[Int], optional):第 0 維的輸入分割大小。如果指定為 None 或為空,則輸入張量的第 0 維必須能被 world_size 整除。 group (ProcessGroup, optional) – 要操作的進程組。如果為 None,將使用默認進程組。 async_op (bool, optional) – 此操作是否應為異步操作。 返回 如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None。 警告 all_to_all_single 是實驗性的,可能會發生變化。 示例 >>> input = torch.arange(4) + rank * 4 >>> input tensor([0, 1, 2, 3]) # Rank 0 tensor([4, 5, 6, 7]) # Rank 1 tensor([8, 9, 10, 11]) # Rank 2 tensor([12, 13, 14, 15]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([0, 4, 8, 12]) # Rank 0 tensor([1, 5, 9, 13]) # Rank 1 tensor([2, 6, 10, 14]) # Rank 2 tensor([3, 7, 11, 15]) # Rank 3 >>> # 本質上,它類似於以下操作: >>> scatter_list = list(input.chunk(world_size)) >>> gather_list = list(output.chunk(world_size)) >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i) >>> # 另一個不均勻分割的示例 >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> output = ... >>> dist.all_to_all_single(output, input, output_splits, input_splits) >>> output tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0 tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1 tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2 tensor([ 5, 17, 18, 24, 36]) # Rank 3 >>> # 另一個使用 torch.cfloat 類型張量的示例。 >>> input = torch.tensor( ... [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat ... ) + 4 * rank * (1 + 1j) >>> input tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0 tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1 tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2 tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0 tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1 tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2 tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3 torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source]# 將輸入張量列表散播到組中的所有進程,並在輸出列表中返回收集的張量列表。支持複數張量。 參數 output_tensor_list (list[Tensor]) – 要收集的張量列表,每個 rank 一個。 input_tensor_list (list[Tensor]) – 要散播的張量列表,每個 rank 一個。 group (ProcessGroup, optional) – 要操作的進程組。如果為 None,將使用默認進程組。 async_op (bool, optional) – 此操作是否應為異步操作。 返回 如果 async_op 設置為 True,則返回異步工作句柄。如果不是異步操作或不屬於該組,則返回 None。 警告 all_to_all 是實驗性的,可能會發生變化。 示例 >>> input = torch.arange(4) + rank * 4 >>> input =

list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3 >>> # 本質上,它類似於以下操作: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i) >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3 >>> # 另一個使用 torch.cfloat 類型張量的示例。 >>> input = torch.tensor( ... [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat ... ) + 4 * rank * (1 + 1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3 torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source]# 同步所有進程。如果 async_op 為 False,或者在 wait() 上調用了異步工作句柄,此集體操作將阻塞進程,直到整個組進入此函數。 參數 group (ProcessGroup, 可選) – 要工作的進程組。如果為 None,將使用默認進程組。 async_op (bool, 可選) – 此操作是否應為異步操作 device_ids ([int], 可選) – 設備/GPU id 列表。僅期望一個 id。 返回 如果 async_op 設置為 True,則返回異步工作句柄。如果不是 async_op 或不屬於該組,則返回 None 注意 ProcessGroupNCCL 現在會阻塞 CPU 線程,直到 barrier 集體操作完成。 注意 ProcessGroupNCCL 將 barrier 實現為單元素張量的 all_reduce。必須選擇一個設備來分配此張量。設備選擇按以下順序檢查確定:(1) 如果 barrier 的 device_ids 參數不為 None,則使用傳入的第一個設備;(2) 如果 init_process_group 傳入的設備不為 None,則使用該設備;(3) 如果已執行過其他帶有張量輸入的集體操作,則使用與此進程組首次一起使用的設備;(4) 由全局 rank 對本地設備數量取模指示的設備索引。 torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source]# 類似於 torch.distributed.barrier 同步進程,但考慮可配置的超時。它能夠報告在提供的超時時間內未通過此 barrier 的 rank。具體來說,對於非零 rank,將阻塞直到處理來自 rank 0 的發送/接收。Rank 0 將阻塞直到處理完來自其他所有 rank 的發送/接收,並將報告未能及時響應的 rank 的故障。請注意,如果一個 rank 未到達 monitored_barrier(例如由於掛起),所有其他 rank 將在 monitored_barrier 中失敗。此集體操作將阻塞組中的所有進程/rank,直到整個組成功退出該函數,使其適用於調試和同步。但是,它可能會影響性能,應僅用於調試或需要在主機端進行完全同步點的場景。出於調試目的,可以在應用程序的集體操作之前插入此 barrier

調用以檢查是否有任何 rank 不同步。注意:此集體通信僅支持 GLOO 後端。

參數

  • group (ProcessGroup, 可選) – 要操作的處理組。如果為 None,將使用默認處理組。
  • timeout (datetime.timedelta, 可選) – monitored_barrier 的超時時間。如果為 None,將使用默認處理組超時時間。
  • wait_all_ranks (bool, 可選) – 是否收集所有失敗的 rank。默認情況下,此值為 False,rank 0 上的 monitored_barrier 會在遇到第一個失敗的 rank 時拋出異常,以便快速失敗。通過設置 wait_all_ranks=Truemonitored_barrier 將收集所有失敗的 rank,並拋出一個包含所有失敗 rank 信息的錯誤。

返回

None

示例

>>> # 注意:省略了每個 rank 上的進程組初始化。
>>> import torch.distributed as dist
>>> if dist.get_rank() != 1:
>>> dist.monitored_barrier() # 拋出異常,表明
>>> # rank 1 未調用 monitored_barrier。
>>> # wait_all_ranks=True 的示例
>>> if dist.get_rank() == 0:
>>> dist.monitored_barrier(wait_all_ranks=True) # 拋出異常
>>> # 表明 rank 1, 2, ... world_size - 1 未調用
>>> # monitored_barrier。

class torch.distributed.Work

Work 對象表示 PyTorch 分佈式包中掛起的異步操作的句柄。它由非阻塞集體通信操作返回,例如 dist.all_reduce(tensor, async_op=True)

block_current_stream(self: torch._C._distributed_c10d.Work) → None

阻塞當前活躍的 GPU 流,直到操作完成。對於基於 GPU 的集體通信,這等同於同步。對於由 CPU 發起的集體通信(如使用 Gloo),這將阻塞 CUDA 流直到操作完成。在所有情況下,此方法都會立即返回。要檢查操作是否成功,您應該異步檢查 Work 對象的結果。

boxed(self: torch._C._distributed_c10d.Work) → object

exception(self: torch._C._distributed_c10d.Work) → std::__exception_ptr::exception_ptr

get_future(self: torch._C._distributed_c10d.Work) → torch.Future

返回

一個與 Work 完成相關聯的 torch.futures.Future 對象。例如,可以通過 fut = process_group.allreduce(tensors).get_future() 獲取 future 對象。

示例

下面是一個簡單的 allreduce DDP 通信鉤子示例,它使用 get_future API 檢索與 allreduce 完成相關聯的 Future。

>>> def allreduce(process_group: dist.ProcessGroup, bucket: dist.GradBucket): -> torch.futures.Future
>>> group_to_use = process_group if process_group is not None else torch.distributed.group.WORLD
>>> tensor = bucket.buffer().div_(group_to_use.size())
>>> return torch.distributed.all_reduce(tensor, group=group_to_use, async_op=True).get_future()
>>> ddp_model.register_comm_hook(state=None, hook=allreduce)

警告

get_future API 支持 NCCL,以及部分支持 GLOO 和 MPI 後端(不支持像 send/recv 這樣的點對點操作),並將返回一個 torch.futures.Future。在上面的示例中,allreduce 工作將使用 NCCL 後端在 GPU 上執行,fut.wait() 將在將適當的 NCCL 流與 PyTorch 的當前設備流同步後返回,以確保我們可以進行異步 CUDA 執行,並且它不會等待 GPU 上的整個操作完成。請注意,CUDAFuture 不支持 TORCH_NCCL_BLOCKING_WAIT 標誌或 NCCL 的 barrier()。此外,如果通過 fut.then() 添加了回調函數,它將等待 WorkNCCL 的 NCCL 流與 ProcessGroupNCCL 的專用回調流同步,並在回調流上運行回調後內聯調用該回調。fut.then() 將返回另一個 CUDAFuture,其中包含回調的返回值和記錄回調流的 CUDAEvent。對於 CPU 工作,當工作完成且 value() 張量就緒時,fut.done() 返回 true。對於 GPU 工作,僅當操作已入隊時,fut.done() 才返回 true。對於混合 CPU-GPU 工作(例如,使用 GLOO 發送 GPU 張量),當張量到達相應節點但尚未在相應 GPU 上同步時(類似於 GPU 工作),fut.done() 返回 true。

get_future_result(self: torch._C._distributed_c10d.Work) → torch.Future

返回

一個 int 類型的 torch.futures.Future 對象,映射到 WorkResult 的枚舉類型。例如,可以通過 fut = process_group.allreduce(tensor).get_future_result() 獲取 future 對象。

示例

用戶可以使用 fut.wait() 阻塞等待工作完成,並通過 fut.value() 獲取 WorkResult。此外,用戶可以使用 fut.then(call_back_func) 註冊一個在工作完成時調用的回調函數,而無需阻塞當前線程。

警告

get_future_result API 支持 NCCL

is_completed(self: torch._C._distributed_c10d.Work) → bool

is_success(self: torch._C._distributed_c10d.Work) → bool

result(self: torch._C._distributed_c10d.Work) → list[torch.Tensor]

source_rank(self: torch._C._distributed_c10d.Work) → int

synchronize(self: torch._C._distributed_c10d.Work) → None

static unbox(arg0: object) → torch._C._distributed_c10d.Work

wait(self:

torch._C._distributed_c10d.Work, timeout: datetime.timedelta = datetime.timedelta(0)) → bool# 返回 true/false。示例:: try:work.wait(timeout) except:# 一些處理 警告 在正常情況下,用戶無需設置超時。調用 wait() 與調用 synchronize() 相同:讓當前流阻塞直到 NCCL 工作完成。但是,如果設置了 timeout,它將阻塞 CPU 線程,直到 NCCL 工作完成或超時。如果超時,將拋出異常。 class torch.distributed.ReduceOp# 一個類似枚舉的類,用於可用的歸約操作:SUM、PRODUCT、MIN、MAX、BAND、BOR、BXOR 和 PREMUL_SUM。當使用 NCCL 後端時,BAND、BOR 和 BXOR 歸約不可用。AVG 在跨秩求和之前將值除以 world size。AVG 僅適用於 NCCL 後端,且僅適用於 NCCL 2.10 或更高版本。PREMUL_SUM 在歸約之前在本地將輸入乘以給定的標量。PREMUL_SUM 僅適用於 NCCL 後端,且僅適用於 NCCL 2.11 或更高版本。用戶應使用 torch.distributed._make_nccl_premul_sum。此外,MAX、MIN 和 PRODUCT 不支持複數張量。可以通過屬性訪問此類的值,例如 ReduceOp.SUM。它們用於指定歸約集合操作的策略,例如 reduce()。此類不支持 members 屬性。 class torch.distributed.reduce_op# 已棄用的類似枚舉的歸約操作類:SUM、PRODUCT、MIN 和 MAX。建議改用 ReduceOp。 分佈式鍵值存儲# 分佈式包附帶一個分佈式鍵值存儲,可用於在組內的進程之間共享信息,以及在 torch.distributed.init_process_group() 中初始化分佈式包(通過顯式創建存儲作為指定 init_method 的替代方案)。鍵值存儲有 3 種選擇:TCPStore、FileStore 和 HashStore。 class torch.distributed.Store# 所有存儲實現的基礎類,例如 PyTorch 分佈式提供的 3 種存儲:(TCPStore、FileStore 和 HashStore)。 init(self: torch._C._distributed_c10d.Store) → None# add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: SupportsInt) → int# 對給定鍵的首次 add 調用會在存儲中創建一個與該鍵關聯的計數器,並初始化為 amount。隨後使用相同鍵調用 add 會將計數器增加指定的 amount。如果對在存儲中已通過 set() 設置的鍵調用 add(),將導致異常。 參數 key (str) – 存儲中將遞增其計數器的鍵。 amount (int) – 計數器遞增的數量。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 為例,也可以使用其他存儲類型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # 應返回 7 >>> store.get("first_key") append(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) → None# 根據提供的鍵和值將鍵值對追加到存儲中。如果存儲中不存在該鍵,則會創建它。 參數 key (str) – 要追加到存儲中的鍵。 value (str) – 要與鍵關聯並添加到存儲中的值。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.append("first_key", "po") >>> store.append("first_key", "tato") >>> # 應返回 "potato" >>> store.get("first_key") check(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) → bool# 調用此方法以檢查給定的鍵列表是否在存儲中存有值。在正常情況下,此調用會立即返回,但在某些邊緣死鎖情況下仍會受到影響,例如在 TCPStore 被銷燬後調用 check()。調用 check() 時傳入一個鍵列表,以檢查這些鍵是否存儲在存儲中。 參數 keys (list[str]) – 要查詢是否存儲在存儲中的鍵。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 為例,也可以使用其他存儲類型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> # 應返回 7 >>> store.check(["first_key"]) clone(self: torch._C._distributed_c10d.Store) → torch._C._distributed_c10d.Store# 克隆存儲並返回一個指向相同底層存儲的新對象。返回的存儲可以與原始對象併發使用。這旨在提供一種安全的方式,通過為每個線程克隆一個存儲來從多個線程使用存儲。 compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) → bytes# 根據提供的鍵將鍵值對插入存儲中,並在插入之前執行 expected_value 和 desired_value 之間的比較。僅當鍵的 expected_value 已存在於存儲中或 expected_value 為空時,才會設置 desired_value

string。參數 key (str) – 要在存儲中檢查的鍵。 expected_value (str) – 在插入之前要與鍵關聯進行檢查的值。 desired_value (str) – 要與鍵關聯並添加到存儲中的值。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # 應返回 "second_value" >>> store.get("key") delete_key(self: torch._C._distributed_c10d.Store, arg0: str) → bool# 從存儲中刪除與鍵關聯的鍵值對。如果成功刪除鍵,則返回 true;否則返回 false。 警告 delete_key API 僅由 TCPStore 和 HashStore 支持。將此 API 與 FileStore 一起使用將導致異常。 參數 key (str) – 要從存儲中刪除的鍵 返回 如果刪除了鍵,則返回 True,否則返回 False。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 為例,也可以使用 HashStore >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # 這應返回 true >>> store.delete_key("first_key") >>> # 這應返回 false >>> store.delete_key("bad_key") get(self: torch._C._distributed_c10d.Store, arg0: str) → bytes# 檢索存儲中與給定鍵關聯的值。如果存儲中不存在該鍵,函數將在拋出異常之前等待超時時間,該超時時間在初始化存儲時定義。 參數 key (str) – 函數將返回與此鍵關聯的值。 返回 如果鍵在存儲中,則返回與鍵關聯的值。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # 應返回 "first_value" >>> store.get("first_key") has_extended_api(self: torch._C._distributed_c10d.Store) → bool# 如果存儲支持擴展操作,則返回 true。 multi_get(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) → list[bytes]# 檢索 keys 中的所有值。如果 keys 中的任何鍵不在存儲中,函數將等待超時 參數 keys (List[str]) – 要從存儲中檢索的鍵。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "po") >>> store.set("second_key", "tato") >>> # 應返回 [b"po", b"tato"] >>> store.multi_get(["first_key", "second_key"]) multi_set(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str], arg1: collections.abc.Sequence[str]) → None# 根據提供的鍵和值將鍵值對列表插入到存儲中 參數 keys (List[str]) – 要插入的鍵。 values (List[str]) – 要插入的值。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.multi_set(["first_key", "second_key"], ["po", "tato"]) >>> # 應返回 b"po" >>> store.get("first_key") num_keys(self: torch._C._distributed_c10d.Store) → int# 返回存儲中設置的鍵的數量。請注意,此數字通常比通過 set() 和 add() 添加的鍵數多一,因為有一個鍵用於協調所有使用該存儲的工作進程。 警告 當與 TCPStore 一起使用時,num_keys 返回寫入底層文件的鍵的數量。如果存儲被銷燬並使用同一文件創建另一個存儲,原始鍵將被保留。 返回 存儲中存在的鍵的數量。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 為例,也可以使用其他存儲類型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # 這應返回 2 >>> store.num_keys() queue_len(self: torch._C._distributed_c10d.Store, arg0: str) → int# 返回指定隊列的長度。如果隊列不存在,則返回 0。有關更多詳細信息,請參閱 queue_push。 參數 key (str) – 要獲取長度的隊列的鍵。 queue_pop(self: torch._C._distributed_c10d.Store, key: str, block: bool = True) → bytes# 從指定隊列中彈出一個值,如果隊列為空,則等待直到超時。有關更多詳細信息,請參閱 queue_push。如果 block 為 False,且隊列為空,則將引發 dist.QueueEmptyError。 參數 key (str) – 要從中彈出值的隊列的鍵。 block (bool) – 是否阻塞等待鍵或立即返回。 queue_push(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) → None# 將一個值推送到指定隊列中。對隊列和 set/get 操作使用相同的鍵可能會導致意外行為。隊列支持 wait/check 操作。使用隊列的 wait 操作只會喚醒一個等待的工作進程,而不是全部。 參數 key

(str) – 要推送到的隊列的鍵。 value (str) – 要推送到隊列中的值。 set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) → None# 根據提供的鍵和值將鍵值對插入到存儲中。如果該鍵已存在於存儲中,它將用新提供的值覆蓋舊值。 參數 key (str) – 要添加到存儲中的鍵。 value (str) – 與要添加到存儲中的鍵關聯的值。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # 應返回 "first_value" >>> store.get("first_key") set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) → None# 設置存儲的默認超時時間。此超時時間在初始化以及 wait() 和 get() 中使用。 參數 timeout (timedelta) – 要在存儲中設置的超時時間。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 為例,也可以使用其他存儲類型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # 這將在 10 秒後拋出異常 >>> store.wait(["bad_key"]) property timeout# 獲取存儲的超時時間。 wait(*args, **kwargs)# 重載函數。 wait(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str]) -> None 等待 keys 中的每個鍵被添加到存儲中。如果在超時(在存儲初始化期間設置)之前未設置所有鍵,則 wait 將拋出異常。 參數 keys (list) – 等待直到它們在存儲中被設置的鍵列表。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 為例,也可以使用其他存儲類型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # 這將在 30 秒後拋出異常 >>> store.wait(["bad_key"]) wait(self: torch._C._distributed_c10d.Store, arg0: collections.abc.Sequence[str], arg1: datetime.timedelta) -> None 等待 keys 中的每個鍵被添加到存儲中,如果鍵未在提供的超時時間內被設置,則拋出異常。 參數 keys (list) – 等待直到它們在存儲中被設置的鍵列表。 timeout (timedelta) – 在拋出異常之前等待鍵被添加的時間。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以 TCPStore 為例,也可以使用其他存儲類型 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # 這將在 10 秒後拋出異常 >>> store.wait(["bad_key"], timedelta(seconds=10)) class torch.distributed.TCPStore# 基於 TCP 的分佈式鍵值存儲實現。服務器存儲保存數據,而客戶端存儲可以通過 TCP 連接到服務器存儲並執行操作,例如使用 set() 插入鍵值對,使用 get() 檢索鍵值對等。應該始終初始化一個服務器存儲,因為客戶端存儲將等待服務器建立連接。 參數 host_name (str) – 服務器存儲應運行的主機名或 IP 地址。 port (int) – 服務器存儲應監聽傳入請求的端口。 world_size (int, optional) – 存儲用戶的總數(客戶端數量 + 1 個服務器)。默認為 None(None 表示非固定數量的存儲用戶)。 is_master (bool, optional) – 初始化服務器存儲時為 True,客戶端存儲時為 False。默認為 False。 timeout (timedelta, optional) – 存儲在初始化期間以及用於 get() 和 wait() 等方法時使用的超時時間。默認為 timedelta(seconds=300) wait_for_workers (bool, optional) – 是否等待所有工作進程與服務器存儲連接。僅當 world_size 為固定值時適用。默認為 True。 multi_tenant (bool, optional) – 如果為 True,則當前進程中具有相同主機/端口的所有 TCPStore 實例將使用相同的底層 TCPServer。默認為 False。 master_listen_fd (int, optional) – 如果指定,底層 TCPServer 將監聽此文件描述符,該描述符必須是已綁定到端口的套接字。要綁定臨時端口,我們建議將端口設置為 0 並讀取 .port。默認為 None(意味著服務器創建一個新的套接字並嘗試將其綁定到端口)。 use_libuv (bool, optional) – 如果為 True,則使用 libuv 作為 TCPServer 後端。默認為 True。 示例::>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 在進程 1(服務器)上運行 >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # 在進程 2(客戶端)上運行 >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # 初始化後,從客戶端或服務器使用任何存儲方法 >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key") init(self: torch._C._distributed_c10d.TCPStore, host_name: str, port: SupportsInt, world_size:

SupportsInt | None = None, is_master: bool = False, timeout: datetime.timedelta = datetime.timedelta(seconds=300), wait_for_workers: bool = True, multi_tenant: bool = False, master_listen_fd: SupportsInt | None = None, use_libuv: bool = True) → None# 創建一個新的 TCPStore。 property host# 獲取 Store 監聽請求的主機名。 property libuvBackend# 如果使用的是 libuv 後端,則返回 True。 property port# 獲取 Store 監聽請求的端口號。 class torch.distributed.HashStore# 一種基於底層哈希映射的線程安全 Store 實現。此 Store 可在同一進程內使用(例如,由其他線程使用),但不能跨進程使用。示例::>>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store 可以從其他線程使用 >>> # 初始化後使用任何 Store 方法 >>> store.set("first_key", "first_value") init(self: torch._C._distributed_c10d.HashStore) → None# 創建一個新的 HashStore。 class torch.distributed.FileStore# 一種使用文件存儲底層鍵值對的 Store 實現。參數 file_name (str) – 用於存儲鍵值對的文件路徑 world_size (int, 可選) – 使用該 Store 的進程總數。默認值為 -1(負值表示 Store 用戶數量不固定)。示例::>>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # 初始化後,從客戶端或服務器使用任何 Store 方法 >>> store1.set("first_key", "first_value") >>> store2.get("first_key") init(self: torch._C._distributed_c10d.FileStore, file_name: str, world_size: SupportsInt = -1) → None# 創建一個新的 FileStore。 property path# 獲取 FileStore 用於存儲鍵值對的文件路徑。 class torch.distributed.PrefixStore# 一個包裝器,可包裝任意三種鍵值存儲(TCPStore、FileStore 和 HashStore)之一,併為插入到 Store 中的每個鍵添加前綴。參數 prefix (str) – 在插入 Store 之前添加到每個鍵的前綴字符串。 store (torch.distributed.store) – 構成底層鍵值存儲的 Store 對象。 init(self: torch._C._distributed_c10d.PrefixStore, prefix: str, store: torch._C._distributed_c10d.Store) → None# 創建一個新的 PrefixStore。 property underlying_store# 獲取 PrefixStore 所包裝的底層 Store 對象。 集體通信性能分析# 注意,你可以使用 torch.profiler(推薦,僅在 1.8.1 之後可用)或 torch.autograd.profiler 來分析此處提到的集體通信和點對點通信 API。所有開箱即用的後端(gloo、nccl、mpi)均受支持,集體通信的使用情況將在性能分析輸出/追蹤中按預期呈現。分析你的代碼與分析任何常規 torch 算子相同: import torch import torch.distributed as dist with torch.profiler(): tensor = torch.randn(20, 10) dist.all_reduce(tensor) 請參閱 profiler 文檔以全面瞭解 profiler 功能。 多 GPU 集體函數# 警告 多 GPU 函數(指每個 CPU 線程對應多個 GPU)已棄用。截至目前,PyTorch Distributed 首選的編程模型是每個線程對應一個設備,正如本文檔中的 API 所示。如果你是後端開發人員並希望支持每個線程對應多個設備,請聯繫 PyTorch Distributed 的維護者。 對象集體操作# 警告 對象集體操作存在一些嚴重的侷限性。請繼續閱讀以確定它們是否適合你的用例安全使用。對象集體操作是一組類似集體的操作,適用於任意 Python 對象,只要這些對象可以被 pickle 序列化。實現了各種集體模式(例如 broadcast、all_gather 等),但它們大致都遵循以下模式:將輸入對象轉換為 pickle(原始字節),然後將其放入字節張量中;將此字節張量的大小告知對等節點(第一次集體操作);分配適當大小的張量以執行真正的集體通信;傳輸對象數據(第二次集體操作);將原始數據轉換回 Python 對象(反 pickle)。對象集體操作有時會出現令人意外的性能或內存特性,導致運行時間過長或發生內存溢出(OOM),因此應謹慎使用。以下是一些常見問題。不對稱的 pickle/unpickle 時間 - Pickle 序列化對象的速度可能較慢,具體取決於對象的數量、類型和大小。當集體操作具有扇入特性時(例如 gather_object),接收秩必須反 pickle 的對象數量是發送秩需要 pickle 的對象數量的 N 倍,這可能導致其他秩在下一次集體操作中超時。低效的張量通信 - 張量應通過常規集體 API 發送,而非對象集體 API。雖然可以通過對象集體 API 發送張量,但它們會被序列化和反序列化(對於非 CPU 張量,還包括 CPU 同步和設備到主機的複製),並且除了調試或故障排除代碼外,在幾乎所有情況下都不建議這樣做

值得花精力重構代碼以改用非對象集合通信(non-object collectives)。

意外的張量設備 - 如果你仍希望通過對象集合通信發送張量,那麼對於 CUDA(以及可能的其他加速器)張量,還有一個特定的方面需要注意。如果你對一個當前位於 cuda:3 上的張量進行 pickle 序列化,然後對其進行 unpickle 反序列化,無論你在哪個進程中,或者該進程的“默認” CUDA 設備是哪個,你都會得到另一個位於 cuda:3 上的張量。使用常規的張量集合通信 API 時,“輸出張量”始終位於相同的本地設備上,這通常符合預期。如果這是進程首次使用 GPU,對張量進行 unpickle 操作會隱式激活 CUDA 上下文,這可能會浪費大量的 GPU 內存。通過在將張量作為輸入傳遞給對象集合通信之前將其移動到 CPU,可以避免此問題。

第三方後端

除了內置的 GLOO/MPI/NCCL 後端外,PyTorch 分佈式還通過運行時註冊機制支持第三方後端。有關如何通過 C++ 擴展開發第三方後端的參考,請參閱 [教程 - 自定義 C++ 和 CUDA 擴展](Tutorials - Custom C++ and CUDA Extensions) 和 test/cpp_extensions/cpp_c10d_extension.cpp。第三方後端的功能取決於其各自的實現。新後端派生自 c10d::ProcessGroup,並在導入時通過 torch.distributed.Backend.register_backend() 註冊後端名稱和實例化接口。當手動導入此後端並使用相應的後端名稱調用 torch.distributed.init_process_group() 時,torch.distributed 包將在新後端上運行。

警告 第三方後端的支持是實驗性的,可能會發生變化。

啟動工具

torch.distributed 包還在 torch.distributed.launch 中提供了啟動工具。此輔助工具可用於為分佈式訓練在每個節點上啟動多個進程。

模塊 torch.distributed.launchtorch.distributed.launch 是一個在每個訓練節點上生成多個分佈式訓練進程的模塊。

警告 此模塊將被棄用,推薦使用 torchrun

該工具可用於單節點分佈式訓練,其中每個節點將生成一個或多個進程。該工具既可用於 CPU 訓練,也可用於 GPU 訓練。如果該工具用於 GPU 訓練,每個分佈式進程將在單個 GPU 上運行。這可以顯著提高單節點訓練性能。它也可用於多節點分佈式訓練,通過在每個節點上生成多個進程,從而顯著提高多節點分佈式訓練性能。這對於具有直接 GPU 支持的多個 Infiniband 接口的系統尤其有益,因為所有接口都可用於聚合通信帶寬。在單節點分佈式訓練或多節點分佈式訓練這兩種情況下,此工具都將啟動每個節點指定數量的進程 (--nproc-per-node)。如果用於 GPU 訓練,此數字需要小於或等於當前系統上的 GPU 數量 (nproc_per_node),並且每個進程將在從 GPU 0 到 GPU (nproc_per_node - 1) 的單個 GPU 上運行。

如何使用此模塊:

單節點多進程分佈式訓練

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other arguments of your training script)

多節點多進程分佈式訓練:(例如兩個節點)

節點 1:(IP:192.168.1.1,且有一個空閒端口:1234)

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE --nnodes=2 --node-rank=0 --master-addr="192.168.1.1" --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other arguments of your training script)

節點 2:

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE --nnodes=2 --node-rank=1 --master-addr="192.168.1.1" --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other arguments of your training script)

要查看此模塊提供的可選參數:

python -m torch.distributed.launch --help

重要注意事項:

  1. 此工具和多進程分佈式(單節點或多節點)GPU 訓練目前僅在使用 NCCL 分佈式後端時能達到最佳性能。因此,建議 GPU 訓練使用 NCCL 後端。

  2. 在你的訓練程序中,你必須解析命令行參數:--local-rank=LOCAL_PROCESS_RANK,該參數將由本模塊提供。如果你的訓練程序使用 GPU,你應該確保你的代碼僅在 LOCAL_PROCESS_RANK 對應的 GPU 設備上運行。可以通過以下方式實現:

    解析 local_rank 參數

    >>> import argparse
    >>> parser = argparse.ArgumentParser()
    >>> parser.add_argument("--local-rank", "--local_rank", type=int)
    >>> args = parser.parse_args()

    使用以下任一方式將你的設備設置為 local rank

    >>> torch.cuda.set_device(args.local_rank) # 在你的代碼運行之前

    >>> with torch.cuda.device(args.local_rank):
    >>> # 你的運行代碼
    >>> ...

版本 2.0.0 變更: 啟動器會將 --local-rank=<rank> 參數傳遞給

您的腳本。從 PyTorch 2.0.0 開始,推薦使用帶連字符的 --local-rank,而非之前使用的帶下劃線的 --local_rank。為了向後兼容,用戶可能需要在參數解析代碼中同時處理這兩種情況。這意味著在參數解析器中同時包含 "--local-rank""--local_rank"。如果僅提供 "--local_rank",啟動器將觸發錯誤:“error: unrecognized arguments: –local-rank=<rank>”。對於僅支持 PyTorch 2.0.0+ 的訓練代碼,包含 "--local-rank" 就足夠了。

  1. 在訓練程序中,您應該在開頭調用以下函數以啟動分佈式後端。強烈建議使用 init_method='env://'。其他初始化方法(例如 tcp://)也可能有效,但 env:// 是本模塊官方支持的方法。
>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>> init_method='env://')
  1. 在訓練程序中,您可以使用常規分佈式函數,也可以使用 torch.nn.parallel.DistributedDataParallel() 模塊。如果您的訓練程序使用 GPU 進行訓練,並且希望使用 torch.nn.parallel.DistributedDataParallel() 模塊,配置方法如下:
>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>> device_ids=[args.local_rank],
>>> output_device=args.local_rank)

請確保 device_ids 參數設置為代碼將要運行的唯一 GPU 設備 ID。這通常是進程的本地 rank。換句話說,為了使用此實用程序,device_ids 需要是 [args.local_rank],而 output_device 需要是 args.local_rank

  1. 另一種通過環境變量 LOCAL_RANKlocal_rank 傳遞給子進程的方法。當您使用 --use-env=True 啟動腳本時,此行為會被啟用。您必須調整上面的子進程示例,將 args.local_rank 替換為 os.environ['LOCAL_RANK'];當指定此標誌時,啟動器不會傳遞 --local-rank
注意

警告 local_rank 不是全局唯一的:它僅在每臺機器上的每個進程中是唯一的。因此,不要用它來決定是否(例如)寫入網絡文件系統。請參閱 pytorch/pytorch#12042 以瞭解如果不正確執行此操作可能會出錯的示例。

Spawn 實用程序

Multiprocessing 包 - torch.multiprocessing 包還在 torch.multiprocessing.spawn() 中提供了 spawn 函數。此輔助函數可用於生成多個進程。它通過傳入您想要運行的函數並生成 N 個進程來運行該函數。這也可用於多進程分佈式訓練。有關如何使用它的參考,請參閱 PyTorch 示例 - ImageNet 實現。

請注意,此函數需要 Python 3.4 或更高版本。

調試 torch.distributed 應用程序

由於難以理解的掛起、崩潰或跨 rank 的不一致行為,調試分佈式應用程序可能具有挑戰性。torch.distributed 提供了一套工具,以幫助以自助方式調試訓練應用程序:

Python Breakpoint

在分佈式環境中使用 Python 調試器非常方便,但由於它不能開箱即用,許多人根本不使用它。PyTorch 提供了一個圍繞 pdb 的自定義包裝器,以簡化該過程。torch.distributed.breakpoint 使此過程變得簡單。在內部,它以兩種方式自定義 pdb 的斷點行為,否則其行為與正常 pdb 相同:

  • 僅在一個 rank(由用戶指定)上附加調試器。
  • 通過使用 torch.distributed.barrier() 確保所有其他 rank 停止,一旦被調試的 rank 發出 continue 命令,該屏障將釋放。
  • 重定向子進程的 stdin,使其連接到您的終端。

要使用它,只需在所有 rank 上發出 torch.distributed.breakpoint(rank),在每種情況下使用相同的 rank 值。

Monitored Barrier

從 v1.10 開始,torch.distributed.monitored_barrier() 作為 torch.distributed.barrier() 的替代方案存在,當崩潰時(即並非所有 rank 都在提供的超時時間內調用 torch.distributed.monitored_barrier()),它會提供有關哪個 rank 可能出現故障的幫助信息。torch.distributed.monitored_barrier() 使用類似於確認過程中的 send/recv 通信原語實現主機側屏障,允許 rank 0 報告哪些 rank 未能及時確認屏障。

例如,考慮以下函數,其中 rank 1 未能調用 torch.distributed.monitored_barrier()(在實踐中,這可能是由於應用程序錯誤或之前的集體操作掛起所致):

import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
# monitored barrier 需要 gloo 進程組來執行主機側同步。
group_gloo = dist.new_group(backend="gloo")
if rank not in [1]:
dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))

if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
mp.spawn(worker, nprocs=2, args=())

在 rank 0 上會產生以下錯誤消息,允許

用戶確定哪些 rank 可能存在故障並進一步調查: RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms Original exception: [gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594

TORCH_DISTRIBUTED_DEBUG

在設置 TORCH_CPP_LOG_LEVEL=INFO 的情況下,環境變量 TORCH_DISTRIBUTED_DEBUG 可用於觸發額外的有用日誌記錄和集體同步檢查,以確保所有 rank 正確同步。根據所需的調試級別,TORCH_DISTRIBUTED_DEBUG 可以設置為 OFF(默認)、INFODETAIL。請注意,最詳細的選項 DETAIL 可能會影響應用程序性能,因此應僅在調試問題時使用。

設置 TORCH_DISTRIBUTED_DEBUG=INFO 將在初始化使用 torch.nn.parallel.DistributedDataParallel() 訓練的模型時產生額外的調試日誌,而 TORCH_DISTRIBUTED_DEBUG=DETAIL 還將記錄選定迭代次數內的運行時性能統計信息。這些運行時統計數據包括前向傳播時間、反向傳播時間、梯度通信時間等數據。

例如,給定以下應用程序:

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

class TwoLinLayerNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Linear(10, 10, bias=False)
self.b = torch.nn.Linear(10, 1, bias=False)

def forward(self, x):
a = self.a(x)
b = self.b(x)
return (a, b)

def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
print("init model")
model = TwoLinLayerNet().cuda()
print("init ddp")
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
inp = torch.randn(10, 10).cuda()
print("train")
for _ in range(20):
output = ddp_model(inp)
loss = output[0] + output[1]
loss.sum().backward()

if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ[
"TORCH_DISTRIBUTED_DEBUG"
] = "DETAIL" # set to DETAIL for runtime logging.
mp.spawn(worker, nprocs=2, args=())

在初始化時將呈現以下日誌:

I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO

在運行時(當設置 TORCH_DISTRIBUTED_DEBUG=DETAIL 時)將呈現以下日誌:

I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0 Avg forward compute time: 40838608 Avg backward compute time: 5983335 Avg backward comm. time: 4326421 Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0 Avg forward compute time: 42850427 Avg backward compute time: 3885553 Avg backward comm. time: 2357981 Avg backward comm/comp overlap time: 2234674

此外,由於模型中存在未使用的參數,TORCH_DISTRIBUTED_DEBUG=INFO 增強了 torch.nn.parallel.DistributedDataParallel() 中的崩潰日誌記錄。目前,如果前向傳播中可能存在未使用的參數,則必須在 torch.nn.parallel.DistributedDataParallel() 初始化時傳入 find_unused_parameters=True,並且從 v1.10 開始,所有模型輸出都必須用於損失計算,因為 torch.nn.parallel.DistributedDataParallel() 不支持反向傳播中的未使用參數。這些約束對於大型模型尤其具有挑戰性,因此當發生錯誤崩潰時,torch.nn.parallel.DistributedDataParallel() 將記錄所有未使用參數的完全限定名稱。

例如,在上述應用程序中,如果我們修改損失計算方式為 loss = output[1],那麼 TwoLinLayerNet.a 在反向傳播中不會接收梯度,從而導致 DDP 失敗。在崩潰時,用戶會收到關於未使用參數的信息,這對於大型模型而言手動查找可能具有挑戰性:

RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by making sure all `forward` function outputs participate in calculating loss. If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the

在報告此問題時,請提供模塊的 forward 方法返回值的結構(例如,list、dict、iterable)。未收到 rank 0 梯度的參數:a.weight 未收到 rank 0 梯度的參數索引:0

設置 TORCH_DISTRIBUTED_DEBUG=DETAIL 將在用戶直接或間接發出的每次集體通信調用(例如 DDP allreduce)上觸發額外的一致性和同步檢查。這是通過創建一個包裝進程組來實現的,該包裝進程組封裝了由 torch.distributed.init_process_group()torch.distributed.new_group() API 返回的所有進程組。因此,這些 API 將返回一個包裝進程組,其用法與普通進程組完全相同,但在將集體通信分派到底層進程組之前會執行一致性檢查。目前,這些檢查包括 torch.distributed.monitored_barrier(),它確保所有 rank 完成其未完成的集體通信調用,並報告卡住的 rank。接下來,通過確保所有集體通信函數匹配並使用一致的張量形狀調用,來檢查集體通信本身的一致性。如果不是這種情況,當應用程序崩潰時,將包含詳細的錯誤報告,而不是掛起或提供無信息的錯誤消息。

例如,考慮以下函數,其輸入到 torch.distributed.all_reduce() 的形狀不匹配:

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
tensor = torch.randn(10 if rank == 0 else 20).cuda()
dist.all_reduce(tensor)
torch.cuda.synchronize(device=rank)

if __name__ == "__main__":
import os
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
mp.spawn(worker, nprocs=2, args=())

使用 NCCL 後端時,此類應用程序可能會導致掛起,這在非平凡場景中很難確定根本原因。如果用戶啟用 TORCH_DISTRIBUTED_DEBUG=DETAIL 並重新運行應用程序,以下錯誤消息將揭示根本原因:

work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes: 10 [ torch.LongTensor{1} ]

注意 為了在運行時對調試級別進行細粒度控制,還可以使用函數 torch.distributed.set_debug_level()torch.distributed.set_debug_level_from_env()torch.distributed.get_debug_level()。此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以與 TORCH_SHOW_CPP_STACKTRACES=1 結合使用,以便在檢測到集體通信不同步時記錄整個調用堆棧。這些集體通信不同步檢查適用於所有使用由 torch.distributed.init_process_group()torch.distributed.new_group() API 創建的進程組支持的 c10d 集體通信調用的應用程序。

日誌記錄

除了通過 torch.distributed.monitored_barrier()TORCH_DISTRIBUTED_DEBUG 提供的顯式調試支持外,torch.distributed 的底層 C++ 庫還會輸出各種級別的日誌消息。這些消息有助於瞭解分佈式訓練任務的執行狀態,並排查諸如網絡連接失敗等問題。

下表顯示瞭如何通過組合 TORCH_CPP_LOG_LEVELTORCH_DISTRIBUTED_DEBUG 環境變量來調整日誌級別。

TORCH_CPP_LOG_LEVELTORCH_DISTRIBUTED_DEBUG有效日誌級別
ERRORignoredError
WARNINGignoredWarning
INFOignoredInfo
INFOINFODebug
INFODETAILTrace (即 All)

分佈式組件會引發自定義的異常類型,這些類型派生自 RuntimeError

  • torch.distributed.DistError:這是所有分佈式異常的基類型。
  • torch.distributed.DistBackendError:當發生特定於後端的錯誤時拋出此異常。例如,如果使用 NCCL 後端,且用戶嘗試使用 NCCL 庫不可用的 GPU。
  • torch.distributed.DistNetworkError:當網絡庫遇到錯誤時拋出此異常(例如:Connection reset by peer)。
  • torch.distributed.DistStoreError:當 Store 遇到錯誤時拋出此異常(例如:TCPStore timeout)。

class torch.distributed.DistError

當分佈式庫中發生錯誤時引發的異常

class torch.distributed.DistBackendError

當分佈式中發生後端錯誤時引發的異常

class torch.distributed.DistNetworkError

當分佈式中發生網絡錯誤時引發的異常

class torch.distributed.DistStoreError

當分佈式 store 中發生錯誤時引發的異常

如果您正在運行單節點訓練,交互式地在腳本中設置斷點可能會很方便。我們提供了一種方便地在單個 rank 上設置斷點的方法:

torch.distributed.breakpoint(rank=0, skip=0, timeout_s=3600)[source] #

設置斷點,但僅在單個 rank 上。所有其他 rank 將等待您完成斷點操作後再繼續。

參數

  • rank

(int) – 在哪個 rank 上中斷。默認值:0 skip (int) – 跳過對此斷點的前 skip 次調用。默認值:0.``` torch.distributed


**模式 3:** 初始化# 在調用任何其他方法之前,需要使用 `torch.distributed.init_process_group()` 或 `torch.distributed.device_mesh.init_device_mesh()` 函數對包進行初始化。兩者都會阻塞,直到所有進程都加入為止。 **警告** 初始化不是線程安全的。應從單個線程執行進程組創建,以防止跨 ranks 的 ‘UUID’ 分配不一致,並防止初始化期間的競爭條件導致掛起。 torch.distributed.is_available()[source]# 如果分佈式包可用,則返回 True。否則,`torch.distributed` 不暴露任何其他 API。目前,`torch.distributed` 在 Linux、MacOS 和 Windows 上可用。從源代碼構建 PyTorch 時,設置 `USE_DISTRIBUTED=1` 以啟用它。目前,Linux 和 Windows 的默認值為 `USE_DISTRIBUTED=1`,MacOS 為 `USE_DISTRIBUTED=0`。 返回類型 bool torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]# 初始化默認的分佈式進程組。這也將初始化分佈式包。 初始化進程組主要有兩種方式: * 顯式指定 `store`、`rank` 和 `world_size`。 * 指定 `init_method`(一個 URL 字符串),指示在哪裡/如何發現對等節點。可以選擇指定 `rank` 和 `world_size`,或者將所有必需參數編碼到 URL 中並省略它們。 如果兩者均未指定,則假定 `init_method` 為 “env://”。 **參數** **backend** (*str* 或 *Backend*, *可選*) – 要使用的後端。根據構建時的配置,有效值包括 `mpi`、`gloo`、`nccl`、`ucc`、`xccl` 或由第三方插件註冊的後端。自 2.6 版本起,如果未提供 `backend`,c10d 將使用為 `device_id` 關鍵字參數(如果提供)指示的設備類型註冊的後端。目前已知的默認註冊為:cuda 對應 `nccl`,cpu 對應 `gloo`,xpu 對應 `xccl`。如果既未提供 `backend` 也未提供 `device_id`,c10d 將檢測運行時機器上的加速器,並使用為該檢測到的加速器(或 cpu)註冊的後端。此字段可以作為小寫字符串給出(例如,`"gloo"`),也可以通過 Backend 屬性訪問(例如,`Backend.GLOO`)。如果在每臺機器上使用多個進程且後端為 `nccl`,則每個進程必須獨佔訪問其使用的每個 GPU,因為進程間共享 GPU 可能導致死鎖或 NCCL 無效使用。`ucc` 後端處於實驗階段。可以使用 `get_default_backend_for_device()` 查詢設備的默認後端。 **init_method** (*str*, *可選*) – 指定如何初始化進程組的 URL。如果未指定 `init_method` 或 `store`,則默認為 “env://”。與 `store` 互斥。 **world_size** (*int*, *可選*) – 參與作業的進程數。如果指定了 `store`,則為必需項。 **rank** (*int*, *可選*) – 當前進程的 rank(它應該是介於 0 和 `world_size-1` 之間的數字)。如果指定了 `store`,則為必需項。 **store** (*Store*, *可選*) – 所有 worker 均可訪問的鍵/值存儲,用於交換連接/地址信息。與 `init_method` 互斥。 **timeout** (*timedelta*, *可選*) – 針對進程組執行的操作的超時時間。NCCL 的默認值為 10 分鐘,其他後端的默認值為 30 分鐘。這是在此時長之後集體操作將被異步中止並且進程將崩潰的時間。這樣做是因為 CUDA 執行是異步的,繼續執行用戶代碼不再安全,因為失敗的異步 NCCL 操作可能導致後續 CUDA 操作在損壞的數據上運行。當設置 `TORCH_NCCL_BLOCKING_WAIT` 時,進程將阻塞並等待此超時。 **group_name** (*str*, *可選*, *已棄用*) – 組名。此參數被忽略 **pg_options** (*ProcessGroupOptions*, *可選*) – 進程組選項,指定在構造特定進程組期間需要傳入哪些附加選項。截至目前,我們支持的唯一選項是用於 `nccl` 後端的 `ProcessGroupNCCL.Options`,可以指定 `is_high_priority_stream`,以便當有計算內核等待時,`nccl` 後端可以選取高優先級的 cuda 流。有關配置 nccl 的其他可用選項,請參閱 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t **device_id** (*torch.device* | *int*, *可選*) – 此進程將工作的單個特定設備,允許進行特定於後端的優化。目前這有兩個影響,僅在 NCCL 下:通信器立即形成(立即調用 `ncclCommInit*` 而不是正常的延遲調用),並且子組將在可能時使用 `ncclCommSplit` 以避免不必要的組創建開銷。如果您想盡早了解 NCCL 初始化錯誤,也可以使用此字段。如果提供的是 int,API 假設將使用編譯時的加速器類型。 **注意** 要啟用 `backend == Backend.MPI`,需要在支持 MPI 的系統上從源代碼構建 PyTorch。 **注意** 對多個後端的支持是實驗性的。目前,當未指定後端時,將同時創建 `gloo` 和 `nccl` 後端。`gloo` 後端將用於集體操作

對於 CPU 張量,將使用 Gloo 後端;對於 CUDA 張量,集合通信將使用 NCCL 後端。可以通過傳入格式為 “<device_type>:<backend_name>,<device_type>:<backend_name>” 的字符串來指定自定義後端,例如 “cpu:gloo,cuda:custom_backend”。

torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None, backend_override=None)[source]#
根據 device_type、mesh_shape 和 mesh_dim_names 參數初始化 DeviceMesh。這將創建一個具有 n 維數組佈局的 DeviceMesh,其中 n 是 mesh_shape 的長度。如果提供了 mesh_dim_names,則每個維度將被標記為 mesh_dim_names[i]。注意,init_device_mesh 遵循 SPMD 編程模型,這意味著相同的 PyTorch Python 程序會在集群中的所有進程/秩上運行。確保所有秩上的 mesh_shape(描述設備佈局的 n 維數組的維度)一致。不一致的 mesh_shape 可能導致掛起。注意,如果未找到進程組,init_device_mesh 將在後臺初始化分佈式通信所需的分佈式進程組。

參數
* **device_type** (str) – 網格的設備類型。目前支持:“cpu”、“cuda/cuda-like”、“xpu”。不允許傳入帶有 GPU 索引的設備類型,例如 “cuda:0”。
* **mesh_shape** (Tuple[int]) – 一個元組,定義描述設備佈局的多維數組的維度。
* **mesh_dim_names** (Tuple[str], 可選) – 一個元組,包含要分配給描述設備佈局的多維數組每個維度的網格維度名稱。其長度必須與 mesh_shape 的長度匹配。mesh_dim_names 中的每個字符串必須是唯一的。
* **backend_override** (Dict[int | str, tuple[str, Options] | str | Options], 可選) – 對將為每個網格維度創建的部分或全部 ProcessGroups 的重寫。每個鍵可以是維度的索引或其名稱(如果提供了 mesh_dim_names)。每個值可以是一個包含後端名稱及其選項的元組,或者僅是這兩個組件之一(在這種情況下,另一個將設置為其默認值)。

返回
表示設備佈局的 DeviceMesh 對象。

返回類型
DeviceMesh

示例:
```python
>>> from torch.distributed.device_mesh import init_device_mesh
>>>
>>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,))
>>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))

torch.distributed.is_initialized()[source]# 檢查默認進程組是否已初始化。

返回類型 bool

torch.distributed.is_mpi_available()[source]# 檢查 MPI 後端是否可用。

返回類型 bool

torch.distributed.is_nccl_available()[source]# 檢查 NCCL 後端是否可用。

返回類型 bool

torch.distributed.is_gloo_available()[source]# 檢查 Gloo 後端是否可用。

返回類型 bool

torch.distributed.distributed_c10d.is_xccl_available()[source]# 檢查 XCCL 後端是否可用。

返回類型 bool

torch.distributed.is_torchelastic_launched()[source]# 檢查此進程是否使用 torch.distributed.elastic(即 torchelastic)啟動。使用 TORCHELASTIC_RUN_ID 環境變量的存在作為代理,以確定當前進程是否使用 torchelastic 啟動。這是一個合理的代理,因為 TORCHELASTIC_RUN_ID 映射到 rendezvous id,它始終是一個非空值,表示用於對等發現的任務 id。

返回類型 bool

torch.distributed.get_default_backend_for_device(device)[source]# 返回給定設備的默認後端。

參數

  • device (Union[str, torch.device]) – 要獲取默認後端的設備。

返回 給定設備的默認後端,以小寫字符串形式返回。

返回類型 str

目前支持三種初始化方法:

TCP 初始化 有兩種使用 TCP 進行初始化的方法,都需要一個所有進程均可訪問的網絡地址和一個期望的 world_size。第一種方法需要指定屬於秩 0 進程的地址。此初始化方法要求所有進程手動指定秩。請注意,最新版本的分佈式包不再支持多播地址。group_name 也已棄用。

import torch.distributed as dist
# 使用其中一臺機器的地址
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)

共享文件系統初始化 另一種初始化方法利用組內所有機器共享且可見的文件系統,以及一個期望的 world_size。URL 應以 file:// 開頭,幷包含共享文件系統上(現有目錄中)一個不存在文件的路徑。文件系統初始化會自動創建該文件(如果不存在),但不會刪除該文件。因此,你有責任確保在下一次在同一文件路徑/名稱上調用 init_process_group() 之前清理該文件。請注意,最新版本的分佈式包不再支持自動秩分配,group_name 也已棄用。

.. warning:: 此方法假設文件系統支持使用 fcntl 進行鎖定 -

大多數本地系統和 NFS 都支持此方法。

注意

警告:此方法將始終創建文件,並盡力在程序結束時清理和刪除該文件。換句話說,每次使用文件初始化方法時,都需要一個全新的空文件才能成功初始化。如果重複使用前一次初始化使用的同一文件(該文件碰巧未被清理),則屬於意外行為,通常會導致死鎖和失敗。因此,儘管此方法會盡力清理文件,但如果自動刪除失敗,你有責任確保在訓練結束時刪除該文件,以防止下次運行時重用同一文件。如果你計劃對同一文件名多次調用 init_process_group(),這一點尤為重要。換言之,如果文件未被刪除/清理,並且你再次對該文件調用 init_process_group(),預計會發生失敗。這裡的經驗法則是:確保每次調用 init_process_group() 時,該文件要麼不存在,要麼為空。

import torch.distributed as dist
# 必須始終指定 rank
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank)

環境變量初始化

此方法將從環境變量中讀取配置,允許用戶完全自定義信息的獲取方式。需要設置的變量如下:

  • MASTER_PORT - 必需;必須是 rank 0 所在機器上的空閒端口
  • MASTER_ADDR - 必需(rank 0 除外);rank 0 節點的地址
  • WORLD_SIZE - 必需;可以在此處設置,也可以在調用初始化函數時設置
  • RANK - 必需;可以在此處設置,也可以在調用初始化函數時設置

Rank 0 所在的機器將用於建立所有連接。這是默認方法,意味著無需指定 init_method(或者可以設置為 env://)。

優化初始化時間

  • TORCH_GLOO_LAZY_INIT - 按需建立連接,而不是使用全網格(full mesh),這可以顯著改善非 all2all 操作的初始化時間。torch.distributed.init_process_group()模式 4: 示例:```

from torch.distributed.device_mesh import init_device_mesh

mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))


**模式 5:** 組# 默認情況下,集體通信操作在默認組(也稱為 world)上進行,並要求所有進程進入分佈式函數調用。然而,某些工作負載可以從更細粒度的通信中受益。這就是分佈式組發揮作用的地方。`new_group()` 函數可用於創建新組,包含所有進程的任意子集。它返回一個不透明的組句柄,該句柄可以作為 `group` 參數傳遞給所有集體通信函數(集體通信是以某些眾所周知的編程模式交換信息的分佈式函數)。

`torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source]`# 創建一個新的分佈式組。此函數要求主組中的所有進程(即屬於分佈式作業的所有進程)都進入此函數,即使它們不是該組的成員。此外,應在所有進程中以相同的順序創建組。

**警告** 安全併發使用:當使用帶有 NCCL 後端的多個進程組時,用戶必須確保跨秩(ranks)的集體通信執行順序在全局範圍內保持一致。如果進程內的多個線程發出集體通信,則需要進行顯式同步以確保順序一致。當使用 `torch.distributed` 通信 API 的異步變體時,會返回一個 work 對象,並且通信內核被排入單獨的 CUDA 流中,從而允許通信和計算重疊。一旦在一個進程組上發出了一個或多個異步操作,在使用另一個進程組之前,必須通過調用 `work.wait()` 與其他 cuda 流進行同步。有關更多詳細信息,請參閱 [同時使用多個 NCCL 通信器](https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using-multiple-nccl-communicators-concurrently)。

**參數**

* **ranks** (`list[int]`) – 組成員的秩列表。如果為 `None`,將設置為所有秩。默認為 `None`。
* **timeout** (`timedelta`, 可選) – 參見 `init_process_group` 瞭解詳細信息和默認值。
* **backend** (`str` 或 `Backend`, 可選) – 要使用的後端。根據構建時的配置,有效值為 `gloo` 和 `nccl`。默認使用與全局組相同的後端。此字段應作為小寫字符串提供(例如,`"gloo"`),也可以通過 `Backend` 屬性訪問(例如,`Backend.GLOO`)。如果傳入 `None`,將使用對應於默認進程組的後端。默認為 `None`。
* **pg_options** (`ProcessGroupOptions`, 可選) – 進程組選項,指定在構建特定進程組期間需要傳入哪些附加選項。例如,對於 nccl 後端,可以指定 `is_high_priority_stream`,以便進程組可以使用高優先級的 cuda 流。有關配置 nccl 的其他可用選項,請參閱 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t
* **use_local_synchronization** (`bool`, 可選):在進程組創建結束時執行組本地屏障。不同之處在於,非成員秩不需要調用 API 也不加入屏障。
* **group_desc** (`str`, 可選) – 用於描述進程組的字符串。
* **device_id** (`torch.device`, 可選) – 將此進程“綁定”到的單個特定設備,如果提供了此字段,`new_group` 調用將嘗試立即為該設備初始化通信後端。

**返回**

分佈式組的句柄,可以傳遞給集體通信調用;如果該秩不屬於 `ranks`,則返回 `GroupMember.NON_GROUP_MEMBER`。

**注意** `use_local_synchronization` 不適用於 MPI。

**注意** 雖然 `use_local_synchronization=True` 在大型集群和小型進程組中可以顯著更快,但必須小心,因為它會改變集群行為,因為非成員秩不會加入組屏障 `barrier()`。

**注意** 當每個秩創建多個重疊的進程組時,`use_local_synchronization=True` 可能導致死鎖。為避免這種情況,請確保所有秩遵循相同的全局創建順序。

`torch.distributed.get_group_rank(group, global_rank)[source]`# 將全局秩轉換為組內秩。`global_rank` 必須是 `group` 的一部分,否則會引發 `RuntimeError`。

**參數**

* **group** (`ProcessGroup`) – 用於查找相對秩的 ProcessGroup。
* **global_rank** (`int`) – 要查詢的全局秩。

**返回**

相對於 `group` 的 `global_rank` 的組內秩

**返回類型**

`int`

**注意** 在默認進程組上調用此函數返回恆等值。

`torch.distributed.get_global_rank(group, group_rank)[source]`# 將組內秩轉換為全局秩。`group_rank` 必須是 `group` 的一部分,否則會引發 `RuntimeError`。

**參數**

* **group** (`ProcessGroup`) – 用於從中查找全局秩的 ProcessGroup。
* **group_rank** (`int`) – 要查詢的組內秩。

**返回**

相對於 `group` 的 `group_rank` 的全局秩

**返回類型**

`int`

**注意** 在默認進程組上調用此函數返回恆等值。

`torch.distributed.get_process_group_ranks(group)[source]`# 獲取與 `group` 關聯的所有秩。

**參數**

* **group** (`Optional[ProcessGroup]`) – 從中獲取所有秩的 ProcessGroup。如果為 `None`,將使用默認進程組。

返回按組排名排序的全局排名列表。返回類型 list[int]```
new_group()
```**模式 6:** 警告 安全的併發用法:當使用帶有 NCCL 後端的多個進程組時,用戶必須確保跨排名的集合操作具有全局一致的執行順序。如果進程內的多個線程發出集合操作,則必須進行顯式同步以確保順序一致。當使用 `torch.distributed` 通信 API 的異步變體時,會返回一個 work 對象,並且通信內核被入隊到單獨的 CUDA 流上,從而允許通信和計算重疊。一旦在一個進程組上發出了一個或多個異步操作,在使用另一個進程組之前,必須通過調用 `work.wait()` 與其他 cuda 流進行同步。有關更多詳細信息,請參閱 [同時使用多個 NCCL 通信器](https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using-multiple-nccl-communicators-concurrently)。```
NCCL
```**模式 7:** 注意 如果您將 DistributedDataParallel 與分佈式 RPC 框架結合使用,則應始終使用 `torch.distributed.autograd.backward()` 來計算梯度,並使用 `torch.distributed.optim.DistributedOptimizer` 來優化參數。示例:
```python
>>> import torch.distributed.autograd as dist_autograd
>>> from torch.nn.parallel import DistributedDataParallel as DDP
>>> import torch
>>> from torch import optim
>>> from torch.distributed.optim import DistributedOptimizer
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>>
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> rref = rpc.remote("worker1", torch.add, args=(t1, t2))
>>> ddp_model = DDP(my_model)
>>>
>>> # 設置優化器
>>> optimizer_params = [rref]
>>> for param in ddp_model.parameters():
>>> optimizer_params.append(RRef(param))
>>>
>>> dist_optim = DistributedOptimizer(
>>> optim.SGD,
>>> optimizer_params,
>>> lr=0.05,
>>> )
>>>
>>> with dist_autograd.context() as context_id:
>>> pred = ddp_model(rref.to_here())
>>> loss = loss_func(pred, target)
>>> dist_autograd.backward(context_id, [loss])
>>> dist_optim.step(context_id)
torch.distributed.autograd.backward()
```**模式 8:** static_graph (bool) – 當設置為 True 時,DDP 知道訓練圖是靜態的。靜態圖意味著 1) 在整個訓練循環中,已使用和未使用的參數集不會改變;在這種情況下,用戶是否設置 `find_unused_parameters = True` 無關緊要。2) 圖的訓練方式在整個訓練循環中不會改變(意味著沒有依賴於迭代的控制流)。當 `static_graph` 設置為 True 時,DDP 將支持過去無法支持的情況:1) 重入反向傳播。2) 多次激活檢查點。3) 當模型有未使用的參數時的激活檢查點。4) 存在位於 forward 函數之外的模型參數。5) 當存在未使用的參數時,可能會提高性能,因為當 `static_graph` 設置為 True 時,DDP 不會在每次迭代中搜索圖以檢測未使用的參數。要檢查是否可以將 `static_graph` 設置為 True,一種方法是檢查上一個模型訓練結束時的 ddp 日誌數據,如果 `ddp_logging_data.get("can_set_static_graph") == True`,通常您也可以設置 `static_graph = True`。示例:
```python
>>> model_DDP = torch.nn.parallel.DistributedDataParallel(model)
>>> # 訓練循環
>>> ...
>>> ddp_logging_data = model_DDP._get_ddp_logging_data()
>>> static_graph = ddp_logging_data.get("can_set_static_graph")
True
```## 參考文件此技能包含 `references/` 中的綜合文檔:- **other.md** - 其他文檔在需要詳細信息時,使用 `view` 讀取特定的參考文件。## 使用此技能### 對於初學者
從 `getting_started` 或 `tutorials` 參考文件開始,瞭解基本概念。### 對於特定功能
使用適當的類別參考文件(api、guides 等)獲取詳細信息。### 對於代碼示例
上面的快速參考部分包含從官方文檔中提取的常見模式。## 資源### references/
從官方來源提取的組織化文檔。這些文件包含:
- 詳細解釋
- 帶有語言註釋的代碼示例
- 指向原始文檔的鏈接
- 用於快速導航的目錄### scripts/
在此處添加用於常見自動化任務的輔助腳本。### assets/
在此處添加模板、樣板代碼或示例項目。## 注意- 此技能是從官方文檔自動生成的
- 參考文件保留了源文檔的結構和示例
- 代碼示例包括語言檢測以實現更好的語法高亮顯示
- 快速參考模式是從文檔中的常見用法示例中提取的## 更新要使用更新的文檔刷新此技能:
1. 使用相同的配置重新運行抓取工具
2. 將使用最新信息重建技能