AI, ML

[Review] Pytorch Distributed: Experiences on Accelerating Data Parallel Training

개발공주 2023. 5. 29. 13:26
728x90
PyTorch Distributed: Experiences on Accelerating Data Parallel Training (Shen Li et al.)

PyTorch의 DistributedDataParallel 모듈을 소개한 2020년 논문입니다. (1) 왜 이런 논문을 쓰게 되었는지, (2) 해결하고자 한 문제점은 무엇인지, (3) 어떻게 해결했는지, (4) 평가는 어떻게 이루어졌는지를 중점적으로 정리해보았습니다. 

1. Background

In the field of Deep Neural Networks (DNN) training, there are three primary steps: the forward pass to compute loss, the backward pass to calculate gradients, and the optimizer step to update parameters. As DNN models and datasets grow larger, many applications aim to achieve higher intelligence by distributing these steps across multiple devices or machines.

PyTorch, a popular deep learning framework, provides three tools for achieving data parallelism: DataParallel, DistributedDataParallel, and RPC. This paper primarily focuses on introducing DistributedDataParallel (DDP). It discusses the challenges faced by PyTorch engineers and the solutions they developed, along with performance evaluations conducted on two large networks. The paper also highlights contributions made by the authors and proposes potential areas for future research.

2. Challenges and Solutions

DDP is required to address three major challenges. First, mathematical equivalence between local single model and data distributed models should be guaranteed. Second, the PyTorch DDP API should be non-intrusive and interceptive to models at the same time. Third, the tool should deliver high performance training.

(1) Mathematical Equivalence

To ensure mathematical equivalence, the model replicas start from the same initial values for model parameters and synchronize gradients to maintain consistency across iterations. PyTorch replicates the model at the beginning of construction to ensure consistent initial parameter values. Two techniques, the AllReduce API and parameter averaging, are suggested for gradient synchronization.

AllReduce API

The AllReduce API is used to compute gradient summation across all processes. It is supported by NCCL (by NVIDIA), Gloo (by Meta), and MPI. AllReduce collects equally-sized tensors from all processes and returns the same result tensor to each participant. However, it can have a negative impact on distributed training speed as all processes need to communicate. Optimizing methods like ring-based and tree-based algorithms have been implemented to mitigate this issue.

Parameter Averaging

Parameter averaging directly computes average parameters to synchronize gradients. It is an additional step after the model optimizer step and does not require interaction with local training steps. However, it has caveats such as potentially producing different results compared to local training, leading to divergence in different directions. It may also result in either computation or communication staying idle, which goes against the purpose of parallelism.

(2) High Performance

To achieve high-performance training, DDP in PyTorch offers several techniques including bucketing gradients, overlapping computation with communication, and skipping synchronization. The paper describes the development process undertaken by the developers to optimize the computation and communication involved in synchronization.

Gradient Bucketing

Traditional gradient synchronization involves a collective communication step after the local backward pass, prior to updating gradients. However, this approach is not efficient for small tensors. To address this, DDP introduces the concept of gradient bucketing. Instead of synchronizing gradients individually, DDP waits for multiple gradients to accumulate in a bucket until a certain threshold is reached. These gradients are then wrapped into a single AllReduce operation, resulting in higher throughput and lower latency.

Overlapping Computation with Communication

Despite the improvements achieved with gradient bucketing, there is still an inefficiency caused by the non-overlapping nature of computation and communication. To address this, DDP registers an autograd hook to all gradient accumulators of the model's parameters. This hook is called each time a gradient is updated. When the hook is triggered, DDP checks if all gradients within the bucket have called the autograd hook. If so, DDP synchronously launches the AllReduce operation. It is worth noting that the bucket order is the opposite of model.parameters().

Gradient Accumulation

In cases where the AllReduce operation consumes significant resources, one possible solution is to reduce its frequency. Instead of performing the AllReduce operation every iteration, DDP conducts a number of local iterations before launching a global synchronization step. This approach is particularly useful when the batch size exceeds the capacity of a single device. By incorporating ddp.no_sync() before the optimization step, synchronization can be skipped during local forward and backward passes.

