Skip to content

API Reference

Core Modules

qgate.config

config.py — Pydantic v2 configuration models for qgate.

All configuration objects are JSON-serialisable, immutable (frozen=True), and carry field-level validation.

Patent pending (see LICENSE)

ThresholdMode = Literal['fixed', 'rolling_z', 'galton'] module-attribute

Threshold adaptation strategy.

  • "fixed" — Static threshold (no adaptation).
  • "rolling_z" — Legacy rolling z-score gating (existing behaviour).
  • "galton" — Distribution-aware adaptive gating inspired by diffusion / central-limit principles. Supports both empirical-quantile and z-score sub-modes.

ConditioningVariant

Bases: str, Enum

Supported conditioning strategies.

Source code in src/qgate/config.py
class ConditioningVariant(str, Enum):
    """Supported conditioning strategies."""

    GLOBAL = "global"
    HIERARCHICAL = "hierarchical"
    SCORE_FUSION = "score_fusion"

AdapterKind

Bases: str, Enum

Known adapter back-ends.

Source code in src/qgate/config.py
class AdapterKind(str, Enum):
    """Known adapter back-ends."""

    QISKIT = "qiskit"
    CIRQ = "cirq"
    PENNYLANE = "pennylane"
    GROVER_TSVF = "grover_tsvf"
    QAOA_TSVF = "qaoa_tsvf"
    VQE_TSVF = "vqe_tsvf"
    QPE_TSVF = "qpe_tsvf"
    MOCK = "mock"

FusionConfig

Bases: BaseModel

Parameters for α-weighted LF / HF score fusion.

Attributes:

Name Type Description
alpha float

Weight for the low-frequency component (0 ≤ α ≤ 1).

threshold float

Accept if combined score ≥ threshold.

hf_cycles Optional[List[int]]

Explicit list of cycle indices counted as HF (None → every cycle).

lf_cycles Optional[List[int]]

Explicit list of cycle indices counted as LF (None → every 2nd cycle: 0, 2, 4, …).

Source code in src/qgate/config.py
class FusionConfig(BaseModel):
    """Parameters for α-weighted LF / HF score fusion.

    Attributes:
        alpha:      Weight for the low-frequency component (0 ≤ α ≤ 1).
        threshold:  Accept if combined score ≥ threshold.
        hf_cycles:  Explicit list of cycle indices counted as HF
                    (``None`` → every cycle).
        lf_cycles:  Explicit list of cycle indices counted as LF
                    (``None`` → every 2nd cycle: 0, 2, 4, …).
    """

    alpha: float = Field(default=0.5, ge=0.0, le=1.0, description="LF weight (0–1)")
    threshold: float = Field(
        default=0.65, ge=0.0, le=1.0, description="Accept if combined ≥ threshold"
    )
    hf_cycles: Optional[List[int]] = Field(default=None, description="Override HF cycle indices")
    lf_cycles: Optional[List[int]] = Field(default=None, description="Override LF cycle indices")

    model_config = ConfigDict(frozen=True, extra="forbid")

DynamicThresholdConfig

Bases: BaseModel

Parameters for dynamic threshold gating.

Supports three modes:

"fixed" (default) No adaptation — uses baseline as a static threshold.

"rolling_z" Legacy rolling z-score gating:

.. math:: \theta_t = \text{clamp}(\mu_{\text{roll}} + z \cdot \sigma_{\text{roll}},\; \theta_{\min},\; \theta_{\max})

"galton" Distribution-aware adaptive gating inspired by diffusion / central-limit principles. The algorithm maintains a rolling window of per-shot combined scores and sets the threshold so that a target fraction of future scores is expected to be accepted.

Two sub-modes are available:

* **Quantile** (``use_quantile=True``, recommended) — sets
  :math:`\theta = Q_{1 - \text{target\_acceptance}}(\text{window})`.
* **Z-score** — estimates μ and σ from the window, then
  :math:`\theta = \mu + z_{\sigma} \cdot \sigma`.  When
  ``robust_stats=True`` the median and MAD-based σ are used.

Attributes:

Name Type Description
enabled bool

Whether dynamic thresholding is active.

mode ThresholdMode

Threshold strategy ("fixed" | "rolling_z" | "galton").

baseline float

Starting / fallback threshold.

z_factor float

Std-dev multiplier for rolling_z mode.

window_size int

Rolling window capacity (batches for rolling_z, individual scores for galton).

min_threshold float

Floor — threshold never drops below this.

max_threshold float

Ceiling — threshold never exceeds this.

min_window_size int

Galton mode: minimum observations before adaptation kicks in (warmup).

target_acceptance float

Galton quantile mode: target acceptance fraction (one-sided tail).

robust_stats bool

Galton z-score mode: use median + MAD instead of mean + std.

use_quantile bool

Galton mode: prefer empirical quantile (True, default) over z-score.

z_sigma float

Galton z-score mode: number of σ above centre to place the gate.

.. note:: Setting mode="galton" automatically sets enabled=True during validation. You do not need to set both.

Source code in src/qgate/config.py
class DynamicThresholdConfig(BaseModel):
    """Parameters for dynamic threshold gating.

    Supports three modes:

    ``"fixed"`` (default)
        No adaptation — uses ``baseline`` as a static threshold.

    ``"rolling_z"``
        Legacy rolling z-score gating:

        .. math:: \\theta_t = \\text{clamp}(\\mu_{\\text{roll}} + z \\cdot \\sigma_{\\text{roll}},\\; \\theta_{\\min},\\; \\theta_{\\max})

    ``"galton"``
        Distribution-aware adaptive gating inspired by diffusion /
        central-limit principles.  The algorithm maintains a rolling window
        of **per-shot** combined scores and sets the threshold so that a
        target fraction of future scores is expected to be accepted.

        Two sub-modes are available:

        * **Quantile** (``use_quantile=True``, recommended) — sets
          :math:`\\theta = Q_{1 - \\text{target\\_acceptance}}(\\text{window})`.
        * **Z-score** — estimates μ and σ from the window, then
          :math:`\\theta = \\mu + z_{\\sigma} \\cdot \\sigma`.  When
          ``robust_stats=True`` the median and MAD-based σ are used.

    Attributes:
        enabled:            Whether dynamic thresholding is active.
        mode:               Threshold strategy (``"fixed"`` | ``"rolling_z"``
                            | ``"galton"``).
        baseline:           Starting / fallback threshold.
        z_factor:           Std-dev multiplier for ``rolling_z`` mode.
        window_size:        Rolling window capacity (batches for rolling_z,
                            individual scores for galton).
        min_threshold:      Floor — threshold never drops below this.
        max_threshold:      Ceiling — threshold never exceeds this.
        min_window_size:    Galton mode: minimum observations before
                            adaptation kicks in (warmup).
        target_acceptance:  Galton quantile mode: target acceptance
                            fraction (one-sided tail).
        robust_stats:       Galton z-score mode: use median + MAD
                            instead of mean + std.
        use_quantile:       Galton mode: prefer empirical quantile
                            (True, default) over z-score.
        z_sigma:            Galton z-score mode: number of σ above
                            centre to place the gate.

    .. note::
       Setting ``mode="galton"`` automatically sets ``enabled=True``
       during validation.  You do **not** need to set both.
    """

    enabled: bool = Field(default=False, description="Enable dynamic thresholding")
    mode: ThresholdMode = Field(
        default="fixed",
        description="Threshold strategy: fixed | rolling_z | galton",
    )
    baseline: float = Field(default=0.65, ge=0.0, le=1.0)
    z_factor: float = Field(default=1.5, ge=0.0, description="Std-dev multiplier (rolling_z)")
    window_size: int = Field(default=10, ge=1, description="Rolling window size")
    min_threshold: float = Field(default=0.3, ge=0.0, le=1.0)
    max_threshold: float = Field(default=0.95, ge=0.0, le=1.0)

    # --- Galton-specific fields (ignored when mode != "galton") -----------
    min_window_size: int = Field(
        default=100,
        ge=1,
        description="Galton: minimum samples before adaptation (warmup)",
    )
    target_acceptance: float = Field(
        default=0.05,
        gt=0.0,
        lt=1.0,
        description="Galton quantile: target acceptance fraction",
    )
    robust_stats: bool = Field(
        default=True,
        description="Galton z-score: use median + MAD instead of mean + std",
    )
    use_quantile: bool = Field(
        default=True,
        description="Galton: use empirical quantile (True) or z-score (False)",
    )
    z_sigma: float = Field(
        default=1.645,
        ge=0.0,
        description="Galton z-score: number of σ above centre (~5 % one-sided)",
    )

    @model_validator(mode="after")
    def _min_le_max(self) -> DynamicThresholdConfig:
        if self.min_threshold > self.max_threshold:
            raise ValueError(
                f"min_threshold ({self.min_threshold}) must be ≤ "
                f"max_threshold ({self.max_threshold})"
            )
        return self

    @model_validator(mode="after")
    def _auto_enable_galton(self) -> DynamicThresholdConfig:
        """Automatically set enabled=True when mode is not 'fixed'."""
        if self.mode != "fixed" and not self.enabled:
            object.__setattr__(self, "enabled", True)
        return self

    model_config = ConfigDict(frozen=True, extra="forbid")

ProbeConfig

Bases: BaseModel

Probe-based batch abort configuration.

Before running a full batch a small probe batch is executed. If the probe pass-rate is below theta the full batch is skipped.

Attributes:

Name Type Description
enabled bool

Whether probing is active.

probe_shots int

Number of probe shots.

theta float

Minimum probe pass-rate to proceed.

Source code in src/qgate/config.py
class ProbeConfig(BaseModel):
    """Probe-based batch abort configuration.

    Before running a full batch a small *probe* batch is executed.
    If the probe pass-rate is below ``theta`` the full batch is skipped.

    Attributes:
        enabled:     Whether probing is active.
        probe_shots: Number of probe shots.
        theta:       Minimum probe pass-rate to proceed.
    """

    enabled: bool = Field(default=False, description="Enable probe-based abort")
    probe_shots: int = Field(default=100, ge=1)
    theta: float = Field(default=0.65, ge=0.0, le=1.0)

    model_config = ConfigDict(frozen=True, extra="forbid")

GateConfig

Bases: BaseModel

Top-level configuration for a qgate trajectory-filter run.

Compose this from the sub-configs above, or load from JSON / YAML:

config = GateConfig.model_validate_json(path.read_text())

Attributes:

Name Type Description
schema_version str

Configuration schema version (for forward compat).

n_subsystems int

Number of Bell-pair subsystems.

n_cycles int

Number of monitoring cycles per shot.

shots int

Total shots to execute per configuration.

variant ConditioningVariant

Conditioning strategy to apply.

k_fraction float

For hierarchical variant — required pass fraction.

fusion FusionConfig

Fusion scoring parameters.

dynamic_threshold DynamicThresholdConfig

Rolling z-score threshold adaptation.

probe ProbeConfig

Probe-based batch abort.

adapter AdapterKind

Which adapter back-end to use.

adapter_options Dict[str, Any]

Arbitrary adapter-specific options (e.g. backend name).

metadata Dict[str, Any]

Free-form metadata dict attached to run logs.

Source code in src/qgate/config.py
class GateConfig(BaseModel):
    """Top-level configuration for a qgate trajectory-filter run.

    Compose this from the sub-configs above, or load from JSON / YAML:

        config = GateConfig.model_validate_json(path.read_text())

    Attributes:
        schema_version:  Configuration schema version (for forward compat).
        n_subsystems:    Number of Bell-pair subsystems.
        n_cycles:        Number of monitoring cycles per shot.
        shots:           Total shots to execute per configuration.
        variant:         Conditioning strategy to apply.
        k_fraction:      For hierarchical variant — required pass fraction.
        fusion:          Fusion scoring parameters.
        dynamic_threshold: Rolling z-score threshold adaptation.
        probe:           Probe-based batch abort.
        adapter:         Which adapter back-end to use.
        adapter_options: Arbitrary adapter-specific options (e.g. backend name).
        metadata:        Free-form metadata dict attached to run logs.
    """

    schema_version: str = Field(default="1", description="Config schema version")
    n_subsystems: int = Field(default=4, ge=1, description="Number of Bell-pair subsystems")
    n_cycles: int = Field(default=2, ge=1, description="Monitoring cycles per shot")
    shots: int = Field(default=1024, ge=1, description="Shots per configuration")
    variant: ConditioningVariant = Field(
        default=ConditioningVariant.SCORE_FUSION,
        description="Conditioning strategy",
    )
    k_fraction: float = Field(
        default=0.9, gt=0.0, le=1.0, description="Hierarchical k-of-N fraction"
    )
    fusion: FusionConfig = Field(default_factory=FusionConfig)
    dynamic_threshold: DynamicThresholdConfig = Field(default_factory=DynamicThresholdConfig)
    probe: ProbeConfig = Field(default_factory=ProbeConfig)
    adapter: AdapterKind = Field(default=AdapterKind.MOCK, description="Adapter back-end")
    adapter_options: Dict[str, Any] = Field(
        default_factory=dict, description="Adapter-specific options"
    )
    metadata: Dict[str, Any] = Field(default_factory=dict, description="Free-form run metadata")

    model_config = ConfigDict(extra="forbid", frozen=True)

qgate.filter

filter.py — TrajectoryFilter: the main qgate API entry-point.

Orchestrates adapter → execute → score → threshold → accept/reject → log.

Usage::

from qgate import TrajectoryFilter, GateConfig
from qgate.adapters import MockAdapter

config = GateConfig(n_subsystems=4, n_cycles=2, shots=1024)
adapter = MockAdapter(error_rate=0.05, seed=42)
tf = TrajectoryFilter(config, adapter)
result = tf.run()
print(result.acceptance_probability)

Patent pending (see LICENSE)

TrajectoryFilter

Main API class — build, run, and filter quantum trajectories.

Typical workflow::

tf = TrajectoryFilter(config, adapter)
result = tf.run()                  # build → execute → filter
result = tf.filter(outcomes)       # filter pre-existing data
result = tf.filter_counts(counts)  # filter from count dict

The adapter argument accepts: - A :class:BaseAdapter instance (existing usage). - A :class:BaseAdapter subclass (instantiated with no args). - An adapter name string (e.g. "mock", "qiskit") resolved via entry-point discovery.

Parameters:

Name Type Description Default
config GateConfig

:class:~qgate.config.GateConfig with all parameters.

required
adapter BaseAdapter | type | str

Backend adapter — instance, class, or registered name.

required
logger RunLogger | None

Optional :class:RunLogger for structured output.

None
Source code in src/qgate/filter.py
class TrajectoryFilter:
    """Main API class — build, run, and filter quantum trajectories.

    Typical workflow::

        tf = TrajectoryFilter(config, adapter)
        result = tf.run()                  # build → execute → filter
        result = tf.filter(outcomes)       # filter pre-existing data
        result = tf.filter_counts(counts)  # filter from count dict

    The *adapter* argument accepts:
      - A :class:`BaseAdapter` **instance** (existing usage).
      - A :class:`BaseAdapter` **subclass** (instantiated with no args).
      - An adapter **name string** (e.g. ``"mock"``, ``"qiskit"``)
        resolved via entry-point discovery.

    Args:
        config:  :class:`~qgate.config.GateConfig` with all parameters.
        adapter: Backend adapter — instance, class, or registered name.
        logger:  Optional :class:`RunLogger` for structured output.
    """

    def __init__(
        self,
        config: GateConfig,
        adapter: BaseAdapter | type | str,
        logger: RunLogger | None = None,
    ) -> None:
        self.config = config
        self.adapter = _resolve_adapter(adapter)
        self.logger = logger
        self._dyn_threshold = DynamicThreshold(config.dynamic_threshold)
        self._galton_threshold: GaltonAdaptiveThreshold | None = None
        if config.dynamic_threshold.mode == "galton":
            self._galton_threshold = GaltonAdaptiveThreshold(config.dynamic_threshold)

    def __repr__(self) -> str:
        return (
            f"TrajectoryFilter(variant={self.config.variant.value!r}, "
            f"n_sub={self.config.n_subsystems}, n_cyc={self.config.n_cycles}, "
            f"shots={self.config.shots}, adapter={type(self.adapter).__name__})"
        )

    # ------------------------------------------------------------------
    # Public API
    # ------------------------------------------------------------------

    def run(self) -> FilterResult:
        """Build circuit → execute → parse → filter → return result.

        This is the high-level "do everything" method.
        """
        outcomes = self.adapter.build_and_run(
            n_subsystems=self.config.n_subsystems,
            n_cycles=self.config.n_cycles,
            shots=self.config.shots,
        )
        return self.filter(outcomes)

    def filter(self, outcomes: Sequence[ParityOutcome]) -> FilterResult:
        """Apply the configured conditioning + thresholding to *outcomes*.

        Args:
            outcomes: List of ParityOutcome (one per shot).  May be empty.

        Returns:
            :class:`~qgate.run_logging.FilterResult`.
        """
        variant = self.config.variant
        total = len(outcomes)

        # Edge case: empty batch
        if total == 0:
            logger.warning("filter() called with zero outcomes")
            config_dump = self.config.model_dump_json(indent=2)
            adapter_name = type(self.adapter).__name__
            return FilterResult(
                run_id=compute_run_id(config_dump, adapter_name=adapter_name),
                variant=variant.value,
                total_shots=0,
                accepted_shots=0,
                acceptance_probability=0.0,
                tts=float("inf"),
                config_json=config_dump,
                metadata=dict(self.config.metadata),
            )

        logger.info(
            "Filtering %d outcomes — variant=%s, n_sub=%d, n_cyc=%d",
            total,
            variant.value,
            self.config.n_subsystems,
            self.config.n_cycles,
        )

        # Vectorised per-shot scoring
        scored = score_batch(
            outcomes,
            alpha=self.config.fusion.alpha,
            hf_cycles=self.config.fusion.hf_cycles,
            lf_cycles=self.config.fusion.lf_cycles,
        )
        combined_scores = [s[2] for s in scored]

        # --- Determine threshold to use --------------------------------
        dt_cfg = self.config.dynamic_threshold

        if dt_cfg.mode == "galton" and self._galton_threshold is not None:
            # Galton mode: feed per-shot scores into the adaptive window
            # and use the resulting threshold for score_fusion gating.
            self._galton_threshold.observe_batch(combined_scores)
            threshold = self._galton_threshold.current_threshold
            snap = self._galton_threshold.last_snapshot
            logger.debug(
                "Galton threshold → %.4f  (warmup=%s, window=%d)",
                threshold,
                snap.in_warmup,
                snap.window_size_current,
            )
        elif dt_cfg.enabled and dt_cfg.mode in ("rolling_z", "fixed") and combined_scores:
            batch_mean = float(np.mean(combined_scores))
            threshold = self._dyn_threshold.update(batch_mean)
            logger.debug(
                "Dynamic threshold updated → %.4f (batch mean=%.4f)", threshold, batch_mean
            )
        else:
            threshold = self.config.fusion.threshold

        # Apply conditioning rule
        accepted_count = 0
        for i, outcome in enumerate(outcomes):
            if variant == ConditioningVariant.GLOBAL:
                if decide_global(outcome):
                    accepted_count += 1
            elif variant == ConditioningVariant.HIERARCHICAL:
                if decide_hierarchical(outcome, self.config.k_fraction):
                    accepted_count += 1
            elif variant == ConditioningVariant.SCORE_FUSION and combined_scores[i] >= threshold:
                accepted_count += 1

        acc_prob = accepted_count / total if total > 0 else 0.0
        tts = 1.0 / acc_prob if acc_prob > 0 else float("inf")

        config_dump = self.config.model_dump_json(indent=2)
        adapter_name = type(self.adapter).__name__

        # --- Build galton telemetry metadata --------------------------
        galton_meta: dict[str, object] = {}
        if dt_cfg.mode == "galton" and self._galton_threshold is not None:
            snap = self._galton_threshold.last_snapshot
            galton_meta = {
                "galton_rolling_mean": snap.rolling_mean,
                "galton_rolling_sigma": snap.rolling_sigma,
                "galton_rolling_quantile": snap.rolling_quantile,
                "galton_effective_threshold": snap.effective_threshold,
                "galton_window_size_current": snap.window_size_current,
                "galton_acceptance_rate_rolling": snap.acceptance_rate_rolling,
                "galton_in_warmup": snap.in_warmup,
            }

        # Merge galton telemetry into metadata
        result_metadata = dict(self.config.metadata)
        if galton_meta:
            result_metadata["galton"] = galton_meta

        # --- Determine dynamic threshold final value ------------------
        dyn_final: float | None = None
        if dt_cfg.mode == "galton" and self._galton_threshold is not None:
            dyn_final = self._galton_threshold.current_threshold
        elif dt_cfg.enabled:
            dyn_final = self._dyn_threshold.current_threshold

        result = FilterResult(
            run_id=compute_run_id(config_dump, adapter_name=adapter_name),
            variant=variant.value,
            total_shots=total,
            accepted_shots=accepted_count,
            acceptance_probability=acc_prob,
            tts=tts,
            mean_combined_score=float(np.mean(combined_scores)) if combined_scores else None,
            threshold_used=threshold,
            dynamic_threshold_final=dyn_final,
            scores=combined_scores,
            config_json=config_dump,
            metadata=result_metadata,
        )

        logger.info(
            "Result: %d/%d accepted (P=%.4f, TTS=%.2f)",
            accepted_count,
            total,
            acc_prob,
            tts,
        )

        if self.logger is not None:
            self.logger.log(result)

        return result

    def filter_counts(
        self,
        counts: dict,
        n_subsystems: int | None = None,
        n_cycles: int | None = None,
    ) -> FilterResult:
        """Filter from a pre-existing count dictionary.

        This is a convenience for working with raw Qiskit-style count
        dictionaries when you already have results.

        Args:
            counts:       ``{bitstring: count}`` mapping.
            n_subsystems: Override (defaults to ``config.n_subsystems``).
            n_cycles:     Override (defaults to ``config.n_cycles``).
        """
        n_sub = n_subsystems or self.config.n_subsystems
        n_cyc = n_cycles or self.config.n_cycles
        outcomes = self.adapter.parse_results(counts, n_sub, n_cyc)
        return self.filter(outcomes)

    # ------------------------------------------------------------------
    # Threshold introspection
    # ------------------------------------------------------------------

    @property
    def current_threshold(self) -> float:
        """The current effective threshold (may be dynamic)."""
        if self.config.dynamic_threshold.mode == "galton" and self._galton_threshold is not None:
            return self._galton_threshold.current_threshold
        if self.config.dynamic_threshold.enabled:
            return self._dyn_threshold.current_threshold
        return self.config.fusion.threshold

    def reset_threshold(self) -> None:
        """Reset the dynamic threshold to baseline."""
        self._dyn_threshold.reset()
        if self._galton_threshold is not None:
            self._galton_threshold.reset()

    @property
    def galton_snapshot(self) -> object | None:
        """The latest :class:`GaltonSnapshot`, or ``None`` if not in galton mode."""
        if self._galton_threshold is not None:
            return self._galton_threshold.last_snapshot
        return None

current_threshold property

The current effective threshold (may be dynamic).

galton_snapshot property

The latest :class:GaltonSnapshot, or None if not in galton mode.

run()

Build circuit → execute → parse → filter → return result.

This is the high-level "do everything" method.

Source code in src/qgate/filter.py
def run(self) -> FilterResult:
    """Build circuit → execute → parse → filter → return result.

    This is the high-level "do everything" method.
    """
    outcomes = self.adapter.build_and_run(
        n_subsystems=self.config.n_subsystems,
        n_cycles=self.config.n_cycles,
        shots=self.config.shots,
    )
    return self.filter(outcomes)

filter(outcomes)

Apply the configured conditioning + thresholding to outcomes.

Parameters:

Name Type Description Default
outcomes Sequence[ParityOutcome]

List of ParityOutcome (one per shot). May be empty.

required

Returns:

Type Description
FilterResult

class:~qgate.run_logging.FilterResult.

Source code in src/qgate/filter.py
def filter(self, outcomes: Sequence[ParityOutcome]) -> FilterResult:
    """Apply the configured conditioning + thresholding to *outcomes*.

    Args:
        outcomes: List of ParityOutcome (one per shot).  May be empty.

    Returns:
        :class:`~qgate.run_logging.FilterResult`.
    """
    variant = self.config.variant
    total = len(outcomes)

    # Edge case: empty batch
    if total == 0:
        logger.warning("filter() called with zero outcomes")
        config_dump = self.config.model_dump_json(indent=2)
        adapter_name = type(self.adapter).__name__
        return FilterResult(
            run_id=compute_run_id(config_dump, adapter_name=adapter_name),
            variant=variant.value,
            total_shots=0,
            accepted_shots=0,
            acceptance_probability=0.0,
            tts=float("inf"),
            config_json=config_dump,
            metadata=dict(self.config.metadata),
        )

    logger.info(
        "Filtering %d outcomes — variant=%s, n_sub=%d, n_cyc=%d",
        total,
        variant.value,
        self.config.n_subsystems,
        self.config.n_cycles,
    )

    # Vectorised per-shot scoring
    scored = score_batch(
        outcomes,
        alpha=self.config.fusion.alpha,
        hf_cycles=self.config.fusion.hf_cycles,
        lf_cycles=self.config.fusion.lf_cycles,
    )
    combined_scores = [s[2] for s in scored]

    # --- Determine threshold to use --------------------------------
    dt_cfg = self.config.dynamic_threshold

    if dt_cfg.mode == "galton" and self._galton_threshold is not None:
        # Galton mode: feed per-shot scores into the adaptive window
        # and use the resulting threshold for score_fusion gating.
        self._galton_threshold.observe_batch(combined_scores)
        threshold = self._galton_threshold.current_threshold
        snap = self._galton_threshold.last_snapshot
        logger.debug(
            "Galton threshold → %.4f  (warmup=%s, window=%d)",
            threshold,
            snap.in_warmup,
            snap.window_size_current,
        )
    elif dt_cfg.enabled and dt_cfg.mode in ("rolling_z", "fixed") and combined_scores:
        batch_mean = float(np.mean(combined_scores))
        threshold = self._dyn_threshold.update(batch_mean)
        logger.debug(
            "Dynamic threshold updated → %.4f (batch mean=%.4f)", threshold, batch_mean
        )
    else:
        threshold = self.config.fusion.threshold

    # Apply conditioning rule
    accepted_count = 0
    for i, outcome in enumerate(outcomes):
        if variant == ConditioningVariant.GLOBAL:
            if decide_global(outcome):
                accepted_count += 1
        elif variant == ConditioningVariant.HIERARCHICAL:
            if decide_hierarchical(outcome, self.config.k_fraction):
                accepted_count += 1
        elif variant == ConditioningVariant.SCORE_FUSION and combined_scores[i] >= threshold:
            accepted_count += 1

    acc_prob = accepted_count / total if total > 0 else 0.0
    tts = 1.0 / acc_prob if acc_prob > 0 else float("inf")

    config_dump = self.config.model_dump_json(indent=2)
    adapter_name = type(self.adapter).__name__

    # --- Build galton telemetry metadata --------------------------
    galton_meta: dict[str, object] = {}
    if dt_cfg.mode == "galton" and self._galton_threshold is not None:
        snap = self._galton_threshold.last_snapshot
        galton_meta = {
            "galton_rolling_mean": snap.rolling_mean,
            "galton_rolling_sigma": snap.rolling_sigma,
            "galton_rolling_quantile": snap.rolling_quantile,
            "galton_effective_threshold": snap.effective_threshold,
            "galton_window_size_current": snap.window_size_current,
            "galton_acceptance_rate_rolling": snap.acceptance_rate_rolling,
            "galton_in_warmup": snap.in_warmup,
        }

    # Merge galton telemetry into metadata
    result_metadata = dict(self.config.metadata)
    if galton_meta:
        result_metadata["galton"] = galton_meta

    # --- Determine dynamic threshold final value ------------------
    dyn_final: float | None = None
    if dt_cfg.mode == "galton" and self._galton_threshold is not None:
        dyn_final = self._galton_threshold.current_threshold
    elif dt_cfg.enabled:
        dyn_final = self._dyn_threshold.current_threshold

    result = FilterResult(
        run_id=compute_run_id(config_dump, adapter_name=adapter_name),
        variant=variant.value,
        total_shots=total,
        accepted_shots=accepted_count,
        acceptance_probability=acc_prob,
        tts=tts,
        mean_combined_score=float(np.mean(combined_scores)) if combined_scores else None,
        threshold_used=threshold,
        dynamic_threshold_final=dyn_final,
        scores=combined_scores,
        config_json=config_dump,
        metadata=result_metadata,
    )

    logger.info(
        "Result: %d/%d accepted (P=%.4f, TTS=%.2f)",
        accepted_count,
        total,
        acc_prob,
        tts,
    )

    if self.logger is not None:
        self.logger.log(result)

    return result

