Distributed Training: 3D Parallelism and Scaling Efficiency

Discover why 1024 GPUs rarely deliver 1024× speedup — and how to minimize the gap.

NoteBackground: Why distributed training?

Some models are too large to fit in a single GPU’s memory, and some training jobs would take months on one GPU. Distributed training splits the work across many GPUs. This tutorial explores the three main ways to split work and the overhead each one introduces. You should complete the Hello World and LLM Serving tutorials before this one.

Scaling a training job from 1 GPU to 1024 GPUs incurs overhead at every step. Communication, pipeline stalls, and coordination each chip away at theoretical speedup. Understanding where that efficiency goes, and how to recover it, is what separates a well-tuned distributed training job from an expensive waste of cluster time.

By the end of this tutorial you will understand:

Tip3D Parallelism at a Glance

Modern distributed training uses three orthogonal strategies simultaneously:

Strategy What it splits Main overhead
Data Parallelism (DP) Batch across GPUs All-reduce gradients after backward pass
Tensor Parallelism (TP) Individual matrix ops within a layer All-gather within each forward/backward
Pipeline Parallelism (PP) Layer groups across nodes Pipeline bubble at start/end of batch

The product \(\text{DP} \times \text{TP} \times \text{PP} = \text{total GPUs}\).


1. Setup

import mlsysim
from mlsysim import DistributedModel
from mlsysim import DistributedModel

# Llama-3.1-70B: the model requires distributed training — too large for a single GPU
model = mlsysim.Models.Llama3_70B

# A research-scale cluster: 32 DGX H100 nodes × 8 GPUs = 256 H100s
# (DGX is NVIDIA's pre-built server containing 8 H100 GPUs connected via NVLink)
cluster = mlsysim.Systems.Clusters.Research_256

print(f"Model:   {model.name}  ({model.parameters.to('Gparam'):.0f} params)")
print(f"Cluster: {cluster.name}")
print(f"  Nodes: {cluster.count}  ×  {cluster.node.accelerators_per_node} GPUs/node")
print(f"  Total: {cluster.total_accelerators} accelerators")
print(f"  Fabric: {cluster.fabric.name} @ {cluster.fabric.bandwidth.to('GB/s'):.0f} GB/s/link")
Model:   Llama-3.1-70B  (71 gigaparam params)
Cluster: Research Cluster (256 GPUs)
  Nodes: 32  ×  8 GPUs/node
  Total: 256 accelerators
  Fabric: 100GbE @ 12 GB / second GB/s/link

2. Visualizing 3D Parallelism

Before working through the numbers, consider how 3D parallelism decomposes a training job across a cluster. Each dimension splits work differently and introduces a different type of overhead:

Data Parallelism (DP=4) — each GPU holds a full model copy and processes 1/4 of the batch. After the backward pass, gradients are synchronized via All-Reduce.

flowchart LR
    R1["Replica 1<br/>Batch 1/4"] <-.->|"All-Reduce"| R2["Replica 2<br/>Batch 2/4"]
    R2 <-.->|"All-Reduce"| R3["Replica 3<br/>Batch 3/4"]
    R3 <-.->|"All-Reduce"| R4["Replica 4<br/>Batch 4/4"]

Data Parallelism: replicate the model, split the batch, synchronize gradients.

Tensor Parallelism (TP=2) — each layer is split across GPUs. Requires fast interconnect (NVLink).

flowchart LR
    G1["GPU 0<br/>Left half of each layer"] <-->|"All-Gather<br/>(NVLink)"| G2["GPU 1<br/>Right half of each layer"]

Tensor Parallelism: split each layer across GPUs, communicate via NVLink.

Pipeline Parallelism (PP=4) — model layers are partitioned across stages. Activations flow forward; gradients flow backward.

flowchart LR
    S1["Stage 1<br/>Layers 1–20"] --> S2["Stage 2<br/>Layers 21–40"]
    S2 --> S3["Stage 3<br/>Layers 41–60"]
    S3 --> S4["Stage 4<br/>Layers 61–80"]

