Composable Pipelines & Callbacks
How to build custom execution graphs and log metrics to external platforms.
The Question
The Engine.evaluate() function is great for standard roofline analysis, but what if you want to insert a proprietary power model into the middle of the evaluation? What if you need to log every simulation result to a database or an MLOps platform like Weights & Biases?
How do we extend the core framework without forking it?
- Compose custom execution graphs using the
Pipelineclass. - Inject your own analytical solvers into the evaluation flow.
- Register callback middleware to log or intercept data.
1. Setup
First, import the necessary modules. We will use the Pipeline orchestrator directly instead of relying on the high-level Engine.
import mlsysim
from mlsysim.core.pipeline import Pipeline
from mlsysim.core.solver import DistributedModel, EconomicsModel2. The Composable Pipeline
The Pipeline object accepts a list of resolvers. When you call .run(), it executes them in order. Critically, the output of an early solver is automatically made available as input to later solvers.
Let’s build a standard evaluation pipeline:
# 1. Define the execution graph
pipe = Pipeline([
DistributedModel(),
EconomicsModel()
])
# 2. Provide the initial inputs (Demand & Supply)
results = pipe.run(
model=mlsysim.Models.Llama3_8B,
fleet=mlsysim.Systems.Clusters.Frontier_8K,
batch_size=32,
precision="fp16",
efficiency=0.45,
duration_days=30
)
# 3. Access results by stage name
tco = results["EconomicsModel"].tco_usd
print(f"Total Cost of Ownership: ${tco:,.2f}")3. Injecting Custom Solvers
Because the pipeline is dynamic, you can insert custom analytical solvers. Suppose you wrote a custom plugin (as described in the Extending the Engine guide) called PowerEfficiencyModel.
You can just snap it into the DAG:
# Assuming you wrote this class
# class PowerEfficiencyModel(BaseModel): ...
custom_pipe = Pipeline([
DistributedModel(),
PowerEfficiencyModel(), # Injected right into the middle
EconomicsModel()
])
# Use it just like before
# custom_pipe.run(...)4. Callbacks (Middleware)
MLSys·im supports middleware hooks. If you want to log metrics to an external platform (like a database, MLflow, or Weights & Biases), you can write a Callback class with an on_stage_end method.
The pipeline will trigger this method every time a solver finishes.
class WandbLogger:
def on_stage_end(self, stage_name: str, result):
print(f"[W&B Callback] Logging data for {stage_name}...")
# If the result is a Pydantic model, we can dump it to a dict
if hasattr(result, "model_dump"):
metrics = result.model_dump(exclude_none=True)
# Simulated: wandb.log({f"{stage_name}/{k}": v for k, v in metrics.items()})
print(f" -> Logged {len(metrics)} keys")
# Attach the logger
pipe.register_callback(WandbLogger())
# Run the pipeline again to see the callback fire
print("\\nRunning Pipeline with Callback:")
pipe.run(
model=mlsysim.Models.Llama3_8B,
fleet=mlsysim.Systems.Clusters.Frontier_8K,
batch_size=32,
precision="fp16",
efficiency=0.45,
duration_days=30
)What You Learned
Pipelineallows you to build custom, sequential execution graphs.- Data flows automatically: outputs from Stage N become inputs to Stage N+1.
- Callbacks enable seamless integration with MLOps tracking tools without altering the core simulation logic.