filter_counts(counts, n_subsystems=None, n_cycles=None)

Filter from a pre-existing count dictionary.

This is a convenience for working with raw Qiskit-style count dictionaries when you already have results.

Parameters:

Name Type Description Default
counts dict

{bitstring: count} mapping.

required
n_subsystems int | None

Override (defaults to config.n_subsystems).

None
n_cycles int | None

Override (defaults to config.n_cycles).

None
Source code in src/qgate/filter.py
def filter_counts(
    self,
    counts: dict,
    n_subsystems: int | None = None,
    n_cycles: int | None = None,
) -> FilterResult:
    """Filter from a pre-existing count dictionary.

    This is a convenience for working with raw Qiskit-style count
    dictionaries when you already have results.

    Args:
        counts:       ``{bitstring: count}`` mapping.
        n_subsystems: Override (defaults to ``config.n_subsystems``).
        n_cycles:     Override (defaults to ``config.n_cycles``).
    """
    n_sub = n_subsystems or self.config.n_subsystems
    n_cyc = n_cycles or self.config.n_cycles
    outcomes = self.adapter.parse_results(counts, n_sub, n_cyc)
    return self.filter(outcomes)

reset_threshold()

Reset the dynamic threshold to baseline.

Source code in src/qgate/filter.py
def reset_threshold(self) -> None:
    """Reset the dynamic threshold to baseline."""
    self._dyn_threshold.reset()
    if self._galton_threshold is not None:
        self._galton_threshold.reset()

qgate.scoring

scoring.py — Score computation and fusion logic.

Extracted from the original monitors.py module to provide a clean, stateless scoring API alongside the stateful :class:MultiRateMonitor.

Scoring is vectorised with NumPy: :func:score_batch processes all shots in a single array operation, avoiding per-shot Python loops.

Patent pending (see LICENSE)

fuse_scores(lf_score, hf_score, alpha=0.5, threshold=0.65)

α-weighted fusion of LF and HF scores.

combined = α · lf_score + (1 − α) · hf_score

Parameters:

Name Type Description Default
lf_score float

Low-frequency score (0–1).

required
hf_score float

High-frequency score (0–1).

required
alpha float

LF weight (0 ≤ α ≤ 1).

0.5
threshold float

Accept if combined ≥ threshold.

0.65

Returns:

Type Description
tuple[bool, float]

(accepted, combined_score)

Source code in src/qgate/scoring.py
def fuse_scores(
    lf_score: float,
    hf_score: float,
    alpha: float = 0.5,
    threshold: float = 0.65,
) -> tuple[bool, float]:
    """α-weighted fusion of LF and HF scores.

    combined = α · lf_score + (1 − α) · hf_score

    Args:
        lf_score:  Low-frequency score (0–1).
        hf_score:  High-frequency score (0–1).
        alpha:     LF weight (0 ≤ α ≤ 1).
        threshold: Accept if combined ≥ threshold.

    Returns:
        (accepted, combined_score)
    """
    combined = alpha * lf_score + (1.0 - alpha) * hf_score
    return combined >= threshold, float(combined)

score_outcome(outcome, alpha=0.5, hf_cycles=None, lf_cycles=None)

Compute LF, HF, and combined scores for a single shot outcome.

Parameters:

Name Type Description Default
outcome ParityOutcome

Parity outcome.

required
alpha float

LF weight in the combined score.

0.5
hf_cycles Sequence[int] | None

Explicit HF cycle indices (default: all).

None
lf_cycles Sequence[int] | None

Explicit LF cycle indices (default: even).

None

Returns:

Type Description
tuple[float, float, float]

(lf_score, hf_score, combined_score)

Source code in src/qgate/scoring.py
def score_outcome(
    outcome: ParityOutcome,
    alpha: float = 0.5,
    hf_cycles: Sequence[int] | None = None,
    lf_cycles: Sequence[int] | None = None,
) -> tuple[float, float, float]:
    """Compute LF, HF, and combined scores for a single shot outcome.

    Args:
        outcome:   Parity outcome.
        alpha:     LF weight in the combined score.
        hf_cycles: Explicit HF cycle indices (default: all).
        lf_cycles: Explicit LF cycle indices (default: even).

    Returns:
        (lf_score, hf_score, combined_score)
    """
    if hf_cycles is None:
        hf_cycles = list(range(outcome.n_cycles))
    if lf_cycles is None:
        lf_cycles = [w for w in range(outcome.n_cycles) if w % 2 == 0]

    rates = outcome.pass_rates  # (n_cycles,) — vectorised

    lf = float(np.mean(rates[list(lf_cycles)])) if lf_cycles else 0.0
    hf = float(np.mean(rates[list(hf_cycles)])) if hf_cycles else 0.0
    combined = alpha * lf + (1.0 - alpha) * hf
    return lf, hf, float(combined)

score_batch(outcomes, alpha=0.5, hf_cycles=None, lf_cycles=None)

Score every outcome in a batch (vectorised).

When all outcomes share the same shape the scoring is performed as a single NumPy operation on a stacked 3-D array. Falls back to per-shot scoring when shapes differ.

Returns:

Type Description
list[tuple[float, float, float]]

List of (lf_score, hf_score, combined_score) per shot.

Source code in src/qgate/scoring.py
def score_batch(
    outcomes: Sequence[ParityOutcome],
    alpha: float = 0.5,
    hf_cycles: Sequence[int] | None = None,
    lf_cycles: Sequence[int] | None = None,
) -> list[tuple[float, float, float]]:
    """Score every outcome in a batch (vectorised).

    When all outcomes share the same shape the scoring is performed as a
    single NumPy operation on a stacked 3-D array.  Falls back to
    per-shot scoring when shapes differ.

    Returns:
        List of ``(lf_score, hf_score, combined_score)`` per shot.
    """
    if not outcomes:
        return []

    n_cyc = outcomes[0].n_cycles
    hf_idx = np.arange(n_cyc) if hf_cycles is None else np.asarray(hf_cycles)
    lf_idx = np.arange(0, n_cyc, 2) if lf_cycles is None else np.asarray(lf_cycles)

    # Fast path: stack all matrices and compute in one shot
    try:
        all_matrices = np.stack([o.parity_matrix for o in outcomes])  # (N, cycles, subs)
        pass_rates = 1.0 - all_matrices.astype(np.float64).mean(axis=2)  # (N, cycles)
        lf_scores = pass_rates[:, lf_idx].mean(axis=1) if lf_idx.size else np.zeros(len(outcomes))
        hf_scores = pass_rates[:, hf_idx].mean(axis=1) if hf_idx.size else np.zeros(len(outcomes))
        combined = alpha * lf_scores + (1.0 - alpha) * hf_scores
        return list(zip(lf_scores.tolist(), hf_scores.tolist(), combined.tolist()))
    except (ValueError, IndexError):
        logger.debug("score_batch: shapes differ — falling back to per-shot scoring")
        return [
            score_outcome(o, alpha=alpha, hf_cycles=hf_cycles, lf_cycles=lf_cycles)
            for o in outcomes
        ]

compute_window_metric(times, values, window=1.0, mode='max')

Compute a metric over a trailing time window.

Examines [t_final − window, t_final] and returns the max or mean of values within that interval.

Parameters:

Name Type Description Default
times ndarray

1-D monotonic time array.

required
values ndarray

1-D values array (same length).

required
window float

Width of the trailing window.

1.0
mode Literal['max', 'mean']

"max" or "mean".

'max'

Returns:

Type Description
tuple[float, float, float]

(metric, window_start, window_end)

Source code in src/qgate/scoring.py
def compute_window_metric(
    times: np.ndarray,
    values: np.ndarray,
    window: float = 1.0,
    mode: Literal["max", "mean"] = "max",
) -> tuple[float, float, float]:
    """Compute a metric over a trailing time window.

    Examines [t_final − window, t_final] and returns the max or mean
    of *values* within that interval.

    Args:
        times:  1-D monotonic time array.
        values: 1-D values array (same length).
        window: Width of the trailing window.
        mode:   ``"max"`` or ``"mean"``.

    Returns:
        (metric, window_start, window_end)
    """
    t_final = float(times[-1])
    window_start = max(0.0, t_final - window)
    mask = (times >= window_start) & (times <= t_final)
    window_values = values[mask]

    if len(window_values) == 0:
        metric = float(values[-1])
    elif mode == "max":
        metric = float(np.max(window_values))
    elif mode == "mean":
        metric = float(np.mean(window_values))
    else:
        raise ValueError(f"Unknown mode: {mode!r}")

    return metric, window_start, t_final

qgate.threshold

threshold.py — Dynamic threshold gating strategies.

Provides two adaptive threshold classes:

:class:DynamicThreshold Rolling z-score gating (legacy rolling_z mode). Operates on batch-level mean scores.

:class:GaltonAdaptiveThreshold Distribution-aware gating (galton mode) inspired by diffusion / central-limit principles. Operates on per-shot combined scores and supports empirical-quantile and robust z-score sub-modes.

Both classes share the same :class:~qgate.config.DynamicThresholdConfig and are wired into :class:~qgate.filter.TrajectoryFilter transparently.

Patent pending (see LICENSE)

GaltonSnapshot dataclass

Telemetry snapshot emitted after every :meth:GaltonAdaptiveThreshold.update.

All fields are populated regardless of the active sub-mode; fields that do not apply in the current mode are set to None.

Attributes:

Name Type Description
rolling_mean float | None

Mean of the rolling window.

rolling_sigma float | None

Std-dev (or MAD-based σ) of the window.

rolling_quantile float | None

Empirical quantile at 1 − target_acceptance.

effective_threshold float

Threshold actually used for gating.

window_size_current int

Number of scores in the window right now.

acceptance_rate_rolling float | None

Fraction of window scores ≥ threshold.

in_warmup bool

True if window < min_window_size.

Source code in src/qgate/threshold.py
@dataclass(frozen=True)
class GaltonSnapshot:
    """Telemetry snapshot emitted after every :meth:`GaltonAdaptiveThreshold.update`.

    All fields are populated regardless of the active sub-mode; fields
    that do not apply in the current mode are set to ``None``.

    Attributes:
        rolling_mean:           Mean of the rolling window.
        rolling_sigma:          Std-dev (or MAD-based σ) of the window.
        rolling_quantile:       Empirical quantile at 1 − target_acceptance.
        effective_threshold:    Threshold actually used for gating.
        window_size_current:    Number of scores in the window right now.
        acceptance_rate_rolling: Fraction of window scores ≥ threshold.
        in_warmup:              True if window < min_window_size.
    """

    rolling_mean: float | None = None
    rolling_sigma: float | None = None
    rolling_quantile: float | None = None
    effective_threshold: float = 0.65
    window_size_current: int = 0
    acceptance_rate_rolling: float | None = None
    in_warmup: bool = True

DynamicThreshold

Rolling z-score threshold adjuster.

Maintains a sliding window of recent batch scores and computes an adaptive threshold each time :meth:update is called.

Parameters:

Name Type Description Default
config DynamicThresholdConfig

Threshold configuration parameters.

required

Example::

from qgate.config import DynamicThresholdConfig
cfg = DynamicThresholdConfig(enabled=True, baseline=0.65,
                              z_factor=1.5, window_size=10)
dt = DynamicThreshold(cfg)
dt.update(0.70)
dt.update(0.68)
print(dt.current_threshold)
Source code in src/qgate/threshold.py
class DynamicThreshold:
    """Rolling z-score threshold adjuster.

    Maintains a sliding window of recent batch scores and computes an
    adaptive threshold each time :meth:`update` is called.

    Args:
        config: Threshold configuration parameters.

    Example::

        from qgate.config import DynamicThresholdConfig
        cfg = DynamicThresholdConfig(enabled=True, baseline=0.65,
                                      z_factor=1.5, window_size=10)
        dt = DynamicThreshold(cfg)
        dt.update(0.70)
        dt.update(0.68)
        print(dt.current_threshold)
    """

    def __init__(self, config: DynamicThresholdConfig) -> None:
        self._config = config
        self._history: deque[float] = deque(maxlen=config.window_size)
        self._current: float = config.baseline

    # ------------------------------------------------------------------
    # Properties
    # ------------------------------------------------------------------

    @property
    def config(self) -> DynamicThresholdConfig:
        return self._config

    @property
    def current_threshold(self) -> float:
        """The most recent threshold value."""
        return self._current

    @property
    def history(self) -> list[float]:
        """Copy of the rolling score history."""
        return list(self._history)

    # ------------------------------------------------------------------
    # Core
    # ------------------------------------------------------------------

    def update(self, batch_score: float) -> float:
        """Record a new batch score and recompute the threshold.

        Args:
            batch_score: The mean combined score of the latest batch.

        Returns:
            The updated threshold value.
        """
        self._history.append(batch_score)

        if not self._config.enabled or len(self._history) < 2:
            self._current = self._config.baseline
            return self._current

        arr = np.array(self._history)
        rolling_mean = float(np.mean(arr))
        rolling_std = float(np.std(arr, ddof=1))

        raw = rolling_mean + self._config.z_factor * rolling_std
        clamped = max(self._config.min_threshold, min(self._config.max_threshold, raw))

        self._current = float(clamped)
        return self._current

    def reset(self) -> None:
        """Clear history and reset to baseline."""
        self._history.clear()
        self._current = self._config.baseline

current_threshold property

The most recent threshold value.

history property

Copy of the rolling score history.

update(batch_score)

Record a new batch score and recompute the threshold.

Parameters:

Name Type Description Default
batch_score float

The mean combined score of the latest batch.

required

Returns:

Type Description
float

The updated threshold value.

Source code in src/qgate/threshold.py
def update(self, batch_score: float) -> float:
    """Record a new batch score and recompute the threshold.

    Args:
        batch_score: The mean combined score of the latest batch.

    Returns:
        The updated threshold value.
    """
    self._history.append(batch_score)

    if not self._config.enabled or len(self._history) < 2:
        self._current = self._config.baseline
        return self._current

    arr = np.array(self._history)
    rolling_mean = float(np.mean(arr))
    rolling_std = float(np.std(arr, ddof=1))

    raw = rolling_mean + self._config.z_factor * rolling_std
    clamped = max(self._config.min_threshold, min(self._config.max_threshold, raw))

    self._current = float(clamped)
    return self._current

reset()

Clear history and reset to baseline.

Source code in src/qgate/threshold.py
def reset(self) -> None:
    """Clear history and reset to baseline."""
    self._history.clear()
    self._current = self._config.baseline

GaltonAdaptiveThreshold

Distribution-aware adaptive threshold (Galton / diffusion mode).

Maintains a per-shot rolling window of combined scores and computes a threshold that targets a stable acceptance fraction.

Two sub-modes are available (selected via config.use_quantile):

Quantile mode (default, recommended) Uses the empirical quantile of the window:

.. math:: \theta = Q_{1 - \text{target\_acceptance}}(\text{window})

This is the most robust option — it makes no distributional
assumptions and naturally tracks hardware drift.

Z-score mode (use_quantile=False) Estimates μ and σ from the window and sets:

.. math:: \theta = \mu + z_{\sigma} \cdot \sigma

When ``robust_stats=True`` (default), the median and
MAD-derived σ are used, making the estimate resilient to
outliers.  When ``robust_stats=False``, ordinary mean and
sample std are used.

Warmup: While len(window) < min_window_size the threshold falls back to config.baseline. This avoids noisy estimates from too few observations.

All operations are O(1) amortised — the window is backed by a :class:collections.deque with bounded capacity.

Parameters:

Name Type Description Default
config DynamicThresholdConfig

:class:~qgate.config.DynamicThresholdConfig with mode="galton" (or "diffusion").

required

Example::

from qgate.config import DynamicThresholdConfig
cfg = DynamicThresholdConfig(
    mode="galton",
    window_size=500,
    target_acceptance=0.05,
    robust_stats=True,
    use_quantile=True,
)
gat = GaltonAdaptiveThreshold(cfg)
for score in batch_scores:
    gat.observe(score)
print(gat.current_threshold)
Source code in src/qgate/threshold.py
class GaltonAdaptiveThreshold:
    """Distribution-aware adaptive threshold (Galton / diffusion mode).

    Maintains a **per-shot** rolling window of combined scores and
    computes a threshold that targets a stable acceptance fraction.

    Two sub-modes are available (selected via ``config.use_quantile``):

    **Quantile mode** (default, recommended)
        Uses the empirical quantile of the window:

        .. math:: \\theta = Q_{1 - \\text{target\\_acceptance}}(\\text{window})

        This is the most robust option — it makes no distributional
        assumptions and naturally tracks hardware drift.

    **Z-score mode** (``use_quantile=False``)
        Estimates μ and σ from the window and sets:

        .. math:: \\theta = \\mu + z_{\\sigma} \\cdot \\sigma

        When ``robust_stats=True`` (default), the median and
        MAD-derived σ are used, making the estimate resilient to
        outliers.  When ``robust_stats=False``, ordinary mean and
        sample std are used.

    **Warmup:** While ``len(window) < min_window_size`` the threshold
    falls back to ``config.baseline``.  This avoids noisy estimates
    from too few observations.

    All operations are O(1) amortised — the window is backed by a
    :class:`collections.deque` with bounded capacity.

    Args:
        config: :class:`~qgate.config.DynamicThresholdConfig` with
                ``mode="galton"`` (or ``"diffusion"``).

    Example::

        from qgate.config import DynamicThresholdConfig
        cfg = DynamicThresholdConfig(
            mode="galton",
            window_size=500,
            target_acceptance=0.05,
            robust_stats=True,
            use_quantile=True,
        )
        gat = GaltonAdaptiveThreshold(cfg)
        for score in batch_scores:
            gat.observe(score)
        print(gat.current_threshold)
    """

    def __init__(self, config: DynamicThresholdConfig) -> None:
        self._config = config
        self._window: deque[float] = deque(maxlen=config.window_size)
        self._current: float = config.baseline
        self._last_snapshot: GaltonSnapshot = GaltonSnapshot(
            effective_threshold=config.baseline,
        )

    # ------------------------------------------------------------------
    # Properties
    # ------------------------------------------------------------------

    @property
    def config(self) -> DynamicThresholdConfig:
        return self._config

    @property
    def current_threshold(self) -> float:
        """The most recent effective threshold value."""
        return self._current

    @property
    def window(self) -> list[float]:
        """Copy of the rolling score window."""
        return list(self._window)

    @property
    def window_size_current(self) -> int:
        """Number of scores currently in the window."""
        return len(self._window)

    @property
    def in_warmup(self) -> bool:
        """True while the window is smaller than ``min_window_size``."""
        return len(self._window) < self._config.min_window_size

    @property
    def last_snapshot(self) -> GaltonSnapshot:
        """The most recent telemetry snapshot."""
        return self._last_snapshot

    # ------------------------------------------------------------------
    # Core — observe individual scores
    # ------------------------------------------------------------------

    def observe(self, score: float) -> float:
        """Add a single score to the window and recompute the threshold.

        Call this once per shot (or per combined score).  The threshold
        is evaluated *before* the new score is appended, so the
        returned threshold was computed without the new score.

        Args:
            score: A per-shot combined score.

        Returns:
            The effective threshold after update.
        """
        # Append first, then recompute
        self._window.append(score)
        self._recompute()
        return self._current

    def observe_batch(self, scores: list[float] | np.ndarray) -> float:
        """Convenience: observe a whole batch of scores at once.

        Args:
            scores: Iterable of per-shot combined scores.

        Returns:
            The effective threshold after the last observation.
        """
        for s in scores:
            self._window.append(float(s))
        self._recompute()
        return self._current

    def reset(self) -> None:
        """Clear the window and reset to baseline."""
        self._window.clear()
        self._current = self._config.baseline
        self._last_snapshot = GaltonSnapshot(
            effective_threshold=self._config.baseline,
        )

    # ------------------------------------------------------------------
    # Private: threshold computation
    # ------------------------------------------------------------------

    def _recompute(self) -> None:
        """Recompute the threshold from the current window."""
        n = len(self._window)

        # Warmup: not enough data yet — fall back to baseline
        if n < self._config.min_window_size:
            self._current = self._config.baseline
            self._last_snapshot = GaltonSnapshot(
                effective_threshold=self._current,
                window_size_current=n,
                in_warmup=True,
            )
            logger.debug(
                "Galton warmup: %d / %d samples — using baseline %.4f",
                n,
                self._config.min_window_size,
                self._current,
            )
            return

        arr = np.asarray(self._window, dtype=np.float64)

        # --- Statistics (always computed for telemetry) ----------------
        if self._config.robust_stats:
            mu = float(np.median(arr))
            mad = float(np.median(np.abs(arr - mu)))
            sigma = mad * _MAD_TO_SIGMA
        else:
            mu = float(np.mean(arr))
            sigma = float(np.std(arr, ddof=1)) if n > 1 else 0.0

        quantile_val = float(np.quantile(arr, 1.0 - self._config.target_acceptance))

        # --- Threshold selection --------------------------------------
        raw = quantile_val if self._config.use_quantile else mu + self._config.z_sigma * sigma

        clamped = float(max(self._config.min_threshold, min(self._config.max_threshold, raw)))
        self._current = clamped

        # --- Acceptance rate ------------------------------------------
        accept_rate = float(np.mean(arr >= self._current))

        # --- Snapshot -------------------------------------------------
        self._last_snapshot = GaltonSnapshot(
            rolling_mean=mu,
            rolling_sigma=sigma,
            rolling_quantile=quantile_val,
            effective_threshold=self._current,
            window_size_current=n,
            acceptance_rate_rolling=accept_rate,
            in_warmup=False,
        )

        logger.debug(
            "Galton threshold: %.4f  (μ=%.4f, σ=%.4f, Q=%.4f, accept=%.3f, window=%d)",
            self._current,
            mu,
            sigma,
            quantile_val,
            accept_rate,
            n,
        )

current_threshold property

The most recent effective threshold value.

window property

Copy of the rolling score window.

window_size_current property

Number of scores currently in the window.

in_warmup property

True while the window is smaller than min_window_size.

last_snapshot property

The most recent telemetry snapshot.

observe(score)

Add a single score to the window and recompute the threshold.

Call this once per shot (or per combined score). The threshold is evaluated before the new score is appended, so the returned threshold was computed without the new score.

Parameters:

Name Type Description Default
score float

A per-shot combined score.

required

Returns:

Type Description
float

The effective threshold after update.

Source code in src/qgate/threshold.py
def observe(self, score: float) -> float:
    """Add a single score to the window and recompute the threshold.

    Call this once per shot (or per combined score).  The threshold
    is evaluated *before* the new score is appended, so the
    returned threshold was computed without the new score.

    Args:
        score: A per-shot combined score.

    Returns:
        The effective threshold after update.
    """
    # Append first, then recompute
    self._window.append(score)
    self._recompute()
    return self._current

observe_batch(scores)

Convenience: observe a whole batch of scores at once.

Parameters:

Name Type Description Default
scores list[float] | ndarray

Iterable of per-shot combined scores.

required

Returns:

Type Description
float

The effective threshold after the last observation.

Source code in src/qgate/threshold.py
def observe_batch(self, scores: list[float] | np.ndarray) -> float:
    """Convenience: observe a whole batch of scores at once.

    Args:
        scores: Iterable of per-shot combined scores.

    Returns:
        The effective threshold after the last observation.
    """
    for s in scores:
        self._window.append(float(s))
    self._recompute()
    return self._current

reset()

Clear the window and reset to baseline.

Source code in src/qgate/threshold.py
def reset(self) -> None:
    """Clear the window and reset to baseline."""
    self._window.clear()
    self._current = self._config.baseline
    self._last_snapshot = GaltonSnapshot(
        effective_threshold=self._config.baseline,
    )

estimate_diffusion_width(window, robust=True)

Estimate the variance (diffusion width) of a score window.

This is a simple dispersion estimator that can serve as a diagnostic for diffusion-scaling validation in future work.

When robust=True (default) the MAD-based σ² is returned; otherwise the ordinary sample variance is used.

Parameters:

Name Type Description Default
window list[float] | ndarray

1-D array-like of scores.

required
robust bool

Use MAD-derived variance estimate.

True

Returns:

Type Description
float

Estimated variance (σ²).

Raises:

Type Description
ValueError

If window has fewer than 2 elements.

Source code in src/qgate/threshold.py
def estimate_diffusion_width(
    window: list[float] | np.ndarray,
    robust: bool = True,
) -> float:
    """Estimate the variance (diffusion width) of a score window.

    This is a simple dispersion estimator that can serve as a diagnostic
    for diffusion-scaling validation in future work.

    When ``robust=True`` (default) the MAD-based σ² is returned; otherwise
    the ordinary sample variance is used.

    Args:
        window: 1-D array-like of scores.
        robust: Use MAD-derived variance estimate.

    Returns:
        Estimated variance (σ²).

    Raises:
        ValueError: If *window* has fewer than 2 elements.
    """
    arr = np.asarray(window, dtype=np.float64)
    if arr.size < 2:
        raise ValueError(f"estimate_diffusion_width requires ≥ 2 observations, got {arr.size}")
    if robust:
        med = float(np.median(arr))
        mad = float(np.median(np.abs(arr - med)))
        sigma = mad * _MAD_TO_SIGMA
        return sigma * sigma
    return float(np.var(arr, ddof=1))

qgate.run_logging

run_logging.py — Structured run logging (JSON / CSV / Parquet).

Every :class:~qgate.filter.TrajectoryFilter run can be logged to disk for reproducibility and analysis.

Patent pending (see LICENSE)

FilterResult dataclass

Structured output of a single trajectory-filter run.

Attributes:

Name Type Description
run_id str

Deterministic 12-char hex digest for deduplication and reproducibility.

variant str

Conditioning strategy used.

total_shots int

Number of shots executed.

accepted_shots int

Number of accepted shots.

acceptance_probability float

accepted / total.

tts float

Time-to-solution (1 / acceptance_probability).

mean_combined_score float | None

Mean combined fusion score across shots.

threshold_used float

Threshold at the time of filtering.

dynamic_threshold_final float | None

Final dynamic threshold (if enabled).

scores list[float]

Per-shot combined scores.

config_json str

Serialised GateConfig as JSON string.

metadata dict[str, Any]

Free-form metadata.

timestamp str

ISO-8601 timestamp.

