///
Search
Duplicate
📼

Resource Elasticity in Distributed Deep Learning

Created
2021/07/04 06:57
발표일자
2020/11/30
발표자
이혜민
Tags
System
Resource
Elasticity
✅main
포스팅 종류
논문리뷰

Resource Elasticity in Distributed Deep Learning

논문 요약 : 현재 Distributed Learning (분산학습) 방식이 매우 별로다 ... 즉 Resource를 Over-provisioninig 또는 Under-provisioning 하는 경우가 많다. 그래서 Resource를 효율적으로 사용하면서 학습에 필요한 Cost도 줄일 수 있는 분산학습 방법을 본 논문에서 제안한다.

Motivation

현재는 특정 Model 들에 대해서는 학습 방식에 대한 다양한 가이던스가 제공되지만, 여전히 가이던스가 없는 Dataset이나 Model에 대해서는 Trial-and-Error (노가다) 방식이 가장 보편적으로 사용되는 방식이다.
예를 들어, ImageNet으로 학습하는 ResNet의 경우 Per GPU당 64 또는 128 Mini-Batch Size를 사용하는 것이 보편적인 가이드
학습을 위한 적절한 리소스를 찾는 과정도 꽤 많은 Trial-and-Error가 필요하다.
그렇기 때문에 대부분 Suboptimal이긴 하지만 fixed set of resource를 사용해서 학습 job을 수행한다. 아래의 문제들이 쉽게 발생한는데
Under-provision : 실제 Resource를 더 Scale-up 할 수 있는데도 불구하고 느리게 학습하는 경우
Over-Provision : 많은 Resource를 쓰는데도 불구하고 그 Resource 들이 학습에 전혀 기여하고 있지 않는 경우
만약 Optimal 하게 설정을 했음에도 불구하고 Persistent Straggler들에 의해서 전체 성능이 크게 감소할 수 있다. (Figure 1. 은 Synchronous SGD 학습에서 Straggler가 있는 경우 Perf Degradation)

Resource elasticity

저자들은 위의 문제를 해결하기 위해서 근본적으로 분산 학습 시스템은 elastic 해야한다고 주장한다. 즉 상황에 따라서 Resource의 할당과 해지가 dynamic하게 학습 중간에 발생할 수 있어야 한다. (이미 대부분의 분산 cluster들은 이러한 elasticity를 제공하고 있다.)

Main Challenges

그럼 현재의 분산 학습 시스템에서 elasticity를 제공하기 어려운 이유는 현재 학습 Framework (Tensorflow, MXNet, Pytorch) 들이 학습에 필요한 여러 자원들을 static하게 관리하고 있기 때문이고, 또 다른 이유는 학습 방식이 기존 분산 시스템에서 도는 Application과의 특성이 현저히 다름에도 있다.
일반적인 분산 workload 와 분산 학습 workload의 차이점
대부분의 Workload가 Dynamic한 특징을 가지고 있고 resource utilization을 기준으로 scaling factor를 쉽게 결정할 수 있다. 하지만, 학습의 경우는 한번 resource가 할당 되면 전체 Lifetime 동안 static한 경우가 대부분이다. (그리고 한번 할당된 Resource가 Idle 상태로 가는 경우가 거의 없다.)
기존 분산 앱의 경우 Scaling이 매우 쉽다. 하지만 학습의 경우 Scaling을 잘못하는 경우 (예를 들어, Global Batch를 너무 크게 올리는 경우) 학습이 안되는 문제가 발생할 수 있다.

Autoscaling engine for distributed learning

그래서 본 논문에서는 위의 문제를 해결할 수 있는 분산학습을 위한 Autoscaling engine을 만들었다. (구현은 뒷 부분에서 더 자세하게 설명한다.)
Main Contribution
기존 학습 framework의 분산 학습시 한계에 대해서 설명
Troughput과 Cost를 고려하는 scaling heuristic을 제안
Persistent stragglers를 over-provisioning 없이 처리할 수 있는 기법을 제안
본 논문은 synchronous stochastic gradient descent (SGD) 환경만 focusing 한다.

Background

Architectural limitations in TensorFlow