Pipeline Parallelism: partition layers across stages, activations flow forward.

The key insight: DP uses inter-node bandwidth (network fabric), TP uses intra-node bandwidth (NVLink), and PP introduces idle time (pipeline bubbles). The optimal configuration balances all three overheads.


3. Baseline: Pure Data Parallelism

Start with the simplest configuration — no model splitting, just replicate the full model on every GPU and split the batch. The per-GPU compute time is determined by the same roofline model you used in the Hello World tutorial. The new element here is communication overhead: after each training step, all GPUs must synchronize their gradients via the network before the next step can begin.

solver = DistributedModel()

result_dp = solver.solve(
    model=model,
    fleet=cluster,
    batch_size=256,
    precision="fp16",
    tp_size=1,   # no tensor parallelism
    pp_size=1,   # no pipeline parallelism
)

node_perf = result_dp.node_profile
print(f"Single-GPU compute time:     {node_perf.latency.to('ms'):~.1f}/step")
print(f"DP all-reduce overhead:      {result_dp.dp_communication_latency.to('ms'):~.2f}")
print(f"Pipeline bubble:             {result_dp.pipeline_bubble_latency.to('ms'):~.2f}")
print(f"")
print(f"Total step latency:          {result_dp.step_latency_total.to('ms'):~.1f}")
print(f"Scaling efficiency:          {result_dp.scaling_efficiency:.1%}")
print(f"Effective throughput:        {result_dp.effective_throughput.magnitude:.0f} samples/s")
print(f"Parallelism:                 DP={result_dp.parallelism['dp']}  TP={result_dp.parallelism['tp']}  PP={result_dp.parallelism['pp']}")
Single-GPU compute time:     2429.3 ms/step
DP all-reduce overhead:      3047.74 ms
Pipeline bubble:             0.00 ms

Total step latency:          5477.0 ms
Scaling efficiency:          44.4%
Effective throughput:        47 samples/s
Parallelism:                 DP=256  TP=1  PP=1
NoteWhat does scaling efficiency mean?

If scaling efficiency is 80%, then your 256-GPU cluster is delivering the equivalent of about 205 fully-utilized GPUs. The other ~51 GPUs worth of compute is being spent on communication overhead. This is the communication tax of distributed training.

The tax is paid in ring all-reduce: after the backward pass, every GPU must synchronize gradients with every other GPU. The time to do this grows with model size and shrinks with network bandwidth.


4. Ring All-Reduce: The Network Tax

The DP all-reduce overhead comes from the ring all-reduce algorithm, which is the standard method for gradient synchronization.

Note🧮 See the Math

For the full equation deriving All-Reduce overhead from model size, node count, and fabric bandwidth, see the Mathematical Foundations: Ring All-Reduce.

The following sweep shows how fabric bandwidth affects overhead:

from mlsysim import Fleet, NetworkFabric, Systems

fabrics = [
    ("100GbE",      Systems.Fabrics.Ethernet_100G),
    ("IB HDR",      Systems.Fabrics.InfiniBand_HDR),
    ("IB NDR",      Systems.Fabrics.InfiniBand_NDR),
]

print(f"{'Fabric':>10}  {'BW (GB/s)':>10}  {'Comm overhead':>14}  {'Efficiency':>11}")
print("-" * 52)

for fab_name, fabric in fabrics:
    custom_cluster = Fleet(
        name="Custom",
        node=Systems.Nodes.DGX_H100,
        count=32,
        fabric=fabric
    )
    r = solver.solve(
        model=model,
        fleet=custom_cluster,
        batch_size=256,
        precision="fp16"
    )
    print(
        f"{fab_name:>10}  "
        f"{fabric.bandwidth.to('GB/s'):>10.0f~}  "
        f"{r.dp_communication_latency.to('ms'):>14.2f~}  "
        f"{r.scaling_efficiency:>11.1%}"
    )
    Fabric   BW (GB/s)   Comm overhead   Efficiency