Source code in src/qgate/run_logging.py
@dataclass
class FilterResult:
    """Structured output of a single trajectory-filter run.

    Attributes:
        run_id:                 Deterministic 12-char hex digest for
                                deduplication and reproducibility.
        variant:                Conditioning strategy used.
        total_shots:            Number of shots executed.
        accepted_shots:         Number of accepted shots.
        acceptance_probability: accepted / total.
        tts:                    Time-to-solution (1 / acceptance_probability).
        mean_combined_score:    Mean combined fusion score across shots.
        threshold_used:         Threshold at the time of filtering.
        dynamic_threshold_final: Final dynamic threshold (if enabled).
        scores:                 Per-shot combined scores.
        config_json:            Serialised GateConfig as JSON string.
        metadata:               Free-form metadata.
        timestamp:              ISO-8601 timestamp.
    """

    run_id: str = ""
    variant: str = ""
    total_shots: int = 0
    accepted_shots: int = 0
    acceptance_probability: float = 0.0
    tts: float = float("inf")
    mean_combined_score: float | None = None
    threshold_used: float = 0.65
    dynamic_threshold_final: float | None = None
    scores: list[float] = field(default_factory=list)
    config_json: str = "{}"
    metadata: dict[str, Any] = field(default_factory=dict)
    timestamp: str = field(
        default_factory=lambda: datetime.datetime.now(datetime.timezone.utc).isoformat()
    )

    def as_dict(self) -> dict[str, Any]:
        d = asdict(self)
        # Drop large per-shot scores from the summary dict
        d.pop("scores", None)
        return d

RunLogger

Append-only logger that writes :class:FilterResult records.

Supports JSON-Lines, CSV, and (optionally) Parquet output.

Parameters:

Name Type Description Default
path str | Path

Output file path (suffix determines format: .jsonl, .csv, or .parquet).

required
fmt Literal['jsonl', 'csv', 'parquet'] | None

Explicit format override ("jsonl" | "csv" | "parquet"). If None the format is inferred from path.

None
Source code in src/qgate/run_logging.py
class RunLogger:
    """Append-only logger that writes :class:`FilterResult` records.

    Supports JSON-Lines, CSV, and (optionally) Parquet output.

    Args:
        path:   Output file path (suffix determines format:
                ``.jsonl``, ``.csv``, or ``.parquet``).
        fmt:    Explicit format override (``"jsonl"`` | ``"csv"`` |
                ``"parquet"``).  If *None* the format is inferred
                from *path*.
    """

    def __init__(
        self,
        path: str | Path,
        fmt: Literal["jsonl", "csv", "parquet"] | None = None,
    ) -> None:
        self.path = Path(path)
        self._fmt: Literal["jsonl", "csv", "parquet"]
        if fmt is not None:
            self._fmt = fmt
        else:
            suffix = self.path.suffix.lower()
            mapping: dict[str, Literal["jsonl", "csv", "parquet"]] = {
                ".jsonl": "jsonl",
                ".csv": "csv",
                ".parquet": "parquet",
            }
            if suffix not in mapping:
                logger.warning(
                    "Unknown file extension %r for %s — defaulting to JSONL format. "
                    "Supported extensions: .jsonl, .csv, .parquet",
                    suffix,
                    self.path,
                )
            self._fmt = mapping.get(suffix, "jsonl")
        self._records: list[dict[str, Any]] = []

    @property
    def format(self) -> str:
        return self._fmt

    def log(self, result: FilterResult) -> None:
        """Append a result to the in-memory buffer and flush to disk."""
        self._records.append(result.as_dict())
        self._flush_one(result)

    def flush_all(self) -> None:
        """Re-write the entire file from the in-memory buffer.

        Useful if you want to guarantee the file is in sync.
        """
        self.path.parent.mkdir(parents=True, exist_ok=True)
        if self._fmt == "jsonl":
            with open(self.path, "w") as f:
                for rec in self._records:
                    f.write(json.dumps(rec, default=str) + "\n")
        elif self._fmt == "csv":
            pd = _get_pandas()
            df = pd.DataFrame(self._records)
            df.to_csv(self.path, index=False)
        elif self._fmt == "parquet":
            self._write_parquet()
        else:
            raise ValueError(f"Unknown format: {self._fmt}")

    # ------------------------------------------------------------------
    # Context-manager support
    # ------------------------------------------------------------------

    def close(self) -> None:
        """Flush remaining buffered records (especially Parquet) and release resources."""
        if self._fmt == "parquet" and self._records:
            self._write_parquet()
        logger.debug("RunLogger closed – %d records written to %s", len(self._records), self.path)

    def __enter__(self) -> RunLogger:
        return self

    def __exit__(self, *exc: object) -> None:
        self.close()

    # ------------------------------------------------------------------
    # Private
    # ------------------------------------------------------------------

    def _flush_one(self, result: FilterResult) -> None:
        """Incremental append (append-friendly for jsonl / csv)."""
        self.path.parent.mkdir(parents=True, exist_ok=True)
        if self._fmt == "jsonl":
            with open(self.path, "a") as f:
                f.write(json.dumps(result.as_dict(), default=str) + "\n")
        elif self._fmt == "csv":
            pd = _get_pandas()
            df = pd.DataFrame([result.as_dict()])
            header = not self.path.exists() or self.path.stat().st_size == 0
            df.to_csv(self.path, mode="a", index=False, header=header)
        elif self._fmt == "parquet":
            # Parquet: buffer only — flushed on close() or explicit flush_all()
            logger.debug("Parquet record buffered (total: %d)", len(self._records))
        else:
            raise ValueError(f"Unknown format: {self._fmt}")

    def _write_parquet(self) -> None:
        try:
            import pyarrow  # noqa: F401
        except ImportError:
            raise ImportError(
                "pyarrow is required for Parquet logging.  "
                "Install with:  pip install qgate[parquet]"
            ) from None
        pd = _get_pandas()
        df = pd.DataFrame(self._records)
        df.to_parquet(self.path, index=False)

log(result)

Append a result to the in-memory buffer and flush to disk.

Source code in src/qgate/run_logging.py
def log(self, result: FilterResult) -> None:
    """Append a result to the in-memory buffer and flush to disk."""
    self._records.append(result.as_dict())
    self._flush_one(result)

flush_all()

Re-write the entire file from the in-memory buffer.

Useful if you want to guarantee the file is in sync.

Source code in src/qgate/run_logging.py
def flush_all(self) -> None:
    """Re-write the entire file from the in-memory buffer.

    Useful if you want to guarantee the file is in sync.
    """
    self.path.parent.mkdir(parents=True, exist_ok=True)
    if self._fmt == "jsonl":
        with open(self.path, "w") as f:
            for rec in self._records:
                f.write(json.dumps(rec, default=str) + "\n")
    elif self._fmt == "csv":
        pd = _get_pandas()
        df = pd.DataFrame(self._records)
        df.to_csv(self.path, index=False)
    elif self._fmt == "parquet":
        self._write_parquet()
    else:
        raise ValueError(f"Unknown format: {self._fmt}")

close()

Flush remaining buffered records (especially Parquet) and release resources.

Source code in src/qgate/run_logging.py
def close(self) -> None:
    """Flush remaining buffered records (especially Parquet) and release resources."""
    if self._fmt == "parquet" and self._records:
        self._write_parquet()
    logger.debug("RunLogger closed – %d records written to %s", len(self._records), self.path)

compute_run_id(config_json, adapter_name='', circuit_hash='')

Return a deterministic 12-char hex run ID (SHA-256 prefix).

The ID is computed from a canonical JSON blob combining config_json, adapter_name, and an optional circuit_hash. Two runs with identical inputs always produce the same ID, enabling deduplication and reproducibility checks.

Parameters:

Name Type Description Default
config_json str

Serialised :class:~qgate.config.GateConfig JSON.

required
adapter_name str

Name/class of the adapter used.

''
circuit_hash str

Optional hash of the circuit object for extra specificity.

''

Returns:

Type Description
str

12-character lowercase hex string.

Source code in src/qgate/run_logging.py
def compute_run_id(
    config_json: str,
    adapter_name: str = "",
    circuit_hash: str = "",
) -> str:
    """Return a deterministic 12-char hex run ID (SHA-256 prefix).

    The ID is computed from a canonical JSON blob combining
    *config_json*, *adapter_name*, and an optional *circuit_hash*.
    Two runs with identical inputs always produce the same ID, enabling
    deduplication and reproducibility checks.

    Args:
        config_json:  Serialised :class:`~qgate.config.GateConfig` JSON.
        adapter_name: Name/class of the adapter used.
        circuit_hash: Optional hash of the circuit object for extra
                      specificity.

    Returns:
        12-character lowercase hex string.
    """
    blob: dict[str, Any] = {
        "config": json.loads(config_json),
        "adapter": adapter_name,
    }
    if circuit_hash:
        blob["circuit_hash"] = circuit_hash
    # Canonical JSON: sorted keys, no extra whitespace, coerce numpy types
    canonical = json.dumps(blob, sort_keys=True, separators=(",", ":"), default=_json_default)
    return hashlib.sha256(canonical.encode()).hexdigest()[:12]

Framework Adapters

qgate.adapters.base

base.py — Adapter protocol and mock implementation.

Every adapter must implement :class:BaseAdapter so that :class:~qgate.filter.TrajectoryFilter can work with any quantum framework.

Patent pending (see LICENSE)

BaseAdapter

Bases: ABC

Abstract base class that all qgate adapters must implement.

The adapter is responsible for
  1. Building circuits with Bell-pair subsystems and parity checks.
  2. Executing shots on a backend (simulator or hardware).
  3. Parsing raw results into ParityOutcome objects.
Source code in src/qgate/adapters/base.py
class BaseAdapter(ABC):
    """Abstract base class that all qgate adapters must implement.

    The adapter is responsible for:
      1. Building circuits with Bell-pair subsystems and parity checks.
      2. Executing shots on a backend (simulator or hardware).
      3. Parsing raw results into ``ParityOutcome`` objects.
    """

    # ------------------------------------------------------------------
    # Required interface
    # ------------------------------------------------------------------

    @abstractmethod
    def build_circuit(
        self,
        n_subsystems: int,
        n_cycles: int,
        **kwargs: Any,
    ) -> Any:
        """Construct a circuit with *n_subsystems* Bell pairs and
        *n_cycles* mid-circuit parity checks.

        Returns a framework-native circuit object.
        """

    @abstractmethod
    def run(
        self,
        circuit: Any,
        shots: int,
        **kwargs: Any,
    ) -> Any:
        """Execute *circuit* for *shots* repetitions.

        Returns the framework-native result object.
        """

    @abstractmethod
    def parse_results(
        self,
        raw_results: Any,
        n_subsystems: int,
        n_cycles: int,
    ) -> list[ParityOutcome]:
        """Parse framework-native results into a list of
        ``ParityOutcome`` objects (one per shot).
        """

    # ------------------------------------------------------------------
    # Optional helpers
    # ------------------------------------------------------------------

    def build_and_run(
        self,
        n_subsystems: int,
        n_cycles: int,
        shots: int,
        circuit_kwargs: dict[str, Any] | None = None,
        run_kwargs: dict[str, Any] | None = None,
    ) -> list[ParityOutcome]:
        """Convenience: build → run → parse in one call."""
        circuit = self.build_circuit(n_subsystems, n_cycles, **(circuit_kwargs or {}))
        raw = self.run(circuit, shots, **(run_kwargs or {}))
        return self.parse_results(raw, n_subsystems, n_cycles)

build_circuit(n_subsystems, n_cycles, **kwargs) abstractmethod

Construct a circuit with n_subsystems Bell pairs and n_cycles mid-circuit parity checks.

Returns a framework-native circuit object.

Source code in src/qgate/adapters/base.py
@abstractmethod
def build_circuit(
    self,
    n_subsystems: int,
    n_cycles: int,
    **kwargs: Any,
) -> Any:
    """Construct a circuit with *n_subsystems* Bell pairs and
    *n_cycles* mid-circuit parity checks.

    Returns a framework-native circuit object.
    """

run(circuit, shots, **kwargs) abstractmethod

Execute circuit for shots repetitions.

Returns the framework-native result object.

Source code in src/qgate/adapters/base.py
@abstractmethod
def run(
    self,
    circuit: Any,
    shots: int,
    **kwargs: Any,
) -> Any:
    """Execute *circuit* for *shots* repetitions.

    Returns the framework-native result object.
    """

parse_results(raw_results, n_subsystems, n_cycles) abstractmethod

Parse framework-native results into a list of ParityOutcome objects (one per shot).

Source code in src/qgate/adapters/base.py
@abstractmethod
def parse_results(
    self,
    raw_results: Any,
    n_subsystems: int,
    n_cycles: int,
) -> list[ParityOutcome]:
    """Parse framework-native results into a list of
    ``ParityOutcome`` objects (one per shot).
    """

build_and_run(n_subsystems, n_cycles, shots, circuit_kwargs=None, run_kwargs=None)

Convenience: build → run → parse in one call.

Source code in src/qgate/adapters/base.py
def build_and_run(
    self,
    n_subsystems: int,
    n_cycles: int,
    shots: int,
    circuit_kwargs: dict[str, Any] | None = None,
    run_kwargs: dict[str, Any] | None = None,
) -> list[ParityOutcome]:
    """Convenience: build → run → parse in one call."""
    circuit = self.build_circuit(n_subsystems, n_cycles, **(circuit_kwargs or {}))
    raw = self.run(circuit, shots, **(run_kwargs or {}))
    return self.parse_results(raw, n_subsystems, n_cycles)

MockAdapter

Bases: BaseAdapter

In-memory adapter that generates synthetic parity outcomes.

Useful for unit tests and demonstrations without a real backend.

Parameters:

Name Type Description Default
error_rate float

Per-subsystem per-cycle probability of a parity flip (default 0.05 → 5 %).

0.05
seed int | None

Optional random seed for reproducibility.

None
Source code in src/qgate/adapters/base.py
class MockAdapter(BaseAdapter):
    """In-memory adapter that generates synthetic parity outcomes.

    Useful for unit tests and demonstrations without a real backend.

    Args:
        error_rate: Per-subsystem per-cycle probability of a parity flip
                    (default 0.05 → 5 %).
        seed:       Optional random seed for reproducibility.
    """

    def __init__(self, error_rate: float = 0.05, seed: int | None = None) -> None:
        self.error_rate = error_rate
        self._rng = random.Random(seed)

    def build_circuit(
        self,
        n_subsystems: int,
        n_cycles: int,
        **kwargs: Any,
    ) -> dict[str, int]:
        """Return a lightweight descriptor (no real circuit)."""
        return {"n_subsystems": n_subsystems, "n_cycles": n_cycles}

    def run(
        self,
        circuit: Any,
        shots: int,
        **kwargs: Any,
    ) -> list[list[list[int]]]:
        """Generate *shots* synthetic parity matrices."""
        n_sub = circuit["n_subsystems"]
        n_cyc = circuit["n_cycles"]
        results: list[list[list[int]]] = []
        for _ in range(shots):
            matrix = [
                [1 if self._rng.random() < self.error_rate else 0 for _ in range(n_sub)]
                for _ in range(n_cyc)
            ]
            results.append(matrix)
        return results

    def parse_results(
        self,
        raw_results: Any,
        n_subsystems: int,
        n_cycles: int,
    ) -> list[ParityOutcome]:
        """Wrap raw matrices in ParityOutcome."""
        return [
            ParityOutcome(
                n_subsystems=n_subsystems,
                n_cycles=n_cycles,
                parity_matrix=matrix,
            )
            for matrix in raw_results
        ]

build_circuit(n_subsystems, n_cycles, **kwargs)

Return a lightweight descriptor (no real circuit).

Source code in src/qgate/adapters/base.py
def build_circuit(
    self,
    n_subsystems: int,
    n_cycles: int,
    **kwargs: Any,
) -> dict[str, int]:
    """Return a lightweight descriptor (no real circuit)."""
    return {"n_subsystems": n_subsystems, "n_cycles": n_cycles}

run(circuit, shots, **kwargs)

Generate shots synthetic parity matrices.

Source code in src/qgate/adapters/base.py
def run(
    self,
    circuit: Any,
    shots: int,
    **kwargs: Any,
) -> list[list[list[int]]]:
    """Generate *shots* synthetic parity matrices."""
    n_sub = circuit["n_subsystems"]
    n_cyc = circuit["n_cycles"]
    results: list[list[list[int]]] = []
    for _ in range(shots):
        matrix = [
            [1 if self._rng.random() < self.error_rate else 0 for _ in range(n_sub)]
            for _ in range(n_cyc)
        ]
        results.append(matrix)
    return results

parse_results(raw_results, n_subsystems, n_cycles)

Wrap raw matrices in ParityOutcome.

Source code in src/qgate/adapters/base.py
def parse_results(
    self,
    raw_results: Any,
    n_subsystems: int,
    n_cycles: int,
) -> list[ParityOutcome]:
    """Wrap raw matrices in ParityOutcome."""
    return [
        ParityOutcome(
            n_subsystems=n_subsystems,
            n_cycles=n_cycles,
            parity_matrix=matrix,
        )
        for matrix in raw_results
    ]

qgate.adapters.registry

registry.py — Entry-point based adapter discovery.

Discovers adapters registered under the qgate.adapters entry-point group and provides :func:list_adapters / :func:load_adapter for programmatic and CLI access.

Example::

from qgate.adapters.registry import list_adapters, load_adapter

print(list_adapters())          # {"mock": "qgate.adapters.base:MockAdapter", ...}
AdapterCls = load_adapter("mock")
adapter = AdapterCls(error_rate=0.05, seed=42)

Patent pending (see LICENSE)

list_adapters()

Return {name: "module:Class"} for all registered adapters.

Reads the qgate.adapters entry-point group.

Source code in src/qgate/adapters/registry.py
def list_adapters() -> dict[str, str]:
    """Return ``{name: "module:Class"}`` for all registered adapters.

    Reads the ``qgate.adapters`` entry-point group.
    """
    return {ep.name: ep.value for ep in _get_group()}

load_adapter(name, **kwargs)

Load and return the adapter class registered under name.

Parameters:

Name Type Description Default
name str

Adapter name as registered in entry points (e.g. "mock").

required
**kwargs Any

Currently unused; reserved for future configuration.

{}

Returns:

Type Description
type

The adapter class (not an instance).

Raises:

Type Description
KeyError

If name is not a registered adapter.

ImportError

If the adapter's optional dependency is missing.

Source code in src/qgate/adapters/registry.py
def load_adapter(name: str, **kwargs: Any) -> type:
    """Load and return the adapter **class** registered under *name*.

    Args:
        name: Adapter name as registered in entry points (e.g. ``"mock"``).
        **kwargs: Currently unused; reserved for future configuration.

    Returns:
        The adapter class (not an instance).

    Raises:
        KeyError: If *name* is not a registered adapter.
        ImportError: If the adapter's optional dependency is missing.
    """
    for ep in _get_group():
        if ep.name == name:
            cls: type = ep.load()
            return cls

    available = sorted(list_adapters().keys())
    raise KeyError(
        f"Unknown adapter {name!r}. "
        f"Available adapters: {', '.join(available) or '(none)'}. "
        f"Install extras to register more (e.g. pip install qgate[qiskit])."
    )

qgate.adapters.qiskit_adapter

qiskit_adapter.py — Full Qiskit adapter for qgate.

Builds dynamic circuits with Bell-pair subsystems, scramble layers, and ancilla-based mid-circuit Z-parity measurements.

Requires the qiskit extra::

pip install qgate[qiskit]

Patent pending (see LICENSE)

QiskitAdapter

Bases: BaseAdapter

Adapter for IBM Qiskit circuits.

Builds dynamic circuits with
  • N Bell pairs (2N data qubits)
  • W monitoring cycles each containing:
  • Random single-qubit scramble rotations
  • Ancilla-based Z⊗Z parity measurement per pair
  • Ancilla reset & reuse

Parameters:

Name Type Description Default
backend Any

Qiskit backend or None for Aer simulator.

None
scramble_depth int

Number of random-rotation layers per cycle.

1
optimization_level int

Transpiler optimization level (0–3).

1
Source code in src/qgate/adapters/qiskit_adapter.py
class QiskitAdapter(BaseAdapter):
    """Adapter for IBM Qiskit circuits.

    Builds dynamic circuits with:
      * N Bell pairs (2N data qubits)
      * W monitoring cycles each containing:
        - Random single-qubit scramble rotations
        - Ancilla-based Z⊗Z parity measurement per pair
        - Ancilla reset & reuse

    Args:
        backend:         Qiskit backend or ``None`` for Aer simulator.
        scramble_depth:  Number of random-rotation layers per cycle.
        optimization_level: Transpiler optimization level (0–3).
    """

    def __init__(
        self,
        backend: Any = None,
        scramble_depth: int = 1,
        optimization_level: int = 1,
    ) -> None:
        _require_qiskit()
        self._backend = backend
        self.scramble_depth = scramble_depth
        self.optimization_level = optimization_level

    # ------------------------------------------------------------------
    # BaseAdapter interface
    # ------------------------------------------------------------------

    def build_circuit(
        self,
        n_subsystems: int,
        n_cycles: int,
        **kwargs: Any,
    ) -> QuantumCircuit:
        """Build a dynamic Qiskit circuit.

        Qubit layout:
          * data qubits   : 0 .. 2N-1  (pairs: [0,1], [2,3], …)
          * ancilla qubits: 2N .. 3N-1  (one per pair)

        Classical registers — one per cycle, each of width N
        (bit *i* records the parity of pair *i*).
        """
        import numpy as np

        n_data = 2 * n_subsystems
        n_anc = n_subsystems
        qc = QuantumCircuit(n_data + n_anc)

        # Classical registers — one per monitoring cycle
        cregs = []
        for w in range(n_cycles):
            cr = ClassicalRegister(n_subsystems, name=f"par_c{w}")
            qc.add_register(cr)
            cregs.append(cr)

        # Bell-pair preparation
        for i in range(n_subsystems):
            qc.h(2 * i)
            qc.cx(2 * i, 2 * i + 1)
        qc.barrier()

        rng = np.random.default_rng(kwargs.get("seed"))

        for w in range(n_cycles):
            # ── Scramble rotations ──
            for _ in range(self.scramble_depth):
                for q in range(n_data):
                    theta, phi, lam = rng.uniform(0, 0.3, size=3)
                    qc.u(theta, phi, lam, q)
            qc.barrier()

            # ── Z⊗Z parity measurement via ancilla ──
            for i in range(n_subsystems):
                anc = n_data + i
                qc.cx(2 * i, anc)
                qc.cx(2 * i + 1, anc)
                qc.measure(anc, cregs[w][i])
                qc.reset(anc)
            qc.barrier()

        return qc

    def run(
        self,
        circuit: Any,
        shots: int,
        **kwargs: Any,
    ) -> Any:
        """Execute via the configured backend (Aer if none)."""
        backend = self._backend
        if backend is None:
            from qiskit_aer import AerSimulator  # type: ignore[import-untyped]

            backend = AerSimulator()

        from qiskit import transpile  # type: ignore[import-untyped]

        transpiled = transpile(
            circuit,
            backend=backend,
            optimization_level=self.optimization_level,
        )
        job = backend.run(transpiled, shots=shots, **kwargs)
        return job.result()

    def parse_results(
        self,
        raw_results: Any,
        n_subsystems: int,
        n_cycles: int,
    ) -> list[ParityOutcome]:
        """Parse Qiskit ``Result`` into ``ParityOutcome`` objects."""
        counts: dict[str, int] = raw_results.get_counts()
        outcomes: list[ParityOutcome] = []

        for bitstring, count in counts.items():
            # Qiskit returns bits in reverse register order
            # Format: "par_cW-1 … par_c0" each of width n_subsystems
            segments = bitstring.strip().split(" ")
            # Reverse to get cycle order 0 → W-1
            segments = list(reversed(segments))

            matrix: list[list[int]] = []
            for seg in segments[:n_cycles]:
                # Reverse each segment so bit 0 = subsystem 0
                bits = [int(b) for b in reversed(seg)]
                # Pad or truncate to n_subsystems
                bits = (bits + [0] * n_subsystems)[:n_subsystems]
                matrix.append(bits)

            base_matrix = np.array(matrix, dtype=np.int8)
            # Each shot gets its own independent ndarray copy
            # to prevent aliasing mutations across shots.
            for _ in range(count):
                outcomes.append(
                    ParityOutcome(
                        n_subsystems=n_subsystems,
                        n_cycles=n_cycles,
                        parity_matrix=base_matrix.copy(),
                    )
                )

        logger.debug("Parsed %d outcomes from Qiskit result", len(outcomes))
        return outcomes

build_circuit(n_subsystems, n_cycles, **kwargs)

Build a dynamic Qiskit circuit.

Qubit layout
  • data qubits : 0 .. 2N-1 (pairs: [0,1], [2,3], …)
  • ancilla qubits: 2N .. 3N-1 (one per pair)

Classical registers — one per cycle, each of width N (bit i records the parity of pair i).

Source code in src/qgate/adapters/qiskit_adapter.py
def build_circuit(
    self,
    n_subsystems: int,
    n_cycles: int,
    **kwargs: Any,
) -> QuantumCircuit:
    """Build a dynamic Qiskit circuit.

    Qubit layout:
      * data qubits   : 0 .. 2N-1  (pairs: [0,1], [2,3], …)
      * ancilla qubits: 2N .. 3N-1  (one per pair)

    Classical registers — one per cycle, each of width N
    (bit *i* records the parity of pair *i*).
    """
    import numpy as np

    n_data = 2 * n_subsystems
    n_anc = n_subsystems
    qc = QuantumCircuit(n_data + n_anc)

    # Classical registers — one per monitoring cycle
    cregs = []
    for w in range(n_cycles):
        cr = ClassicalRegister(n_subsystems, name=f"par_c{w}")
        qc.add_register(cr)
        cregs.append(cr)

    # Bell-pair preparation
    for i in range(n_subsystems):
        qc.h(2 * i)
        qc.cx(2 * i, 2 * i + 1)
    qc.barrier()

    rng = np.random.default_rng(kwargs.get("seed"))

    for w in range(n_cycles):
        # ── Scramble rotations ──
        for _ in range(self.scramble_depth):
            for q in range(n_data):
                theta, phi, lam = rng.uniform(0, 0.3, size=3)
                qc.u(theta, phi, lam, q)
        qc.barrier()

        # ── Z⊗Z parity measurement via ancilla ──
        for i in range(n_subsystems):
            anc = n_data + i
            qc.cx(2 * i, anc)
            qc.cx(2 * i + 1, anc)
            qc.measure(anc, cregs[w][i])
            qc.reset(anc)
        qc.barrier()

    return qc

run(circuit, shots, **kwargs)

Execute via the configured backend (Aer if none).

Source code in src/qgate/adapters/qiskit_adapter.py
def run(
    self,
    circuit: Any,
    shots: int,
    **kwargs: Any,
) -> Any:
    """Execute via the configured backend (Aer if none)."""
    backend = self._backend
    if backend is None:
        from qiskit_aer import AerSimulator  # type: ignore[import-untyped]

        backend = AerSimulator()

    from qiskit import transpile  # type: ignore[import-untyped]

    transpiled = transpile(
        circuit,
        backend=backend,
        optimization_level=self.optimization_level,
    )
    job = backend.run(transpiled, shots=shots, **kwargs)
    return job.result()

parse_results(raw_results, n_subsystems, n_cycles)

Parse Qiskit Result into ParityOutcome objects.