분산 학습시 Model Parameter가 worker에 다 replicate되고. 학습시에는 각 worker들이 연산한 batch 결과를 종합해서 하나의 model을 학습 시키고 다시 그 model의 parameter를 replicating 하는 방식으로 학습이 되는데. (일반적인 Synchronous SGD 방식) 학습시 Operation들이 worker를 넘나 들면서 편하게 수행하기 위해서 sendreceive operation을 graph에 추가하는 방식이다.
이러한 sendreceive operation이 더 elasticity를 떨어뜨리는 결정적 요인을 하게 되는데, runtime resource가 변경이 되면 이러한 send와 receive로 연결된 Node간의 Topology가 변하면서 Network의 Reconnection이 발생하게 된다. (Costly)
그리고 더 나아가, Resource의 변경으로 인해서 새로운 Worker가 Initialization되서 실제 throughput에 도움이 되기 까지 오랜 시간이 걸리는데 왜냐하면 학습 graph에 대한 다양한 optimization을 적용하게 된다. (e.g. Rewriting the graph, JIT compilation, etc)
가장 결적적으로 Elasticity를 떨어뜨리는 요인은 TF는 전체 Lifetime 동안 항상 fixed number of process 를 사용한다.

How to set the batch size ?

Batch Size는 resource 측면에서도 매우 중요한 parameter이고 학습 convergence에도 큰 영향을 미치는 factor 이다. (따라서, 잘 못 조절하게 되면 학습이 아예 안되는 문제도 발생할 수 있다.)
저자들은 Batch Size에 따른 Convergence 문제는 본 논문을 넘는 주제라 이야기 하면서, 대신에 Worker당 Local Batch와 Worker 갯수를 어떻게 설정할 수 있는지에 대한 문제를 더 자세하게 살펴볼 것이라 이야기 한다. (아래의 Example 확인)
Example
Config 1. Global Batch (16384) = Batch Size (256) * GPU Worker (32)
Config 2. Global Batch (16384) = Batch Size (64) * GPU Worker (128)
위의 정보만 봐도 Global Batch는 동일하지만 다양한 Configuration 이 나올 수 있고 여기서 어떤 Config가 Throughput과 Cost 측면에서 더 효율적인지를 본 논문에서는 더 고민하겠다는 뜻이다.
그럼 항상 global batch는 fix할 것인가?
Tradeoff 발생 : Global batch를 fix하면 convergence의 문제는 없을 것이지만 Up-scaling 시 Local Batch의 사이즈가 작아지면서 각 worker당 효율이 떨어질 것이다. 반대로, local batch를 fix하면 worker의 효율을 늘어나지만 convergence의 문제가 생길 수 있다..
해결 방법 : 본 논문에서는 global batch size의 maximum batch size를 사용자로 부터 받고 global batch가 given threshold를 넘기기 전까지 local batch를 늘려서 worker 내 효율성을 높이도록 디자인했다. (만약 global batch를 넣지 않는다면, 해당 모델은 batch 사이즈에 대한 convergence 문제가 없다고 판단할 것이다.)

System Overview

Autoscaling Engine은 크게 두 가지 Part로 구성된다.
Scaling Heuristics
Straggler Detection
그럼 시스템은 위의 결정들을 수행하기 위해서는 statistics를 runtime 상황에서 모아야 하는데 ?
일반적인 분산 시스템들은 Batch 처리해야 하는 정보들을 Fine-grained 하게 쪼개서 그 사이마다 statistics를 갱신한다. (즉 Batch를 짧게 쪼개면 쪼갤 수록 더 많은 statistics를 얻을 수 있다.) 해당 분산 학습 시스템은 10개의 minibatch interval에 statistics를 모아 위의 결정들을 수행한다.
Autoscaling Engine Architecture
Autoscaling Engine Timeline

Scaling Heuristics

