Configuring the Simulator¶
YAQS draws a sharp line between what you simulate and how it runs:
Layer |
Role |
|---|---|
|
The physics: initial state, operator, time grid, observables, trajectory count, truncation, noise. |
The execution: parallel vs. serial trajectories, worker count, progress reporting, multiprocessing start method, and retry policy for transient worker errors. |
This page walks through every option on the Simulator class so you can tune execution without touching the physics.
1from mqt.yaqs import Result, Simulator
2from mqt.yaqs.core.data_structures.hamiltonian import Hamiltonian
3from mqt.yaqs.core.data_structures.simulation_parameters import AnalogSimParams, Observable
4from mqt.yaqs.core.data_structures.state import State
5from mqt.yaqs.core.libraries.gate_library import Z
A small reusable analog problem we will simulate throughout:
1L = 4
2H = Hamiltonian.ising(L, J=1.0, g=0.5)
3
4
5def make_params(num_traj: int = 8) -> AnalogSimParams:
6 """A short Ising evolution measuring `<Z>` on every site."""
7 return AnalogSimParams(
8 observables=[Observable(Z(), site) for site in range(L)],
9 elapsed_time=0.2,
10 dt=0.05,
11 num_traj=num_traj,
12 max_bond_dim=8,
13 svd_threshold=1e-9,
14 sample_timesteps=False,
15 random_seed=0,
16 )
17
18
19state = State(L, initial="zeros")
Quick start: defaults¶
Calling Simulator() with no arguments gives you parallel execution across most of your CPU cores, a tqdm progress bar, an "auto" multiprocessing context, and a generous retry policy.
1sim = Simulator()
2print("parallel: ", sim.parallel)
3print("max_workers: ", sim.max_workers)
4print("show_progress: ", sim.show_progress)
5print("mp_context: ", sim.mp_context)
6print("max_retries: ", sim.max_retries)
7print("retry_exceptions:", sim.retry_exceptions)
parallel: True
max_workers: 1
show_progress: True
mp_context: auto
max_retries: 10
retry_exceptions: (<class 'concurrent.futures._base.CancelledError'>, <class 'TimeoutError'>, <class 'OSError'>)
Every option is keyword-only, so you can override one without specifying the others:
1quiet_sim = Simulator(show_progress=False)
Reusing one Simulator across runs¶
A Simulator instance is stateless with respect to the physics; the same instance can drive arbitrarily many run() calls. This is the recommended pattern in scripts and notebooks because it keeps execution configuration in one place.
1sim = Simulator(show_progress=False)
2
3for noise_strength in (0.0, 0.05, 0.1):
4 params = make_params()
5 # ... build a NoiseModel from `noise_strength` here in real code ...
6 result = sim.run(state, H, params, noise_model=None)
7 print(f"gamma={noise_strength}: <Z_0>={float(result.expectation_values[0][0]):+.4f}")
gamma=0.0: <Z_0>=+0.9803
gamma=0.05: <Z_0>=+0.9803
gamma=0.1: <Z_0>=+0.9803
/tmp/ipykernel_2687/2485894359.py:7: ComplexWarning: Casting complex values to real discards the imaginary part
print(f"gamma={noise_strength}: <Z_0>={float(result.expectation_values[0][0]):+.4f}")
Each call constructs a short-lived ProcessPoolExecutor when parallel=True; pools are not persisted across run() calls, so you can safely change sim.max_workers (or replace sim entirely) between calls.
parallel: process-pool vs. in-process execution¶
parallel=True (the default) runs trajectories in worker processes via concurrent.futures.ProcessPoolExecutor. parallel=False runs every trajectory in the calling process, which is useful for:
Debugging (full tracebacks, no pickling, breakpoints work).
Very small jobs where the pool startup cost dominates.
Notebook cells where you want to share state with the caller.
Both modes produce identical results for a fixed random_seed:
1import numpy as np
2
3params_serial = make_params()
4sim_serial = Simulator(parallel=False, show_progress=False)
5result_serial = sim_serial.run(state, H, params_serial)
6
7params_parallel = make_params()
8sim_parallel = Simulator(parallel=True, max_workers=2, show_progress=False)
9result_parallel = sim_parallel.run(state, H, params_parallel)
10
11for vals_s, vals_p in zip(result_serial.expectation_values, result_parallel.expectation_values, strict=True):
12 np.testing.assert_allclose(vals_s, vals_p, atol=1e-10)
13print("Serial and parallel results match.")
Serial and parallel results match.
Note
For runs with num_traj == 1 (e.g. noise-free analog/circuit dynamics, Lindblad), the simulator automatically takes the in-process path even with parallel=True. The pool is only spun up when there is more than one trajectory to dispatch.
max_workers and how the default is chosen¶
When max_workers is left as None, the simulator picks max(1, available_cpus() - 1) to leave one core free for the parent process and the OS:
1from mqt.yaqs.simulator import available_cpus
2
3print("available CPUs YAQS would use:", available_cpus())
4print("default max_workers: ", Simulator().max_workers)
available CPUs YAQS would use: 2
default max_workers: 1
available_cpus() (re-exported as available_cpus()) is deliberately cgroup- and scheduler-aware. In priority order it honours:
YAQS_MAX_WORKERS(explicit user override; positive integer).PYTEST_XDIST_WORKER(returns1to avoid nested parallelism in tests).SLURM hints —
SLURM_CPUS_PER_TASKthenSLURM_CPUS_ON_NODE.Linux
os.sched_getaffinity(0)(respectstaskset, containers, cgroups).os.cpu_count()as a final fallback.
Override the resolution either by setting the environment variable…
# In a shell: export YAQS_MAX_WORKERS=4
…or by passing max_workers explicitly:
1sim_four = Simulator(max_workers=4, show_progress=False)
2print("fixed max_workers:", sim_four.max_workers)
fixed max_workers: 4
show_progress: tqdm bars¶
show_progress=True (default) shows a tqdm bar labelled “Running trajectories” (or “Running unitary ensemble” for the deterministic ensemble path). Set show_progress=False to silence it — useful in test suites, batch scripts, and CI logs:
1silent = Simulator(show_progress=False)
2silent.run(state, H, make_params(num_traj=4))
Result(sim_params=<mqt.yaqs.core.data_structures.simulation_parameters.AnalogSimParams object at 0x7cfee00319a0>, observables=[<mqt.yaqs.core.data_structures.simulation_parameters.Observable object at 0x7cfedcc4e420>, <mqt.yaqs.core.data_structures.simulation_parameters.Observable object at 0x7cfedcc4e660>, <mqt.yaqs.core.data_structures.simulation_parameters.Observable object at 0x7cfedcc4e720>, <mqt.yaqs.core.data_structures.simulation_parameters.Observable object at 0x7cfedcc4e7e0>], expectation_values=[array([0.98033079+0.j]), array([0.98110104+0.j]), array([0.98110104+0.j]), array([0.98033079+0.j])], trajectories=[array([[0.98033079+0.j]]), array([[0.98110104+0.j]]), array([[0.98110104+0.j]]), array([[0.98033079+0.j]])], times=array([0.2]), runtime_cost=array([24.]), max_bond=array([2.]), total_bond=array([6.]), noise_model=None, output_state=None, multi_time_times=None, multi_time_results=None, measurements=[], counts=None)
The bar is suppressed regardless of parallel, so the same flag also silences serial runs.
mp_context: multiprocessing start method¶
mp_context controls how worker processes are spawned. The default "auto" picks the best option per OS:
Value |
Behaviour |
|---|---|
|
|
|
Fastest worker startup; reuses Python state from the parent. Safe in YAQS because BLAS/OpenMP pools are capped. |
|
Fresh interpreter per worker. Required on Windows/macOS; slower startup but more isolated. |
1from mqt.yaqs.parallel_utils import get_parallel_context
2
3for choice in ("auto", "fork", "spawn"):
4 try:
5 ctx = get_parallel_context(choice)
6 except ValueError as exc:
7 print(f"{choice}: not available on this platform ({exc})")
8 else:
9 print(f"{choice}: start_method={ctx.get_start_method()}")
auto: start_method=fork
fork: start_method=fork
spawn: start_method=spawn
If you mix YAQS with GPU libraries or anything that does not survive fork(), force mp_context="spawn".
max_retries and retry_exceptions¶
Long parallel runs occasionally encounter transient worker failures: a worker is cancelled by the OS, a TimeoutError is raised, or a transient OSError (e.g. a temporary file system hiccup) propagates out of a backend. By default, the simulator retries each failing trajectory up to 10 times for the following exception types:
concurrent.futures.CancelledErrorTimeoutErrorOSError
1print("default max_retries: ", Simulator().max_retries)
2print("default retry_exceptions:", Simulator().retry_exceptions)
default max_retries: 10
default retry_exceptions: (<class 'concurrent.futures._base.CancelledError'>, <class 'TimeoutError'>, <class 'OSError'>)
Tighten the policy for fail-fast development (e.g. when bisecting an error):
1strict = Simulator(max_retries=0, show_progress=False)
Or broaden it for unreliable environments:
1import concurrent.futures
2
3resilient = Simulator(
4 max_retries=20,
5 retry_exceptions=(concurrent.futures.CancelledError, TimeoutError, OSError, ConnectionError),
6 show_progress=False,
7)
Permanent errors (e.g. ValueError from your physics setup, AssertionError from invariants) are not retried — they propagate after the first failure regardless of max_retries.
Inspecting the return value: Result¶
run() returns a Result that holds every simulation output through a small, stable surface. The AnalogSimParams you passed in is referenced unchanged at result.sim_params:
1sim = Simulator(show_progress=False)
2params = make_params()
3result = sim.run(state, H, params)
4
5print("type: ", type(result).__name__)
6print("len(observables): ", len(result.observables))
7print("len(expectation_values):", len(result.expectation_values))
8print("len(trajectories): ", len(result.trajectories))
9print("times: ", result.times)
10print("noise_model: ", result.noise_model)
11print("output_state: ", result.output_state)
12print("counts (weak only): ", result.counts)
13print("multi_time_times: ", result.multi_time_times)
14print("multi_time_results: ", result.multi_time_results)
15print("runtime_cost: ", result.runtime_cost)
16print("max_bond: ", result.max_bond)
17print("total_bond: ", result.total_bond)
type: Result
len(observables): 4
len(expectation_values): 4
len(trajectories): 4
times: [0.2]
noise_model: None
output_state: None
counts (weak only): None
multi_time_times: None
multi_time_results: None
runtime_cost: [24.]
max_bond: [2.]
total_bond: [6.]
The properties that don’t apply to your simulation kind return None (or an empty list for observables in weak simulations), so you can branch on them safely. The full set is:
Property |
Populated for |
|---|---|
|
Analog and strong digital runs. Empty list for weak digital. |
|
Aggregated expectation per observable (parallel to |
|
Per-trajectory data per observable (parallel to |
|
Shared analog time grid; |
|
MPS-backed analog and strong digital runs (contraction-cost heuristic over time). |
|
MPS-backed analog and strong digital runs (maximum bond dimension over time). |
|
MPS-backed analog and strong digital runs (sum of internal bond dimensions). |
|
Any run that was given a |
|
Runs with |
|
Analog deterministic ensembles with |
|
Weak digital simulations (the |
Result (and its wrapped sim_params) is pickleable, so you can checkpoint and resume analysis from disk:
1import pickle
2
3blob = pickle.dumps(result)
4restored: Result = pickle.loads(blob) # noqa: S301
5print("restored observable count:", len(restored.observables))
restored observable count: 4
Choosing settings for common scenarios¶
Scenario |
Recommended |
|---|---|
Quick local run, want to see a progress bar |
|
Notebook / docs build / CI logs |
|
Debugging a physics setup |
|
Single-process benchmark, all cores in the worker |
|
Fixed core budget (e.g. SLURM job step) |
|
Mixing with GPU / non-fork-safe code |
|
Long unattended run on a flaky cluster |
|
For physics-side settings (num_traj, max_bond_dim, svd_threshold, random_seed, sample_timesteps, observables, noise), see Noisy Analog Simulation, Representation Comparison, and Initializing quantum states.