Source code in src/qgate/adapters/qiskit_adapter.py
def parse_results(
    self,
    raw_results: Any,
    n_subsystems: int,
    n_cycles: int,
) -> list[ParityOutcome]:
    """Parse Qiskit ``Result`` into ``ParityOutcome`` objects."""
    counts: dict[str, int] = raw_results.get_counts()
    outcomes: list[ParityOutcome] = []

    for bitstring, count in counts.items():
        # Qiskit returns bits in reverse register order
        # Format: "par_cW-1 … par_c0" each of width n_subsystems
        segments = bitstring.strip().split(" ")
        # Reverse to get cycle order 0 → W-1
        segments = list(reversed(segments))

        matrix: list[list[int]] = []
        for seg in segments[:n_cycles]:
            # Reverse each segment so bit 0 = subsystem 0
            bits = [int(b) for b in reversed(seg)]
            # Pad or truncate to n_subsystems
            bits = (bits + [0] * n_subsystems)[:n_subsystems]
            matrix.append(bits)

        base_matrix = np.array(matrix, dtype=np.int8)
        # Each shot gets its own independent ndarray copy
        # to prevent aliasing mutations across shots.
        for _ in range(count):
            outcomes.append(
                ParityOutcome(
                    n_subsystems=n_subsystems,
                    n_cycles=n_cycles,
                    parity_matrix=base_matrix.copy(),
                )
            )

    logger.debug("Parsed %d outcomes from Qiskit result", len(outcomes))
    return outcomes

Algorithm TSVF Adapters

qgate.adapters.grover_adapter

grover_adapter.py — Adapter for Grover / TSVF-Chaotic Grover experiments.

Maps Grover search circuits with an ancilla-based post-selection probe onto qgate's :class:ParityOutcome model, enabling the full trajectory filtering pipeline (scoring → thresholding → conditioning) to work on search algorithms — not only Bell-pair parity monitoring.

Mapping to ParityOutcome: - n_subsystems = number of search qubits (e.g. 3 for |101⟩). - n_cycles = number of Grover iterations. - parity_matrix[cycle, sub] = 0 if qubit sub was in the correct target state at iteration cycle (via ancilla probe), 1 otherwise. This lets qgate's score_fusion, thresholding, and hierarchical conditioning rules apply naturally.

The adapter supports two algorithm variants via algorithm_mode: - "standard" — Oracle + diffusion per iteration. - "tsvf" — Oracle + chaotic ansatz + weak-measurement ancilla per iteration (post-selection trajectory filter).

Patent pending (see LICENSE)

GroverTSVFAdapter

Bases: BaseAdapter

Adapter for Grover / TSVF-Chaotic Grover experiments.

This adapter builds Grover-search circuits, executes them on a Qiskit backend, and maps the raw results onto ParityOutcome objects that the rest of qgate can score and threshold.

Parameters:

Name Type Description Default
backend Any

A Qiskit backend (Aer or IBM Runtime).

None
algorithm_mode str

"standard" or "tsvf" (default "tsvf").

'tsvf'
target_state str

Target bitstring (default "101").

'101'
seed int

RNG seed for the chaotic ansatz.

42
weak_angle_base float

Base angle for the post-selection probe (radians).

pi / 6
weak_angle_ramp float

Per-iteration angle increase (radians).

pi / 12
optimization_level int

Transpilation optimisation level (0-3).