Scaling Heuristics은 언제, 얼마나 많은 worker를 할당할 지를 결정하는 heuristic이다.
스케쥴은 아주 단순하게 작동한다.
Adds K workers every N batches until the scaling conditions are violated
K : The unit of scaling K depends on the workload and system (e.g. K is the number of GPUs on a machine)
N : The scaling interval N controls how much information the system is given before making an autoscaling decision
아래 그림은 실제 schedule에 의해서 worker가 추가되거나 제거되는 방식을 보여준다.
처음에 하나의 Worker로 Scale up 시키는 것이 아니라 첫 스텝에 여러개의 worker를 두어 쓰는 것이 의미있는 schedule 결정을 위한 statistics를 제공한다.
(Left 그림) 만약 Scaling Conditions이 Pass하면 계속 Scale-up을 한다.
(Right 그림) 만약 현재 Scaling Condition에 Fail이 발생한다면, System은 Next Lowest Point로 뛰고 다시 다음 Scaling Point로 Scaling Condition을 적용해 뛰어보는 작업을 반복해서 돌린다.

Scaling Conditions

Throughput Scaling Efficiency - Incremental throughput scaling efficiency를 측정하는 metric
현재 대비 Worker의 숫자가 늘었을 때의 Throughput의 Gain이 어떻게 되는지 알려주는 metric
delta R = 0 → S_k,d = 0 (k 만큼 Worker를 추가했을 때 throughput에 gain이 전혀 없는 경우)
delta R = R_k → S_k,d = 1 (k 만큼 Worker를 추가했을 때 linear하게 gain이 있는 경우)
S_k, d < 0 (실제 Worker를 추가한 경우 전체 Throughput이 떨어지는 경우 (e.g. Communication Overhead))
즉 해당 metric은 worker를 추가 했을 때 전체 throughput에 gain이 추가 직전에 비해 상대적으로 얼마나 커지는 지를 측정한다.
Q. 원래는 하나의 worker마다의 throughput을 측정하면 되는데 이렇게 하는 이유는?,
A. 이렇게 하는 이유는 특정 worker를 넣었을 때 해당 worker의 batch가 할당되지 않아서 완전 노는 경우도 해당 metric을 통해서 모두 커버할 수 있다.
Utility vs Cost - 대부분의 User들은 throughput 보다는 Cost($)를 metric으로 볼 수 있는게 더 직관적일 수 있다. 따라서, 해당 section에서는 cost를 기반의 metric을 모델링하고 수행 시간에 dependent 한 utilization 함수를 모델링해서 Cost를 낮추면서 Utilization을 높힐 수 있는 Search Point를 찾아낸다.
여기서 의미하는 Cost는 Expected Cost이다. 현재 까지 소모된 Cost를 기준으로 k개의 worker가 투입될 때 미래의 cost를 expectation
여기서 중요한 metric은 t(k)이며, 이는 number of worker를 투입했을 때, batch를 수행하는데 있어 step time을 제공하는 함수이다.
위의 graph는 아래의 configuration에서 측정한 수치이다.
가장 적당한 configuration을 찾기 위한 하나의 방법으로는 Cost Function 으로는 부족하다. (단순히 싸게 하는게 중요한게 아니기 때문이다.) 따라서, Utilization Metric을 아래와 같이 modeling 하는데 (해당 값은 metric으로 표현하는게 아니라 실제 System이 운용되면서 Sampling되어야 정확한 curve를 얻을 수 있다.)
Utility Function U(T)는 T(Completion Time) 동안 얼마의 Cost가 발생했는지를 표현
U(T)의 함수 형태는 아래의 형태로 나올 수 있다.
U(T) is monotonically decrease function 이다. 그 형태는 크게 (a), (b), (c) 로 보여질 것이다.
즉 이성적인 사용자들은 가장 작은 T값을 선택할 것이다. 왜냐하면 학습 시간이 빠른게 중요하니깐
그럼 Cost function과 Utility Function을 통해 User가 선호할 수 있는 Search Space를 Modeling 할 수 있다.
예상되는 전체 수행시간으로 구한 Utilization function과 Cost Function의 차이를 minimize하는 k를 찾는 문제로 바꿔서 생각할 수 있다.
minimize U(T(k)) - C(k) 이기 때문에 Scaling Condition : U(T(k)) > C(K)
하지만 위의 metric을 만족하는 k 값을 학습 초기에 바로 찾아낼 수 없다? (왜냐하면 U(T)의 curve를 초반에 알 수 있는 방법이 없다.) 따라서, 큰 문제가 발생하는데 Sampling된 statistics가 없는 경우 U(T(k))는 Figure 5. (c) 처럼 Flat 하게 보일텐데, 이런 경우에는 C(K) 값보다 Utility 함수가 항상 큰 값을 가지기 때문에 Scaling Condition을 항상 만족시키면서 K값을 크게 Scale out 하는 결정을 수행한다. (뭐 돈은 계속 오르겠지만 실제 Utility는 증가하지 않는 문제가 발생한다.)
Scaling Condition Metric
위의 문제를 해결하기 위해 marginal utility와 marginal cost를 비교할 수 있도록 수식을 변경한다. 이렇게 되면 매번 k값이 scale될 때의 이전 값과의 상대치를 계산하기 때문에 다양한 U(T)의 변화를 판단할 수 있다.
위의 조건이 만족되는 경우 k값을 한 단계 scale-up 한다.