----------------------------------------------------
    100GbE          12 GB / s         3047.74 ms        44.4%
    IB HDR          25 GB / s         1677.20 ms        59.2%
    IB NDR          50 GB / s          993.14 ms        71.0%
WarningFabric choice determines scaling efficiency

Upgrading from 100GbE to InfiniBand NDR roughly doubles the effective inter-node bandwidth. On a model the size of Llama-70B (140 GB of gradients per step in fp16), that difference is significant. For smaller models, it matters less — compute time dominates.


5. Pipeline Parallelism and the Bubble

Pipeline Parallelism splits the model’s layers across multiple nodes. Node 1 runs layers 1–20, node 2 runs layers 21–40, etc. This allows a much larger model to be trained than fits on a single node.

The downside: a pipeline bubble. The first microbatch must flow through all stages before the last stage can start processing the second microbatch. During that startup phase, most GPUs are idle.

Note🧮 See the Math

For the full equation governing pipeline bubbles and interleaved 1F1B schedules, see the Mathematical Foundations: Pipeline Parallelism Bubble.

print(f"{'PP stages':>10}  {'Microbatches':>13}  {'Bubble %':>9}  {'Comm (ms)':>10}  {'Efficiency':>11}")
print("-" * 60)

for pp_size in [1, 2, 4, 8]:
    for m in [1, 4, 16]:
        # Only show interesting combinations
        if pp_size == 1 and m > 1:
            continue
        tp = min(8, cluster.total_accelerators // (pp_size * 4))
        r = solver.solve(
            model=model,
            fleet=cluster,
            batch_size=256,
            precision="fp16",
            tp_size=1,
            pp_size=pp_size,
            microbatch_count=m
        )
        bubble_pct = r.bubble_fraction * 100
        print(
            f"{pp_size:>10}  "
            f"{m:>13}  "
            f"{bubble_pct:>9.1f}%  "
            f"{r.pipeline_bubble_latency.to('ms'):>10.1f~}  "
            f"{r.scaling_efficiency:>11.1%}"
        )
 PP stages   Microbatches   Bubble %   Comm (ms)   Efficiency
------------------------------------------------------------
         1              1        0.0%         0.0 ms        44.4%
         2              1       50.0%      1325.0 ms        38.2%
         2              4       20.0%       530.0 ms        43.2%
         2             16        5.9%       155.9 ms        46.0%
         4              1       75.0%      2318.4 ms        37.7%
         4              4       42.9%      1324.8 ms        43.0%
         4             16       15.8%       488.1 ms        48.6%
         8              1       87.5%      3477.0 ms        40.2%
         8              4       63.6%      2528.7 ms        44.5%
         8             16       30.4%      1209.4 ms        52.2%
TipRecovering bubble efficiency

Increasing the number of microbatches (\(M\)) reduces the bubble fraction. With \(M = 16\) and \(P = 8\), the bubble is only \(7/(7+16) ≈ 30\%\) of the pipeline, down from \(88\%\) with \(M = 1\).

In practice, frameworks like Megatron-LM use interleaved pipeline schedules that further reduce the bubble. But even with the standard 1F1B schedule, choosing \(M \gg P\) is essential.


6. Finding the Optimal Configuration

Now combine all three parallelism strategies and find the configuration that maximizes scaling efficiency for the Research_256 cluster. In practice, 70-80% scaling efficiency on hundreds of GPUs is considered excellent. Below 50% typically signals a suboptimal parallelism configuration or insufficient network bandwidth.

configs = [
    # (description,             tp, pp, m)
    ("DP only",                  1,  1,  1),
    ("DP + TP=2",                2,  1,  1),
    ("DP + PP=4, M=16",          1,  4, 16),
    ("DP + TP=2 + PP=4, M=16",   2,  4, 16),
    ("DP + TP=8 + PP=4, M=16",   8,  4, 16),
]

print(f"{'Config':<26}  {'DP':>4} {'TP':>4} {'PP':>4}  {'Efficiency':>11}  {'Throughput':>14}")
print("-" * 72)

for desc, tp, pp, m in configs:
    try:
        r = solver.solve(
            model=model,
            fleet=cluster,
            batch_size=256,
            precision="fp16",
            tp_size=tp,
            pp_size=pp,
            microbatch_count=m
        )
        print(
            f"{desc:<26}  "
            f"{r.parallelism['dp']:>4}  "
            f"{r.parallelism['tp']:>4}  "
            f"{r.parallelism['pp']:>4}  "
            f"{r.scaling_efficiency:>11.1%}  "
            f"{r.effective_throughput.magnitude:>14.1f}"
        )
    except ValueError as e:
        print(f"{desc:<26}  {'INFEASIBLE':>44}  ({e})")
Config                        DP   TP   PP   Efficiency      Throughput
------------------------------------------------------------------------
DP only                      256     1     1        44.4%            46.7
DP + TP=2                    128     2     1        64.0%           123.6
DP + PP=4, M=16               64     1     4        48.6%           161.0
DP + TP=2 + PP=4, M=16        32     2     4        67.8%           349.3
DP + TP=8 + PP=4, M=16         8     8     4        83.5%           737.9

Your Turn

CautionExercises

Exercise 1: Predict before you observe. For a 256-GPU cluster training Llama-3.1-70B, predict: will DP=256, TP=1, PP=1 have higher or lower scaling efficiency than DP=32, TP=4, PP=2? Write your prediction and reasoning, then run both configurations. Were you right?

Exercise 2: Find the optimal configuration. Sweep all valid 3D parallelism configurations for 256 GPUs (where DP x TP x PP = 256). Which configuration maximizes scaling efficiency? Is it the same for Ethernet 100G vs. InfiniBand NDR? (Hint: valid TP values are divisors of 8, the GPUs per node: 1, 2, 4, 8. For each TP, valid PP values are divisors of 256/TP.)

Exercise 3: The microbatch lever. With PP=8, sweep microbatch count M from 1 to 64. Plot the pipeline bubble fraction vs. M. At what value of M does the bubble fraction drop below 10%? (Use the formula from Section 5: bubble = (P-1)/(P-1+M). Predict the answer analytically before running the sweep.)

Self-check: Why must tensor parallelism (TP) stay within a single node on most clusters? What would happen to communication overhead if TP crossed node boundaries?


What You Learned

  • 3D Parallelism decomposes the training problem across \(\text{DP} \times \text{TP} \times \text{PP}\) GPUs, each with distinct communication costs.
  • Ring all-reduce is the network tax of data parallelism. It grows with model size and shrinks with fabric bandwidth. Switching from 100GbE to InfiniBand can recover 10-30% efficiency on large models.
  • Pipeline bubbles waste GPU cycles proportional to \(\frac{P-1}{P-1+M}\). Use large microbatch counts (\(M \gg P\)) to minimize waste.
  • Scaling efficiency below 100% is normal and unavoidable. A well-tuned job at 70-80% efficiency on hundreds of GPUs is excellent. Below 50% signals a configuration problem.

Advanced: Network Topologies

The same communication equations can be used to compare topology assumptions. A ring collective is bandwidth-optimal for large gradient buffers, while tree-style collectives reduce startup latency for smaller messages. In practice, topology choice matters because the fabric determines which term dominates: link bandwidth, oversubscription, or per-hop latency.

For this release, the built-in DistributedModel exposes the core ring and hierarchical all-reduce model. Use the lower-level formulas in mlsysim.core.formulas when you want to compare algorithmic assumptions directly.


Next Steps

  • Two Phases of Inference: After training, learn how to model the serving cost of the same model
  • Math Foundations: Full derivations for ring all-reduce, pipeline bubble, and MFU
  • Fleet Zoo: Browse the available cluster configurations and their network specs
Back to top