1
Source code in src/qgate/adapters/grover_adapter.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
class GroverTSVFAdapter(BaseAdapter):
    """Adapter for Grover / TSVF-Chaotic Grover experiments.

    This adapter builds Grover-search circuits, executes them on a Qiskit
    backend, and maps the raw results onto ``ParityOutcome`` objects that
    the rest of qgate can score and threshold.

    Args:
        backend:         A Qiskit backend (Aer or IBM Runtime).
        algorithm_mode:  ``"standard"`` or ``"tsvf"`` (default ``"tsvf"``).
        target_state:    Target bitstring (default ``"101"``).
        seed:            RNG seed for the chaotic ansatz.
        weak_angle_base: Base angle for the post-selection probe (radians).
        weak_angle_ramp: Per-iteration angle increase (radians).
        optimization_level: Transpilation optimisation level (0-3).
    """

    def __init__(
        self,
        backend: Any = None,
        *,
        algorithm_mode: str = "tsvf",
        target_state: str = "101",
        seed: int = 42,
        weak_angle_base: float = math.pi / 6,
        weak_angle_ramp: float = math.pi / 12,
        optimization_level: int = 1,
    ) -> None:
        if not _HAS_QISKIT:  # pragma: no cover
            raise ImportError(
                "GroverTSVFAdapter requires Qiskit. Install with: pip install qgate[qiskit]"
            )
        self.backend = backend
        self.algorithm_mode = algorithm_mode
        self.target_state = target_state
        self.n_search_qubits = len(target_state)
        self.seed = seed
        self.weak_angle_base = weak_angle_base
        self.weak_angle_ramp = weak_angle_ramp
        self.optimization_level = optimization_level

    # ------------------------------------------------------------------
    # BaseAdapter interface
    # ------------------------------------------------------------------

    def build_circuit(
        self,
        n_subsystems: int,
        n_cycles: int,
        **kwargs: Any,
    ) -> QuantumCircuit:
        """Build the Grover circuit.

        ``n_subsystems`` = number of search qubits (must match
        ``len(target_state)``).
        ``n_cycles`` = number of Grover iterations.

        Returns a :class:`QuantumCircuit`.
        """
        if n_subsystems != self.n_search_qubits:
            raise ValueError(
                f"n_subsystems ({n_subsystems}) must match target_state "
                f"length ({self.n_search_qubits})"
            )
        if self.algorithm_mode == "standard":
            return self._build_standard(n_subsystems, n_cycles)
        elif self.algorithm_mode == "tsvf":
            seed_offset = kwargs.get("seed_offset", 0)
            return self._build_tsvf(n_subsystems, n_cycles, seed_offset)
        else:
            raise ValueError(
                f"Unknown algorithm_mode: {self.algorithm_mode!r}. Use 'standard' or 'tsvf'."
            )

    def run(
        self,
        circuit: Any,
        shots: int,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Execute the circuit and return a raw result dict.

        Tries SamplerV2 first, falls back to ``backend.run()``.
        """
        if self.backend is None:
            raise RuntimeError("No backend configured for GroverTSVFAdapter")

        try:
            from qiskit.transpiler.preset_passmanagers import (
                generate_preset_pass_manager,
            )
            from qiskit_ibm_runtime import SamplerV2 as Sampler

            pm = generate_preset_pass_manager(
                backend=self.backend,
                optimization_level=self.optimization_level,
            )
            isa = pm.run(circuit)
            job = Sampler(mode=self.backend).run([isa], shots=shots)
            result = job.result()
            pub = result[0]
            return {
                "pub_result": pub,
                "circuit": circuit,
                "shots": shots,
            }
        except (ImportError, Exception):
            pass

        transpiled = transpile(
            circuit,
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        job = self.backend.run(transpiled, shots=shots)
        result = job.result()
        return {
            "counts": result.get_counts(0),
            "circuit": circuit,
            "shots": shots,
        }

    def parse_results(
        self,
        raw_results: Any,
        n_subsystems: int,
        n_cycles: int,
    ) -> list[ParityOutcome]:
        """Parse raw Qiskit results into ParityOutcome objects.

        Each shot → one ParityOutcome.  The parity matrix records per-
        iteration, per-qubit: 0 = qubit in target state, 1 = not.

        For the TSVF variant the ancilla measurement at each iteration
        provides the "parity probe".  For the standard variant we infer
        from the final measurement only (all cycles share the same row).
        """
        # Extract per-shot bitstrings
        counts = self._extract_counts(raw_results)

        outcomes: list[ParityOutcome] = []
        for bitstring, count in counts.items():
            row = self._bitstring_to_parity_row(bitstring, n_subsystems, n_cycles)
            for _ in range(count):
                outcomes.append(
                    ParityOutcome(
                        n_subsystems=n_subsystems,
                        n_cycles=n_cycles,
                        parity_matrix=row.copy(),
                    )
                )
        return outcomes

    # ------------------------------------------------------------------
    # Public helpers (beyond BaseAdapter)
    # ------------------------------------------------------------------

    def get_transpiled_depth(self, circuit: QuantumCircuit) -> int:
        """Return the depth of the transpiled circuit."""
        transpiled = transpile(
            circuit,
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        return int(transpiled.depth())

    def extract_target_probability(
        self,
        raw_results: dict[str, Any],
        postselect: bool = True,
    ) -> tuple[float, int]:
        """Extract P(target) from raw results, optionally post-selecting.

        Returns (probability, total_shots_used).
        """
        counts = self._extract_counts(raw_results)
        if not postselect or self.algorithm_mode == "standard":
            total = sum(counts.values())
            if total == 0:
                return 0.0, 0
            target_count = 0
            for key, val in counts.items():
                search = self._extract_search_bits(str(key))
                if search == self.target_state:
                    target_count += val
            return target_count / total, total

        # Post-select on ancilla = 1
        accepted_total = 0
        target_count = 0
        for key, val in counts.items():
            key_str = str(key)
            anc_bit, search_bits = self._split_ancilla_search(key_str)
            if anc_bit == "1":
                accepted_total += val
                if search_bits == self.target_state:
                    target_count += val
        if accepted_total == 0:
            return 0.0, 0
        return target_count / accepted_total, accepted_total

    # ------------------------------------------------------------------
    # Private circuit builders
    # ------------------------------------------------------------------

    def _build_standard(self, n_sub: int, n_iter: int) -> QuantumCircuit:
        """Standard Grover: oracle + diffusion, no ancilla."""
        qr = QuantumRegister(n_sub, "q")
        cr = ClassicalRegister(n_sub, "c")
        qc = QuantumCircuit(qr, cr)
        search_qubits = list(range(n_sub))
        for q in search_qubits:
            qc.h(q)
        for _ in range(n_iter):
            _oracle_101(qc, search_qubits)
            _grover_diffusion(qc, search_qubits)
        qc.measure(search_qubits, list(range(n_sub)))
        return qc

    def _build_tsvf(
        self,
        n_sub: int,
        n_iter: int,
        seed_offset: int = 0,
    ) -> QuantumCircuit:
        """TSVF chaotic Grover: oracle + chaotic ansatz + ancilla probe."""
        qr = QuantumRegister(n_sub, "q")
        anc_r = QuantumRegister(1, "anc")
        cr = ClassicalRegister(n_sub, "c_search")
        cr_anc = ClassicalRegister(1, "c_anc")
        qc = QuantumCircuit(qr, anc_r, cr, cr_anc)

        search_qubits = list(range(n_sub))
        anc_qubit = n_sub
        rng = np.random.default_rng(self.seed + seed_offset)

        for q in search_qubits:
            qc.h(q)

        for it in range(n_iter):
            _oracle_101(qc, search_qubits)
            qc.barrier()
            _chaotic_ansatz(qc, search_qubits, iteration=it, rng=rng)
            qc.barrier()
            if it > 0:
                qc.reset(anc_qubit)
            angle = self.weak_angle_base + self.weak_angle_ramp * min(it, 4)
            _add_postselection_ancilla(
                qc,
                search_qubits,
                anc_qubit,
                cr_anc[0],
                weak_angle=angle,
            )
            qc.barrier()

        qc.measure(search_qubits, list(range(n_sub)))
        return qc

    # ------------------------------------------------------------------
    # Private result parsing helpers
    # ------------------------------------------------------------------

    def _extract_counts(self, raw_results: Any) -> dict[str, int]:
        """Extract a counts dict from raw run() output."""
        if isinstance(raw_results, dict):
            if "counts" in raw_results:
                return self._normalise_counts(raw_results["counts"])
            if "pub_result" in raw_results:
                return self._counts_from_pub(
                    raw_results["pub_result"],
                    raw_results.get("circuit"),
                )
        # Already a counts dict
        return self._normalise_counts(raw_results)

    def _normalise_counts(self, counts: dict) -> dict[str, int]:
        """Ensure keys are bitstrings and values are ints."""
        out: dict[str, int] = {}
        for k, v in counts.items():
            out[str(k)] = int(v)
        return out

    def _counts_from_pub(self, pub, circuit: Any) -> dict[str, int]:
        """Extract per-shot combined bitstrings from a SamplerV2 PubResult."""
        creg_names = [cr.name for cr in circuit.cregs] if circuit else []
        if len(creg_names) <= 1:
            name = creg_names[0] if creg_names else "c"
            try:
                return {str(k): int(v) for k, v in pub.data[name].get_counts().items()}
            except Exception:
                return {}

        # Multi-register: reconstruct combined bitstrings
        try:
            reg_bitstrings = {}
            for name in creg_names:
                reg_bitstrings[name] = pub.data[name].get_bitstrings()
            num_shots = len(reg_bitstrings[creg_names[0]])
            combined: dict[str, int] = {}
            for i in range(num_shots):
                parts = []
                for name in reversed(creg_names):
                    parts.append(reg_bitstrings[name][i])
                full = " ".join(parts)
                combined[full] = combined.get(full, 0) + 1
            return combined
        except Exception:
            # Fallback: first register
            name = creg_names[0]
            try:
                return {str(k): int(v) for k, v in pub.data[name].get_counts().items()}
            except Exception:
                return {}

    def _extract_search_bits(self, bitstring: str) -> str:
        """Extract the search register bits from a bitstring."""
        key = bitstring.strip()
        if " " in key:
            # Space-separated: last part is first register (search)
            return key.split()[-1]
        # Concatenated: last n_search_qubits chars
        return key[-self.n_search_qubits :]

    def _split_ancilla_search(self, bitstring: str) -> tuple[str, str]:
        """Split bitstring into (ancilla_bit, search_bits)."""
        key = bitstring.strip()
        if " " in key:
            parts = key.split()
            return parts[0], parts[-1]
        return key[0], key[1:]

    def _bitstring_to_parity_row(
        self,
        bitstring: str,
        n_subsystems: int,
        n_cycles: int,
    ) -> np.ndarray:
        """Convert a measurement bitstring to a parity matrix.

        For the TSVF variant:
          - Ancilla=1 → row of 0s (all pass — the probe confirmed target).
          - Ancilla=0 → row of 1s (all fail — no evidence of target).

        For the standard variant:
          - Compare each qubit to target — 0 if match, 1 if mismatch.
          - Replicate the same row across all cycles (no mid-circuit info).

        Shape: (n_cycles, n_subsystems).
        """
        if self.algorithm_mode == "tsvf":
            anc_bit, search_bits = self._split_ancilla_search(bitstring)
            # Per-qubit match check
            qubit_match = np.array(
                [
                    0 if (i < len(search_bits) and search_bits[i] == self.target_state[i]) else 1
                    for i in range(n_subsystems)
                ],
                dtype=np.int8,
            )
            # Ancilla probed once at the end; replicate across cycles
            if anc_bit == "1":
                # Post-selection probe fired → use qubit-level match
                matrix = np.tile(qubit_match, (n_cycles, 1))
            else:
                # Probe didn't fire → mark all as fail
                matrix = np.ones((n_cycles, n_subsystems), dtype=np.int8)
        else:
            # Standard Grover — compare final measurement to target
            search_bits = self._extract_search_bits(bitstring)
            qubit_match = np.array(
                [
                    0 if (i < len(search_bits) and search_bits[i] == self.target_state[i]) else 1
                    for i in range(n_subsystems)
                ],
                dtype=np.int8,
            )
            matrix = np.tile(qubit_match, (n_cycles, 1))

        return matrix

build_circuit(n_subsystems, n_cycles, **kwargs)

Build the Grover circuit.

n_subsystems = number of search qubits (must match len(target_state)). n_cycles = number of Grover iterations.

Returns a :class:QuantumCircuit.

Source code in src/qgate/adapters/grover_adapter.py
def build_circuit(
    self,
    n_subsystems: int,
    n_cycles: int,
    **kwargs: Any,
) -> QuantumCircuit:
    """Build the Grover circuit.

    ``n_subsystems`` = number of search qubits (must match
    ``len(target_state)``).
    ``n_cycles`` = number of Grover iterations.

    Returns a :class:`QuantumCircuit`.
    """
    if n_subsystems != self.n_search_qubits:
        raise ValueError(
            f"n_subsystems ({n_subsystems}) must match target_state "
            f"length ({self.n_search_qubits})"
        )
    if self.algorithm_mode == "standard":
        return self._build_standard(n_subsystems, n_cycles)
    elif self.algorithm_mode == "tsvf":
        seed_offset = kwargs.get("seed_offset", 0)
        return self._build_tsvf(n_subsystems, n_cycles, seed_offset)
    else:
        raise ValueError(
            f"Unknown algorithm_mode: {self.algorithm_mode!r}. Use 'standard' or 'tsvf'."
        )

run(circuit, shots, **kwargs)

Execute the circuit and return a raw result dict.

Tries SamplerV2 first, falls back to backend.run().

Source code in src/qgate/adapters/grover_adapter.py
def run(
    self,
    circuit: Any,
    shots: int,
    **kwargs: Any,
) -> dict[str, Any]:
    """Execute the circuit and return a raw result dict.

    Tries SamplerV2 first, falls back to ``backend.run()``.
    """
    if self.backend is None:
        raise RuntimeError("No backend configured for GroverTSVFAdapter")

    try:
        from qiskit.transpiler.preset_passmanagers import (
            generate_preset_pass_manager,
        )
        from qiskit_ibm_runtime import SamplerV2 as Sampler

        pm = generate_preset_pass_manager(
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        isa = pm.run(circuit)
        job = Sampler(mode=self.backend).run([isa], shots=shots)
        result = job.result()
        pub = result[0]
        return {
            "pub_result": pub,
            "circuit": circuit,
            "shots": shots,
        }
    except (ImportError, Exception):
        pass

    transpiled = transpile(
        circuit,
        backend=self.backend,
        optimization_level=self.optimization_level,
    )
    job = self.backend.run(transpiled, shots=shots)
    result = job.result()
    return {
        "counts": result.get_counts(0),
        "circuit": circuit,
        "shots": shots,
    }

parse_results(raw_results, n_subsystems, n_cycles)

Parse raw Qiskit results into ParityOutcome objects.

Each shot → one ParityOutcome. The parity matrix records per- iteration, per-qubit: 0 = qubit in target state, 1 = not.

For the TSVF variant the ancilla measurement at each iteration provides the "parity probe". For the standard variant we infer from the final measurement only (all cycles share the same row).

Source code in src/qgate/adapters/grover_adapter.py
def parse_results(
    self,
    raw_results: Any,
    n_subsystems: int,
    n_cycles: int,
) -> list[ParityOutcome]:
    """Parse raw Qiskit results into ParityOutcome objects.

    Each shot → one ParityOutcome.  The parity matrix records per-
    iteration, per-qubit: 0 = qubit in target state, 1 = not.

    For the TSVF variant the ancilla measurement at each iteration
    provides the "parity probe".  For the standard variant we infer
    from the final measurement only (all cycles share the same row).
    """
    # Extract per-shot bitstrings
    counts = self._extract_counts(raw_results)

    outcomes: list[ParityOutcome] = []
    for bitstring, count in counts.items():
        row = self._bitstring_to_parity_row(bitstring, n_subsystems, n_cycles)
        for _ in range(count):
            outcomes.append(
                ParityOutcome(
                    n_subsystems=n_subsystems,
                    n_cycles=n_cycles,
                    parity_matrix=row.copy(),
                )
            )
    return outcomes

get_transpiled_depth(circuit)

Return the depth of the transpiled circuit.

Source code in src/qgate/adapters/grover_adapter.py
def get_transpiled_depth(self, circuit: QuantumCircuit) -> int:
    """Return the depth of the transpiled circuit."""
    transpiled = transpile(
        circuit,
        backend=self.backend,
        optimization_level=self.optimization_level,
    )
    return int(transpiled.depth())

extract_target_probability(raw_results, postselect=True)

Extract P(target) from raw results, optionally post-selecting.

Returns (probability, total_shots_used).

Source code in src/qgate/adapters/grover_adapter.py
def extract_target_probability(
    self,
    raw_results: dict[str, Any],
    postselect: bool = True,
) -> tuple[float, int]:
    """Extract P(target) from raw results, optionally post-selecting.

    Returns (probability, total_shots_used).
    """
    counts = self._extract_counts(raw_results)
    if not postselect or self.algorithm_mode == "standard":
        total = sum(counts.values())
        if total == 0:
            return 0.0, 0
        target_count = 0
        for key, val in counts.items():
            search = self._extract_search_bits(str(key))
            if search == self.target_state:
                target_count += val
        return target_count / total, total

    # Post-select on ancilla = 1
    accepted_total = 0
    target_count = 0
    for key, val in counts.items():
        key_str = str(key)
        anc_bit, search_bits = self._split_ancilla_search(key_str)
        if anc_bit == "1":
            accepted_total += val
            if search_bits == self.target_state:
                target_count += val
    if accepted_total == 0:
        return 0.0, 0
    return target_count / accepted_total, accepted_total

qgate.adapters.qaoa_adapter

qaoa_adapter.py — Adapter for QAOA / TSVF-QAOA experiments.

Maps QAOA (Quantum Approximate Optimisation Algorithm) circuits with an ancilla-based post-selection probe onto qgate's :class:ParityOutcome model, enabling the full trajectory filtering pipeline (scoring → thresholding → conditioning) to work on combinatorial optimisation — specifically the MaxCut problem on random graphs.

Mapping to ParityOutcome: - n_subsystems = number of graph nodes (qubits). - n_cycles = number of QAOA layers (p). - parity_matrix[cycle, sub] = 0 if qubit sub contributes to a satisfying cut at layer cycle (via cost-function probe), 1 otherwise. This lets qgate's score_fusion, thresholding, and hierarchical conditioning rules apply naturally.

The adapter supports two algorithm variants via algorithm_mode: - "standard" — Canonical QAOA (cost + mixer layers). - "tsvf" — QAOA + chaotic entangling ansatz + weak-measurement ancilla per layer (post-selection trajectory filter).

MaxCut problem: Given an undirected graph G = (V, E), find a partition of vertices into two sets that maximises the number of edges crossing the cut. The QAOA cost operator encodes C = Σ_{(i,j)∈E} ½(1 - Z_i·Z_j).

Patent pending (see LICENSE)

QAOATSVFAdapter

Bases: BaseAdapter

Adapter for QAOA / TSVF-QAOA MaxCut experiments.

This adapter builds QAOA circuits for the MaxCut problem, executes them on a Qiskit backend, and maps the raw results onto ParityOutcome objects that the rest of qgate can score and threshold.

Parameters:

Name Type Description Default
backend Any

A Qiskit backend (Aer or IBM Runtime).

None
algorithm_mode str

"standard" or "tsvf" (default "tsvf").

'tsvf'
edges list[tuple[int, int]] | None

Edge list for the MaxCut graph.

None
n_nodes int

Number of graph nodes (qubits). Required.

4
gammas list[float] | float | None

Cost layer angles (one per layer, or single float).

None
betas list[float] | float | None

Mixer layer angles (one per layer, or single float).

None
seed int

RNG seed for chaotic ansatz and graph generation.

42
weak_angle_base float

Base angle for the post-selection probe (radians).

pi / 4
weak_angle_ramp float

Per-layer angle increase (radians).

pi / 8
optimization_level int

Transpilation optimisation level (0-3).

1
Source code in src/qgate/adapters/qaoa_adapter.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
class QAOATSVFAdapter(BaseAdapter):
    """Adapter for QAOA / TSVF-QAOA MaxCut experiments.

    This adapter builds QAOA circuits for the MaxCut problem, executes
    them on a Qiskit backend, and maps the raw results onto
    ``ParityOutcome`` objects that the rest of qgate can score and
    threshold.

    Args:
        backend:           A Qiskit backend (Aer or IBM Runtime).
        algorithm_mode:    ``"standard"`` or ``"tsvf"`` (default ``"tsvf"``).
        edges:             Edge list for the MaxCut graph.
        n_nodes:           Number of graph nodes (qubits). Required.
        gammas:            Cost layer angles (one per layer, or single float).
        betas:             Mixer layer angles (one per layer, or single float).
        seed:              RNG seed for chaotic ansatz and graph generation.
        weak_angle_base:   Base angle for the post-selection probe (radians).
        weak_angle_ramp:   Per-layer angle increase (radians).
        optimization_level: Transpilation optimisation level (0-3).
    """

    def __init__(
        self,
        backend: Any = None,
        *,
        algorithm_mode: str = "tsvf",
        edges: list[tuple[int, int]] | None = None,
        n_nodes: int = 4,
        gammas: list[float] | float | None = None,
        betas: list[float] | float | None = None,
        seed: int = 42,
        weak_angle_base: float = math.pi / 4,
        weak_angle_ramp: float = math.pi / 8,
        optimization_level: int = 1,
    ) -> None:
        if not _HAS_QISKIT:  # pragma: no cover
            raise ImportError(
                "QAOATSVFAdapter requires Qiskit. Install with: pip install qgate[qiskit]"
            )
        self.backend = backend
        self.algorithm_mode = algorithm_mode
        self.n_nodes = n_nodes
        self.seed = seed
        self.weak_angle_base = weak_angle_base
        self.weak_angle_ramp = weak_angle_ramp
        self.optimization_level = optimization_level

        # Graph edges
        if edges is not None:
            self.edges = edges
        else:
            self.edges = random_regular_graph(n_nodes, degree=3, seed=seed)

        # QAOA angles — default heuristic if not provided
        self._gammas_raw = gammas
        self._betas_raw = betas

    def _get_angles(self, n_layers: int) -> tuple[list[float], list[float]]:
        """Resolve gamma/beta arrays for n_layers."""
        if self._gammas_raw is None:
            # Heuristic: linearly spaced from π/8 to π/4
            gammas = [
                math.pi / 8 + (math.pi / 8) * idx / max(n_layers - 1, 1) for idx in range(n_layers)
            ]
        elif isinstance(self._gammas_raw, (int, float)):
            gammas = [float(self._gammas_raw)] * n_layers
        else:
            gammas = list(self._gammas_raw)
            if len(gammas) < n_layers:
                gammas = gammas + [gammas[-1]] * (n_layers - len(gammas))

        if self._betas_raw is None:
            # Heuristic: linearly spaced from π/4 to π/8
            betas = [
                math.pi / 4 - (math.pi / 8) * idx / max(n_layers - 1, 1) for idx in range(n_layers)
            ]
        elif isinstance(self._betas_raw, (int, float)):
            betas = [float(self._betas_raw)] * n_layers
        else:
            betas = list(self._betas_raw)
            if len(betas) < n_layers:
                betas = betas + [betas[-1]] * (n_layers - len(betas))

        return gammas[:n_layers], betas[:n_layers]

    # ------------------------------------------------------------------
    # BaseAdapter interface
    # ------------------------------------------------------------------

    def build_circuit(
        self,
        n_subsystems: int,
        n_cycles: int,
        **kwargs: Any,
    ) -> QuantumCircuit:
        """Build the QAOA circuit.

        ``n_subsystems`` = number of graph nodes (must match ``n_nodes``).
        ``n_cycles`` = number of QAOA layers (p).

        Returns a :class:`QuantumCircuit`.
        """
        if n_subsystems != self.n_nodes:
            raise ValueError(f"n_subsystems ({n_subsystems}) must match n_nodes ({self.n_nodes})")
        if self.algorithm_mode == "standard":
            return self._build_standard(n_subsystems, n_cycles)
        elif self.algorithm_mode == "tsvf":
            seed_offset = kwargs.get("seed_offset", 0)
            return self._build_tsvf(n_subsystems, n_cycles, seed_offset)
        else:
            raise ValueError(
                f"Unknown algorithm_mode: {self.algorithm_mode!r}. Use 'standard' or 'tsvf'."
            )

    def run(
        self,
        circuit: Any,
        shots: int,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Execute the circuit and return a raw result dict.

        Tries SamplerV2 first, falls back to ``backend.run()``.
        """
        if self.backend is None:
            raise RuntimeError("No backend configured for QAOATSVFAdapter")

        try:
            from qiskit.transpiler.preset_passmanagers import (
                generate_preset_pass_manager,
            )
            from qiskit_ibm_runtime import SamplerV2 as Sampler

            pm = generate_preset_pass_manager(
                backend=self.backend,
                optimization_level=self.optimization_level,
            )
            isa = pm.run(circuit)
            job = Sampler(mode=self.backend).run([isa], shots=shots)
            result = job.result()
            pub = result[0]
            return {
                "pub_result": pub,
                "circuit": circuit,
                "shots": shots,
            }
        except (ImportError, Exception):
            pass

        transpiled = transpile(
            circuit,
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        job = self.backend.run(transpiled, shots=shots)
        result = job.result()
        return {
            "counts": result.get_counts(0),
            "circuit": circuit,
            "shots": shots,
        }

    def parse_results(
        self,
        raw_results: Any,
        n_subsystems: int,
        n_cycles: int,
    ) -> list[ParityOutcome]:
        """Parse raw Qiskit results into ParityOutcome objects.

        Each shot → one ParityOutcome.  The parity matrix records per-
        layer, per-qubit: 0 if the qubit contributes to a "good" cut
        partition, 1 otherwise.

        For the TSVF variant the ancilla measurement at each layer
        provides the "cut quality probe".  For the standard variant we
        evaluate from the final measurement against the best-known cut.
        """
        counts = self._extract_counts(raw_results)

        outcomes: list[ParityOutcome] = []
        for bitstring, count in counts.items():
            row = self._bitstring_to_parity_row(bitstring, n_subsystems, n_cycles)
            for _ in range(count):
                outcomes.append(
                    ParityOutcome(
                        n_subsystems=n_subsystems,
                        n_cycles=n_cycles,
                        parity_matrix=row.copy(),
                    )
                )
        return outcomes

    # ------------------------------------------------------------------
    # Public helpers (beyond BaseAdapter)
    # ------------------------------------------------------------------

    def get_transpiled_depth(self, circuit: QuantumCircuit) -> int:
        """Return the depth of the transpiled circuit."""
        transpiled = transpile(
            circuit,
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        return int(transpiled.depth())

    def extract_cut_quality(
        self,
        raw_results: dict[str, Any],
        postselect: bool = True,
    ) -> tuple[float, float, int]:
        """Extract the mean cut ratio and approximation ratio.

        Returns (mean_cut_ratio, approx_ratio, total_shots_used).

        ``mean_cut_ratio`` = mean(cut_value) / max_possible_edges.
        ``approx_ratio``   = mean(cut_value) / best_known_cut.
        """
        counts = self._extract_counts(raw_results)
        _, best_cut = best_maxcut(self.n_nodes, self.edges)
        max_edges = len(self.edges)

        if not postselect or self.algorithm_mode == "standard":
            total = sum(counts.values())
            if total == 0:
                return 0.0, 0.0, 0
            total_cut = 0.0
            for key, val in counts.items():
                search = self._extract_search_bits(str(key))
                cv = maxcut_value(search, self.edges)
                total_cut += cv * val
            mean_cut = total_cut / total
            cut_ratio = mean_cut / max_edges if max_edges > 0 else 0.0
            approx_ratio = mean_cut / best_cut if best_cut > 0 else 0.0
            return cut_ratio, approx_ratio, total

        # Post-select on ancilla = 1
        accepted_total = 0
        total_cut = 0.0
        for key, val in counts.items():
            key_str = str(key)
            anc_bit, search_bits = self._split_ancilla_search(key_str)
            if anc_bit == "1":
                accepted_total += val
                cv = maxcut_value(search_bits, self.edges)
                total_cut += cv * val
        if accepted_total == 0:
            return 0.0, 0.0, 0
        mean_cut = total_cut / accepted_total
        cut_ratio = mean_cut / max_edges if max_edges > 0 else 0.0
        approx_ratio = mean_cut / best_cut if best_cut > 0 else 0.0
        return cut_ratio, approx_ratio, accepted_total

    def extract_best_bitstring(
        self,
        raw_results: dict[str, Any],
        postselect: bool = True,
    ) -> tuple[str, int, int]:
        """Find the most-sampled bitstring and its cut value.

        Returns (bitstring, cut_value, count).
        """
        counts = self._extract_counts(raw_results)
        best_bs = ""
        best_count = 0
        best_cv = 0

        for key, val in counts.items():
            key_str = str(key)
            if self.algorithm_mode == "tsvf" and postselect:
                anc_bit, search_bits = self._split_ancilla_search(key_str)
                if anc_bit != "1":
                    continue
            else:
                search_bits = self._extract_search_bits(key_str)

            cv = maxcut_value(search_bits, self.edges)
            if val > best_count or (val == best_count and cv > best_cv):
                best_bs = search_bits
                best_count = val
                best_cv = cv

        return best_bs, best_cv, best_count

    # ------------------------------------------------------------------
    # Private circuit builders
    # ------------------------------------------------------------------

    def _build_standard(self, n_sub: int, n_layers: int) -> QuantumCircuit:
        """Standard QAOA: cost + mixer layers, no ancilla."""
        qr = QuantumRegister(n_sub, "q")
        cr = ClassicalRegister(n_sub, "c")
        qc = QuantumCircuit(qr, cr)
        qubits = list(range(n_sub))

        # Initial superposition
        for q in qubits:
            qc.h(q)

        gammas, betas = self._get_angles(n_layers)

        for layer in range(n_layers):
            _qaoa_cost_layer(qc, qubits, self.edges, gammas[layer])
            qc.barrier()
            _qaoa_mixer_layer(qc, qubits, betas[layer])
            qc.barrier()

        qc.measure(qubits, list(range(n_sub)))
        return qc

    def _build_tsvf(
        self,
        n_sub: int,
        n_layers: int,
        seed_offset: int = 0,
    ) -> QuantumCircuit:
        """TSVF QAOA: cost + chaotic ansatz + ancilla probe per layer."""
        qr = QuantumRegister(n_sub, "q")
        anc_r = QuantumRegister(1, "anc")
        cr = ClassicalRegister(n_sub, "c_search")
        cr_anc = ClassicalRegister(1, "c_anc")
        qc = QuantumCircuit(qr, anc_r, cr, cr_anc)

        qubits = list(range(n_sub))
        anc_qubit = n_sub
        rng = np.random.default_rng(self.seed + seed_offset)

        # Initial superposition
        for q in qubits:
            qc.h(q)

        gammas, _betas = self._get_angles(n_layers)

        for layer in range(n_layers):
            # Cost layer (same as standard — problem encoding)
            _qaoa_cost_layer(qc, qubits, self.edges, gammas[layer])
            qc.barrier()

            # Chaotic ansatz instead of mixer
            _chaotic_qaoa_ansatz(qc, qubits, layer=layer, rng=rng)
            qc.barrier()

            # Reset ancilla for reuse (except first layer)
            if layer > 0:
                qc.reset(anc_qubit)

            # Weak-measurement probe
            angle = self.weak_angle_base + self.weak_angle_ramp * min(layer, 4)
            _add_cost_probe_ancilla(
                qc,
                qubits,
                self.edges,
                anc_qubit,
                cr_anc[0],
                weak_angle=angle,
            )
            qc.barrier()

        qc.measure(qubits, list(range(n_sub)))
        return qc

    # ------------------------------------------------------------------
    # Private result parsing helpers
    # ------------------------------------------------------------------

    def _extract_counts(self, raw_results: Any) -> dict[str, int]:
        """Extract a counts dict from raw run() output."""
        if isinstance(raw_results, dict):
            if "counts" in raw_results:
                return self._normalise_counts(raw_results["counts"])
            if "pub_result" in raw_results:
                return self._counts_from_pub(
                    raw_results["pub_result"],
                    raw_results.get("circuit"),
                )
        return self._normalise_counts(raw_results)

    def _normalise_counts(self, counts: dict) -> dict[str, int]:
        """Ensure keys are bitstrings and values are ints."""
        out: dict[str, int] = {}
        for k, v in counts.items():
            out[str(k)] = int(v)
        return out

    def _counts_from_pub(self, pub: Any, circuit: Any) -> dict[str, int]:
        """Extract per-shot combined bitstrings from a SamplerV2 PubResult."""
        creg_names = [cr.name for cr in circuit.cregs] if circuit else []
        if len(creg_names) <= 1:
            name = creg_names[0] if creg_names else "c"
            try:
                return {str(k): int(v) for k, v in pub.data[name].get_counts().items()}
            except Exception:
                return {}

        # Multi-register: reconstruct combined bitstrings
        try:
            reg_bitstrings: dict[str, Any] = {}
            for name in creg_names:
                reg_bitstrings[name] = pub.data[name].get_bitstrings()
            num_shots = len(reg_bitstrings[creg_names[0]])
            combined: dict[str, int] = {}
            for i in range(num_shots):
                parts = []
                for name in reversed(creg_names):
                    parts.append(reg_bitstrings[name][i])
                full = " ".join(parts)
                combined[full] = combined.get(full, 0) + 1
            return combined
        except Exception:
            name = creg_names[0]
            try:
                return {str(k): int(v) for k, v in pub.data[name].get_counts().items()}
            except Exception:
                return {}

    def _extract_search_bits(self, bitstring: str) -> str:
        """Extract the search register bits from a bitstring."""
        key = bitstring.strip()
        if " " in key:
            return key.split()[-1]
        return key[-self.n_nodes :]

    def _split_ancilla_search(self, bitstring: str) -> tuple[str, str]:
        """Split bitstring into (ancilla_bit, search_bits)."""
        key = bitstring.strip()
        if " " in key:
            parts = key.split()
            return parts[0], parts[-1]
        return key[0], key[1:]

    def _bitstring_to_parity_row(
        self,
        bitstring: str,
        n_subsystems: int,
        n_cycles: int,
    ) -> np.ndarray:
        """Convert a measurement bitstring to a parity matrix.

        For the TSVF variant:
          - Ancilla=1 → evaluate per-qubit cut contribution:
            0 if qubit is on the "cut side" of at least one edge, 1 otherwise.
          - Ancilla=0 → row of 1s (all fail — no evidence of good cut).

        For the standard variant:
          - Evaluate per-qubit cut contribution from final measurement.
          - Replicate across all cycles.

        Shape: (n_cycles, n_subsystems).
        """
        if self.algorithm_mode == "tsvf":
            anc_bit, search_bits = self._split_ancilla_search(bitstring)

            if anc_bit == "1":
                qubit_quality = self._compute_qubit_cut_quality(
                    search_bits,
                    n_subsystems,
                )
                matrix = np.tile(qubit_quality, (n_cycles, 1))
            else:
                matrix = np.ones((n_cycles, n_subsystems), dtype=np.int8)
        else:
            search_bits = self._extract_search_bits(bitstring)
            qubit_quality = self._compute_qubit_cut_quality(
                search_bits,
                n_subsystems,
            )
            matrix = np.tile(qubit_quality, (n_cycles, 1))

        return matrix

    def _compute_qubit_cut_quality(
        self,
        search_bits: str,
        n_subsystems: int,
    ) -> np.ndarray:
        """Compute per-qubit cut quality: 0 = contributes to cut, 1 = doesn't.

        A qubit contributes to the cut if it is on the opposite side of
        at least one of its neighbour edges.
        """
        quality = np.ones(n_subsystems, dtype=np.int8)  # default: fail
        for i in range(min(len(search_bits), n_subsystems)):
            # Check if this qubit participates in any cut edge
            for a, b in self.edges:
                other = b if a == i else (a if b == i else None)
                if (
                    other is not None
                    and other < len(search_bits)
                    and search_bits[i] != search_bits[other]
                ):
                    quality[i] = 0  # contributes to a cut
                    break
        return quality

build_circuit(n_subsystems, n_cycles, **kwargs)

Build the QAOA circuit.

n_subsystems = number of graph nodes (must match n_nodes). n_cycles = number of QAOA layers (p).

Returns a :class:QuantumCircuit.

Source code in src/qgate/adapters/qaoa_adapter.py
def build_circuit(
    self,
    n_subsystems: int,
    n_cycles: int,
    **kwargs: Any,
) -> QuantumCircuit:
    """Build the QAOA circuit.

    ``n_subsystems`` = number of graph nodes (must match ``n_nodes``).
    ``n_cycles`` = number of QAOA layers (p).

    Returns a :class:`QuantumCircuit`.
    """
    if n_subsystems != self.n_nodes:
        raise ValueError(f"n_subsystems ({n_subsystems}) must match n_nodes ({self.n_nodes})")
    if self.algorithm_mode == "standard":
        return self._build_standard(n_subsystems, n_cycles)
    elif self.algorithm_mode == "tsvf":
        seed_offset = kwargs.get("seed_offset", 0)
        return self._build_tsvf(n_subsystems, n_cycles, seed_offset)
    else:
        raise ValueError(
            f"Unknown algorithm_mode: {self.algorithm_mode!r}. Use 'standard' or 'tsvf'."
        )

run(circuit, shots, **kwargs)

Execute the circuit and return a raw result dict.

Tries SamplerV2 first, falls back to backend.run().

Source code in src/qgate/adapters/qaoa_adapter.py
def run(
    self,
    circuit: Any,
    shots: int,
    **kwargs: Any,
) -> dict[str, Any]:
    """Execute the circuit and return a raw result dict.

    Tries SamplerV2 first, falls back to ``backend.run()``.
    """
    if self.backend is None:
        raise RuntimeError("No backend configured for QAOATSVFAdapter")

    try:
        from qiskit.transpiler.preset_passmanagers import (
            generate_preset_pass_manager,
        )
        from qiskit_ibm_runtime import SamplerV2 as Sampler

        pm = generate_preset_pass_manager(
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        isa = pm.run(circuit)
        job = Sampler(mode=self.backend).run([isa], shots=shots)
        result = job.result()
        pub = result[0]
        return {
            "pub_result": pub,
            "circuit": circuit,
            "shots": shots,
        }
    except (ImportError, Exception):
        pass

    transpiled = transpile(
        circuit,
        backend=self.backend,
        optimization_level=self.optimization_level,
    )
    job = self.backend.run(transpiled, shots=shots)
    result = job.result()
    return {
        "counts": result.get_counts(0),
        "circuit": circuit,
        "shots": shots,
    }

parse_results(raw_results, n_subsystems, n_cycles)

Parse raw Qiskit results into ParityOutcome objects.

Each shot → one ParityOutcome. The parity matrix records per- layer, per-qubit: 0 if the qubit contributes to a "good" cut partition, 1 otherwise.

For the TSVF variant the ancilla measurement at each layer provides the "cut quality probe". For the standard variant we evaluate from the final measurement against the best-known cut.

Source code in src/qgate/adapters/qaoa_adapter.py
def parse_results(
    self,
    raw_results: Any,
    n_subsystems: int,
    n_cycles: int,
) -> list[ParityOutcome]:
    """Parse raw Qiskit results into ParityOutcome objects.

    Each shot → one ParityOutcome.  The parity matrix records per-
    layer, per-qubit: 0 if the qubit contributes to a "good" cut
    partition, 1 otherwise.

    For the TSVF variant the ancilla measurement at each layer
    provides the "cut quality probe".  For the standard variant we
    evaluate from the final measurement against the best-known cut.
    """
    counts = self._extract_counts(raw_results)

    outcomes: list[ParityOutcome] = []
    for bitstring, count in counts.items():
        row = self._bitstring_to_parity_row(bitstring, n_subsystems, n_cycles)
        for _ in range(count):
            outcomes.append(
                ParityOutcome(
                    n_subsystems=n_subsystems,
                    n_cycles=n_cycles,
                    parity_matrix=row.copy(),
                )
            )
    return outcomes

get_transpiled_depth(circuit)

Return the depth of the transpiled circuit.

Source code in src/qgate/adapters/qaoa_adapter.py
def get_transpiled_depth(self, circuit: QuantumCircuit) -> int:
    """Return the depth of the transpiled circuit."""
    transpiled = transpile(
        circuit,
        backend=self.backend,
        optimization_level=self.optimization_level,
    )
    return int(transpiled.depth())

extract_cut_quality(raw_results, postselect=True)

Extract the mean cut ratio and approximation ratio.

Returns (mean_cut_ratio, approx_ratio, total_shots_used).

mean_cut_ratio = mean(cut_value) / max_possible_edges. approx_ratio = mean(cut_value) / best_known_cut.

Source code in src/qgate/adapters/qaoa_adapter.py
def extract_cut_quality(
    self,
    raw_results: dict[str, Any],
    postselect: bool = True,
) -> tuple[float, float, int]:
    """Extract the mean cut ratio and approximation ratio.

    Returns (mean_cut_ratio, approx_ratio, total_shots_used).

    ``mean_cut_ratio`` = mean(cut_value) / max_possible_edges.
    ``approx_ratio``   = mean(cut_value) / best_known_cut.
    """
    counts = self._extract_counts(raw_results)
    _, best_cut = best_maxcut(self.n_nodes, self.edges)
    max_edges = len(self.edges)

    if not postselect or self.algorithm_mode == "standard":
        total = sum(counts.values())
        if total == 0:
            return 0.0, 0.0, 0
        total_cut = 0.0
        for key, val in counts.items():
            search = self._extract_search_bits(str(key))
            cv = maxcut_value(search, self.edges)
            total_cut += cv * val
        mean_cut = total_cut / total
        cut_ratio = mean_cut / max_edges if max_edges > 0 else 0.0
        approx_ratio = mean_cut / best_cut if best_cut > 0 else 0.0
        return cut_ratio, approx_ratio, total

    # Post-select on ancilla = 1
    accepted_total = 0
    total_cut = 0.0
    for key, val in counts.items():
        key_str = str(key)
        anc_bit, search_bits = self._split_ancilla_search(key_str)
        if anc_bit == "1":
            accepted_total += val
            cv = maxcut_value(search_bits, self.edges)
            total_cut += cv * val
    if accepted_total == 0:
        return 0.0, 0.0, 0
    mean_cut = total_cut / accepted_total
    cut_ratio = mean_cut / max_edges if max_edges > 0 else 0.0
    approx_ratio = mean_cut / best_cut if best_cut > 0 else 0.0
    return cut_ratio, approx_ratio, accepted_total

extract_best_bitstring(raw_results, postselect=True)

Find the most-sampled bitstring and its cut value.

Returns (bitstring, cut_value, count).

Source code in src/qgate/adapters/qaoa_adapter.py
def extract_best_bitstring(
    self,
    raw_results: dict[str, Any],
    postselect: bool = True,
) -> tuple[str, int, int]:
    """Find the most-sampled bitstring and its cut value.

    Returns (bitstring, cut_value, count).
    """
    counts = self._extract_counts(raw_results)
    best_bs = ""
    best_count = 0
    best_cv = 0

    for key, val in counts.items():
        key_str = str(key)
        if self.algorithm_mode == "tsvf" and postselect:
            anc_bit, search_bits = self._split_ancilla_search(key_str)
            if anc_bit != "1":
                continue
        else:
            search_bits = self._extract_search_bits(key_str)

        cv = maxcut_value(search_bits, self.edges)
        if val > best_count or (val == best_count and cv > best_cv):
            best_bs = search_bits
            best_count = val
            best_cv = cv

    return best_bs, best_cv, best_count

random_regular_graph(n_nodes, degree=3, seed=None)

Generate a random regular-ish graph as an edge list.

Falls back to an Erdős–Rényi-like model when exact regular graph construction isn't possible (e.g. odd degree × odd nodes).

Returns:

Type Description
list[tuple[int, int]]

List of (i, j) edges with i < j.

Source code in src/qgate/adapters/qaoa_adapter.py
def random_regular_graph(
    n_nodes: int,
    degree: int = 3,
    seed: int | None = None,
) -> list[tuple[int, int]]:
    """Generate a random regular-ish graph as an edge list.

    Falls back to an Erdős–Rényi-like model when exact regular graph
    construction isn't possible (e.g. odd degree × odd nodes).

    Returns:
        List of (i, j) edges with i < j.
    """
    rng = np.random.default_rng(seed)
    edges: set[tuple[int, int]] = set()

    # Build a random graph with target average degree
    target_edges = (n_nodes * degree) // 2
    attempts = 0
    while len(edges) < target_edges and attempts < target_edges * 50:
        i = int(rng.integers(0, n_nodes))
        j = int(rng.integers(0, n_nodes))
        if i != j:
            edge = (min(i, j), max(i, j))
            edges.add(edge)
        attempts += 1

    # Ensure graph is connected — add a spanning path if needed
    visited = {0}
    queue = [0]
    while queue:
        node = queue.pop(0)
        for a, b in edges:
            other = b if a == node else (a if b == node else None)
            if other is not None and other not in visited:
                visited.add(other)
                queue.append(other)
    for node in range(n_nodes):
        if node not in visited:
            prev = node - 1
            edge = (min(prev, node), max(prev, node))
            edges.add(edge)
            visited.add(node)

    return sorted(edges)

maxcut_value(bitstring, edges)

Compute the MaxCut value for a bitstring partition.

Source code in src/qgate/adapters/qaoa_adapter.py
def maxcut_value(bitstring: str, edges: list[tuple[int, int]]) -> int:
    """Compute the MaxCut value for a bitstring partition."""
    cut = 0
    for i, j in edges:
        if i < len(bitstring) and j < len(bitstring) and bitstring[i] != bitstring[j]:
            cut += 1
    return cut

best_maxcut(n_nodes, edges)

Brute-force the best MaxCut solution (only for small graphs).

Source code in src/qgate/adapters/qaoa_adapter.py
def best_maxcut(n_nodes: int, edges: list[tuple[int, int]]) -> tuple[str, int]:
    """Brute-force the best MaxCut solution (only for small graphs)."""
    best_bs = "0" * n_nodes
    best_val = 0
    for x in range(2**n_nodes):
        bs = format(x, f"0{n_nodes}b")
        val = maxcut_value(bs, edges)
        if val > best_val:
            best_val = val
            best_bs = bs
    return best_bs, best_val

qgate.adapters.vqe_adapter

vqe_adapter.py — Adapter for VQE / TSVF-VQE experiments.

Maps Variational Quantum Eigensolver (VQE) circuits with an ancilla-based post-selection probe onto qgate's :class:ParityOutcome model, enabling the full trajectory filtering pipeline (scoring → thresholding → conditioning) to work on ground-state-energy estimation — specifically the Transverse-Field Ising Model (TFIM).

Problem — Transverse-Field Ising Model (TFIM): H = −J Σ_{} Z_i Z_j − h Σ_i X_i

where J is the coupling strength and h is the transverse field. For a 1D chain of n qubits with open boundary conditions: H = −J Σ_{i=0}^{n-2} Z_i Z_{i+1} − h Σ_{i=0}^{n-1} X_i

The ground-state energy can be computed classically for benchmarking.

Mapping to ParityOutcome: - n_subsystems = number of qubits in the system. - n_cycles = number of ansatz layers (depth). - parity_matrix[cycle, sub] = 0 if qubit sub contributes to a low-energy configuration at layer cycle (via energy probe), 1 otherwise. This lets qgate's score_fusion, thresholding, and hierarchical conditioning rules apply naturally.

The adapter supports two algorithm variants via algorithm_mode: - "standard" — Hardware-efficient ansatz (Ry+Rz + CNOT entangling). - "tsvf" — Hardware-efficient ansatz + chaotic entangling layers + weak-measurement ancilla per layer (post-selection trajectory filter).

Patent pending (see LICENSE)

VQETSVFAdapter

Bases: BaseAdapter

Adapter for VQE / TSVF-VQE ground-state energy experiments.

This adapter builds VQE circuits for the Transverse-Field Ising Model (TFIM), executes them on a Qiskit backend, and maps the raw results onto ParityOutcome objects that the rest of qgate can score and threshold.

The TFIM Hamiltonian

H = −J Σ Z_i Z_{i+1} − h Σ X_i

Parameters:

Name Type Description Default
backend Any

A Qiskit backend (Aer or IBM Runtime).

None
algorithm_mode str

"standard" or "tsvf" (default "tsvf").

'tsvf'
n_qubits int

Number of system qubits.

4
j_coupling float

ZZ coupling strength J (default 1.0).

1.0
h_field float

Transverse field strength h (default 1.0).

1.0
params ndarray | None

Variational parameters. If None, random init.

None
seed int

RNG seed for parameter init and chaotic ansatz.

42
weak_angle_base float

Base angle for the energy probe (radians).

pi / 4
weak_angle_ramp float

Per-layer angle increase (radians).

pi / 8
optimization_level int

Transpilation optimisation level (0-3).

1
Source code in src/qgate/adapters/vqe_adapter.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
class VQETSVFAdapter(BaseAdapter):
    """Adapter for VQE / TSVF-VQE ground-state energy experiments.

    This adapter builds VQE circuits for the Transverse-Field Ising
    Model (TFIM), executes them on a Qiskit backend, and maps the raw
    results onto ``ParityOutcome`` objects that the rest of qgate can
    score and threshold.

    The TFIM Hamiltonian:
      H = −J Σ Z_i Z_{i+1}  −  h Σ X_i

    Args:
        backend:           A Qiskit backend (Aer or IBM Runtime).
        algorithm_mode:    ``"standard"`` or ``"tsvf"`` (default ``"tsvf"``).
        n_qubits:          Number of system qubits.
        j_coupling:        ZZ coupling strength J (default 1.0).
        h_field:           Transverse field strength h (default 1.0).
        params:            Variational parameters. If None, random init.
        seed:              RNG seed for parameter init and chaotic ansatz.
        weak_angle_base:   Base angle for the energy probe (radians).
        weak_angle_ramp:   Per-layer angle increase (radians).
        optimization_level: Transpilation optimisation level (0-3).
    """

    def __init__(
        self,
        backend: Any = None,
        *,
        algorithm_mode: str = "tsvf",
        n_qubits: int = 4,
        j_coupling: float = 1.0,
        h_field: float = 1.0,
        params: np.ndarray | None = None,
        seed: int = 42,
        weak_angle_base: float = math.pi / 4,
        weak_angle_ramp: float = math.pi / 8,
        optimization_level: int = 1,
    ) -> None:
        if not _HAS_QISKIT:  # pragma: no cover
            raise ImportError(
                "VQETSVFAdapter requires Qiskit. Install with: pip install qgate[qiskit]"
            )
        self.backend = backend
        self.algorithm_mode = algorithm_mode
        self.n_qubits = n_qubits
        self.j_coupling = j_coupling
        self.h_field = h_field
        self.seed = seed
        self.weak_angle_base = weak_angle_base
        self.weak_angle_ramp = weak_angle_ramp
        self.optimization_level = optimization_level

        # Variational parameters: shape (n_layers, n_qubits, 2)
        # Will be set per build_circuit call if not pre-specified.
        self._params = params

    def _get_params(
        self,
        n_layers: int,
        rng: np.random.Generator,
    ) -> np.ndarray:
        """Resolve variational parameters for n_layers.

        Shape: (n_layers, n_qubits, 2) — [θ_ry, θ_rz] per qubit per layer.
        """
        if self._params is not None:
            p = np.array(self._params)
            if p.ndim == 2:
                # (n_qubits, 2) → replicate for all layers
                return np.tile(p, (n_layers, 1, 1))
            if p.ndim == 3 and p.shape[0] >= n_layers:
                return p[:n_layers]
            # Pad with random if not enough layers
            if p.ndim == 3:
                extra = rng.uniform(
                    -math.pi,
                    math.pi,
                    size=(n_layers - p.shape[0], self.n_qubits, 2),
                )
                return np.concatenate([p, extra], axis=0)

        # Random initialisation — identity-biased with layer-scaled
        # perturbation to avoid barren plateaus.  Early layers get larger
        # rotations (π/4 scale) to break symmetry, later layers get smaller
        # rotations (decay ∝ 1/√L) to preserve learned structure.
        params = np.zeros((n_layers, self.n_qubits, 2))
        for layer in range(n_layers):
            scale = (math.pi / 4) / math.sqrt(1 + layer)
            params[layer] = rng.uniform(-scale, scale, size=(self.n_qubits, 2))
        return params

    # ------------------------------------------------------------------
    # BaseAdapter interface
    # ------------------------------------------------------------------

    def build_circuit(
        self,
        n_subsystems: int,
        n_cycles: int,
        **kwargs: Any,
    ) -> QuantumCircuit:
        """Build the VQE circuit.

        ``n_subsystems`` = number of qubits (must match ``n_qubits``).
        ``n_cycles`` = number of ansatz layers (depth).

        Returns a :class:`QuantumCircuit`.
        """
        if n_subsystems != self.n_qubits:
            raise ValueError(
                f"n_subsystems ({n_subsystems}) must match n_qubits ({self.n_qubits})"
            )
        if self.algorithm_mode == "standard":
            return self._build_standard(n_subsystems, n_cycles, **kwargs)
        elif self.algorithm_mode == "tsvf":
            seed_offset = kwargs.get("seed_offset", 0)
            return self._build_tsvf(n_subsystems, n_cycles, seed_offset)
        else:
            raise ValueError(
                f"Unknown algorithm_mode: {self.algorithm_mode!r}. Use 'standard' or 'tsvf'."
            )

    def run(
        self,
        circuit: Any,
        shots: int,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Execute the circuit and return a raw result dict.

        Tries SamplerV2 first, falls back to ``backend.run()``.
        """
        if self.backend is None:
            raise RuntimeError("No backend configured for VQETSVFAdapter")

        try:
            from qiskit.transpiler.preset_passmanagers import (
                generate_preset_pass_manager,
            )
            from qiskit_ibm_runtime import SamplerV2 as Sampler

            pm = generate_preset_pass_manager(
                backend=self.backend,
                optimization_level=self.optimization_level,
            )
            isa = pm.run(circuit)
            job = Sampler(mode=self.backend).run([isa], shots=shots)
            result = job.result()
            pub = result[0]
            return {
                "pub_result": pub,
                "circuit": circuit,
                "shots": shots,
            }
        except (ImportError, Exception):
            pass

        transpiled = transpile(
            circuit,
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        job = self.backend.run(transpiled, shots=shots)
        result = job.result()
        return {
            "counts": result.get_counts(0),
            "circuit": circuit,
            "shots": shots,
        }

    def parse_results(
        self,
        raw_results: Any,
        n_subsystems: int,
        n_cycles: int,
    ) -> list[ParityOutcome]:
        """Parse raw Qiskit results into ParityOutcome objects.

        Each shot → one ParityOutcome.  The parity matrix records per-
        layer, per-qubit: 0 if the qubit contributes to a low-energy
        configuration, 1 otherwise.

        For the TSVF variant the ancilla measurement at each layer
        provides the "energy quality probe".  For the standard variant
        we evaluate from the final measurement against the ground state.
        """
        counts = self._extract_counts(raw_results)

        outcomes: list[ParityOutcome] = []
        for bitstring, count in counts.items():
            row = self._bitstring_to_parity_row(bitstring, n_subsystems, n_cycles)
            for _ in range(count):
                outcomes.append(
                    ParityOutcome(
                        n_subsystems=n_subsystems,
                        n_cycles=n_cycles,
                        parity_matrix=row.copy(),
                    )
                )
        return outcomes

    # ------------------------------------------------------------------
    # Public helpers (beyond BaseAdapter)
    # ------------------------------------------------------------------

    def get_transpiled_depth(self, circuit: QuantumCircuit) -> int:
        """Return the depth of the transpiled circuit."""
        transpiled = transpile(
            circuit,
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        return int(transpiled.depth())

    def extract_energy(
        self,
        raw_results: dict[str, Any],
        postselect: bool = True,
    ) -> tuple[float, int]:
        """Extract the estimated ZZ energy from measurement results.

        Returns (estimated_energy, total_shots_used).

        For TSVF mode with postselect=True, only ancilla=1 shots are used.
        """
        counts = self._extract_counts(raw_results)

        if not postselect or self.algorithm_mode == "standard":
            search_counts = self._to_search_counts(counts, postselect=False)
            total = sum(search_counts.values())
            if total == 0:
                return 0.0, 0
            e = estimate_energy_from_counts(
                search_counts,
                self.n_qubits,
                self.j_coupling,
                self.h_field,
            )
            return e, total

        # Post-select on ancilla = 1
        search_counts = self._to_search_counts(counts, postselect=True)
        total = sum(search_counts.values())
        if total == 0:
            return 0.0, 0
        e = estimate_energy_from_counts(
            search_counts,
            self.n_qubits,
            self.j_coupling,
            self.h_field,
        )
        return e, total

    def extract_energy_ratio(
        self,
        raw_results: dict[str, Any],
        postselect: bool = True,
    ) -> tuple[float, float, int]:
        """Extract energy ratio relative to exact ground state.

        Returns (energy_ratio, energy_error, n_shots_used).
        energy_ratio = estimated / exact (closer to 1.0 is better).
        """
        est_energy, n_used = self.extract_energy(raw_results, postselect)
        exact = tfim_exact_ground_energy(
            self.n_qubits,
            self.j_coupling,
            self.h_field,
        )
        ratio = energy_ratio(est_energy, exact)
        err = energy_error(est_energy, exact)
        return ratio, err, n_used

    def extract_best_bitstring(
        self,
        raw_results: dict[str, Any],
        postselect: bool = True,
    ) -> tuple[str, float, int]:
        """Find the most-sampled bitstring and its energy.

        Returns (bitstring, energy, count).
        """
        counts = self._extract_counts(raw_results)
        best_bs = ""
        best_count = 0
        best_energy = 0.0

        for key, val in counts.items():
            key_str = str(key)
            if self.algorithm_mode == "tsvf" and postselect:
                anc_bit, search_bits = self._split_ancilla_search(key_str)
                if anc_bit != "1":
                    continue
            else:
                search_bits = self._extract_search_bits(key_str)

            e = compute_energy_from_bitstring(
                search_bits,
                self.n_qubits,
                self.j_coupling,
                self.h_field,
            )
            if val > best_count or (val == best_count and e < best_energy):
                best_bs = search_bits
                best_count = val
                best_energy = e

        return best_bs, best_energy, best_count

    def get_exact_ground_energy(self) -> float:
        """Return the exact ground-state energy for this TFIM instance."""
        return tfim_exact_ground_energy(
            self.n_qubits,
            self.j_coupling,
            self.h_field,
        )

    # ------------------------------------------------------------------
    # Private circuit builders
    # ------------------------------------------------------------------

    def _build_standard(
        self,
        n_sub: int,
        n_layers: int,
        **kwargs: Any,
    ) -> QuantumCircuit:
        """Standard VQE: hardware-efficient ansatz, no ancilla."""
        qr = QuantumRegister(n_sub, "q")
        cr = ClassicalRegister(n_sub, "c")
        qc = QuantumCircuit(qr, cr)
        qubits = list(range(n_sub))

        seed_offset = kwargs.get("seed_offset", 0)
        rng = np.random.default_rng(self.seed + seed_offset)
        params = self._get_params(n_layers, rng)

        # Initial state: |+⟩^n (good for TFIM with transverse field)
        for q in qubits:
            qc.h(q)

        for layer in range(n_layers):
            _hardware_efficient_layer(qc, qubits, params[layer])
            qc.barrier()

        qc.measure(qubits, list(range(n_sub)))
        return qc

    def _build_tsvf(
        self,
        n_sub: int,
        n_layers: int,
        seed_offset: int = 0,
    ) -> QuantumCircuit:
        """TSVF VQE: HW-efficient ansatz + chaotic layers + ancilla probe."""
        qr = QuantumRegister(n_sub, "q")
        anc_r = QuantumRegister(1, "anc")
        cr = ClassicalRegister(n_sub, "c_sys")
        cr_anc = ClassicalRegister(1, "c_anc")
        qc = QuantumCircuit(qr, anc_r, cr, cr_anc)

        qubits = list(range(n_sub))
        anc_qubit = n_sub
        rng = np.random.default_rng(self.seed + seed_offset)
        params = self._get_params(n_layers, rng)

        # Initial state: |+⟩^n
        for q in qubits:
            qc.h(q)

        for layer in range(n_layers):
            # Hardware-efficient ansatz layer
            _hardware_efficient_layer(qc, qubits, params[layer])
            qc.barrier()

            # Chaotic entangling ansatz
            _chaotic_vqe_ansatz(qc, qubits, layer=layer, rng=rng)
            qc.barrier()

            # Reset ancilla for reuse (except first layer)
            if layer > 0:
                qc.reset(anc_qubit)

            # Weak-measurement energy probe
            angle = self.weak_angle_base + self.weak_angle_ramp * min(layer, 4)
            _add_energy_probe_ancilla(
                qc,
                qubits,
                anc_qubit,
                cr_anc[0],
                n_qubits=self.n_qubits,
                weak_angle=angle,
            )
            qc.barrier()

        qc.measure(qubits, list(range(n_sub)))
        return qc

    # ------------------------------------------------------------------
    # Private result parsing helpers
    # ------------------------------------------------------------------

    def _extract_counts(self, raw_results: Any) -> dict[str, int]:
        """Extract a counts dict from raw run() output."""
        if isinstance(raw_results, dict):
            if "counts" in raw_results:
                return self._normalise_counts(raw_results["counts"])
            if "pub_result" in raw_results:
                return self._counts_from_pub(
                    raw_results["pub_result"],
                    raw_results.get("circuit"),
                )
        return self._normalise_counts(raw_results)

    def _normalise_counts(self, counts: dict) -> dict[str, int]:
        """Ensure keys are bitstrings and values are ints."""
        out: dict[str, int] = {}
        for k, v in counts.items():
            out[str(k)] = int(v)
        return out

    def _counts_from_pub(self, pub: Any, circuit: Any) -> dict[str, int]:
        """Extract per-shot combined bitstrings from a SamplerV2 PubResult."""
        creg_names = [cr.name for cr in circuit.cregs] if circuit else []
        if len(creg_names) <= 1:
            name = creg_names[0] if creg_names else "c"
            try:
                return {str(k): int(v) for k, v in pub.data[name].get_counts().items()}
            except Exception:
                return {}

        # Multi-register: reconstruct combined bitstrings
        try:
            reg_bitstrings: dict[str, Any] = {}
            for name in creg_names:
                reg_bitstrings[name] = pub.data[name].get_bitstrings()
            num_shots = len(reg_bitstrings[creg_names[0]])
            combined: dict[str, int] = {}
            for i in range(num_shots):
                parts = []
                for name in reversed(creg_names):
                    parts.append(reg_bitstrings[name][i])
                full = " ".join(parts)
                combined[full] = combined.get(full, 0) + 1
            return combined
        except Exception:
            name = creg_names[0]
            try:
                return {str(k): int(v) for k, v in pub.data[name].get_counts().items()}
            except Exception:
                return {}

    def _to_search_counts(
        self,
        counts: dict[str, int],
        postselect: bool,
    ) -> dict[str, int]:
        """Convert raw counts to search-register-only counts.

        If postselect=True and mode is tsvf, only keep ancilla=1 shots.
        """
        search_counts: dict[str, int] = {}
        for key, val in counts.items():
            key_str = str(key)
            if self.algorithm_mode == "tsvf" and postselect:
                anc_bit, search_bits = self._split_ancilla_search(key_str)
                if anc_bit != "1":
                    continue
            else:
                search_bits = self._extract_search_bits(key_str)
            search_counts[search_bits] = search_counts.get(search_bits, 0) + val
        return search_counts

    def _extract_search_bits(self, bitstring: str) -> str:
        """Extract the system register bits from a bitstring."""
        key = bitstring.strip()
        if " " in key:
            return key.split()[-1]
        return key[-self.n_qubits :]

    def _split_ancilla_search(self, bitstring: str) -> tuple[str, str]:
        """Split bitstring into (ancilla_bit, search_bits)."""
        key = bitstring.strip()
        if " " in key:
            parts = key.split()
            return parts[0], parts[-1]
        return key[0], key[1:]

    def _bitstring_to_parity_row(
        self,
        bitstring: str,
        n_subsystems: int,
        n_cycles: int,
    ) -> np.ndarray:
        """Convert a measurement bitstring to a parity matrix.

        For the TSVF variant:
          - Ancilla=1 → evaluate per-qubit energy contribution:
            0 if qubit is in an aligned pair (low energy), 1 otherwise.
          - Ancilla=0 → row of 1s (all fail — no evidence of low energy).

        For the standard variant:
          - Evaluate per-qubit alignment from final measurement.
          - Replicate across all cycles.

        Shape: (n_cycles, n_subsystems).
        """
        if self.algorithm_mode == "tsvf":
            anc_bit, search_bits = self._split_ancilla_search(bitstring)

            if anc_bit == "1":
                qubit_quality = self._compute_qubit_energy_quality(
                    search_bits,
                    n_subsystems,
                )
                matrix = np.tile(qubit_quality, (n_cycles, 1))
            else:
                matrix = np.ones((n_cycles, n_subsystems), dtype=np.int8)
        else:
            search_bits = self._extract_search_bits(bitstring)
            qubit_quality = self._compute_qubit_energy_quality(
                search_bits,
                n_subsystems,
            )
            matrix = np.tile(qubit_quality, (n_cycles, 1))

        return matrix

    def _compute_qubit_energy_quality(
        self,
        search_bits: str,
        n_subsystems: int,
    ) -> np.ndarray:
        """Compute per-qubit energy quality.

        0 = qubit is aligned with at least one neighbour (low ZZ energy),
        1 = qubit is anti-aligned with all neighbours (high ZZ energy).

        For the TFIM with nearest-neighbour coupling, qubit i is "good"
        if it matches at least one of its neighbours (i−1 or i+1).
        """
        quality = np.ones(n_subsystems, dtype=np.int8)  # default: fail
        bits = search_bits[-n_subsystems:]
        for i in range(min(len(bits), n_subsystems)):
            # Check left neighbour
            if i > 0 and i - 1 < len(bits) and bits[i] == bits[i - 1]:
                quality[i] = 0
                continue
            # Check right neighbour
            if i < n_subsystems - 1 and i + 1 < len(bits) and bits[i] == bits[i + 1]:
                quality[i] = 0
        return quality

build_circuit(n_subsystems, n_cycles, **kwargs)

Build the VQE circuit.

n_subsystems = number of qubits (must match n_qubits). n_cycles = number of ansatz layers (depth).

Returns a :class:QuantumCircuit.

Source code in src/qgate/adapters/vqe_adapter.py
def build_circuit(
    self,
    n_subsystems: int,
    n_cycles: int,
    **kwargs: Any,
) -> QuantumCircuit:
    """Build the VQE circuit.

    ``n_subsystems`` = number of qubits (must match ``n_qubits``).
    ``n_cycles`` = number of ansatz layers (depth).

    Returns a :class:`QuantumCircuit`.
    """
    if n_subsystems != self.n_qubits:
        raise ValueError(
            f"n_subsystems ({n_subsystems}) must match n_qubits ({self.n_qubits})"
        )
    if self.algorithm_mode == "standard":
        return self._build_standard(n_subsystems, n_cycles, **kwargs)
    elif self.algorithm_mode == "tsvf":
        seed_offset = kwargs.get("seed_offset", 0)
        return self._build_tsvf(n_subsystems, n_cycles, seed_offset)
    else:
        raise ValueError(
            f"Unknown algorithm_mode: {self.algorithm_mode!r}. Use 'standard' or 'tsvf'."
        )

run(circuit, shots, **kwargs)

Execute the circuit and return a raw result dict.

Tries SamplerV2 first, falls back to backend.run().

Source code in src/qgate/adapters/vqe_adapter.py
def run(
    self,
    circuit: Any,
    shots: int,
    **kwargs: Any,
) -> dict[str, Any]:
    """Execute the circuit and return a raw result dict.

    Tries SamplerV2 first, falls back to ``backend.run()``.
    """
    if self.backend is None:
        raise RuntimeError("No backend configured for VQETSVFAdapter")

    try:
        from qiskit.transpiler.preset_passmanagers import (
            generate_preset_pass_manager,
        )
        from qiskit_ibm_runtime import SamplerV2 as Sampler

        pm = generate_preset_pass_manager(
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        isa = pm.run(circuit)
        job = Sampler(mode=self.backend).run([isa], shots=shots)
        result = job.result()
        pub = result[0]
        return {
            "pub_result": pub,
            "circuit": circuit,
            "shots": shots,
        }
    except (ImportError, Exception):
        pass

    transpiled = transpile(
        circuit,
        backend=self.backend,
        optimization_level=self.optimization_level,
    )
    job = self.backend.run(transpiled, shots=shots)
    result = job.result()
    return {
        "counts": result.get_counts(0),
        "circuit": circuit,
        "shots": shots,
    }

parse_results(raw_results, n_subsystems, n_cycles)

Parse raw Qiskit results into ParityOutcome objects.

Each shot → one ParityOutcome. The parity matrix records per- layer, per-qubit: 0 if the qubit contributes to a low-energy configuration, 1 otherwise.

For the TSVF variant the ancilla measurement at each layer provides the "energy quality probe". For the standard variant we evaluate from the final measurement against the ground state.

Source code in src/qgate/adapters/vqe_adapter.py
def parse_results(
    self,
    raw_results: Any,
    n_subsystems: int,
    n_cycles: int,
) -> list[ParityOutcome]:
    """Parse raw Qiskit results into ParityOutcome objects.

    Each shot → one ParityOutcome.  The parity matrix records per-
    layer, per-qubit: 0 if the qubit contributes to a low-energy
    configuration, 1 otherwise.

    For the TSVF variant the ancilla measurement at each layer
    provides the "energy quality probe".  For the standard variant
    we evaluate from the final measurement against the ground state.
    """
    counts = self._extract_counts(raw_results)

    outcomes: list[ParityOutcome] = []
    for bitstring, count in counts.items():
        row = self._bitstring_to_parity_row(bitstring, n_subsystems, n_cycles)
        for _ in range(count):
            outcomes.append(
                ParityOutcome(
                    n_subsystems=n_subsystems,
                    n_cycles=n_cycles,
                    parity_matrix=row.copy(),
                )
            )
    return outcomes

get_transpiled_depth(circuit)

Return the depth of the transpiled circuit.

Source code in src/qgate/adapters/vqe_adapter.py
def get_transpiled_depth(self, circuit: QuantumCircuit) -> int:
    """Return the depth of the transpiled circuit."""
    transpiled = transpile(
        circuit,
        backend=self.backend,
        optimization_level=self.optimization_level,
    )
    return int(transpiled.depth())

extract_energy(raw_results, postselect=True)

Extract the estimated ZZ energy from measurement results.

Returns (estimated_energy, total_shots_used).

For TSVF mode with postselect=True, only ancilla=1 shots are used.

Source code in src/qgate/adapters/vqe_adapter.py
def extract_energy(
    self,
    raw_results: dict[str, Any],
    postselect: bool = True,
) -> tuple[float, int]:
    """Extract the estimated ZZ energy from measurement results.

    Returns (estimated_energy, total_shots_used).

    For TSVF mode with postselect=True, only ancilla=1 shots are used.
    """
    counts = self._extract_counts(raw_results)

    if not postselect or self.algorithm_mode == "standard":
        search_counts = self._to_search_counts(counts, postselect=False)
        total = sum(search_counts.values())
        if total == 0:
            return 0.0, 0
        e = estimate_energy_from_counts(
            search_counts,
            self.n_qubits,
            self.j_coupling,
            self.h_field,
        )
        return e, total

    # Post-select on ancilla = 1
    search_counts = self._to_search_counts(counts, postselect=True)
    total = sum(search_counts.values())
    if total == 0:
        return 0.0, 0
    e = estimate_energy_from_counts(
        search_counts,
        self.n_qubits,
        self.j_coupling,
        self.h_field,
    )
    return e, total

extract_energy_ratio(raw_results, postselect=True)

Extract energy ratio relative to exact ground state.

Returns (energy_ratio, energy_error, n_shots_used). energy_ratio = estimated / exact (closer to 1.0 is better).

Source code in src/qgate/adapters/vqe_adapter.py
def extract_energy_ratio(
    self,
    raw_results: dict[str, Any],
    postselect: bool = True,
) -> tuple[float, float, int]:
    """Extract energy ratio relative to exact ground state.

    Returns (energy_ratio, energy_error, n_shots_used).
    energy_ratio = estimated / exact (closer to 1.0 is better).
    """
    est_energy, n_used = self.extract_energy(raw_results, postselect)
    exact = tfim_exact_ground_energy(
        self.n_qubits,
        self.j_coupling,
        self.h_field,
    )
    ratio = energy_ratio(est_energy, exact)
    err = energy_error(est_energy, exact)
    return ratio, err, n_used

extract_best_bitstring(raw_results, postselect=True)

Find the most-sampled bitstring and its energy.

Returns (bitstring, energy, count).

Source code in src/qgate/adapters/vqe_adapter.py
def extract_best_bitstring(
    self,
    raw_results: dict[str, Any],
    postselect: bool = True,
) -> tuple[str, float, int]:
    """Find the most-sampled bitstring and its energy.

    Returns (bitstring, energy, count).
    """
    counts = self._extract_counts(raw_results)
    best_bs = ""
    best_count = 0
    best_energy = 0.0

    for key, val in counts.items():
        key_str = str(key)
        if self.algorithm_mode == "tsvf" and postselect:
            anc_bit, search_bits = self._split_ancilla_search(key_str)
            if anc_bit != "1":
                continue
        else:
            search_bits = self._extract_search_bits(key_str)

        e = compute_energy_from_bitstring(
            search_bits,
            self.n_qubits,
            self.j_coupling,
            self.h_field,
        )
        if val > best_count or (val == best_count and e < best_energy):
            best_bs = search_bits
            best_count = val
            best_energy = e

    return best_bs, best_energy, best_count

get_exact_ground_energy()

Return the exact ground-state energy for this TFIM instance.

Source code in src/qgate/adapters/vqe_adapter.py
def get_exact_ground_energy(self) -> float:
    """Return the exact ground-state energy for this TFIM instance."""
    return tfim_exact_ground_energy(
        self.n_qubits,
        self.j_coupling,
        self.h_field,
    )

tfim_exact_ground_energy(n_qubits, j_coupling=1.0, h_field=1.0)

Compute exact ground-state energy of the 1D TFIM.

H = −J Σ_{i} Z_i Z_{i+1} − h Σ_{i} X_i

Uses sparse Hamiltonian construction + Lanczos (ARPACK) for the ground state, so it scales comfortably to 20+ qubits on a laptop.

For very small systems (≤ 12 qubits) a dense fallback is used because ARPACK can occasionally be slower for tiny matrices.

Returns:

Type Description
float

The minimum eigenvalue of H.

Source code in src/qgate/adapters/vqe_adapter.py
def tfim_exact_ground_energy(
    n_qubits: int,
    j_coupling: float = 1.0,
    h_field: float = 1.0,
) -> float:
    """Compute exact ground-state energy of the 1D TFIM.

    H = −J Σ_{i} Z_i Z_{i+1}  −  h Σ_{i} X_i

    Uses **sparse** Hamiltonian construction + Lanczos (ARPACK) for the
    ground state, so it scales comfortably to 20+ qubits on a laptop.

    For very small systems (≤ 12 qubits) a dense fallback is used because
    ARPACK can occasionally be slower for tiny matrices.

    Returns:
        The minimum eigenvalue of H.
    """
    dim = 2**n_qubits

    # Sparse Pauli matrices
    eye2 = sp.eye(2, format="csc")
    pauli_z = sp.diags([1.0, -1.0], format="csc")
    pauli_x = sp.csc_matrix(np.array([[0.0, 1.0], [1.0, 0.0]]))

    ham = sp.csc_matrix((dim, dim), dtype=np.float64)

    # ZZ coupling: -J Sum Z_i Z_{i+1}
    for i in range(n_qubits - 1):
        ops: list[sp.spmatrix] = [eye2] * n_qubits
        ops[i] = pauli_z
        ops[i + 1] = pauli_z
        ham = ham - j_coupling * _sparse_kron_chain(ops)

    # Transverse field: -h Sum X_i
    for i in range(n_qubits):
        ops = [eye2] * n_qubits
        ops[i] = pauli_x
        ham = ham - h_field * _sparse_kron_chain(ops)

    # For small systems, dense is fine and avoids ARPACK edge cases
    if n_qubits <= 12:
        eigenvalues = np.linalg.eigvalsh(ham.toarray())
        return float(eigenvalues[0])

    # Lanczos / ARPACK for largest systems — find 1 smallest eigenvalue
    eigenvalues, _ = spla.eigsh(ham, k=1, which="SA")
    return float(eigenvalues[0])

compute_energy_from_bitstring(bitstring, n_qubits, j_coupling=1.0, h_field=0.0)

Compute the diagonal (ZZ) energy of a computational-basis state.

Since X_i terms are off-diagonal, they don't contribute to individual computational-basis expectations. The ZZ part gives: E_ZZ = −J Σ_{i} s_i · s_{i+1} where s_i = +1 if bit=0, −1 if bit=1.

This is the energy that can be estimated from measurement counts.

Returns:

Type Description
float

The ZZ contribution to the energy.

Source code in src/qgate/adapters/vqe_adapter.py
def compute_energy_from_bitstring(
    bitstring: str,
    n_qubits: int,
    j_coupling: float = 1.0,
    h_field: float = 0.0,
) -> float:
    """Compute the diagonal (ZZ) energy of a computational-basis state.

    Since X_i terms are off-diagonal, they don't contribute to
    individual computational-basis expectations.  The ZZ part gives:
      E_ZZ = −J Σ_{i} s_i · s_{i+1}
    where s_i = +1 if bit=0, −1 if bit=1.

    This is the energy that can be estimated from measurement counts.

    Returns:
        The ZZ contribution to the energy.
    """
    bits = [int(b) for b in bitstring[-n_qubits:]]
    spins = [1 - 2 * b for b in bits]  # 0->+1, 1->-1

    energy = 0.0
    for i in range(len(spins) - 1):
        energy -= j_coupling * spins[i] * spins[i + 1]
    return energy

estimate_energy_from_counts(counts, n_qubits, j_coupling=1.0, h_field=0.0)

Estimate the ZZ energy from measurement counts.

Returns the weighted-average diagonal energy.

Source code in src/qgate/adapters/vqe_adapter.py
def estimate_energy_from_counts(
    counts: dict[str, int],
    n_qubits: int,
    j_coupling: float = 1.0,
    h_field: float = 0.0,
) -> float:
    """Estimate the ZZ energy from measurement counts.

    Returns the weighted-average diagonal energy.
    """
    total_shots = sum(counts.values())
    if total_shots == 0:
        return 0.0
    total_energy = 0.0
    for bs, cnt in counts.items():
        e = compute_energy_from_bitstring(bs, n_qubits, j_coupling, h_field)
        total_energy += e * cnt
    return total_energy / total_shots

energy_error(estimated, exact)

Absolute energy error: |estimated − exact|.

Source code in src/qgate/adapters/vqe_adapter.py
def energy_error(
    estimated: float,
    exact: float,
) -> float:
    """Absolute energy error: |estimated − exact|."""
    return abs(estimated - exact)

energy_ratio(estimated, exact)

Energy ratio: estimated / exact.

For ground-state search, a ratio closer to 1.0 is better (the estimated energy approaches the true ground-state energy). The exact ground-state energy is negative for TFIM, so ratio > 1 means we overestimate (too negative = too good), ratio < 1 means we underestimate (not negative enough).

Source code in src/qgate/adapters/vqe_adapter.py
def energy_ratio(
    estimated: float,
    exact: float,
) -> float:
    """Energy ratio: estimated / exact.

    For ground-state search, a ratio closer to 1.0 is better
    (the estimated energy approaches the true ground-state energy).
    The exact ground-state energy is negative for TFIM, so
    ratio > 1 means we overestimate (too negative = too good),
    ratio < 1 means we underestimate (not negative enough).
    """
    if abs(exact) < 1e-12:
        return 0.0
    return estimated / exact

qgate.adapters.qpe_adapter

qpe_adapter.py — Adapter for QPE / TSVF-QPE experiments.

Maps Quantum Phase Estimation (QPE) circuits with an ancilla-based post-selection probe onto qgate's :class:ParityOutcome model, enabling the full trajectory filtering pipeline (scoring → thresholding → conditioning) to work on eigenvalue estimation.

Problem — Quantum Phase Estimation: Given a unitary U and its eigenstate |ψ⟩ such that U|ψ⟩ = e^{2πiφ}|ψ⟩, QPE estimates the phase φ to t-bit binary precision using a register of t "precision" (counting) qubits.

Target unitary: U = Rz(2πφ), with eigenstate |1⟩ (eigenvalue e^{-iπφ}) or equivalently a diagonal unitary diag(1, e^{2πiφ}).

Mapping to ParityOutcome: - n_subsystems = number of precision qubits (t). - n_cycles = 1 (QPE is a single-shot algorithm per run). - parity_matrix[0, k] = 0 if precision qubit k matches the correct phase bit, 1 otherwise. This lets qgate's score_fusion, thresholding, and hierarchical conditioning rules apply naturally.

The adapter supports two algorithm variants via algorithm_mode: - "standard" — Canonical QPE (Hadamards + controlled-U^{2^k} + inverse QFT). - "tsvf" — QPE + chaotic entangling ansatz + weak-measurement ancilla (post-selection trajectory filter) that rewards phase states close to the correct answer.

Patent pending (see LICENSE)

QPETSVFAdapter

Bases: BaseAdapter

Adapter for QPE / TSVF-QPE phase estimation experiments.

This adapter builds QPE circuits for estimating the eigenphase of a unitary operator, executes them on a Qiskit backend, and maps the raw results onto ParityOutcome objects that the rest of qgate can score and threshold.

Target unitary: U = diag(1, e^{2πiφ}) with eigenstate |1⟩ and eigenphase φ.

Parameters:

Name Type Description Default
backend Any

A Qiskit backend (Aer or IBM Runtime).

None
algorithm_mode str

"standard" or "tsvf" (default "tsvf").

'tsvf'
eigenphase float

The true phase φ ∈ [0, 1) (default 1/3).

1.0 / 3.0
seed int

RNG seed for chaotic ansatz.

42
weak_angle_base float

Base angle for the phase probe (radians).

pi / 4
weak_angle_ramp float

Per-precision-qubit angle increase (radians).

pi / 8
optimization_level int

Transpilation optimisation level (0-3).

1
Source code in src/qgate/adapters/qpe_adapter.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
class QPETSVFAdapter(BaseAdapter):
    """Adapter for QPE / TSVF-QPE phase estimation experiments.

    This adapter builds QPE circuits for estimating the eigenphase of
    a unitary operator, executes them on a Qiskit backend, and maps
    the raw results onto ``ParityOutcome`` objects that the rest of
    qgate can score and threshold.

    **Target unitary:** ``U = diag(1, e^{2πiφ})``
    with eigenstate |1⟩ and eigenphase φ.

    Args:
        backend:           A Qiskit backend (Aer or IBM Runtime).
        algorithm_mode:    ``"standard"`` or ``"tsvf"`` (default ``"tsvf"``).
        eigenphase:        The true phase φ ∈ [0, 1) (default 1/3).
        seed:              RNG seed for chaotic ansatz.
        weak_angle_base:   Base angle for the phase probe (radians).
        weak_angle_ramp:   Per-precision-qubit angle increase (radians).
        optimization_level: Transpilation optimisation level (0-3).
    """

    def __init__(
        self,
        backend: Any = None,
        *,
        algorithm_mode: str = "tsvf",
        eigenphase: float = 1.0 / 3.0,
        seed: int = 42,
        weak_angle_base: float = math.pi / 4,
        weak_angle_ramp: float = math.pi / 8,
        optimization_level: int = 1,
    ) -> None:
        if not _HAS_QISKIT:  # pragma: no cover
            raise ImportError(
                "QPETSVFAdapter requires Qiskit. Install with: pip install qgate[qiskit]"
            )
        self.backend = backend
        self.algorithm_mode = algorithm_mode
        self.eigenphase = eigenphase
        self.seed = seed
        self.weak_angle_base = weak_angle_base
        self.weak_angle_ramp = weak_angle_ramp
        self.optimization_level = optimization_level

    # ------------------------------------------------------------------
    # BaseAdapter interface
    # ------------------------------------------------------------------

    def build_circuit(
        self,
        n_subsystems: int,
        n_cycles: int,
        **kwargs: Any,
    ) -> QuantumCircuit:
        """Build the QPE circuit.

        ``n_subsystems`` = number of precision qubits (t).
        ``n_cycles`` = 1 (QPE is a single-pass algorithm).  Accepted
        but ignored (always 1 effective cycle).

        Returns a :class:`QuantumCircuit`.
        """
        if self.algorithm_mode == "standard":
            return self._build_standard(n_subsystems)
        elif self.algorithm_mode == "tsvf":
            seed_offset = kwargs.get("seed_offset", 0)
            return self._build_tsvf(n_subsystems, seed_offset)
        else:
            raise ValueError(
                f"Unknown algorithm_mode: {self.algorithm_mode!r}. Use 'standard' or 'tsvf'."
            )

    def run(
        self,
        circuit: Any,
        shots: int,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Execute the circuit and return a raw result dict.

        Tries SamplerV2 first, falls back to ``backend.run()``.
        """
        if self.backend is None:
            raise RuntimeError("No backend configured for QPETSVFAdapter")

        try:
            from qiskit.transpiler.preset_passmanagers import (
                generate_preset_pass_manager,
            )
            from qiskit_ibm_runtime import SamplerV2 as Sampler

            pm = generate_preset_pass_manager(
                backend=self.backend,
                optimization_level=self.optimization_level,
            )
            isa = pm.run(circuit)
            job = Sampler(mode=self.backend).run([isa], shots=shots)
            result = job.result()
            pub = result[0]
            return {
                "pub_result": pub,
                "circuit": circuit,
                "shots": shots,
            }
        except (ImportError, Exception):
            pass

        transpiled = transpile(
            circuit,
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        job = self.backend.run(transpiled, shots=shots)
        result = job.result()
        return {
            "counts": result.get_counts(0),
            "circuit": circuit,
            "shots": shots,
        }

    def parse_results(
        self,
        raw_results: Any,
        n_subsystems: int,
        n_cycles: int,
    ) -> list[ParityOutcome]:
        """Parse raw Qiskit results into ParityOutcome objects.

        Each shot → one ParityOutcome.  The parity matrix records per-
        precision-qubit: 0 if the qubit matches the correct phase bit,
        1 otherwise.

        For the TSVF variant the ancilla measurement provides the
        "phase quality probe".  For the standard variant we evaluate
        from the final measurement against the ideal phase bits.
        """
        counts = self._extract_counts(raw_results)

        # Determine the correct phase bitstring
        correct_bits = phase_to_binary_fraction(self.eigenphase, n_subsystems)

        outcomes: list[ParityOutcome] = []
        for bitstring, count in counts.items():
            row = self._bitstring_to_parity_row(
                bitstring,
                n_subsystems,
                n_cycles,
                correct_bits,
            )
            for _ in range(count):
                outcomes.append(
                    ParityOutcome(
                        n_subsystems=n_subsystems,
                        n_cycles=max(n_cycles, 1),
                        parity_matrix=row.copy(),
                    )
                )
        return outcomes

    # ------------------------------------------------------------------
    # Public helpers (beyond BaseAdapter)
    # ------------------------------------------------------------------

    def get_transpiled_depth(self, circuit: QuantumCircuit) -> int:
        """Return the depth of the transpiled circuit."""
        transpiled = transpile(
            circuit,
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        return int(transpiled.depth())

    def get_correct_phase_bits(self, n_precision: int) -> str:
        """Return the ideal binary-fraction bitstring for the eigenphase."""
        return phase_to_binary_fraction(self.eigenphase, n_precision)

    def extract_phase_metrics(
        self,
        raw_results: dict[str, Any],
        n_precision: int,
        postselect: bool = True,
    ) -> dict[str, float]:
        """Extract comprehensive phase estimation metrics.

        Returns a dict with:
          - ``fidelity``: P(correct phase bitstring)
          - ``mean_phase_error``: weighted-mean circular phase error
          - ``entropy``: Shannon entropy of the phase histogram (bits)
          - ``measured_phase``: most-probable measured phase
          - ``true_phase``: the target eigenphase
          - ``total_shots``: number of shots used (after post-selection)
          - ``acceptance_rate``: fraction of shots accepted (TSVF only)
        """
        counts = self._extract_counts(raw_results)
        correct_bits = phase_to_binary_fraction(self.eigenphase, n_precision)

        if postselect and self.algorithm_mode == "tsvf":
            phase_counts, total_original, accepted_total = self._postselect_phase_counts(
                counts, n_precision
            )
            acceptance_rate = accepted_total / total_original if total_original > 0 else 0.0
        else:
            phase_counts = self._extract_phase_counts(counts, n_precision)
            accepted_total = sum(phase_counts.values())
            acceptance_rate = 1.0

        fid = phase_fidelity(phase_counts, correct_bits)
        mean_err = mean_phase_error(
            phase_counts,
            self.eigenphase,
            n_precision,
        )
        ent = histogram_entropy(phase_counts)

        # Most-probable phase
        if phase_counts:
            best_bs = max(phase_counts, key=phase_counts.get)  # type: ignore[arg-type]
            measured = binary_fraction_to_phase(best_bs[-n_precision:])
        else:
            measured = 0.0

        return {
            "fidelity": fid,
            "mean_phase_error": mean_err,
            "entropy": ent,
            "measured_phase": measured,
            "true_phase": self.eigenphase,
            "total_shots": accepted_total,
            "acceptance_rate": acceptance_rate,
        }

    def extract_best_phase(
        self,
        raw_results: dict[str, Any],
        n_precision: int,
        postselect: bool = True,
    ) -> tuple[str, float, int]:
        """Find the most-sampled phase bitstring.

        Returns (bitstring, phase_value, count).
        """
        counts = self._extract_counts(raw_results)

        if postselect and self.algorithm_mode == "tsvf":
            phase_counts, _, _ = self._postselect_phase_counts(
                counts,
                n_precision,
            )
        else:
            phase_counts = self._extract_phase_counts(counts, n_precision)

        if not phase_counts:
            return "0" * n_precision, 0.0, 0

        best_bs = max(phase_counts, key=phase_counts.get)  # type: ignore[arg-type]
        best_count = phase_counts[best_bs]
        best_phase = binary_fraction_to_phase(best_bs[-n_precision:])
        return best_bs, best_phase, best_count

    # ------------------------------------------------------------------
    # Private circuit builders
    # ------------------------------------------------------------------

    def _build_standard(self, n_precision: int) -> QuantumCircuit:
        """Standard QPE: Hadamards + controlled-U^{2^k} + inverse QFT."""
        # Registers: t precision qubits + 1 eigenstate qubit
        prec_r = QuantumRegister(n_precision, "prec")
        eig_r = QuantumRegister(1, "eig")
        cr = ClassicalRegister(n_precision, "c_phase")
        qc = QuantumCircuit(prec_r, eig_r, cr)

        prec_qubits = list(range(n_precision))
        eig_qubit = n_precision

        # Prepare eigenstate |1⟩
        qc.x(eig_qubit)

        # Hadamard on all precision qubits
        for q in prec_qubits:
            qc.h(q)

        # Controlled-U^{2^k} gates
        # U = diag(1, e^{2πiφ}) → controlled-U^{2^k} is CP(2π·φ·2^k)
        for k in range(n_precision):
            angle = 2 * math.pi * self.eigenphase * (2**k)
            _controlled_phase_rotation(
                qc,
                prec_qubits[k],
                eig_qubit,
                angle,
            )

        # Inverse QFT on precision register
        _inverse_qft(qc, prec_qubits)

        # Measure precision register
        for k in range(n_precision):
            qc.measure(prec_qubits[k], cr[k])

        return qc

    def _build_tsvf(
        self,
        n_precision: int,
        seed_offset: int = 0,
    ) -> QuantumCircuit:
        """TSVF QPE: standard QPE + chaotic ansatz + phase probe ancilla.

        The chaotic ansatz is applied BEFORE the inverse QFT, perturbing
        the phase-encoded state.  The ancilla probe then post-selects
        for trajectories where the precision register still encodes a
        phase close to the correct answer — the TSVF "anchoring" effect.
        """
        prec_r = QuantumRegister(n_precision, "prec")
        eig_r = QuantumRegister(1, "eig")
        anc_r = QuantumRegister(1, "anc")
        cr = ClassicalRegister(n_precision, "c_phase")
        cr_anc = ClassicalRegister(1, "c_anc")
        qc = QuantumCircuit(prec_r, eig_r, anc_r, cr, cr_anc)

        prec_qubits = list(range(n_precision))
        eig_qubit = n_precision
        anc_qubit = n_precision + 1

        rng = np.random.default_rng(self.seed + seed_offset)

        # Prepare eigenstate |1⟩
        qc.x(eig_qubit)

        # Hadamard on all precision qubits
        for q in prec_qubits:
            qc.h(q)

        # Controlled-U^{2^k} gates (same as standard)
        for k in range(n_precision):
            angle = 2 * math.pi * self.eigenphase * (2**k)
            _controlled_phase_rotation(
                qc,
                prec_qubits[k],
                eig_qubit,
                angle,
            )

        qc.barrier()

        # ── TSVF mild perturbation on precision register ──
        _chaotic_qpe_ansatz(
            qc,
            prec_qubits,
            iteration=0,
            rng=rng,
            n_precision=n_precision,
        )
        qc.barrier()

        # ── Inverse QFT on precision register ──
        _inverse_qft(qc, prec_qubits)
        qc.barrier()

        # ── Phase probe ancilla ──
        correct_bits = phase_to_binary_fraction(self.eigenphase, n_precision)
        angle = self.weak_angle_base + self.weak_angle_ramp * min(n_precision, 6)
        _add_phase_probe_ancilla(
            qc,
            prec_qubits,
            anc_qubit,
            cr_anc[0],
            correct_phase_bits=correct_bits,
            weak_angle=angle,
        )
        qc.barrier()

        # Measure precision register
        for k in range(n_precision):
            qc.measure(prec_qubits[k], cr[k])

        return qc

    # ------------------------------------------------------------------
    # Private result parsing helpers
    # ------------------------------------------------------------------

    def _extract_counts(self, raw_results: Any) -> dict[str, int]:
        """Extract a counts dict from raw run() output."""
        if isinstance(raw_results, dict):
            if "counts" in raw_results:
                return self._normalise_counts(raw_results["counts"])
            if "pub_result" in raw_results:
                return self._counts_from_pub(
                    raw_results["pub_result"],
                    raw_results.get("circuit"),
                )
        return self._normalise_counts(raw_results)

    def _normalise_counts(self, counts: dict) -> dict[str, int]:
        """Ensure keys are bitstrings and values are ints."""
        out: dict[str, int] = {}
        for k, v in counts.items():
            out[str(k)] = int(v)
        return out

    def _counts_from_pub(self, pub: Any, circuit: Any) -> dict[str, int]:
        """Extract per-shot combined bitstrings from a SamplerV2 PubResult."""
        creg_names = [cr.name for cr in circuit.cregs] if circuit else []
        if len(creg_names) <= 1:
            name = creg_names[0] if creg_names else "c_phase"
            try:
                return {str(k): int(v) for k, v in pub.data[name].get_counts().items()}
            except Exception:
                return {}

        # Multi-register: reconstruct combined bitstrings
        try:
            reg_bitstrings: dict[str, Any] = {}
            for name in creg_names:
                reg_bitstrings[name] = pub.data[name].get_bitstrings()
            num_shots = len(reg_bitstrings[creg_names[0]])
            combined: dict[str, int] = {}
            for i in range(num_shots):
                parts = []
                for name in reversed(creg_names):
                    parts.append(reg_bitstrings[name][i])
                full = " ".join(parts)
                combined[full] = combined.get(full, 0) + 1
            return combined
        except Exception:
            name = creg_names[0]
            try:
                return {str(k): int(v) for k, v in pub.data[name].get_counts().items()}
            except Exception:
                return {}

    def _extract_phase_bits(self, bitstring: str, n_precision: int) -> str:
        """Extract the precision-register bits from a bitstring."""
        key = bitstring.strip()
        if " " in key:
            # Space-separated: last part is first register (c_phase)
            return key.split()[-1]
        return key[-n_precision:]

    def _split_ancilla_phase(
        self,
        bitstring: str,
        n_precision: int,
    ) -> tuple[str, str]:
        """Split bitstring into (ancilla_bit, phase_bits).

        For space-separated keys: "anc_bit phase_bits"
        For concatenated: first char is ancilla, rest is phase.
        """
        key = bitstring.strip()
        if " " in key:
            parts = key.split()
            # parts[0] is the LAST classical register written (c_anc)
            # parts[-1] is the FIRST classical register written (c_phase)
            return parts[0], parts[-1]
        # Concatenated: ancilla is MSB (leftmost)
        return key[0], key[1:]

    def _extract_phase_counts(
        self,
        counts: dict[str, int],
        n_precision: int,
    ) -> dict[str, int]:
        """Extract phase-register-only counts from full bitstrings."""
        phase_counts: dict[str, int] = {}
        for key, val in counts.items():
            if self.algorithm_mode == "tsvf":
                _, phase_bits = self._split_ancilla_phase(key, n_precision)
            else:
                phase_bits = self._extract_phase_bits(key, n_precision)
            phase_counts[phase_bits] = phase_counts.get(phase_bits, 0) + val
        return phase_counts

    def _postselect_phase_counts(
        self,
        counts: dict[str, int],
        n_precision: int,
    ) -> tuple[dict[str, int], int, int]:
        """Post-select on ancilla=1 and return phase counts.

        Returns (phase_counts, total_original, accepted_total).
        """
        total_original = sum(counts.values())
        phase_counts: dict[str, int] = {}
        accepted_total = 0

        for key, val in counts.items():
            anc_bit, phase_bits = self._split_ancilla_phase(key, n_precision)
            if anc_bit == "1":
                accepted_total += val
                phase_counts[phase_bits] = phase_counts.get(phase_bits, 0) + val

        return phase_counts, total_original, accepted_total

    def _bitstring_to_parity_row(
        self,
        bitstring: str,
        n_subsystems: int,
        n_cycles: int,
        correct_bits: str,
    ) -> np.ndarray:
        """Convert a measurement bitstring to a parity matrix.

        For the TSVF variant:
          - Ancilla=1 → compare each precision qubit to the correct
            phase bit: 0 if match, 1 if mismatch.
          - Ancilla=0 → row of 1s (all fail — no evidence of correct phase).

        For the standard variant:
          - Compare each precision qubit to the correct phase bit.

        Shape: (max(n_cycles, 1), n_subsystems).
        """
        effective_cycles = max(n_cycles, 1)

        if self.algorithm_mode == "tsvf":
            anc_bit, phase_bits = self._split_ancilla_phase(
                bitstring,
                n_subsystems,
            )
            if anc_bit == "1":
                qubit_match = self._compute_phase_match(
                    phase_bits,
                    n_subsystems,
                    correct_bits,
                )
                matrix = np.tile(qubit_match, (effective_cycles, 1))
            else:
                matrix = np.ones(
                    (effective_cycles, n_subsystems),
                    dtype=np.int8,
                )
        else:
            phase_bits = self._extract_phase_bits(bitstring, n_subsystems)
            qubit_match = self._compute_phase_match(
                phase_bits,
                n_subsystems,
                correct_bits,
            )
            matrix = np.tile(qubit_match, (effective_cycles, 1))

        return matrix

    def _compute_phase_match(
        self,
        phase_bits: str,
        n_subsystems: int,
        correct_bits: str,
    ) -> np.ndarray:
        """Compute per-qubit phase match: 0 = correct bit, 1 = wrong.

        Returns an array of shape (n_subsystems,).
        """
        match = np.ones(n_subsystems, dtype=np.int8)
        for k in range(min(len(phase_bits), n_subsystems, len(correct_bits))):
            if phase_bits[k] == correct_bits[k]:
                match[k] = 0
        return match

build_circuit(n_subsystems, n_cycles, **kwargs)

Build the QPE circuit.

n_subsystems = number of precision qubits (t). n_cycles = 1 (QPE is a single-pass algorithm). Accepted but ignored (always 1 effective cycle).

Returns a :class:QuantumCircuit.

Source code in src/qgate/adapters/qpe_adapter.py
def build_circuit(
    self,
    n_subsystems: int,
    n_cycles: int,
    **kwargs: Any,
) -> QuantumCircuit:
    """Build the QPE circuit.

    ``n_subsystems`` = number of precision qubits (t).
    ``n_cycles`` = 1 (QPE is a single-pass algorithm).  Accepted
    but ignored (always 1 effective cycle).

    Returns a :class:`QuantumCircuit`.
    """
    if self.algorithm_mode == "standard":
        return self._build_standard(n_subsystems)
    elif self.algorithm_mode == "tsvf":
        seed_offset = kwargs.get("seed_offset", 0)
        return self._build_tsvf(n_subsystems, seed_offset)
    else:
        raise ValueError(
            f"Unknown algorithm_mode: {self.algorithm_mode!r}. Use 'standard' or 'tsvf'."
        )

run(circuit, shots, **kwargs)

Execute the circuit and return a raw result dict.

Tries SamplerV2 first, falls back to backend.run().

Source code in src/qgate/adapters/qpe_adapter.py
def run(
    self,
    circuit: Any,
    shots: int,
    **kwargs: Any,
) -> dict[str, Any]:
    """Execute the circuit and return a raw result dict.

    Tries SamplerV2 first, falls back to ``backend.run()``.
    """
    if self.backend is None:
        raise RuntimeError("No backend configured for QPETSVFAdapter")

    try:
        from qiskit.transpiler.preset_passmanagers import (
            generate_preset_pass_manager,
        )
        from qiskit_ibm_runtime import SamplerV2 as Sampler

        pm = generate_preset_pass_manager(
            backend=self.backend,
            optimization_level=self.optimization_level,
        )
        isa = pm.run(circuit)
        job = Sampler(mode=self.backend).run([isa], shots=shots)
        result = job.result()
        pub = result[0]
        return {
            "pub_result": pub,
            "circuit": circuit,
            "shots": shots,
        }
    except (ImportError, Exception):
        pass

    transpiled = transpile(
        circuit,
        backend=self.backend,
        optimization_level=self.optimization_level,
    )
    job = self.backend.run(transpiled, shots=shots)
    result = job.result()
    return {
        "counts": result.get_counts(0),
        "circuit": circuit,
        "shots": shots,
    }

parse_results(raw_results, n_subsystems, n_cycles)

Parse raw Qiskit results into ParityOutcome objects.

Each shot → one ParityOutcome. The parity matrix records per- precision-qubit: 0 if the qubit matches the correct phase bit, 1 otherwise.

For the TSVF variant the ancilla measurement provides the "phase quality probe". For the standard variant we evaluate from the final measurement against the ideal phase bits.

Source code in src/qgate/adapters/qpe_adapter.py
def parse_results(
    self,
    raw_results: Any,
    n_subsystems: int,
    n_cycles: int,
) -> list[ParityOutcome]:
    """Parse raw Qiskit results into ParityOutcome objects.

    Each shot → one ParityOutcome.  The parity matrix records per-
    precision-qubit: 0 if the qubit matches the correct phase bit,
    1 otherwise.

    For the TSVF variant the ancilla measurement provides the
    "phase quality probe".  For the standard variant we evaluate
    from the final measurement against the ideal phase bits.
    """
    counts = self._extract_counts(raw_results)

    # Determine the correct phase bitstring
    correct_bits = phase_to_binary_fraction(self.eigenphase, n_subsystems)

    outcomes: list[ParityOutcome] = []
    for bitstring, count in counts.items():
        row = self._bitstring_to_parity_row(
            bitstring,
            n_subsystems,
            n_cycles,
            correct_bits,
        )
        for _ in range(count):
            outcomes.append(
                ParityOutcome(
                    n_subsystems=n_subsystems,
                    n_cycles=max(n_cycles, 1),
                    parity_matrix=row.copy(),
                )
            )
    return outcomes

get_transpiled_depth(circuit)

Return the depth of the transpiled circuit.

Source code in src/qgate/adapters/qpe_adapter.py
def get_transpiled_depth(self, circuit: QuantumCircuit) -> int:
    """Return the depth of the transpiled circuit."""
    transpiled = transpile(
        circuit,
        backend=self.backend,
        optimization_level=self.optimization_level,
    )
    return int(transpiled.depth())

get_correct_phase_bits(n_precision)

Return the ideal binary-fraction bitstring for the eigenphase.

Source code in src/qgate/adapters/qpe_adapter.py
def get_correct_phase_bits(self, n_precision: int) -> str:
    """Return the ideal binary-fraction bitstring for the eigenphase."""
    return phase_to_binary_fraction(self.eigenphase, n_precision)

extract_phase_metrics(raw_results, n_precision, postselect=True)

Extract comprehensive phase estimation metrics.

Returns a dict with
  • fidelity: P(correct phase bitstring)
  • mean_phase_error: weighted-mean circular phase error
  • entropy: Shannon entropy of the phase histogram (bits)
  • measured_phase: most-probable measured phase
  • true_phase: the target eigenphase
  • total_shots: number of shots used (after post-selection)
  • acceptance_rate: fraction of shots accepted (TSVF only)
Source code in src/qgate/adapters/qpe_adapter.py
def extract_phase_metrics(
    self,
    raw_results: dict[str, Any],
    n_precision: int,
    postselect: bool = True,
) -> dict[str, float]:
    """Extract comprehensive phase estimation metrics.

    Returns a dict with:
      - ``fidelity``: P(correct phase bitstring)
      - ``mean_phase_error``: weighted-mean circular phase error
      - ``entropy``: Shannon entropy of the phase histogram (bits)
      - ``measured_phase``: most-probable measured phase
      - ``true_phase``: the target eigenphase
      - ``total_shots``: number of shots used (after post-selection)
      - ``acceptance_rate``: fraction of shots accepted (TSVF only)
    """
    counts = self._extract_counts(raw_results)
    correct_bits = phase_to_binary_fraction(self.eigenphase, n_precision)

    if postselect and self.algorithm_mode == "tsvf":
        phase_counts, total_original, accepted_total = self._postselect_phase_counts(
            counts, n_precision
        )
        acceptance_rate = accepted_total / total_original if total_original > 0 else 0.0
    else:
        phase_counts = self._extract_phase_counts(counts, n_precision)
        accepted_total = sum(phase_counts.values())
        acceptance_rate = 1.0

    fid = phase_fidelity(phase_counts, correct_bits)
    mean_err = mean_phase_error(
        phase_counts,
        self.eigenphase,
        n_precision,
    )
    ent = histogram_entropy(phase_counts)

    # Most-probable phase
    if phase_counts:
        best_bs = max(phase_counts, key=phase_counts.get)  # type: ignore[arg-type]
        measured = binary_fraction_to_phase(best_bs[-n_precision:])
    else:
        measured = 0.0

    return {
        "fidelity": fid,
        "mean_phase_error": mean_err,
        "entropy": ent,
        "measured_phase": measured,
        "true_phase": self.eigenphase,
        "total_shots": accepted_total,
        "acceptance_rate": acceptance_rate,
    }

extract_best_phase(raw_results, n_precision, postselect=True)

Find the most-sampled phase bitstring.

Returns (bitstring, phase_value, count).

Source code in src/qgate/adapters/qpe_adapter.py
def extract_best_phase(
    self,
    raw_results: dict[str, Any],
    n_precision: int,
    postselect: bool = True,
) -> tuple[str, float, int]:
    """Find the most-sampled phase bitstring.

    Returns (bitstring, phase_value, count).
    """
    counts = self._extract_counts(raw_results)

    if postselect and self.algorithm_mode == "tsvf":
        phase_counts, _, _ = self._postselect_phase_counts(
            counts,
            n_precision,
        )
    else:
        phase_counts = self._extract_phase_counts(counts, n_precision)

    if not phase_counts:
        return "0" * n_precision, 0.0, 0

    best_bs = max(phase_counts, key=phase_counts.get)  # type: ignore[arg-type]
    best_count = phase_counts[best_bs]
    best_phase = binary_fraction_to_phase(best_bs[-n_precision:])
    return best_bs, best_phase, best_count

phase_to_binary_fraction(phi, n_bits)

Convert a phase φ ∈ [0, 1) to its best n-bit binary fraction string.

The binary fraction 0.b₁b₂…bₜ represents φ ≈ Σ bₖ / 2ᵏ. We return the string "b₁b₂…bₜ".

Example: φ = 0.375 with n_bits=3 → "011" (0.011₂ = 3/8)

Source code in src/qgate/adapters/qpe_adapter.py
def phase_to_binary_fraction(phi: float, n_bits: int) -> str:
    """Convert a phase φ ∈ [0, 1) to its best n-bit binary fraction string.

    The binary fraction 0.b₁b₂…bₜ represents φ ≈ Σ bₖ / 2ᵏ.
    We return the string "b₁b₂…bₜ".

    Example: φ = 0.375 with n_bits=3 → "011"  (0.011₂ = 3/8)
    """
    val = round(phi * (2**n_bits)) % (2**n_bits)
    return format(val, f"0{n_bits}b")

binary_fraction_to_phase(bitstring)

Convert a binary fraction string to a phase value.

"011" → 0.011₂ = 3/8 = 0.375

Source code in src/qgate/adapters/qpe_adapter.py
def binary_fraction_to_phase(bitstring: str) -> float:
    """Convert a binary fraction string to a phase value.

    "011" → 0.011₂ = 3/8 = 0.375
    """
    n = len(bitstring)
    val = int(bitstring, 2)
    return float(val / (2**n))

phase_error(measured_phase, true_phase)

Circular phase error in [0, 0.5].

Accounts for the wraparound: |0.9 − 0.1| should be 0.2 not 0.8.

Source code in src/qgate/adapters/qpe_adapter.py
def phase_error(measured_phase: float, true_phase: float) -> float:
    """Circular phase error in [0, 0.5].

    Accounts for the wraparound: |0.9 − 0.1| should be 0.2 not 0.8.
    """
    diff = abs(measured_phase - true_phase)
    return min(diff, 1.0 - diff)

histogram_entropy(counts)

Shannon entropy of a measurement histogram (in bits).

Lower entropy → sharper distribution (more peaked). For a uniform distribution over 2^t outcomes, entropy = t bits. A perfect delta function has entropy = 0.

Source code in src/qgate/adapters/qpe_adapter.py
def histogram_entropy(counts: dict[str, int]) -> float:
    """Shannon entropy of a measurement histogram (in bits).

    Lower entropy → sharper distribution (more peaked).
    For a uniform distribution over 2^t outcomes, entropy = t bits.
    A perfect delta function has entropy = 0.
    """
    total = sum(counts.values())
    if total == 0:
        return 0.0
    entropy = 0.0
    for cnt in counts.values():
        if cnt > 0:
            p = cnt / total
            entropy -= p * math.log2(p)
    return entropy

phase_fidelity(counts, correct_bitstring)

Fraction of shots that measured the correct phase bitstring.

Source code in src/qgate/adapters/qpe_adapter.py
def phase_fidelity(
    counts: dict[str, int],
    correct_bitstring: str,
) -> float:
    """Fraction of shots that measured the correct phase bitstring."""
    total = sum(counts.values())
    if total == 0:
        return 0.0
    correct_count = 0
    for key, val in counts.items():
        bs = key.strip().replace(" ", "")
        # Handle multi-register keys — take the last n bits
        n = len(correct_bitstring)
        if len(bs) >= n and bs[-n:] == correct_bitstring:
            correct_count += val
    return correct_count / total

mean_phase_error(counts, true_phase, n_bits)

Weighted mean circular phase error over all measurement outcomes.

Source code in src/qgate/adapters/qpe_adapter.py
def mean_phase_error(
    counts: dict[str, int],
    true_phase: float,
    n_bits: int,
) -> float:
    """Weighted mean circular phase error over all measurement outcomes."""
    total = sum(counts.values())
    if total == 0:
        return 0.5
    total_err = 0.0
    for key, val in counts.items():
        bs = key.strip().replace(" ", "")
        measured = binary_fraction_to_phase(bs[-n_bits:])
        total_err += phase_error(measured, true_phase) * val
    return total_err / total

Backward-Compatible Modules

qgate.conditioning

conditioning.py — Backward-compatible shim.

.. deprecated:: 0.3.0 The canonical location is :mod:qgate.compat.conditioning. This module re-exports all public symbols so that existing from qgate.conditioning import ... statements continue to work.

Usage unchanged::

from qgate.conditioning import ParityOutcome, decide_global

ConditioningStats dataclass

Aggregated statistics after applying a conditioning rule to many shots.

Source code in src/qgate/compat/conditioning.py
@dataclass
class ConditioningStats:
    """Aggregated statistics after applying a conditioning rule to many shots."""

    variant: str
    total_shots: int = 0
    accepted_shots: int = 0
    scores: list[float] = field(default_factory=list)

    @property
    def acceptance_probability(self) -> float:
        return self.accepted_shots / self.total_shots if self.total_shots else 0.0

    @property
    def tts(self) -> float:
        """Time-to-solution: 1 / acceptance_probability."""
        p = self.acceptance_probability
        return 1.0 / p if p > 0 else float("inf")

    def as_dict(self) -> dict:
        return {
            "variant": self.variant,
            "total_shots": self.total_shots,
            "accepted_shots": self.accepted_shots,
            "acceptance_probability": self.acceptance_probability,
            "TTS": self.tts,
            "mean_score": float(np.mean(self.scores)) if self.scores else None,
        }

tts property

Time-to-solution: 1 / acceptance_probability.

ParityOutcome dataclass

Parsed mid-circuit measurement outcomes for one shot.

Attributes:

Name Type Description
n_subsystems int

Number of subsystems (Bell pairs or logical units).

n_cycles int

Number of monitoring cycles.

parity_matrix Union[ndarray, list]

Shape (n_cycles, n_subsystems)0 = pass (even parity), 1 = fail (odd parity). Accepts list[list[int]] or np.ndarray on construction (coerced to np.ndarray).

Source code in src/qgate/compat/conditioning.py
@dataclass
class ParityOutcome:
    """Parsed mid-circuit measurement outcomes for one shot.

    Attributes:
        n_subsystems: Number of subsystems (Bell pairs or logical units).
        n_cycles:     Number of monitoring cycles.
        parity_matrix: Shape ``(n_cycles, n_subsystems)`` — ``0`` = pass
                       (even parity), ``1`` = fail (odd parity).
                       Accepts ``list[list[int]]`` *or* ``np.ndarray``
                       on construction (coerced to ``np.ndarray``).
    """

    n_subsystems: int
    n_cycles: int
    parity_matrix: Union[np.ndarray, list] = field(default_factory=list)  # noqa: UP007

    def __post_init__(self) -> None:
        if not isinstance(self.parity_matrix, np.ndarray):
            self.parity_matrix = np.asarray(self.parity_matrix, dtype=np.int8)
        if self.parity_matrix.ndim == 0 or self.parity_matrix.size == 0:
            self.parity_matrix = np.zeros((self.n_cycles, self.n_subsystems), dtype=np.int8)

    # Convenience -----------------------------------------------------------

    def subsystem_pass_count(self, cycle: int) -> int:
        """Number of subsystems that passed in *cycle*."""
        return int(np.sum(self.parity_matrix[cycle] == 0))

    def subsystem_pass_rate(self, cycle: int) -> float:
        """Fraction of subsystems that passed in *cycle*."""
        return float(1.0 - self.parity_matrix[cycle].mean())

    def cycle_all_pass(self, cycle: int) -> bool:
        """True if every subsystem passed in *cycle*."""
        return bool(np.all(self.parity_matrix[cycle] == 0))

    @property
    def pass_rates(self) -> np.ndarray:
        """Per-cycle pass rate — shape ``(n_cycles,)``."""
        mat: np.ndarray = self.parity_matrix  # type: ignore[assignment]  # coerced in __post_init__
        result: np.ndarray = 1.0 - mat.astype(np.float64).mean(axis=1)
        return result

pass_rates property

Per-cycle pass rate — shape (n_cycles,).

subsystem_pass_count(cycle)

Number of subsystems that passed in cycle.

Source code in src/qgate/compat/conditioning.py
def subsystem_pass_count(self, cycle: int) -> int:
    """Number of subsystems that passed in *cycle*."""
    return int(np.sum(self.parity_matrix[cycle] == 0))

subsystem_pass_rate(cycle)

Fraction of subsystems that passed in cycle.

Source code in src/qgate/compat/conditioning.py
def subsystem_pass_rate(self, cycle: int) -> float:
    """Fraction of subsystems that passed in *cycle*."""
    return float(1.0 - self.parity_matrix[cycle].mean())

cycle_all_pass(cycle)

True if every subsystem passed in cycle.

Source code in src/qgate/compat/conditioning.py
def cycle_all_pass(self, cycle: int) -> bool:
    """True if every subsystem passed in *cycle*."""
    return bool(np.all(self.parity_matrix[cycle] == 0))

apply_rule_to_batch(outcomes, variant='global', k_fraction=0.9, alpha=0.5, threshold_combined=0.65)

Apply a conditioning rule to a batch of parity outcomes.

Parameters:

Name Type Description Default
outcomes Sequence[ParityOutcome]

Sequence of ParityOutcome (one per shot).

required
variant str

"global" | "hierarchical" | "score_fusion".

'global'
k_fraction float

For hierarchical rule.

0.9
alpha float

For score fusion.

0.5
threshold_combined float

For score fusion.

0.65

Returns:

Type Description
ConditioningStats

ConditioningStats with acceptance statistics.

Source code in src/qgate/compat/conditioning.py
def apply_rule_to_batch(
    outcomes: Sequence[ParityOutcome],
    variant: str = "global",
    k_fraction: float = 0.9,
    alpha: float = 0.5,
    threshold_combined: float = 0.65,
) -> ConditioningStats:
    """Apply a conditioning rule to a batch of parity outcomes.

    Args:
        outcomes:           Sequence of ParityOutcome (one per shot).
        variant:            ``"global"`` | ``"hierarchical"`` | ``"score_fusion"``.
        k_fraction:         For hierarchical rule.
        alpha:              For score fusion.
        threshold_combined: For score fusion.

    Returns:
        ConditioningStats with acceptance statistics.
    """
    stats = ConditioningStats(variant=variant)
    for outcome in outcomes:
        stats.total_shots += 1
        if variant == "global":
            if decide_global(outcome):
                stats.accepted_shots += 1
        elif variant == "hierarchical":
            if decide_hierarchical(outcome, k_fraction):
                stats.accepted_shots += 1
        elif variant == "score_fusion":
            accepted, score = decide_score_fusion(outcome, alpha, threshold_combined)
            stats.scores.append(score)
            if accepted:
                stats.accepted_shots += 1
        else:
            raise ValueError(f"Unknown variant: {variant}")
    return stats

decide_global(outcome)

Global conditioning — all subsystems pass all cycles.

Parameters:

Name Type Description Default
outcome ParityOutcome

Parity outcome for one shot.

required

Returns:

Type Description
bool

True if the shot should be accepted.

Example::

outcome = ParityOutcome(n_subsystems=4, n_cycles=2,
                        parity_matrix=[[0,0,0,0], [0,0,0,0]])
assert decide_global(outcome) is True
Source code in src/qgate/compat/conditioning.py
def decide_global(outcome: ParityOutcome) -> bool:
    """Global conditioning — all subsystems pass all cycles.

    Args:
        outcome: Parity outcome for one shot.

    Returns:
        True if the shot should be accepted.

    Example::

        outcome = ParityOutcome(n_subsystems=4, n_cycles=2,
                                parity_matrix=[[0,0,0,0], [0,0,0,0]])
        assert decide_global(outcome) is True
    """
    return bool(np.all(outcome.parity_matrix == 0))

decide_hierarchical(outcome, k_fraction=0.9)

Hierarchical k-of-N conditioning.

Accepts if at least ⌈k·N⌉ subsystems pass in every cycle.

Parameters:

Name Type Description Default
outcome ParityOutcome

Parity outcome for one shot.

required
k_fraction float

Required pass fraction (0 < k_fraction ≤ 1).

0.9

Returns:

Type Description
bool

True if the shot should be accepted.

Example::

outcome = ParityOutcome(n_subsystems=4, n_cycles=1,
                        parity_matrix=[[0, 0, 1, 0]])
# ceil(0.75 * 4) = 3  →  3 passed  →  accept
assert decide_hierarchical(outcome, k_fraction=0.75) is True
Source code in src/qgate/compat/conditioning.py
def decide_hierarchical(
    outcome: ParityOutcome,
    k_fraction: float = 0.9,
) -> bool:
    """Hierarchical k-of-N conditioning.

    Accepts if at least ⌈k·N⌉ subsystems pass in **every** cycle.

    Args:
        outcome:    Parity outcome for one shot.
        k_fraction: Required pass fraction (0 < k_fraction ≤ 1).

    Returns:
        True if the shot should be accepted.

    Example::

        outcome = ParityOutcome(n_subsystems=4, n_cycles=1,
                                parity_matrix=[[0, 0, 1, 0]])
        # ceil(0.75 * 4) = 3  →  3 passed  →  accept
        assert decide_hierarchical(outcome, k_fraction=0.75) is True
    """
    if not 0 < k_fraction <= 1:
        raise ValueError(f"k_fraction must be in (0, 1], got {k_fraction}")
    threshold = math.ceil(k_fraction * outcome.n_subsystems)
    # Per-cycle pass counts via vectorised sum
    pass_counts = np.sum(outcome.parity_matrix == 0, axis=1)  # (n_cycles,)
    return bool(np.all(pass_counts >= threshold))

decide_score_fusion(outcome, alpha=0.5, threshold_combined=0.65, hf_cycles=None, lf_cycles=None)

Score-fusion conditioning.

Computes a weighted combination of low-frequency (LF) and high-frequency (HF) subsystem pass-rates and compares to a continuous threshold:

combined = α · mean(LF pass-rates) + (1-α) · mean(HF pass-rates)
By default
  • HF cycles = every cycle
  • LF cycles = every 2nd cycle (0, 2, 4, …)

Parameters:

Name Type Description Default
outcome ParityOutcome

Parity outcome for one shot.

required
alpha float

Weight for LF component (0 ≤ α ≤ 1).

0.5
threshold_combined float

Accept if combined ≥ this value.

0.65
hf_cycles Sequence[int] | None

Override which cycles count as HF.

None
lf_cycles Sequence[int] | None

Override which cycles count as LF.

None

Returns:

Type Description
tuple[bool, float]

(accepted, combined_score)

Example::

outcome = ParityOutcome(n_subsystems=2, n_cycles=4,
                        parity_matrix=[[0,0],[0,1],[0,0],[1,0]])
accepted, score = decide_score_fusion(outcome, alpha=0.5)
Source code in src/qgate/compat/conditioning.py
def decide_score_fusion(
    outcome: ParityOutcome,
    alpha: float = 0.5,
    threshold_combined: float = 0.65,
    hf_cycles: Sequence[int] | None = None,
    lf_cycles: Sequence[int] | None = None,
) -> tuple[bool, float]:
    """Score-fusion conditioning.

    Computes a weighted combination of low-frequency (LF) and
    high-frequency (HF) subsystem pass-rates and compares to a
    continuous threshold:

        combined = α · mean(LF pass-rates) + (1-α) · mean(HF pass-rates)

    By default:
      - HF cycles = every cycle
      - LF cycles = every 2nd cycle (0, 2, 4, …)

    Args:
        outcome:            Parity outcome for one shot.
        alpha:              Weight for LF component (0 ≤ α ≤ 1).
        threshold_combined: Accept if combined ≥ this value.
        hf_cycles:          Override which cycles count as HF.
        lf_cycles:          Override which cycles count as LF.

    Returns:
        (accepted, combined_score)

    Example::

        outcome = ParityOutcome(n_subsystems=2, n_cycles=4,
                                parity_matrix=[[0,0],[0,1],[0,0],[1,0]])
        accepted, score = decide_score_fusion(outcome, alpha=0.5)
    """
    if hf_cycles is None:
        hf_cycles = list(range(outcome.n_cycles))
    if lf_cycles is None:
        lf_cycles = [w for w in range(outcome.n_cycles) if w % 2 == 0]

    def _mean_rate(cycles: Sequence[int]) -> float:
        if not cycles:
            return 0.0
        rates = outcome.pass_rates
        return float(np.mean(rates[list(cycles)]))

    score_lf = _mean_rate(lf_cycles)
    score_hf = _mean_rate(hf_cycles)
    combined = alpha * score_lf + (1.0 - alpha) * score_hf
    return combined >= threshold_combined, float(combined)

qgate.monitors

monitors.py — Backward-compatible shim.

.. deprecated:: 0.3.0 The canonical location is :mod:qgate.compat.monitors. This module re-exports all public symbols so that existing from qgate.monitors import ... statements continue to work.

Usage unchanged::

from qgate.monitors import MultiRateMonitor, score_fusion

MultiRateMonitor dataclass

Stateful monitor tracking HF and LF parity scores across cycles.

Usage::

mon = MultiRateMonitor(n_subsystems=4, alpha=0.5,
                       threshold_combined=0.65)
mon.record_cycle(0, pass_rates=0.75)   # cycle 0 → HF + LF
mon.record_cycle(1, pass_rates=0.50)   # cycle 1 → HF only
mon.record_cycle(2, pass_rates=0.80)   # cycle 2 → HF + LF
accepted, score = mon.fused_decision()

Attributes:

Name Type Description
n_subsystems int

Number of subsystems being monitored.

alpha float

LF weight in fusion formula.

threshold_combined float

Accept if fused score ≥ this.

hf_scores list[float]

Recorded HF scores (every cycle).

lf_scores list[float]

Recorded LF scores (even cycles only).

Source code in src/qgate/compat/monitors.py
@dataclass
class MultiRateMonitor:
    """Stateful monitor tracking HF and LF parity scores across cycles.

    Usage::

        mon = MultiRateMonitor(n_subsystems=4, alpha=0.5,
                               threshold_combined=0.65)
        mon.record_cycle(0, pass_rates=0.75)   # cycle 0 → HF + LF
        mon.record_cycle(1, pass_rates=0.50)   # cycle 1 → HF only
        mon.record_cycle(2, pass_rates=0.80)   # cycle 2 → HF + LF
        accepted, score = mon.fused_decision()

    Attributes:
        n_subsystems:       Number of subsystems being monitored.
        alpha:              LF weight in fusion formula.
        threshold_combined: Accept if fused score ≥ this.
        hf_scores:          Recorded HF scores (every cycle).
        lf_scores:          Recorded LF scores (even cycles only).
    """

    n_subsystems: int = 1
    alpha: float = 0.5
    threshold_combined: float = 0.65
    hf_scores: list[float] = field(default_factory=list)
    lf_scores: list[float] = field(default_factory=list)

    def record_cycle(self, cycle_idx: int, pass_rate: float) -> None:
        """Record the subsystem pass-rate for a cycle.

        The score is always recorded as HF.  If the cycle index is even,
        it is also recorded as LF.

        Args:
            cycle_idx: Zero-based cycle index.
            pass_rate: Fraction of subsystems that passed (0–1).
        """
        self.hf_scores.append(pass_rate)
        if cycle_idx % 2 == 0:
            self.lf_scores.append(pass_rate)

    def fused_decision(self) -> tuple[bool, float]:
        """Compute the fused decision from accumulated scores.

        Returns:
            (accepted, combined_score)
        """
        lf = float(np.mean(self.lf_scores)) if self.lf_scores else 0.0
        hf = float(np.mean(self.hf_scores)) if self.hf_scores else 0.0
        return score_fusion(lf, hf, self.alpha, self.threshold_combined)

    def reset(self) -> None:
        """Clear all recorded scores."""
        self.hf_scores.clear()
        self.lf_scores.clear()

record_cycle(cycle_idx, pass_rate)

Record the subsystem pass-rate for a cycle.

The score is always recorded as HF. If the cycle index is even, it is also recorded as LF.

Parameters:

Name Type Description Default
cycle_idx int

Zero-based cycle index.

required
pass_rate float

Fraction of subsystems that passed (0–1).

required
Source code in src/qgate/compat/monitors.py
def record_cycle(self, cycle_idx: int, pass_rate: float) -> None:
    """Record the subsystem pass-rate for a cycle.

    The score is always recorded as HF.  If the cycle index is even,
    it is also recorded as LF.

    Args:
        cycle_idx: Zero-based cycle index.
        pass_rate: Fraction of subsystems that passed (0–1).
    """
    self.hf_scores.append(pass_rate)
    if cycle_idx % 2 == 0:
        self.lf_scores.append(pass_rate)

fused_decision()

Compute the fused decision from accumulated scores.

Returns:

Type Description
tuple[bool, float]

(accepted, combined_score)

Source code in src/qgate/compat/monitors.py
def fused_decision(self) -> tuple[bool, float]:
    """Compute the fused decision from accumulated scores.

    Returns:
        (accepted, combined_score)
    """
    lf = float(np.mean(self.lf_scores)) if self.lf_scores else 0.0
    hf = float(np.mean(self.hf_scores)) if self.hf_scores else 0.0
    return score_fusion(lf, hf, self.alpha, self.threshold_combined)

reset()

Clear all recorded scores.

Source code in src/qgate/compat/monitors.py
def reset(self) -> None:
    """Clear all recorded scores."""
    self.hf_scores.clear()
    self.lf_scores.clear()

compute_window_metric(times, values, window=1.0, mode='max')

Compute a metric over a trailing time window.

Examines [t_final − window, t_final] and returns the max or mean of values within that interval.

Parameters:

Name Type Description Default
times ndarray

1-D monotonic time array.

required
values ndarray

1-D values array (same length).

required
window float

Width of the trailing window.

1.0
mode Literal['max', 'mean']

"max" or "mean".

'max'

Returns:

Type Description
tuple[float, float, float]

(metric, window_start, window_end)

Source code in src/qgate/scoring.py
def compute_window_metric(
    times: np.ndarray,
    values: np.ndarray,
    window: float = 1.0,
    mode: Literal["max", "mean"] = "max",
) -> tuple[float, float, float]:
    """Compute a metric over a trailing time window.

    Examines [t_final − window, t_final] and returns the max or mean
    of *values* within that interval.

    Args:
        times:  1-D monotonic time array.
        values: 1-D values array (same length).
        window: Width of the trailing window.
        mode:   ``"max"`` or ``"mean"``.

    Returns:
        (metric, window_start, window_end)
    """
    t_final = float(times[-1])
    window_start = max(0.0, t_final - window)
    mask = (times >= window_start) & (times <= t_final)
    window_values = values[mask]

    if len(window_values) == 0:
        metric = float(values[-1])
    elif mode == "max":
        metric = float(np.max(window_values))
    elif mode == "mean":
        metric = float(np.mean(window_values))
    else:
        raise ValueError(f"Unknown mode: {mode!r}")

    return metric, window_start, t_final

score_fusion(lf_score, hf_score, alpha=0.5, threshold=0.65)

Compute α-weighted score fusion and compare to threshold.

combined = α · lf_score + (1 − α) · hf_score

Parameters:

Name Type Description Default
lf_score float

Low-frequency monitoring score (0–1).

required
hf_score float

High-frequency monitoring score (0–1).

required
alpha float

Weight for the LF component (0 ≤ α ≤ 1).

0.5
threshold float

Accept if combined ≥ threshold.

0.65

Returns:

Type Description
tuple[bool, float]

(accepted, combined_score)

Example::

accepted, score = score_fusion(0.8, 0.6, alpha=0.5, threshold=0.65)
# score = 0.5*0.8 + 0.5*0.6 = 0.70  → accepted = True
Source code in src/qgate/compat/monitors.py
def score_fusion(
    lf_score: float,
    hf_score: float,
    alpha: float = 0.5,
    threshold: float = 0.65,
) -> tuple[bool, float]:
    """Compute α-weighted score fusion and compare to threshold.

    combined = α · lf_score + (1 − α) · hf_score

    Args:
        lf_score:  Low-frequency monitoring score (0–1).
        hf_score:  High-frequency monitoring score (0–1).
        alpha:     Weight for the LF component (0 ≤ α ≤ 1).
        threshold: Accept if combined ≥ threshold.

    Returns:
        (accepted, combined_score)

    Example::

        accepted, score = score_fusion(0.8, 0.6, alpha=0.5, threshold=0.65)
        # score = 0.5*0.8 + 0.5*0.6 = 0.70  → accepted = True
    """
    combined = alpha * lf_score + (1.0 - alpha) * hf_score
    return combined >= threshold, float(combined)

should_abort_batch(probe_pass_rate, theta=0.65)

Decide whether to abort a full batch based on a probe result.

Parameters:

Name Type Description Default
probe_pass_rate float

Fraction of probe shots that passed (0–1).

required
theta float

Proceed only if probe_pass_rate ≥ θ.

0.65

Returns:

Type Description
bool

True if the batch should be aborted (i.e. probe failed).

Example::

# Probe returned 30% pass-rate  →  abort
assert should_abort_batch(0.30, theta=0.65) is True
Source code in src/qgate/compat/monitors.py
def should_abort_batch(
    probe_pass_rate: float,
    theta: float = 0.65,
) -> bool:
    """Decide whether to abort a full batch based on a probe result.

    Args:
        probe_pass_rate: Fraction of probe shots that passed (0–1).
        theta:           Proceed only if probe_pass_rate ≥ θ.

    Returns:
        True if the batch should be **aborted** (i.e. probe failed).

    Example::

        # Probe returned 30% pass-rate  →  abort
        assert should_abort_batch(0.30, theta=0.65) is True
    """
    return probe_pass_rate < theta