Straggler Detection

synchronous SGD에서 straggler는 큰 성능 저하를 야기 시킨다. 따라서, straggler를 잘 detect 하는 것이 매우 중요하다.
Auto Scaling engine에서는 아래의 worker를 potential straggler로 간주하는데.
A worker is considered a potential straggler if its throughput as a fraction of the median throughput falls below a threshold.
만약 threshold가 1이라는 뜻은 → median throughput 보다 떨어지면 모두 potential straggler로 간주
만약 threshold가 0이라는 뜻은 → 어떠한 worker도 straggler가 되지 않는다는 것을 의미
해당 논문에서는 Threshold : 0.8 값을 사용한다.
최대 전체 성능의 1.25x의 성능 저하까지만 용인하겠다는 뜻으로 결정된다.
위의 로직으로 straggler를 발견하자 마자 제거하는 방식은 too aggressive 하다. 한 batch에서 느렸다고 그 친구를 straggler라고 여기는 것에 대해서는 무리가 있다.
본 논문에서는 persistent stragglers를 찾는 것이 목적이 되는데. 그냥 단순히 N만큼의 연속된 batch에서 성능이 느렸다고 이를 persistent straggler라고 단정하는게 어렵다. 왜냐하면, N-1 번째 마다 속도가 복구되는 경우에는 Detection을 피할 수 있다.
이러한 문제를 피하기 위해서 system은 throughput fraction에 대한 exponential weighted moving average (EWMA)를 유지하게 된다. 그리고 average 가 threshold보다 낮은 경우 persistent straggler라고 간주한다.
그럼 Persistent Straggler를 발견하면 바로 제거하는가?
전체 Throughput의 영향을 피하기 위해 발견된 persistent straggler를 바로 제거하는 방식이 아니라 Ready 된 worker를 replace한다. (구현 섹션에서 좀 더 설명)

Implementation

그럼 auto scaling engine을 어떻게 구현했는지 조금 더 살펴 보자

CheckPoint Restart

저자들은 맨 처음에 Checkpoint 기능을 이용해서 위의 구현을 고려했다. Scale-up 요청시 checkpoint를 save하고 restore 하는 방식으로 Tensorflow를 수정하지 않고 구현을 하려했지만, checkpoint를 save하고 restore하는 과정 자체가 너무 costly 했다 ...

Bypassing Tensorflow distribution logic

원래는 TF내의 Send와 Receive의 Node들이 Dynamically Modification이 가능하도록 수정을 해야 Auto-Scaling Engine을 구현해 낼 수 있다. (구현 Cost가 크다.)
본 논문에서는 구현 Cost를 낮추기 위해 아래의 그림 처럼 Compute Graph와 Reduce를 분리해서 구현을 단순화하였다.
위의 구현을 달성하기 위해서는 분산 Operation에서 흔히 사용되는 Reducer를 사용하면 되는데 이를 구현하기 위해서 Horovod - https://github.com/horovod/horovod 라는 분산 학습용 Framework을 사용했다. Horovod에서 제공하는 API인 Allreducer 를 사용하여 위의 구현을 달성. 아래의 그림은 Allreducer의 구현 예제를 보여준다. (AllReduce API는 Dynamic worker에 대한 기능들도 지원한다.)
(AllReduce 방식)

Minimizing transition time

