1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
| # 选择指定显卡
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0" # 设置默认使用的 GPU 设备为 0 号
## 使用 0,1 号两块显卡
CUDA_VISIBLE_DEVICES=0,1 python your_code.py
# os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"
# 将模型移动到 GPU
model = MyModel()
model.cuda() # 或者 model.to('cuda')
# 将张量移动到 GPU
tensor = tensor.cuda() # 或者 tensor.to('cuda')
for image,label in data_loader:
image = image.cuda()
label = label.cuda()
# 将图像和标签移动到 GPU 上进行训练
# 多卡训练
## 单机多卡 DP DataParallel
model = Net()
model.cuda() # 模型显示转移到CUDA上
if torch.cuda.device_count() > 1: # 含有多张GPU的卡
model = nn.DataParallel(model) # 单机多卡DP训练
# 指定使用的GPU卡号
model = nn.DataParallel(model, device_ids=[0,1]) # 使用第0和第1张卡进行并行训练
#通过DP进行分布式多卡训练的方式容易造成负载不均衡,有可能第一块GPU显存占用更多,因为输出默认都会被gather到第一块GPU上。
## 多机多卡 DDP DistributedDataParallel
## 针对每个GPU,启动一个进程,然后这些进程在最开始的时候会保持一致(模型的初始化参数也一致,每个进程拥有自己的优化器),同时在更新模型的时候,梯度传播也是完全一致的,这样就可以保证任何一个GPU上面的模型参数就是完全一致的,所以这样就不会出现DataParallel那样显存不均衡的问题。
## GROUP:进程组,默认情况下,只有一个组,一个 job 即为一个组,也即一个 world。(当需要进行更加精细的通信时,可以通过 new_group 接口,使用 world 的子集,创建新组,用于集体通信等。)
## WORLD_SIZE:表示全局进程个数。如果是多机多卡就表示机器数量,如果是单机多卡就表示 GPU 数量。
## RANK:表示进程序号,用于进程间通讯,表征进程优先级。rank = 0 的主机为 master 节点。 如果是多机多卡就表示对应第几台机器,如果是单机多卡,由于一个进程内就只有一个 GPU,所以 rank 也就表示第几块 GPU。
## LOCAL_RANK:表示进程内,GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。例如,多机多卡中 rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU。
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, distributed
from torch import nn, optim
import argparse
def setup(rank, world_size):
# 初始化进程组
dist.init_process_group(
backend='nccl', # GPU 通常用 nccl, CPU 可用 gloo
init_method='env://', # 使用环境变量初始化
world_size=world_size, # 进程总数
rank=rank # 当前进程编号
)
def cleanup():
dist.destroy_process_group()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int)
args = parser.parse_args()
# 1️⃣ 设置 GPU 设备
local_rank = args.local_rank
torch.cuda.set_device(local_rank)
# 2️⃣ 初始化分布式环境
setup(local_rank, world_size=int(os.environ["WORLD_SIZE"]))
# 3️⃣ 创建模型并移动到对应 GPU
model = nn.Linear(10, 1).to(local_rank)
# 4️⃣ 使用 DDP 包装模型
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
# 5️⃣ 数据加载(使用 DistributedSampler)
dataset = torch.utils.data.TensorDataset(torch.randn(100, 10), torch.randn(100, 1))
sampler = distributed.DistributedSampler(dataset)
dataloader = DataLoader(dataset, batch_size=8, sampler=sampler)
# 6️⃣ 定义优化器与损失
optimizer = optim.SGD(model.parameters(), lr=0.01)
loss_fn = nn.MSELoss()
# 7️⃣ 训练循环
for epoch in range(5):
sampler.set_epoch(epoch) # 保证每个 epoch 数据不同
for x, y in dataloader:
x, y = x.to(local_rank), y.to(local_rank)
pred = model(x)
loss = loss_fn(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if local_rank == 0: # 只在主进程打印日志
print(f"Epoch {epoch} loss: {loss.item():.4f}")
cleanup()
if __name__ == "__main__":
main()
|