(3) Non-intrusive and Interceptive

To minimize intrusiveness, the PyTorch framework provides the same forward API, allowing developers to seamlessly replace a regular model with a distributed data parallel model. The paper explains the implementation details of the DDP API, covering both the front-end Python code and the core C++ components.

Front-end

The distributed.py module encompasses three key DDP functionalities. Firstly, it includes configurable knobs for controlling parameters such as the process_group, bucket_cap_mb, and find_unused_parameters. Secondly, it handles model device affinity. When a model is distributed across multiple devices, the Tensor.to(device) API is utilized to move intermediate outputs between devices as needed. Lastly, the module manages model buffers, ensuring that buffer values are broadcasted from one process to another to maintain consistency across layers.

Core Gradient Reduction

The reducer.cpp file contains the core implementation of four major functionalities. Firstly, it handles the mapping of parameters to buckets, creating buckets on the same device as the parameters to optimize gradient copying operations. Secondly, it registers an autograd hook during the backward pass. This hook loops through all parameters, identifies the gradient accumulator, and registers a post-hook function. Thirdly, it performs the bucket AllReduce operation, which is a key contributor to communication overhead. The default bucket size is set to 25MB, striking a balance between communication efficiency and memory consumption. Lastly, it addresses globally unused parameters by maintaining a global bitmap in the CPU and a local bitmap on the device where the model parameters reside.

3. Evaluation

The performance of DistributedDataParallel (DDP) is assessed using a setup consisting of 32 GPUs distributed across 4 servers, with each server equipped with 8 GPUs. The evaluation focuses on two popular models, ResNet50 and BERT, to measure DDP's per iteration latency and scalability. During the evaluation, the loss is computed using CrossEntropyLoss, and parameter updates are performed using the SGD optimizer. Several aspects are analyzed to gauge the effectiveness of DDP:

(1) Latency Breakdown

To demonstrate the effectiveness of overlapping computation with communication, an evaluation was conducted, analyzing the latency breakdown in the NCCL and Gloo backends. The results showed that ResNet and BERT achieved speed improvements of 38% and 35% respectively with the NCCL backend, and 26.8% and 21.5% respectively with the Gloo backend. This indicates that communication on the CPU is a major contributor to delays in the system.

(2) Bucket Size

To understand the relationship between the bucket size and synchronization and how it affects speed, evaluations were performed by adjusting the bucket_cap_mb value. The results revealed that the fastest conditions were observed when the bucket size ranged from 10MB to 25MB. When using the NCCL backend, the bucket size had little correlation with the number of GPUs, suggesting that asynchronous and parallel processing compensates for overall delays.

(3) Scalability

To measure scalability, GPUs were utilized up to a count of 256. In both the NCCL and Gloo backends, increasing the number of GPUs led to an increase in per iteration latency. This phenomenon is attributed to the costs associated with communication. Evaluation results also indicated that when appropriately using the skipping synchronization technique, Distributed Data Parallel (DDP) demonstrated linear scalability and minimal accuracy penalties.

(4) Round-Robin Process Group

In DDP, another technique used to accelerate training is the utilization of multiple process group configurations. Speed measurements were conducted for various configurations of round-robin process groups. The results showed differences in the behavior of NCCL and Gloo with changes in the process group. With the NCCL backend, increasing the number of processes showed little difference for ResNet50 but demonstrated acceleration for the BERT model. On the other hand, in Gloo, increasing the number of processes generally resulted in model acceleration.

4. Contribution

Many of the concepts that are mentioned in the paper, such as AllReduce API, parameter averaging, gradient accumulation, gradient synchronization, and setting bucket size, are not specific to PyTorch or PyTorch Distributed. They are commonly used techniques in distributed training and have been around for some time before the release of PyTorch Distributed.

While these techniques were not introduced with the specific ‘PyTorch Distributed: Experiences on Accelerating Data Parallel Training’ paper, they have been widely used and explored in the field of distributed deep learning. The PyTorch Distributed framework provides an implementation of these techniques to simplify the process of distributed training within the PyTorch ecosystem.

728x90