transition 시에 소모되는 시간이 굉장히 크다. 이를 줄일 수 있는 방법을 본 섹션에서 고민한다.
실제 학습이 시작될 때 아래의 time-consuming tasks가 수행 (약 수 분의 시간이 걸린다.)
Loading Library, Building the model graph, preprocessing the first batch of data, and warming up the GPUs
실제 위의 작업이 수행될 떄 synchronous SGD 의 특성상 새로 init되는 process가 straggler가 될 것이다. (모든 System을 init time 만큼 idle 상태로 빠진다.)
Auto-scaling engine에서는 새로운 resource가 할당될 때에는 미리 수 개의 Batch를 소비하면서 warm-up이 확실히 완료된 worker를 group에 join하는 방식

Bootstrapping new workers

새로운 worker가 편입되는 순간 worker에 대해서 gradient update를 위해 Parameter synchronization 과정이 필요한데, 이를 최소화 하기 위해 각 worker별 다른 model parameter의 slice를 병렬적으로 땡겨서 parameter를 update한다.
위의 구현을 적용했을 때의 성능 차이

Evaluation

실험 환경 - Cluster
Dataset - ImageNet, CIFAR-10
Baseline - Static Resource Allocation
Auto Scaling Engine Configuration
Scaling Efficiency Threshold 0.1
Scale step (4 GPU increment)

CIFAR-10

ResNet-56 Model을 CIFAR-10으로 Global Batch Size 1024, 200 epoch 학습 시켰다. GPU는 worker당 한 개, 그리고 4~24개의 GPU까지 Scale-up 하면서 실험
CIFAR-10에 경우 4개의 GPU를 사용했을 때 성능이 가장 좋았다. 그 이유는 Global Batch가 너무 작기 떄문에 parallelism이 충분히 올라가지 못하기 때문이다.
실제 AutoScaling Engine은 모두 4개의 GPU로 converge 헀다. (즉 최적의 GPU Worker를 찾았다.)
그리고 평균 Completion time이 8.23%~16.0% 까지 빠르게 학습을 끝낼 수 있었다. total gpu time도 58.6%~85.1% 만큼 줄일 수 있었다. Figure 6. (a), (b)
그리고 auto-scaling system이 misallocation된 값을 복구하는데 61.0 sec ~ 78.4 sec 이 소요되었는데 이는 매우 짧은 시간이고 이 시간의 대부분의 portion을 bootstrap phase에서 소요되었다.

ImageNet

ImageNet의 경우엔 ResNet-50를 사용했고 global batch size 4096으로 사용했다. 그리고 local batch size는 fix를 하고 4096을 limit으로 scaling되도록 설정하였다. 이것 때문에 Worker가 늘어날 수록 efficiency가 증가한다. (그런데 당연히 V100을 사용하는데 Fix Local Batch를 사용한다는 뜻은 Per GPU efficiency는 더 떨어질 것 같다.)
전체 수행시간은 64 GPUs 에서 최소가 된다. 보면 auto-scaling engine을 사용한 경우 total completion time이 19.4% ~ 45.0% 감소했다. 그리고 GPU Time은 실제 7.39% ~ 14.7% 늘었는데 performance의 증가가 있었음에도 불구하고 static mode에 비해서 성능이 감소한 이유는 suboptimal number of worker로 돌아간 시점이 중간중간 많았기 떄문에 이러한 Overhead가 추가적으로 야기되었다고 볼 수 있고 resource allocation을 하면서 발생되는 Idle 타임도 이러한 문제를 야기했다.
마지막으로 misallocation 문제가 해결되는 시점까지 걸린 시간도 CIFAR-10 대비해서 더 오래 걸렸는데, many trials 이 발생했고 scale out을 하면서 resource가 추가 할당되면서 initialization overhead가 크게 발생하기 때문에 CIFAR-10보다 큰 보정 시간이 걸린 것은 사실이다.
결국 이런 문제를 보여주지만 실제 학습 시간이 굉장히 길기 때문에 이러한 문제점들이 충분한 학습 시간을 통해 상쇄 될 수 있다.

Discussion

High performance GPU 보다는 Low-end GPU에 적용하면 더 좋을 것 같다. 왜냐하면 per gpu utilization을 크게 높일 수 있으니깐 ?
Per-gpu의 multi-tenancy 까지 고려될 수 있는지 ?
GPU Cluster를 운용하는 입장에서 꼭 고민이 되어야 하는 것 같다.