Skip to content

API Reference

Graphs

causalrl.scm.graph.CausalGraph

A causal graph: a DAG over observed variables plus bidirected edges that denote unobserved confounders (an ADMG).

Source code in src/causalrl/scm/graph.py
class CausalGraph:
    """A causal graph: a DAG over observed variables plus bidirected edges that
    denote unobserved confounders (an ADMG)."""

    def __init__(
        self,
        directed_edges: list[tuple[str, str]],
        bidirected_edges: list[tuple[str, str]] | None = None,
        nodes: list[str] | None = None,
    ) -> None:
        self._dag: nx.DiGraph[str] = nx.DiGraph()
        self._bi: nx.Graph[str] = nx.Graph()
        if nodes:
            self._dag.add_nodes_from(nodes)
            self._bi.add_nodes_from(nodes)
        self._dag.add_edges_from(directed_edges)
        for a, b in bidirected_edges or []:
            self._bi.add_node(a)
            self._bi.add_node(b)
            self._bi.add_edge(a, b)
        # keep node sets aligned
        self._bi.add_nodes_from(self._dag.nodes)
        self._dag.add_nodes_from(self._bi.nodes)
        if not nx.is_directed_acyclic_graph(self._dag):
            raise CausalGraphError("directed edges must form a DAG (cycle detected)")

    @property
    def nodes(self) -> list[str]:
        return list(self._dag.nodes)

    @property
    def directed_edges(self) -> list[tuple[str, str]]:
        """The directed edges as ``(parent, child)`` pairs."""
        return [(u, v) for u, v in self._dag.edges]

    @property
    def bidirected_edges(self) -> list[tuple[str, str]]:
        """The bidirected (latent-confounding) edges."""
        return [(u, v) for u, v in self._bi.edges]

    def _check(self, node: str) -> None:
        if node not in self._dag:
            raise CausalGraphError(f"unknown node: {node!r}")

    def parents(self, node: str) -> list[str]:
        self._check(node)
        return list(self._dag.predecessors(node))

    def children(self, node: str) -> list[str]:
        self._check(node)
        return list(self._dag.successors(node))

    def topological_order(self) -> list[str]:
        return list(nx.topological_sort(self._dag))

    def is_confounded(self, a: str, b: str) -> bool:
        self._check(a)
        self._check(b)
        return self._bi.has_edge(a, b)

    def has_bidirected_edges(self) -> bool:
        """Whether this graph contains latent-confounding (bidirected) edges."""
        return self._bi.number_of_edges() > 0

    def has_incident_bidirected_edges(self, node: str) -> bool:
        """Whether `node` is incident to any latent-confounding edge."""
        self._check(node)
        return self._bi.degree(node) > 0

    def c_components(self) -> list[set[str]]:
        """Connected components of the bidirected graph (isolated nodes are singletons)."""
        return [set(c) for c in nx.connected_components(self._bi)]

    def remove_incoming_edges(self, node: str) -> CausalGraph:
        """Return a copy with all directed edges into `node` removed (graph mutilation)."""
        self._check(node)
        directed = [(u, v) for u, v in self._dag.edges if v != node]
        bidirected = list(self._bi.edges)
        return CausalGraph(directed, bidirected, nodes=self.nodes)

    def _as_node_set(self, nodes: str | Iterable[str]) -> set[str]:
        ns = {nodes} if isinstance(nodes, str) else set(nodes)
        for n in ns:
            self._check(n)
        return ns

    def ancestors(self, nodes: str | Iterable[str]) -> set[str]:
        """Ancestors of `nodes` via directed edges, INCLUDING the inputs (the inclusive An(·)
        convention used by the identification/POMIS literature)."""
        ns = self._as_node_set(nodes)
        result = set(ns)
        for n in ns:
            result |= set(nx.ancestors(self._dag, n))  # type: ignore[reportUnknownMemberType]
        return result

    def descendants(self, nodes: str | Iterable[str]) -> set[str]:
        """Strict descendants of `nodes` via directed edges (excludes the inputs)."""
        ns = self._as_node_set(nodes)
        result: set[str] = set()
        for n in ns:
            result |= set(nx.descendants(self._dag, n))  # type: ignore[reportUnknownMemberType]
        return result

    def induced_subgraph(self, nodes: str | Iterable[str]) -> CausalGraph:
        """Subgraph on `nodes`: keep directed/bidirected edges with both endpoints in `nodes`."""
        keep = self._as_node_set(nodes)
        directed = [(u, v) for u, v in self._dag.edges if u in keep and v in keep]
        bidirected = [(u, v) for u, v in self._bi.edges if u in keep and v in keep]
        return CausalGraph(directed, bidirected, nodes=list(keep))

    def do_mutilate(self, intervened: str | Iterable[str]) -> CausalGraph:
        """ADMG mutilation for do(intervened): drop incoming directed edges to each
        intervened node AND every bidirected edge incident to an intervened node
        (intervention severs latent confounding into the set). Distinct from
        ``remove_incoming_edges``, which keeps bidirected edges."""
        x = self._as_node_set(intervened)
        directed = [(u, v) for u, v in self._dag.edges if v not in x]
        bidirected = [(u, v) for u, v in self._bi.edges if u not in x and v not in x]
        return CausalGraph(directed, bidirected, nodes=self.nodes)

    def _kept_reach(self, children: Iterable[str], keep: set[str]) -> set[str]:
        """Kept nodes reachable from `children` via directed edges whose interior nodes are
        all outside `keep` (a kept node is a boundary: it is reached but not expanded)."""
        reached: set[str] = set()
        seen: set[str] = set()
        frontier = list(children)
        while frontier:
            w = frontier.pop()
            if w in keep:
                reached.add(w)
            elif w not in seen:
                seen.add(w)
                frontier.extend(list(self._dag.successors(w)))
        return reached

    def latent_projection(self, keep: str | Iterable[str]) -> CausalGraph:
        """Latent projection onto `keep`: marginalize out every node not in `keep`, adding a
        directed edge for each directed path through removed nodes and a bidirected edge for
        each confounding path through removed nodes (the Tian-Pearl / Verma projection).

        Directed ``Vi -> Vj`` when a directed path from ``Vi`` to ``Vj`` has all interior
        nodes removed; bidirected ``Vi <-> Vj`` when a removed common cause (a marginalized
        node, or the latent behind a bidirected edge) reaches both through removed interiors.
        Removing a collider induces no confounding (its parents are never in its reached set)."""
        keep_set = self._as_node_set(keep)
        directed: list[tuple[str, str]] = []
        for vi in keep_set:
            for w in self._kept_reach(list(self._dag.successors(vi)), keep_set):
                directed.append((vi, w))

        bidirected: set[tuple[str, str]] = set()
        sources = [list(self._dag.successors(w)) for w in self._dag.nodes if w not in keep_set]
        sources += [[a, b] for a, b in self._bi.edges]
        for children in sources:
            reached = sorted(self._kept_reach(children, keep_set))
            for i in range(len(reached)):
                for j in range(i + 1, len(reached)):
                    bidirected.add((reached[i], reached[j]))

        return CausalGraph(directed, list(bidirected), nodes=list(keep_set))

directed_edges property

The directed edges as (parent, child) pairs.

bidirected_edges property

The bidirected (latent-confounding) edges.

has_bidirected_edges()

Whether this graph contains latent-confounding (bidirected) edges.

Source code in src/causalrl/scm/graph.py
def has_bidirected_edges(self) -> bool:
    """Whether this graph contains latent-confounding (bidirected) edges."""
    return self._bi.number_of_edges() > 0

has_incident_bidirected_edges(node)

Whether node is incident to any latent-confounding edge.

Source code in src/causalrl/scm/graph.py
def has_incident_bidirected_edges(self, node: str) -> bool:
    """Whether `node` is incident to any latent-confounding edge."""
    self._check(node)
    return self._bi.degree(node) > 0

c_components()

Connected components of the bidirected graph (isolated nodes are singletons).

Source code in src/causalrl/scm/graph.py
def c_components(self) -> list[set[str]]:
    """Connected components of the bidirected graph (isolated nodes are singletons)."""
    return [set(c) for c in nx.connected_components(self._bi)]

remove_incoming_edges(node)

Return a copy with all directed edges into node removed (graph mutilation).

Source code in src/causalrl/scm/graph.py
def remove_incoming_edges(self, node: str) -> CausalGraph:
    """Return a copy with all directed edges into `node` removed (graph mutilation)."""
    self._check(node)
    directed = [(u, v) for u, v in self._dag.edges if v != node]
    bidirected = list(self._bi.edges)
    return CausalGraph(directed, bidirected, nodes=self.nodes)

ancestors(nodes)

Ancestors of nodes via directed edges, INCLUDING the inputs (the inclusive An(·) convention used by the identification/POMIS literature).

Source code in src/causalrl/scm/graph.py
def ancestors(self, nodes: str | Iterable[str]) -> set[str]:
    """Ancestors of `nodes` via directed edges, INCLUDING the inputs (the inclusive An(·)
    convention used by the identification/POMIS literature)."""
    ns = self._as_node_set(nodes)
    result = set(ns)
    for n in ns:
        result |= set(nx.ancestors(self._dag, n))  # type: ignore[reportUnknownMemberType]
    return result

descendants(nodes)

Strict descendants of nodes via directed edges (excludes the inputs).

Source code in src/causalrl/scm/graph.py
def descendants(self, nodes: str | Iterable[str]) -> set[str]:
    """Strict descendants of `nodes` via directed edges (excludes the inputs)."""
    ns = self._as_node_set(nodes)
    result: set[str] = set()
    for n in ns:
        result |= set(nx.descendants(self._dag, n))  # type: ignore[reportUnknownMemberType]
    return result

induced_subgraph(nodes)

Subgraph on nodes: keep directed/bidirected edges with both endpoints in nodes.

Source code in src/causalrl/scm/graph.py
def induced_subgraph(self, nodes: str | Iterable[str]) -> CausalGraph:
    """Subgraph on `nodes`: keep directed/bidirected edges with both endpoints in `nodes`."""
    keep = self._as_node_set(nodes)
    directed = [(u, v) for u, v in self._dag.edges if u in keep and v in keep]
    bidirected = [(u, v) for u, v in self._bi.edges if u in keep and v in keep]
    return CausalGraph(directed, bidirected, nodes=list(keep))

do_mutilate(intervened)

ADMG mutilation for do(intervened): drop incoming directed edges to each intervened node AND every bidirected edge incident to an intervened node (intervention severs latent confounding into the set). Distinct from remove_incoming_edges, which keeps bidirected edges.

Source code in src/causalrl/scm/graph.py
def do_mutilate(self, intervened: str | Iterable[str]) -> CausalGraph:
    """ADMG mutilation for do(intervened): drop incoming directed edges to each
    intervened node AND every bidirected edge incident to an intervened node
    (intervention severs latent confounding into the set). Distinct from
    ``remove_incoming_edges``, which keeps bidirected edges."""
    x = self._as_node_set(intervened)
    directed = [(u, v) for u, v in self._dag.edges if v not in x]
    bidirected = [(u, v) for u, v in self._bi.edges if u not in x and v not in x]
    return CausalGraph(directed, bidirected, nodes=self.nodes)

latent_projection(keep)

Latent projection onto keep: marginalize out every node not in keep, adding a directed edge for each directed path through removed nodes and a bidirected edge for each confounding path through removed nodes (the Tian-Pearl / Verma projection).

Directed Vi -> Vj when a directed path from Vi to Vj has all interior nodes removed; bidirected Vi <-> Vj when a removed common cause (a marginalized node, or the latent behind a bidirected edge) reaches both through removed interiors. Removing a collider induces no confounding (its parents are never in its reached set).

Source code in src/causalrl/scm/graph.py
def latent_projection(self, keep: str | Iterable[str]) -> CausalGraph:
    """Latent projection onto `keep`: marginalize out every node not in `keep`, adding a
    directed edge for each directed path through removed nodes and a bidirected edge for
    each confounding path through removed nodes (the Tian-Pearl / Verma projection).

    Directed ``Vi -> Vj`` when a directed path from ``Vi`` to ``Vj`` has all interior
    nodes removed; bidirected ``Vi <-> Vj`` when a removed common cause (a marginalized
    node, or the latent behind a bidirected edge) reaches both through removed interiors.
    Removing a collider induces no confounding (its parents are never in its reached set)."""
    keep_set = self._as_node_set(keep)
    directed: list[tuple[str, str]] = []
    for vi in keep_set:
        for w in self._kept_reach(list(self._dag.successors(vi)), keep_set):
            directed.append((vi, w))

    bidirected: set[tuple[str, str]] = set()
    sources = [list(self._dag.successors(w)) for w in self._dag.nodes if w not in keep_set]
    sources += [[a, b] for a, b in self._bi.edges]
    for children in sources:
        reached = sorted(self._kept_reach(children, keep_set))
        for i in range(len(reached)):
            for j in range(i + 1, len(reached)):
                bidirected.add((reached[i], reached[j]))

    return CausalGraph(directed, list(bidirected), nodes=list(keep_set))

Intervention Sets

causalrl.identification.intervention_sets.pomis(graph, reward, manipulable=None)

All POMISs for reward: a deduplicated, canonically sorted list of frozensets.

frozenset() (the observational regime) appears when it is possibly optimal. When manipulable is given, only those variables may be intervened on: by r40's Theorem 4 this is the unconstrained POMIS of the latent projection onto manipulable | {reward}.

Source code in src/causalrl/identification/intervention_sets.py
def pomis(
    graph: CausalGraph, reward: str, manipulable: Iterable[str] | None = None
) -> list[frozenset[str]]:
    """All POMISs for `reward`: a deduplicated, canonically sorted list of frozensets.

    ``frozenset()`` (the observational regime) appears when it is possibly optimal. When
    ``manipulable`` is given, only those variables may be intervened on: by r40's Theorem 4
    this is the unconstrained POMIS of the latent projection onto ``manipulable | {reward}``.
    """
    if manipulable is not None:
        graph = graph.latent_projection(set(manipulable) | {reward})
    graph = graph.induced_subgraph(graph.ancestors(reward))
    territory, border = _muct_ib(graph, reward)
    sub = graph.do_mutilate(border).induced_subgraph(territory | border)
    ws = [w for w in _backward_order(sub) if w in (territory - {reward})]
    result = _sub_pomis(sub, reward, ws, frozenset()) | {border}
    return _canonical(result)

causalrl.identification.intervention_sets.minimal_intervention_sets(graph, reward, manipulable=None)

All MISs for reward: a deduplicated, canonically sorted list of frozensets.

When manipulable is given, the result is filtered to sets that avoid the non-manipulable variables (r40: a constrained MIS is just the filtered unconstrained MIS).

Source code in src/causalrl/identification/intervention_sets.py
def minimal_intervention_sets(
    graph: CausalGraph, reward: str, manipulable: Iterable[str] | None = None
) -> list[frozenset[str]]:
    """All MISs for `reward`: a deduplicated, canonically sorted list of frozensets.

    When ``manipulable`` is given, the result is filtered to sets that avoid the
    non-manipulable variables (r40: a constrained MIS is just the filtered unconstrained MIS).
    """
    graph = graph.induced_subgraph(graph.ancestors(reward))
    ws = [w for w in _backward_order(graph) if w != reward]
    result = _canonical(_sub_miss(graph, reward, frozenset(), ws))
    if manipulable is None:
        return result
    allowed = set(manipulable)
    return [s for s in result if s <= allowed]

causalrl.identification.intervention_sets.requires_experiment(graph, treatment, outcome)

Whether learning P(outcome | do(treatment)) requires experimentation (Task 2, "when").

Returns True exactly when the effect is not identifiable from observational (L1) data, so an online experiment (an L2 intervention) is necessary, and False when offline data already suffices. This is the "when to intervene" companion to POMIS's "where": :func:pomis narrows which intervention sets could be optimal, while this decides whether you must intervene at all. Delegates to the complete ID algorithm (:func:causalrl.identification.id_algorithm.is_identifiable_effect).

Source code in src/causalrl/identification/intervention_sets.py
def requires_experiment(graph: CausalGraph, treatment: Iterable[str], outcome: str) -> bool:
    """Whether learning ``P(outcome | do(treatment))`` *requires* experimentation (Task 2, "when").

    Returns ``True`` exactly when the effect is **not** identifiable from observational (L1) data,
    so an online experiment (an L2 intervention) is necessary, and ``False`` when offline data
    already suffices. This is the "when to intervene" companion to POMIS's "where": :func:`pomis`
    narrows *which* intervention sets could be optimal, while this decides *whether* you must
    intervene at all. Delegates to the complete ID algorithm
    (:func:`causalrl.identification.id_algorithm.is_identifiable_effect`).
    """
    return not is_identifiable_effect(graph, treatment, {outcome})

Structural Causal Models

causalrl.scm.scm.StructuralCausalModel

Executable explicit-latent DAG SCM supporting L1/L2/L3 queries.

Bidirected-edge ADMGs are accepted by :class:CausalGraph for analytical graph algorithms, but they are not executable SCMs: shared latent causes must be represented as explicit parent nodes with their own mechanism and exogenous distribution.

Source code in src/causalrl/scm/scm.py
class StructuralCausalModel:
    """Executable explicit-latent DAG SCM supporting L1/L2/L3 queries.

    Bidirected-edge ADMGs are accepted by :class:`CausalGraph` for analytical graph
    algorithms, but they are not executable SCMs: shared latent causes must be represented
    as explicit parent nodes with their own mechanism and exogenous distribution.
    """

    def __init__(
        self,
        graph: CausalGraph,
        mechanisms: dict[str, Mechanism],
        exogenous: dict[str, Distribution],
    ) -> None:
        if graph.has_bidirected_edges():
            raise CausalGraphError(
                "StructuralCausalModel requires an explicit latent-variable DAG; "
                "bidirected ADMG edges are analytical only. Represent shared latent causes "
                "as explicit latent nodes."
            )
        nodes = set(graph.nodes)
        for name, entries in (("mechanisms", mechanisms), ("exogenous distributions", exogenous)):
            missing = nodes - set(entries)
            extra = set(entries) - nodes
            if missing or extra:
                raise CausalGraphError(
                    f"{name} must exactly cover graph nodes; "
                    f"missing={sorted(missing)}, extra={sorted(extra)}"
                )
        for node in graph.nodes:
            expected = set(graph.parents(node))
            declared = set(mechanisms[node].parents)
            if declared != expected:
                raise CausalGraphError(
                    f"mechanism parents for {node!r} do not match graph parents; "
                    f"expected={sorted(expected)}, declared={sorted(declared)}"
                )
        self.graph = graph
        self.mechanisms = mechanisms
        self.exogenous = exogenous
        self._generator = torch.Generator()  # type: ignore[reportPrivateImportUsage]
        self._generator.seed()

    def _sample_exogenous(self, n: int, seed: int | None) -> dict[str, Tensor]:
        """Sample using a private CPU RNG stream without mutating process-global Torch state."""
        generator = torch.Generator()  # type: ignore[reportPrivateImportUsage]
        if seed is None:
            generator.set_state(self._generator.get_state())
        else:
            generator.manual_seed(seed)
        with torch.random.fork_rng():  # type: ignore[reportUnknownMemberType]
            torch.random.set_rng_state(generator.get_state())  # type: ignore[reportUnknownMemberType]
            out = {
                name: dist.sample((n,)).reshape(n).float() for name, dist in self.exogenous.items()
            }
            if seed is None:
                self._generator.set_state(torch.random.get_rng_state())  # type: ignore[reportUnknownMemberType]
        return out

    def _evaluate(self, noise: dict[str, Tensor]) -> dict[str, Tensor]:
        values: dict[str, Tensor] = {}
        for node in self.graph.topological_order():
            mech = self.mechanisms[node]
            parent_values = {p: values[p] for p in self.graph.parents(node)}
            values[node] = mech(parent_values, noise[node])
        return values

    def do(self, interventions: Mapping[str, Value]) -> StructuralCausalModel:
        """Layer 2: return the mutilated SCM under do(interventions). Original is unchanged."""
        graph = self.graph
        mechanisms = dict(self.mechanisms)
        for node, value in interventions.items():
            if node not in self.mechanisms:
                raise CausalGraphError(f"cannot intervene on unknown node: {node!r}")
            graph = graph.remove_incoming_edges(node)
            if isinstance(value, (int, float, bool)):
                const = float(value)
                mechanisms[node] = FunctionalMechanism(
                    [],
                    lambda pa, u, _c=const: torch.full_like(u, _c),  # type: ignore[reportPrivateImportUsage]
                )
            else:
                # Per-sample vector: pin elementwise, broadcast-checked against the unit count.
                mechanisms[node] = FunctionalMechanism(
                    [],
                    lambda pa, u, _v=value: _broadcast_value(_v, u.shape[0]).to(u.dtype),
                )
        return StructuralCausalModel(graph, mechanisms, self.exogenous)

    def abduct(
        self,
        evidence: dict[str, float] | None = None,
        *,
        known: Mapping[str, Value] | None = None,
        n: int = 20_000,
        seed: int | None = None,
        atol: float = 1e-6,
    ) -> ExogenousPosterior:
        """Layer 3, step 1 — infer the exogenous posterior given evidence/known noise.

        ``known`` pins supplied exogenous values *exactly* (the exact, continuous path: no
        rejection). Remaining exogenous are sampled; if ``evidence`` is given they are
        rejection-filtered so the factual evaluation matches ``evidence`` within ``atol``.
        Returns an :class:`ExogenousPosterior`; call its ``predict(do=...)``.
        """
        known = known or {}
        bad = set(known) - set(self.exogenous)
        if bad:
            raise CausalGraphError(f"unknown exogenous node(s): {sorted(bad)}")
        # Exact path: all-known exogenous, no evidence to reject against.
        if known and not evidence:
            noise = {
                name: (
                    _broadcast_value(known[name], n)
                    if name in known
                    else dist.sample((n,)).reshape(n).float()
                )
                for name, dist in self.exogenous.items()
            }
            return ExogenousPosterior(self, noise)
        # Evidence path: sample, pin any known, reject to match evidence.
        sampled = self._sample_exogenous(n, seed)
        for name, val in known.items():
            sampled[name] = _broadcast_value(val, n)
        factual = self._evaluate(sampled)
        mask = torch.ones(n, dtype=torch.bool)  # type: ignore[reportPrivateImportUsage]
        for node, val in (evidence or {}).items():
            if node not in self.mechanisms:
                raise CausalGraphError(f"unknown evidence node: {node!r}")
            mask &= (factual[node] - float(val)).abs() <= atol
        if int(mask.sum()) == 0:
            raise RealizabilityError(
                f"no exogenous draws match evidence {evidence!r}; "
                "increase n or check that the evidence has nonzero probability"
            )
        return ExogenousPosterior(self, {name: u[mask] for name, u in sampled.items()})

    def counterfactual(
        self,
        evidence: dict[str, float],
        interventions: Mapping[str, Value],
        n: int,
        *,
        seed: int | None = None,
        atol: float = 1e-6,
    ) -> dict[str, Tensor]:
        """Layer 3: abduction-action-prediction. Sugar over :meth:`abduct` + predict."""
        post = self.abduct(evidence, n=n, seed=seed, atol=atol)
        return post.predict(do=interventions or None)

    def see(self, n: int, *, seed: int | None = None) -> dict[str, Tensor]:
        """Layer 1: draw n observational samples P(V)."""
        noise = self._sample_exogenous(n, seed)
        return self._evaluate(noise)

do(interventions)

Layer 2: return the mutilated SCM under do(interventions). Original is unchanged.

Source code in src/causalrl/scm/scm.py
def do(self, interventions: Mapping[str, Value]) -> StructuralCausalModel:
    """Layer 2: return the mutilated SCM under do(interventions). Original is unchanged."""
    graph = self.graph
    mechanisms = dict(self.mechanisms)
    for node, value in interventions.items():
        if node not in self.mechanisms:
            raise CausalGraphError(f"cannot intervene on unknown node: {node!r}")
        graph = graph.remove_incoming_edges(node)
        if isinstance(value, (int, float, bool)):
            const = float(value)
            mechanisms[node] = FunctionalMechanism(
                [],
                lambda pa, u, _c=const: torch.full_like(u, _c),  # type: ignore[reportPrivateImportUsage]
            )
        else:
            # Per-sample vector: pin elementwise, broadcast-checked against the unit count.
            mechanisms[node] = FunctionalMechanism(
                [],
                lambda pa, u, _v=value: _broadcast_value(_v, u.shape[0]).to(u.dtype),
            )
    return StructuralCausalModel(graph, mechanisms, self.exogenous)

abduct(evidence=None, *, known=None, n=20000, seed=None, atol=1e-06)

Layer 3, step 1 — infer the exogenous posterior given evidence/known noise.

known pins supplied exogenous values exactly (the exact, continuous path: no rejection). Remaining exogenous are sampled; if evidence is given they are rejection-filtered so the factual evaluation matches evidence within atol. Returns an :class:ExogenousPosterior; call its predict(do=...).

Source code in src/causalrl/scm/scm.py
def abduct(
    self,
    evidence: dict[str, float] | None = None,
    *,
    known: Mapping[str, Value] | None = None,
    n: int = 20_000,
    seed: int | None = None,
    atol: float = 1e-6,
) -> ExogenousPosterior:
    """Layer 3, step 1 — infer the exogenous posterior given evidence/known noise.

    ``known`` pins supplied exogenous values *exactly* (the exact, continuous path: no
    rejection). Remaining exogenous are sampled; if ``evidence`` is given they are
    rejection-filtered so the factual evaluation matches ``evidence`` within ``atol``.
    Returns an :class:`ExogenousPosterior`; call its ``predict(do=...)``.
    """
    known = known or {}
    bad = set(known) - set(self.exogenous)
    if bad:
        raise CausalGraphError(f"unknown exogenous node(s): {sorted(bad)}")
    # Exact path: all-known exogenous, no evidence to reject against.
    if known and not evidence:
        noise = {
            name: (
                _broadcast_value(known[name], n)
                if name in known
                else dist.sample((n,)).reshape(n).float()
            )
            for name, dist in self.exogenous.items()
        }
        return ExogenousPosterior(self, noise)
    # Evidence path: sample, pin any known, reject to match evidence.
    sampled = self._sample_exogenous(n, seed)
    for name, val in known.items():
        sampled[name] = _broadcast_value(val, n)
    factual = self._evaluate(sampled)
    mask = torch.ones(n, dtype=torch.bool)  # type: ignore[reportPrivateImportUsage]
    for node, val in (evidence or {}).items():
        if node not in self.mechanisms:
            raise CausalGraphError(f"unknown evidence node: {node!r}")
        mask &= (factual[node] - float(val)).abs() <= atol
    if int(mask.sum()) == 0:
        raise RealizabilityError(
            f"no exogenous draws match evidence {evidence!r}; "
            "increase n or check that the evidence has nonzero probability"
        )
    return ExogenousPosterior(self, {name: u[mask] for name, u in sampled.items()})

counterfactual(evidence, interventions, n, *, seed=None, atol=1e-06)

Layer 3: abduction-action-prediction. Sugar over :meth:abduct + predict.

Source code in src/causalrl/scm/scm.py
def counterfactual(
    self,
    evidence: dict[str, float],
    interventions: Mapping[str, Value],
    n: int,
    *,
    seed: int | None = None,
    atol: float = 1e-6,
) -> dict[str, Tensor]:
    """Layer 3: abduction-action-prediction. Sugar over :meth:`abduct` + predict."""
    post = self.abduct(evidence, n=n, seed=seed, atol=atol)
    return post.predict(do=interventions or None)

see(n, *, seed=None)

Layer 1: draw n observational samples P(V).

Source code in src/causalrl/scm/scm.py
def see(self, n: int, *, seed: int | None = None) -> dict[str, Tensor]:
    """Layer 1: draw n observational samples P(V)."""
    noise = self._sample_exogenous(n, seed)
    return self._evaluate(noise)

Assumption-Aware Agents

causalrl.agents.dovi.DOVI

Bases: Agent

Deconfounded Optimistic Value Iteration (Wang, Yang, Wang, Bareinboim 2021), tabular.

Finite-horizon backward induction (LSVI form) whose optimism is capped by the Manski upper causal bound on each (state, action)'s immediate reward, deconfounding the value estimate. With horizon H::

r̃(s,a)   = min( mean_online(s,a) + ucb_bonus(s,a),  hi_Manski(s,a) )
Q_h(s,a) = r̃(s,a) + Σ_s' P̂(s'|s,a) · V_{h+1}(s')
V_h(s)   = max_a Q_h(s,a),    V_{H+1} ≡ 0,    h = H, H-1, …, 1

is the empirical next-state distribution pooled from offline logs and online steps; transitions that ended an episode (done) carry zero future value (the absorbing terminal). At H = 1 the backup reduces exactly to v0.2's immediate Manski ceiling, so the horizon-1 DTR result is preserved.

Bound validity: the backup is a certified optimistic bound on the return only when the dynamics do not depend on the hidden confounder (true of SequentialDTREnv, whose transitions are U-independent). On ConfoundedGridworld the behavior policy steers around the hidden hazard, so is confounded and the backup is heuristic value propagation, not a certified bound.

Source code in src/causalrl/agents/dovi.py
class DOVI(Agent):
    """Deconfounded Optimistic Value Iteration (Wang, Yang, Wang, Bareinboim 2021), tabular.

    Finite-horizon backward induction (LSVI form) whose optimism is capped by the Manski
    upper causal bound on each (state, action)'s immediate reward, deconfounding the value
    estimate. With horizon ``H``::

        r̃(s,a)   = min( mean_online(s,a) + ucb_bonus(s,a),  hi_Manski(s,a) )
        Q_h(s,a) = r̃(s,a) + Σ_s' P̂(s'|s,a) · V_{h+1}(s')
        V_h(s)   = max_a Q_h(s,a),    V_{H+1} ≡ 0,    h = H, H-1, …, 1

    ``P̂`` is the empirical next-state distribution pooled from offline logs and online steps;
    transitions that ended an episode (``done``) carry zero future value (the absorbing
    terminal). At ``H = 1`` the backup reduces exactly to v0.2's immediate Manski ceiling, so
    the horizon-1 DTR result is preserved.

    Bound validity: the backup is a certified optimistic bound on the return only when the
    dynamics do not depend on the hidden confounder (true of ``SequentialDTREnv``, whose
    transitions are U-independent). On ``ConfoundedGridworld`` the behavior policy steers
    around the hidden hazard, so ``P̂`` is confounded and the backup is heuristic value
    propagation, not a certified bound.
    """

    def __init__(
        self,
        n_states: int,
        n_actions: int,
        horizon: int,
        seed: int | None = None,
        *,
        reward_max: float = 1.0,
        transition_assumption: TransitionAssumption = "unknown",
        allow_heuristic: bool = False,
    ) -> None:
        if transition_assumption not in ("unknown", "unconfounded"):
            raise ValueError(
                "transition_assumption must be 'unknown' or 'unconfounded', "
                f"got {transition_assumption!r}"
            )
        self.n_states = n_states
        self.n_actions = n_actions
        self.horizon = horizon
        self.reward_max = reward_max
        self.transition_assumption = transition_assumption
        self.allow_heuristic = allow_heuristic
        self._counts = np.zeros((n_states, n_actions))
        self._sums = np.zeros((n_states, n_actions))
        # Transition model: non-terminal next-state counts, and total transitions seen.
        self._trans = np.zeros((n_states, n_actions, n_states))
        self._trans_n = np.zeros((n_states, n_actions))
        self._t = 1
        self._rng = np.random.default_rng(seed)
        self._ceiling = np.ones((n_states, n_actions))  # Manski upper bound on immediate reward
        self._q: np.ndarray | None = None  # cached plan, shape (H+1, S, A), indexed h = 1..H

    @property
    def is_certified(self) -> bool:
        """Whether value propagation is within the documented causal guarantee."""
        return self.horizon <= 1 or self.transition_assumption == "unconfounded"

    def ingest_offline(self, dataset: ConfoundedTrajectoryDataset) -> None:
        if not self.is_certified and not self.allow_heuristic:
            raise UnverifiedAssumptionError(
                "multi-stage DOVI requires transition_assumption='unconfounded' for "
                "certified value propagation; set allow_heuristic=True to run without "
                "that transition guarantee"
            )
        for (s, a), (_lo, hi) in bounds_table(dataset).items():
            self._ceiling[s, a] = hi
        for tr in dataset.transitions:
            self._trans_n[tr.state, tr.action] += 1.0
            if not tr.done:
                self._trans[tr.state, tr.action, tr.next_state] += 1.0
        self._q = None

    def _r_tilde_table(self) -> np.ndarray:
        n = self._counts
        safe_n = np.maximum(n, 1.0)
        mean = np.where(n > 0, self._sums / safe_n, 0.0)
        bonus = np.where(
            n > 0,
            self.reward_max * np.sqrt(2.0 * math.log(self._t) / safe_n),
            self.reward_max,
        )
        return np.minimum(mean + bonus, self._ceiling)

    def _plan(self) -> np.ndarray:
        h_max, n_states, n_actions = self.horizon, self.n_states, self.n_actions
        rtil = self._r_tilde_table()
        denom = np.maximum(self._trans_n, 1.0)  # (S, A)
        q = np.zeros((h_max + 1, n_states, n_actions))
        v_next = np.zeros(n_states)  # V_{H+1}
        for h in range(h_max, 0, -1):
            # boot[s,a] = (Σ_s' trans[s,a,s'] · V_next[s']) / total transitions seen at (s,a)
            boot = (self._trans @ v_next) / denom
            q[h] = rtil + boot
            v_next = q[h].max(axis=1)
        return q

    def optimistic_q(self, state: int, action: int, h: int = 1) -> float:
        if self._q is None:
            self._q = self._plan()
        return float(self._q[h, state, action])

    def act(self, observation: dict[str, Any]) -> int:
        s = int(observation["state"])
        h = min(int(observation.get("t", 0)) + 1, self.horizon)
        if self._q is None:
            self._q = self._plan()
        scores = self._q[h, s]
        best = float(scores.max())
        winners = [a for a in range(self.n_actions) if scores[a] >= best - 1e-12]
        return int(self._rng.choice(winners))

    def update(self, observation: dict[str, Any], action: int, reward: float) -> None:
        s = int(observation["state"])
        self._counts[s, action] += 1.0
        self._sums[s, action] += reward
        self._t += 1
        self._q = None

    def observe_transition(self, state: int, action: int, next_state: int, done: bool) -> None:
        self._trans_n[state, action] += 1.0
        if not done:
            self._trans[state, action, next_state] += 1.0
        self._q = None

is_certified property

Whether value propagation is within the documented causal guarantee.

causalrl.agents.scbandit.POMISThompsonSampling

Bases: _ArmSubsetThompsonSampling

Thompson sampling over only the arms whose intervened-variable set is a POMIS.

manipulable (the variables you may intervene on) must be given explicitly; non-manipulable variables are handled by the POMIS engine via latent projection (r40). When every non-reward variable is manipulable this matches the unconstrained POMIS.

Source code in src/causalrl/agents/scbandit.py
class POMISThompsonSampling(_ArmSubsetThompsonSampling):
    """Thompson sampling over only the arms whose intervened-variable set is a POMIS.

    ``manipulable`` (the variables you may intervene on) must be given explicitly; non-manipulable
    variables are handled by the POMIS engine via latent projection (r40). When every non-reward
    variable is manipulable this matches the unconstrained POMIS.
    """

    def __init__(
        self,
        graph: CausalGraph,
        reward: str,
        arms: list[dict[str, int]],
        seed: int | None = None,
        *,
        manipulable: Iterable[str] | None = None,
    ) -> None:
        arm_variables = {v for arm in arms for v in arm}
        if manipulable is None:
            raise ValueError(
                "POMISThompsonSampling requires manipulable=<intervenable variables>; "
                "inferring it from the arms is no longer supported"
            )
        permitted = set(manipulable)
        unexpected = arm_variables - permitted
        if unexpected:
            raise ValueError(f"arms intervene on non-manipulable variables: {sorted(unexpected)}")
        pomis_sets = set(pomis(graph, reward, manipulable=permitted))
        allowed = [i for i, arm in enumerate(arms) if frozenset(arm.keys()) in pomis_sets]
        super().__init__(allowed, seed)

Benchmark Reports

causalrl.eval.benchmark.BenchmarkEstimate dataclass

A per-seed benchmark measurement with simple descriptive uncertainty.

Source code in src/causalrl/eval/benchmark.py
@dataclass(frozen=True)
class BenchmarkEstimate:
    """A per-seed benchmark measurement with simple descriptive uncertainty."""

    name: str
    seeds: tuple[int, ...]
    values: tuple[float, ...]
    mean: float
    std: float
    ci95_low: float
    ci95_high: float

    @classmethod
    def from_values(
        cls, name: str, *, seeds: Sequence[int], values: Sequence[float]
    ) -> BenchmarkEstimate:
        seed_tuple = tuple(seeds)
        value_tuple = tuple(float(value) for value in values)
        if not seed_tuple or len(seed_tuple) != len(value_tuple):
            raise ValueError("seeds and values must be non-empty sequences of equal length")
        mean = statistics.fmean(value_tuple)
        std = statistics.stdev(value_tuple) if len(value_tuple) > 1 else 0.0
        margin = 1.96 * std / math.sqrt(len(value_tuple))
        return cls(name, seed_tuple, value_tuple, mean, std, mean - margin, mean + margin)

    def to_dict(self) -> dict[str, object]:
        return {
            "name": self.name,
            "seeds": list(self.seeds),
            "values": list(self.values),
            "mean": self.mean,
            "std": self.std,
            "ci95_low": self.ci95_low,
            "ci95_high": self.ci95_high,
        }

causalrl.eval.benchmark.run_confounded_chain_benchmark(*, seeds=(0, 1, 2, 3, 4), n_steps=8000, tail_window=2000, n_mc=2000)

Report POMIS, brute-force, and fixed-set behavior on the confounded-chain demo.

Source code in src/causalrl/eval/benchmark.py
def run_confounded_chain_benchmark(
    *,
    seeds: Sequence[int] = (0, 1, 2, 3, 4),
    n_steps: int = 8000,
    tail_window: int = 2000,
    n_mc: int = 2000,
) -> dict[str, BenchmarkEstimate]:
    """Report POMIS, brute-force, and fixed-set behavior on the confounded-chain demo."""

    def env_factory(seed: int) -> StructuralCausalBanditEnv:
        return make_confounded_chain_env(seed=seed, n_mc=n_mc)

    return {
        "pomis": _tail_mean_rewards(
            "pomis",
            seeds=seeds,
            n_steps=n_steps,
            tail_window=tail_window,
            env_factory=env_factory,
            agent_factory=lambda env, seed: POMISThompsonSampling(
                env.graph, env.reward, env.arms, seed=seed, manipulable=env.manipulable
            ),
        ),
        "brute_force": _tail_mean_rewards(
            "brute_force",
            seeds=seeds,
            n_steps=n_steps,
            tail_window=tail_window,
            env_factory=env_factory,
            agent_factory=lambda env, seed: BruteForceInterventionTS(env.arms, seed=seed),
        ),
        "fixed_set": _tail_mean_rewards(
            "fixed_set",
            seeds=seeds,
            n_steps=n_steps,
            tail_window=tail_window,
            env_factory=env_factory,
            agent_factory=lambda env, seed: FixedSetThompsonSampling(env.arms, {"X3"}, seed=seed),
        ),
    }

causalrl.eval.benchmark.run_frontdoor_benchmark(*, seeds=(0, 1, 2, 3, 4), n_steps=30000, tail_window=10000, n_mc=20000)

Report manipulability-aware and naive-filter behavior on the front-door demo.

Source code in src/causalrl/eval/benchmark.py
def run_frontdoor_benchmark(
    *,
    seeds: Sequence[int] = (0, 1, 2, 3, 4),
    n_steps: int = 30000,
    tail_window: int = 10000,
    n_mc: int = 20000,
) -> dict[str, BenchmarkEstimate]:
    """Report manipulability-aware and naive-filter behavior on the front-door demo."""

    def env_factory(seed: int) -> StructuralCausalBanditEnv:
        return make_frontdoor_env(seed=seed, n_mc=n_mc)

    return {
        "manipulability_aware": _tail_mean_rewards(
            "manipulability_aware",
            seeds=seeds,
            n_steps=n_steps,
            tail_window=tail_window,
            env_factory=env_factory,
            agent_factory=lambda env, seed: POMISThompsonSampling(
                env.graph, env.reward, env.arms, seed=seed, manipulable=env.manipulable
            ),
        ),
        "naive_filter": _tail_mean_rewards(
            "naive_filter",
            seeds=seeds,
            n_steps=n_steps,
            tail_window=tail_window,
            env_factory=env_factory,
            agent_factory=lambda env, seed: NaivePOMISThompsonSampling(
                env.graph, env.reward, env.arms, seed=seed
            ),
        ),
    }

Counterfactual Decision-Making (L3 / ETT)

causalrl.identification.counterfactual.counterfactual_expectation(scm, *, outcome, intervention, evidence, n=20000, seed=None)

Return E[ outcome_{do(intervention)} | evidence ] (a Layer-3 counterfactual mean).

Wraps :meth:StructuralCausalModel.counterfactual (abduction-action-prediction) and averages the outcome over the retained, evidence-consistent units. With empty evidence this reduces to the interventional mean E[outcome_{do(intervention)}].

Source code in src/causalrl/identification/counterfactual.py
def counterfactual_expectation(
    scm: StructuralCausalModel,
    *,
    outcome: str,
    intervention: dict[str, float],
    evidence: dict[str, float],
    n: int = 20_000,
    seed: int | None = None,
) -> float:
    """Return ``E[ outcome_{do(intervention)} | evidence ]`` (a Layer-3 counterfactual mean).

    Wraps :meth:`StructuralCausalModel.counterfactual` (abduction-action-prediction) and averages
    the outcome over the retained, evidence-consistent units. With empty ``evidence`` this reduces
    to the interventional mean ``E[outcome_{do(intervention)}]``.
    """
    _validate_nodes(scm, {outcome} | set(intervention) | set(evidence))
    result = scm.counterfactual(evidence, intervention, n, seed=seed)
    return float(result[outcome].float().mean().item())

causalrl.identification.counterfactual.effect_of_treatment_on_treated(scm, *, treatment, outcome, treated, control, n=20000, seed=None)

Effect of Treatment on the Treated: E[Y_{treated} - Y_{control} | treatment = treated].

The treatment effect among the subpopulation that actually received treated (Pearl, Causality §8.2.1). Under confounding this differs from the average treatment effect. Both potential outcomes use the same seed (common random numbers), so they are evaluated on the same abducted units and the difference is matched and low-variance.

Source code in src/causalrl/identification/counterfactual.py
def effect_of_treatment_on_treated(
    scm: StructuralCausalModel,
    *,
    treatment: str,
    outcome: str,
    treated: float,
    control: float,
    n: int = 20_000,
    seed: int | None = None,
) -> float:
    """Effect of Treatment on the Treated: ``E[Y_{treated} - Y_{control} | treatment = treated]``.

    The treatment effect among the subpopulation that actually received ``treated`` (Pearl,
    *Causality* §8.2.1). Under confounding this differs from the average treatment effect. Both
    potential outcomes use the same ``seed`` (common random numbers), so they are evaluated on the
    same abducted units and the difference is matched and low-variance.
    """
    factual = counterfactual_expectation(
        scm,
        outcome=outcome,
        intervention={treatment: treated},
        evidence={treatment: treated},
        n=n,
        seed=seed,
    )
    counterfactual = counterfactual_expectation(
        scm,
        outcome=outcome,
        intervention={treatment: control},
        evidence={treatment: treated},
        n=n,
        seed=seed,
    )
    return factual - counterfactual

causalrl.agents.counterfactual.CounterfactualOptimalPolicy

Bases: Agent

Plays argmax_a E[Y_{do(action_node=a)} | intent] from a known SCM.

The Layer-3 oracle: it precomputes the Regret Decision Criterion table once at construction and then acts greedily on the observed intent. update is a no-op — the model is known, so there is nothing to learn online. The computed table is exposed as decision_table for inspection.

Source code in src/causalrl/agents/counterfactual.py
class CounterfactualOptimalPolicy(Agent):
    """Plays ``argmax_a E[Y_{do(action_node=a)} | intent]`` from a known SCM.

    The Layer-3 oracle: it precomputes the Regret Decision Criterion table once at construction and
    then acts greedily on the observed intent. ``update`` is a no-op — the model is known, so there
    is nothing to learn online. The computed table is exposed as ``decision_table`` for inspection.
    """

    def __init__(
        self,
        scm: StructuralCausalModel,
        *,
        outcome: str,
        action_node: str,
        intent_node: str,
        arms: Sequence[int],
        intents: Sequence[int],
        intent_key: str = "intuition",
        n: int = 20_000,
        seed: int | None = None,
    ) -> None:
        self._intent_key = intent_key
        self.decision_table = regret_decision_table(
            scm,
            outcome=outcome,
            action_node=action_node,
            intent_node=intent_node,
            arms=arms,
            intents=intents,
            n=n,
            seed=seed,
        )
        self._best_arm: dict[int, int] = {
            intent: max(row, key=lambda arm: row[arm])
            for intent, row in self.decision_table.items()
        }

    def act(self, observation: dict[str, Any]) -> int:
        return self._best_arm[int(observation[self._intent_key])]

    def update(self, observation: dict[str, Any], action: int, reward: float) -> None:
        """No-op: the SCM is known, so there is nothing to learn online."""

update(observation, action, reward)

No-op: the SCM is known, so there is nothing to learn online.

Source code in src/causalrl/agents/counterfactual.py
def update(self, observation: dict[str, Any], action: int, reward: float) -> None:
    """No-op: the SCM is known, so there is nothing to learn online."""

Transportability

causalrl.identification.transport.SelectionDiagram dataclass

A causal graph plus the variables whose mechanism differs between source and target.

Each selection variable carries an implicit selection node S -> variable (Bareinboim & Pearl). selection_variables must be a subset of graph.nodes.

Source code in src/causalrl/identification/transport.py
@dataclass(frozen=True)
class SelectionDiagram:
    """A causal graph plus the variables whose mechanism differs between source and target.

    Each selection variable carries an implicit selection node ``S -> variable`` (Bareinboim &
    Pearl). ``selection_variables`` must be a subset of ``graph.nodes``.
    """

    graph: CausalGraph
    selection_variables: frozenset[str]

    def __post_init__(self) -> None:
        unknown = set(self.selection_variables) - set(self.graph.nodes)
        if unknown:
            raise CausalGraphError(f"selection variables not in graph: {sorted(unknown)}")
        if any(n.startswith(SEL) or n.startswith(LAT) for n in self.graph.nodes):
            raise CausalGraphError(f"node names must not start with {SEL!r} or {LAT!r}")

causalrl.identification.transport.transport_formula(diagram, *, treatment, outcome, max_adjustment_size=3)

Return the transport formula for P*(outcome | do(treatment)), or None if it is not provably transportable within the supported class (direct / S-admissible adjustment).

Source code in src/causalrl/identification/transport.py
def transport_formula(
    diagram: SelectionDiagram,
    *,
    treatment: str,
    outcome: str,
    max_adjustment_size: int = 3,
) -> TransportFormula | None:
    """Return the transport formula for ``P*(outcome | do(treatment))``, or ``None`` if it is not
    provably transportable within the supported class (direct / S-admissible adjustment)."""
    graph = diagram.graph
    for name in (treatment, outcome):
        if name not in graph.nodes:
            raise CausalGraphError(f"unknown node: {name!r}")
    selection = diagram.selection_variables
    g_bar_x = graph.do_mutilate(treatment)  # the interventional graph G_{\bar X}
    s_nodes = selection_nodes(selection)

    # Case 1: direct transportability — the interventional law is invariant (S ⊥ Y | X).
    if not selection or d_separated(g_bar_x, s_nodes, {outcome}, {treatment}, selection):
        return TransportFormula(
            "direct",
            frozenset(),
            f"P*({outcome}|do({treatment})) = P({outcome}|do({treatment}))",
        )

    # Case 2: an S-admissible adjustment set Z (back-door admissible and S ⊥ Y | Z, X).
    descendants = graph.descendants(treatment)
    candidates = sorted(
        v for v in graph.nodes if v not in descendants and v not in {treatment, outcome}
    )
    for size in range(min(max_adjustment_size, len(candidates)) + 1):
        for combo in combinations(candidates, size):
            z = set(combo)
            if not is_backdoor_admissible(graph, treatment, outcome, z):
                continue
            if not d_separated(g_bar_x, s_nodes, {outcome}, z | {treatment}, selection):
                continue
            expr = f"P*({outcome}|do {treatment}) = sum_z P({outcome}|{treatment},z) P*(z)"
            return TransportFormula("adjustment", frozenset(z), expr)
    return None

causalrl.identification.transport.is_transportable(diagram, *, treatment, outcome)

Whether the target effect is provably transportable (see :func:transport_formula).

Source code in src/causalrl/identification/transport.py
def is_transportable(diagram: SelectionDiagram, *, treatment: str, outcome: str) -> bool:
    """Whether the target effect is provably transportable (see :func:`transport_formula`)."""
    return transport_formula(diagram, treatment=treatment, outcome=outcome) is not None

causalrl.identification.transport.transported_effect(formula, *, treatment, treated_value, outcome, source, target, n=40000, seed=None)

Compute E*[outcome | do(treatment=treated_value)] via the transport formula.

direct: the source interventional mean transfers unchanged. adjustment: reweight the source conditionals E[outcome | treatment, z] by the target covariate marginal P*(z) (discrete z assumed; the demo uses binary covariates). Strata absent from the source sample are skipped.

Source code in src/causalrl/identification/transport.py
def transported_effect(
    formula: TransportFormula,
    *,
    treatment: str,
    treated_value: float,
    outcome: str,
    source: StructuralCausalModel,
    target: StructuralCausalModel,
    n: int = 40_000,
    seed: int | None = None,
) -> float:
    """Compute ``E*[outcome | do(treatment=treated_value)]`` via the transport `formula`.

    ``direct``: the source interventional mean transfers unchanged. ``adjustment``: reweight the
    source conditionals ``E[outcome | treatment, z]`` by the *target* covariate marginal ``P*(z)``
    (discrete `z` assumed; the demo uses binary covariates). Strata absent from the source sample
    are skipped.
    """
    if formula.kind == "direct":
        return counterfactual_expectation(
            source,
            outcome=outcome,
            intervention={treatment: treated_value},
            evidence={},
            n=n,
            seed=seed,
        )

    z_vars = sorted(formula.adjustment_set)
    src = source.see(n, seed=seed)
    tgt = target.see(n, seed=None if seed is None else seed + 1)
    x_mask = src[treatment] == float(treated_value)
    if not z_vars:
        kept = int(x_mask.sum().item())
        return float(src[outcome][x_mask].float().mean().item()) if kept else 0.0

    value_lists: list[list[float]] = []
    for z in z_vars:
        column: list[float] = src[z].tolist()  # type: ignore[reportUnknownMemberType]
        value_lists.append(sorted(set(column)))
    total = 0.0
    for combo in product(*value_lists):
        src_sel = x_mask.clone()
        tgt_sel: torch.Tensor | None = None
        for z, value in zip(z_vars, combo, strict=True):
            src_sel = src_sel & (src[z] == value)
            cond = tgt[z] == value
            tgt_sel = cond if tgt_sel is None else tgt_sel & cond
        assert tgt_sel is not None
        if int(src_sel.sum().item()) == 0:
            continue
        e_y = float(src[outcome][src_sel].float().mean().item())
        p_z = float(tgt_sel.float().mean().item())
        total += e_y * p_z
    return total

causalrl.identification.transport.is_backdoor_admissible(graph, treatment, outcome, z)

Back-door criterion: z has no descendant of treatment and blocks every back-door path (treatment ⊥ outcome | z in the graph with treatment's outgoing edges removed).

Source code in src/causalrl/identification/transport.py
def is_backdoor_admissible(graph: CausalGraph, treatment: str, outcome: str, z: set[str]) -> bool:
    """Back-door criterion: `z` has no descendant of `treatment` and blocks every back-door path
    (``treatment ⊥ outcome | z`` in the graph with `treatment`'s outgoing edges removed)."""
    if z & graph.descendants(treatment):
        return False
    underline = CausalGraph(
        [(u, v) for u, v in graph.directed_edges if u != treatment],
        graph.bidirected_edges,
        nodes=graph.nodes,
    )
    return d_separated(underline, {treatment}, {outcome}, z)

General Transportability (sID)

causalrl.identification.transport.transport_estimand(diagram, *, treatment, outcome)

The general (sID) transport estimand for P*(outcome | do(treatment)) over diagram.

A :class:SelectionDiagram adapter over :func:causalrl.identification.id_algorithm.identify_transport: each target c-factor is taken from the source if its mechanism is invariant, else identified from the target. Raises :class:~causalrl.exceptions.NotIdentifiableError when not transportable. Generalizes the direct / S-admissible-adjustment :func:transport_formula (which returns a readable closed form for those two cases).

Source code in src/causalrl/identification/transport.py
def transport_estimand(diagram: SelectionDiagram, *, treatment: str, outcome: str) -> Estimand:
    """The general (sID) transport estimand for ``P*(outcome | do(treatment))`` over ``diagram``.

    A :class:`SelectionDiagram` adapter over
    :func:`causalrl.identification.id_algorithm.identify_transport`: each target c-factor is taken
    from the source if its mechanism is invariant, else identified from the target. Raises
    :class:`~causalrl.exceptions.NotIdentifiableError` when not transportable. Generalizes the
    direct / S-admissible-adjustment :func:`transport_formula` (which returns a readable closed form
    for those two cases).
    """
    return identify_transport(diagram.graph, [treatment], [outcome], diagram.selection_variables)

causalrl.identification.id_algorithm.identify_transport(graph, treatment, outcome, selection)

Return an :class:Estimand for the target effect P*(outcome | do(treatment)) across a selection diagram, or raise :class:NotIdentifiableError.

selection lists the variables whose mechanism differs between source and target (each carries an implicit selection node). The target effect is decomposed into c-factors; a c-factor with no selection-marked variable is invariant and taken from the source distribution, while one that contains a selection-marked variable is identified from the target's observational distribution. With an empty selection this reduces to source identification (the ID algorithm).

This is the sound c-factor-routing transportability algorithm; it subsumes direct and S-admissible-adjustment transportability. It is not the complete sID (a c-factor identifiable only by combining source and target is reported as non-transportable rather than guessed).

Faithful to J. Pearl, E. Bareinboim, Transportability of Causal and Statistical Relations, AAAI 2011 / External Validity, Statistical Science 2014, via Tian's c-factor decomposition.

Source code in src/causalrl/identification/id_algorithm.py
def identify_transport(
    graph: CausalGraph,
    treatment: Iterable[str],
    outcome: Iterable[str],
    selection: Iterable[str],
) -> Estimand:
    """Return an :class:`Estimand` for the *target* effect ``P*(outcome | do(treatment))`` across a
    selection diagram, or raise :class:`NotIdentifiableError`.

    ``selection`` lists the variables whose mechanism differs between source and target (each
    carries an implicit selection node). The target effect is decomposed into c-factors; a c-factor
    with no selection-marked variable is invariant and taken from the **source** distribution, while
    one that contains a selection-marked variable is identified from the **target**'s observational
    distribution. With an empty ``selection`` this reduces to source identification (the ID
    algorithm).

    This is the sound c-factor-routing transportability algorithm; it subsumes direct and
    S-admissible-adjustment transportability. It is not the complete sID (a c-factor identifiable
    only by combining source and target is reported as non-transportable rather than guessed).

    Faithful to J. Pearl, E. Bareinboim, *Transportability of Causal and Statistical Relations*,
    AAAI 2011 / *External Validity*, Statistical Science 2014, via Tian's c-factor decomposition.
    """
    x, y, s = frozenset(treatment), frozenset(outcome), frozenset(selection)
    unknown = (x | y | s) - set(graph.nodes)
    if unknown:
        raise CausalGraphError(f"unknown nodes: {sorted(unknown)}")
    if not y:
        raise CausalGraphError("outcome must be non-empty")
    if x & y:
        raise CausalGraphError(f"treatment and outcome overlap: {sorted(x & y)}")
    return _transport(y, x, graph, s)

causalrl.identification.id_algorithm.is_transportable_effect(graph, treatment, outcome, selection)

Whether the target effect P*(outcome | do(treatment)) is transportable (see :func:identify_transport).

Source code in src/causalrl/identification/id_algorithm.py
def is_transportable_effect(
    graph: CausalGraph,
    treatment: Iterable[str],
    outcome: Iterable[str],
    selection: Iterable[str],
) -> bool:
    """Whether the target effect ``P*(outcome | do(treatment))`` is transportable (see
    :func:`identify_transport`)."""
    try:
        identify_transport(graph, treatment, outcome, selection)
    except NotIdentifiableError:
        return False
    return True

causalrl.identification.id_algorithm.estimate_transported_effect(graph, treatment, outcome, selection, source_data, target_data, *, do)

Estimate the target effect P*(outcome | do(treatment = do)) from source + target data.

Identifies the transport estimand (:func:identify_transport), then evaluates it with source c-factors read from source_data and target c-factors from target_data (both discrete integer columns over graph.nodes). Returns the outcome distribution keyed in sorted(outcome) order.

Source code in src/causalrl/identification/id_algorithm.py
def estimate_transported_effect(
    graph: CausalGraph,
    treatment: Iterable[str],
    outcome: Iterable[str],
    selection: Iterable[str],
    source_data: Mapping[str, np.ndarray],
    target_data: Mapping[str, np.ndarray],
    *,
    do: Mapping[str, int],
) -> dict[tuple[int, ...], float]:
    """Estimate the target effect ``P*(outcome | do(treatment = do))`` from source + target data.

    Identifies the transport estimand (:func:`identify_transport`), then evaluates it with source
    c-factors read from ``source_data`` and target c-factors from ``target_data`` (both discrete
    integer columns over ``graph.nodes``). Returns the outcome distribution keyed in
    ``sorted(outcome)`` order.
    """
    estimand = identify_transport(graph, treatment, outcome, selection)
    domains = {
        "source": _empirical_joint(source_data, graph.nodes),
        "target": _empirical_joint(target_data, graph.nodes),
    }
    context = _EvalContext(domains["target"], {}, domains, {})
    factor = _evaluate(estimand, context).condition(do)
    targets = sorted(outcome)
    factor = factor.marginalize(set(factor.variables) - set(targets)).reorder(targets)
    return dict(factor.table)

Multiple Domains And Experiments (mz / meta)

causalrl.identification.id_algorithm.Domain dataclass

A source domain relative to the target: which mechanisms differ, and what data it offers.

selection lists the variables whose mechanism differs from the target (each carries an implicit selection node S -> v); a c-factor transfers from this domain only if it touches no selection-marked variable. experiments lists intervention targets available here (each a set of variables that can be randomized). The target domain is always implicitly available observationally as a fallback, so it need not be listed.

Source code in src/causalrl/identification/id_algorithm.py
@dataclass(frozen=True)
class Domain:
    """A source domain relative to the target: which mechanisms differ, and what data it offers.

    ``selection`` lists the variables whose mechanism differs from the target (each carries an
    implicit selection node ``S -> v``); a c-factor transfers from this domain only if it touches no
    selection-marked variable. ``experiments`` lists intervention targets available here (each a set
    of variables that can be randomized). The target domain is always implicitly available
    observationally as a fallback, so it need not be listed.
    """

    name: str
    selection: frozenset[str] = frozenset()
    experiments: frozenset[frozenset[str]] = frozenset()

causalrl.identification.id_algorithm.identify_transport_general(target_graph, treatment, outcome, domains)

Return an :class:Estimand for the target effect P*(outcome | do(treatment)) combining one or more source domains with the target's own data, or raise :class:NotIdentifiableError.

Generalizes :func:identify_transport to multiple domains (meta-transportability) and to surrogate experiments (mz-transportability): each target c-factor is taken from any domain whose mechanism for it is invariant and that can identify it from its observational or experimental data, with the target as the fallback. With a single observational source it coincides with :func:identify_transport; with no selection and no experiments it reduces to the ID algorithm.

Faithful to E. Bareinboim & J. Pearl, A General Algorithm for Deciding Transportability of Experimental Results (Journal of Causal Inference 2013) and Meta-Transportability of Causal Effects: A Formal Approach (AISTATS 2013), unified via the surrogate-experiment view of S. Lee, J. Correa & E. Bareinboim (UAI 2019). No external code is ported.

Source code in src/causalrl/identification/id_algorithm.py
def identify_transport_general(
    target_graph: CausalGraph,
    treatment: Iterable[str],
    outcome: Iterable[str],
    domains: Iterable[Domain],
) -> Estimand:
    """Return an :class:`Estimand` for the target effect ``P*(outcome | do(treatment))`` combining
    one or more source ``domains`` with the target's own data, or raise
    :class:`NotIdentifiableError`.

    Generalizes :func:`identify_transport` to multiple domains (meta-transportability) and to
    surrogate experiments (mz-transportability): each target c-factor is taken from any domain whose
    mechanism for it is invariant and that can identify it from its observational or experimental
    data, with the target as the fallback. With a single observational source it coincides with
    :func:`identify_transport`; with no selection and no experiments it reduces to the ID algorithm.

    Faithful to E. Bareinboim & J. Pearl, *A General Algorithm for Deciding Transportability of
    Experimental Results* (Journal of Causal Inference 2013) and *Meta-Transportability of Causal
    Effects: A Formal Approach* (AISTATS 2013), unified via the surrogate-experiment view of S. Lee,
    J. Correa & E. Bareinboim (UAI 2019). No external code is ported.
    """
    x, y = frozenset(treatment), frozenset(outcome)
    doms = list(domains)
    referenced = set(x) | set(y)
    for d in doms:
        referenced |= d.selection
        for exp in d.experiments:
            referenced |= exp
    unknown = referenced - set(target_graph.nodes)
    if unknown:
        raise CausalGraphError(f"unknown nodes: {sorted(unknown)}")
    if not y:
        raise CausalGraphError("outcome must be non-empty")
    if x & y:
        raise CausalGraphError(f"treatment and outcome overlap: {sorted(x & y)}")
    return _general_transport(y, x, target_graph, doms)

causalrl.identification.id_algorithm.is_transportable_general(target_graph, treatment, outcome, domains)

Whether P*(outcome | do(treatment)) is transportable from domains (see :func:identify_transport_general).

Source code in src/causalrl/identification/id_algorithm.py
def is_transportable_general(
    target_graph: CausalGraph,
    treatment: Iterable[str],
    outcome: Iterable[str],
    domains: Iterable[Domain],
) -> bool:
    """Whether ``P*(outcome | do(treatment))`` is transportable from ``domains`` (see
    :func:`identify_transport_general`)."""
    try:
        identify_transport_general(target_graph, treatment, outcome, domains)
    except NotIdentifiableError:
        return False
    return True

causalrl.identification.id_algorithm.estimate_transport_general(target_graph, treatment, outcome, domains, *, domain_data, experiment_data=None, do)

Estimate the target effect P*(outcome | do(treatment = do)) across domains.

domain_data maps each domain name (including "target") to an observational dataset over target_graph.nodes; experiment_data maps (domain, frozenset(target)) to a randomized experimental dataset. Identifies the estimand via :func:identify_transport_general, evaluates it against the supplied data, and returns the outcome distribution keyed in sorted(outcome) order.

Source code in src/causalrl/identification/id_algorithm.py
def estimate_transport_general(
    target_graph: CausalGraph,
    treatment: Iterable[str],
    outcome: Iterable[str],
    domains: Iterable[Domain],
    *,
    domain_data: Mapping[str, Mapping[str, np.ndarray]],
    experiment_data: Mapping[tuple[str, frozenset[str]], Mapping[str, np.ndarray]] | None = None,
    do: Mapping[str, int],
) -> dict[tuple[int, ...], float]:
    """Estimate the target effect ``P*(outcome | do(treatment = do))`` across ``domains``.

    ``domain_data`` maps each domain name (including ``"target"``) to an observational dataset over
    ``target_graph.nodes``; ``experiment_data`` maps ``(domain, frozenset(target))`` to a randomized
    experimental dataset. Identifies the estimand via :func:`identify_transport_general`, evaluates
    it against the supplied data, and returns the outcome distribution keyed in ``sorted(outcome)``
    order.
    """
    doms = list(domains)
    estimand = identify_transport_general(target_graph, treatment, outcome, doms)
    if "target" not in domain_data:
        raise CausalGraphError("domain_data must include 'target'")
    nodes = target_graph.nodes
    domains_f = {name: _empirical_joint(rows, nodes) for name, rows in domain_data.items()}
    exps_f = {key: _empirical_joint(rows, nodes) for key, rows in (experiment_data or {}).items()}
    ctx = _EvalContext(domains_f["target"], {}, domains_f, exps_f)
    factor = _evaluate(estimand, ctx).condition(do)
    targets = sorted(outcome)
    factor = factor.marginalize(set(factor.variables) - set(targets)).reorder(targets)
    return dict(factor.table)

General Identification (ID Algorithm)

causalrl.identification.id_algorithm.identify_effect(graph, treatment, outcome)

Return a do-free :class:Estimand for P(outcome | do(treatment)), or raise.

Runs the ID algorithm. Raises :class:NotIdentifiableError (with the witnessing hedge attached as .witness) when the effect is not non-parametrically identifiable, and :class:CausalGraphError for malformed inputs (unknown nodes, empty outcome, or a treatment and outcome that overlap).

Source code in src/causalrl/identification/id_algorithm.py
def identify_effect(
    graph: CausalGraph, treatment: Iterable[str], outcome: Iterable[str]
) -> Estimand:
    """Return a do-free :class:`Estimand` for ``P(outcome | do(treatment))``, or raise.

    Runs the ID algorithm. Raises :class:`NotIdentifiableError` (with the witnessing hedge attached
    as ``.witness``) when the effect is not non-parametrically identifiable, and
    :class:`CausalGraphError` for malformed inputs (unknown nodes, empty outcome, or a treatment and
    outcome that overlap).
    """
    x, y = frozenset(treatment), frozenset(outcome)
    unknown = (x | y) - set(graph.nodes)
    if unknown:
        raise CausalGraphError(f"unknown nodes: {sorted(unknown)}")
    if not y:
        raise CausalGraphError("outcome must be non-empty")
    if x & y:
        raise CausalGraphError(f"treatment and outcome overlap: {sorted(x & y)}")
    return _id(y, x, graph, _Joint(frozenset(graph.nodes)))

causalrl.identification.id_algorithm.is_identifiable_effect(graph, treatment, outcome)

Whether P(outcome | do(treatment)) is identifiable from observational data.

Source code in src/causalrl/identification/id_algorithm.py
def is_identifiable_effect(
    graph: CausalGraph, treatment: Iterable[str], outcome: Iterable[str]
) -> bool:
    """Whether ``P(outcome | do(treatment))`` is identifiable from observational data."""
    try:
        identify_effect(graph, treatment, outcome)
    except NotIdentifiableError:
        return False
    return True

causalrl.identification.id_algorithm.estimate_effect(graph, treatment, outcome, data, *, do)

Estimate P(outcome | do(treatment = do)) from observational data via the ID estimand.

Identifies the effect (raising if it cannot), then evaluates the resulting estimand on the empirical joint of data (discrete integer columns over graph.nodes) at the intervention do. Returns the outcome distribution as {assignment: probability} with assignments keyed in sorted(outcome) order.

Source code in src/causalrl/identification/id_algorithm.py
def estimate_effect(
    graph: CausalGraph,
    treatment: Iterable[str],
    outcome: Iterable[str],
    data: Mapping[str, np.ndarray],
    *,
    do: Mapping[str, int],
) -> dict[tuple[int, ...], float]:
    """Estimate ``P(outcome | do(treatment = do))`` from observational ``data`` via the ID estimand.

    Identifies the effect (raising if it cannot), then evaluates the resulting estimand on the
    empirical joint of ``data`` (discrete integer columns over ``graph.nodes``) at the intervention
    ``do``. Returns the outcome distribution as ``{assignment: probability}`` with assignments keyed
    in ``sorted(outcome)`` order.
    """
    estimand = identify_effect(graph, treatment, outcome)
    factor = _eval_observational(estimand, _empirical_joint(data, graph.nodes))
    factor = factor.condition(do)
    targets = sorted(outcome)
    factor = factor.marginalize(set(factor.variables) - set(targets)).reorder(targets)
    return dict(factor.table)

causalrl.identification.id_algorithm.Estimand

A do-free expression for an interventional distribution over the observational law P(V).

Render it with :meth:render; evaluate it on data with :func:estimate_effect.

Source code in src/causalrl/identification/id_algorithm.py
class Estimand:
    """A do-free expression for an interventional distribution over the observational law ``P(V)``.

    Render it with :meth:`render`; evaluate it on data with :func:`estimate_effect`.
    """

    def render(self) -> str:
        raise NotImplementedError

From Surrogate Experiments (gID)

causalrl.identification.id_algorithm.identify_effect_with_experiments(graph, treatment, outcome, experiments)

Return an :class:Estimand for P(outcome | do(treatment)) using surrogate experiments.

This is general identification (gID): it runs the ID recursion but, where observational data hits a hedge, it tries to obtain the needed c-factor from one of the available experiments (each a set of variables you can intervene on; observational data is always available too). Raises :class:NotIdentifiableError if no combination of observational data and experiments identifies the effect.

Faithful to S. Lee, J. Correa, E. Bareinboim, General Identifiability with Arbitrary Surrogate Experiments, UAI 2019, building on Tian & Pearl's c-factor identification. No code is ported.

Source code in src/causalrl/identification/id_algorithm.py
def identify_effect_with_experiments(
    graph: CausalGraph,
    treatment: Iterable[str],
    outcome: Iterable[str],
    experiments: Iterable[Iterable[str]],
) -> Estimand:
    """Return an :class:`Estimand` for ``P(outcome | do(treatment))`` using surrogate experiments.

    This is general identification (gID): it runs the ID recursion but, where observational data
    hits a hedge, it tries to obtain the needed c-factor from one of the available ``experiments``
    (each a set of variables you can intervene on; observational data is always available too).
    Raises :class:`NotIdentifiableError` if no combination of observational data and experiments
    identifies the effect.

    Faithful to S. Lee, J. Correa, E. Bareinboim, *General Identifiability with Arbitrary Surrogate
    Experiments*, UAI 2019, building on Tian & Pearl's c-factor identification. No code is ported.
    """
    x, y = frozenset(treatment), frozenset(outcome)
    nodes = set(graph.nodes)
    exps = [frozenset(z) for z in experiments]
    referenced = set(x) | set(y)
    for z in exps:
        referenced |= z
    unknown = referenced - nodes
    if unknown:
        raise CausalGraphError(f"unknown nodes: {sorted(unknown)}")
    if not y:
        raise CausalGraphError("outcome must be non-empty")
    if x & y:
        raise CausalGraphError(f"treatment and outcome overlap: {sorted(x & y)}")
    return _id(y, x, graph, _Joint(frozenset(graph.nodes)), experiments=exps, original=graph)

causalrl.identification.id_algorithm.is_gid_identifiable(graph, treatment, outcome, experiments)

Whether P(outcome | do(treatment)) is identifiable from data plus those experiments.

Source code in src/causalrl/identification/id_algorithm.py
def is_gid_identifiable(
    graph: CausalGraph,
    treatment: Iterable[str],
    outcome: Iterable[str],
    experiments: Iterable[Iterable[str]],
) -> bool:
    """Whether ``P(outcome | do(treatment))`` is identifiable from data plus those experiments."""
    try:
        identify_effect_with_experiments(graph, treatment, outcome, experiments)
    except NotIdentifiableError:
        return False
    return True

causalrl.identification.id_algorithm.estimate_effect_with_experiments(graph, treatment, outcome, data, experiments_data, *, do)

Estimate P(outcome | do(treatment = do)) from observational data plus experiments.

experiments_data maps each available intervention target (a frozenset of variables) to a dataset drawn from do(target) over graph.nodes. Identifies the effect via gID (:func:identify_effect_with_experiments), then evaluates the estimand against the empirical observational joint and each experiment's empirical c-factor. Returns the outcome distribution keyed in sorted(outcome) order.

Source code in src/causalrl/identification/id_algorithm.py
def estimate_effect_with_experiments(
    graph: CausalGraph,
    treatment: Iterable[str],
    outcome: Iterable[str],
    data: Mapping[str, np.ndarray],
    experiments_data: Mapping[frozenset[str], Mapping[str, np.ndarray]],
    *,
    do: Mapping[str, int],
) -> dict[tuple[int, ...], float]:
    """Estimate ``P(outcome | do(treatment = do))`` from observational ``data`` plus experiments.

    ``experiments_data`` maps each available intervention target (a ``frozenset`` of variables) to a
    dataset drawn from ``do(target)`` over ``graph.nodes``. Identifies the effect via gID
    (:func:`identify_effect_with_experiments`), then evaluates the estimand against the empirical
    observational joint and each experiment's empirical c-factor. Returns the outcome distribution
    keyed in ``sorted(outcome)`` order.
    """
    estimand = identify_effect_with_experiments(graph, treatment, outcome, experiments_data.keys())
    obs = _empirical_joint(data, graph.nodes)
    experiments = {
        target: _empirical_joint(rows, graph.nodes) for target, rows in experiments_data.items()
    }
    factor = _evaluate(estimand, _EvalContext(obs, experiments, {}, {})).condition(do)
    targets = sorted(outcome)
    factor = factor.marginalize(set(factor.variables) - set(targets)).reorder(targets)
    return dict(factor.table)

Causal Discovery

causalrl.discovery.discover(data, variables, *, threshold=0.01, max_conditioning_size=3)

Discover the CPDAG over variables from data via the PC algorithm.

threshold is the conditional-mutual-information cutoff below which two variables are judged independent; max_conditioning_size caps the separating-set search.

Source code in src/causalrl/discovery.py
def discover(
    data: Mapping[str, np.ndarray],
    variables: Sequence[str],
    *,
    threshold: float = 0.01,
    max_conditioning_size: int = 3,
) -> CPDAG:
    """Discover the CPDAG over `variables` from `data` via the PC algorithm.

    ``threshold`` is the conditional-mutual-information cutoff below which two variables are judged
    independent; ``max_conditioning_size`` caps the separating-set search.
    """
    nodes = list(variables)
    adj, sepset = _pc_skeleton(
        data, nodes, threshold=threshold, max_conditioning_size=max_conditioning_size
    )

    # Orient unshielded colliders a -> c <- b (a, b non-adjacent and c not in their separating set).
    directed: set[tuple[str, str]] = set()
    undirected: set[frozenset[str]] = {frozenset((a, b)) for a in nodes for b in adj[a] if a < b}
    for c in nodes:
        for a, b in combinations(sorted(adj[c]), 2):
            if b in adj[a]:
                continue  # shielded triple
            if c not in sepset.get(frozenset((a, b)), ()):
                _orient(a, c, directed, undirected)
                _orient(b, c, directed, undirected)

    _apply_meek_rules(nodes, directed, undirected)
    return CPDAG(tuple(nodes), frozenset(directed), frozenset(undirected))

causalrl.discovery.discover_interventional(observational, interventions, variables, *, threshold=0.01, shift_threshold=0.05, max_conditioning_size=3)

Discover the interventional essential graph from observational and experimental data.

Runs the observational PC algorithm (:func:discover), then orients the edges incident to each intervention target by the invariance principle: under a perfect intervention do(T) a child of T shifts its marginal, while a parent (a non-descendant) stays invariant. Each incident edge T - B is oriented T -> B if B shifts and B -> T if not; Meek's rules R1-R3 then propagate the new orientations, so the result refines the observational CPDAG toward the true DAG as more targets are experimented on.

interventions maps each intervened target T to a dataset drawn from do(T) (a perfect intervention covering every variable in variables). shift_threshold is the total-variation cutoff above which an endpoint's marginal is judged to have shifted.

Source code in src/causalrl/discovery.py
def discover_interventional(
    observational: Mapping[str, np.ndarray],
    interventions: Mapping[str, Mapping[str, np.ndarray]],
    variables: Sequence[str],
    *,
    threshold: float = 0.01,
    shift_threshold: float = 0.05,
    max_conditioning_size: int = 3,
) -> CPDAG:
    """Discover the interventional essential graph from observational and experimental data.

    Runs the observational PC algorithm (:func:`discover`), then orients the edges incident to each
    intervention target by the invariance principle: under a perfect intervention ``do(T)`` a
    *child* of ``T`` shifts its marginal, while a *parent* (a non-descendant) stays invariant. Each
    incident edge ``T - B`` is oriented ``T -> B`` if ``B`` shifts and ``B -> T`` if not; Meek's
    rules R1-R3 then propagate the new orientations, so the result refines the observational CPDAG
    toward the true DAG as more targets are experimented on.

    ``interventions`` maps each intervened target ``T`` to a dataset drawn from ``do(T)`` (a perfect
    intervention covering every variable in ``variables``). ``shift_threshold`` is the
    total-variation cutoff above which an endpoint's marginal is judged to have shifted.
    """
    nodes = list(variables)
    cpdag = discover(
        observational, nodes, threshold=threshold, max_conditioning_size=max_conditioning_size
    )
    directed = set(cpdag.directed_edges)
    undirected = set(cpdag.undirected_edges)

    for target, idata in interventions.items():
        if target not in nodes:
            raise CausalGraphError(f"intervention target not in variables: {target!r}")
        for edge in list(undirected):
            if target not in edge:
                continue
            other = next(iter(edge - {target}))
            if other not in observational or other not in idata:
                raise CausalGraphError(f"variable not in data: {other!r}")
            shifted = _total_variation(observational[other], idata[other]) >= shift_threshold
            undirected.discard(edge)
            directed.add((target, other) if shifted else (other, target))

    _apply_meek_rules(nodes, directed, undirected)
    return CPDAG(tuple(nodes), frozenset(directed), frozenset(undirected))

causalrl.discovery.discover_latent(data, variables, *, threshold=0.01, max_conditioning_size=3)

Discover a PAG over variables from data via the FCI algorithm (allows latents).

Unlike :func:discover, FCI does not assume causal sufficiency: it learns the PC skeleton, then refines it with the Possible-D-SEP step (sound under latent confounders), re-orients unshielded colliders, and applies the complete orientation rules R1-R10 (Zhang 2008 — sound and complete for latent confounders and selection bias). The result is a :class:PAG: a <-> b witnesses a latent confounder; a circle endpoint is undetermined by the equivalence class.

threshold and max_conditioning_size mirror :func:discover.

Source code in src/causalrl/discovery.py
def discover_latent(
    data: Mapping[str, np.ndarray],
    variables: Sequence[str],
    *,
    threshold: float = 0.01,
    max_conditioning_size: int = 3,
) -> PAG:
    """Discover a PAG over ``variables`` from ``data`` via the FCI algorithm (allows latents).

    Unlike :func:`discover`, FCI does not assume causal sufficiency: it learns the PC skeleton, then
    refines it with the Possible-D-SEP step (sound under latent confounders), re-orients unshielded
    colliders, and applies the complete orientation rules R1-R10 (Zhang 2008 — sound and complete
    for latent confounders and selection bias). The result is a :class:`PAG`: ``a <-> b``
    witnesses a latent confounder; a circle endpoint is undetermined by the equivalence class.

    ``threshold`` and ``max_conditioning_size`` mirror :func:`discover`.
    """
    nodes = list(variables)
    adj, sepset = _pc_skeleton(
        data, nodes, threshold=threshold, max_conditioning_size=max_conditioning_size
    )
    marks: dict[tuple[str, str], str] = {}
    for a in nodes:
        for b in adj[a]:
            marks[(a, b)] = _CIRCLE
    _orient_colliders(marks, nodes, sepset)
    _refine_skeleton_pds(
        data, marks, sepset, nodes, threshold=threshold, max_conditioning_size=max_conditioning_size
    )
    for key in marks:
        marks[key] = _CIRCLE
    _orient_colliders(marks, nodes, sepset)
    _apply_fci_rules(marks, sepset)
    return PAG(tuple(nodes), marks)

causalrl.discovery.CPDAG dataclass

A completed partially directed acyclic graph (a Markov equivalence class).

Source code in src/causalrl/discovery.py
@dataclass(frozen=True)
class CPDAG:
    """A completed partially directed acyclic graph (a Markov equivalence class)."""

    variables: tuple[str, ...]
    directed_edges: frozenset[tuple[str, str]]
    undirected_edges: frozenset[frozenset[str]]

    def to_causal_graph(self) -> CausalGraph:
        """Convert to a :class:`CausalGraph`; raises if any edge is still unoriented."""
        if self.undirected_edges:
            remaining = sorted(tuple(sorted(e)) for e in self.undirected_edges)
            raise CausalGraphError(
                f"CPDAG is not fully oriented; undirected edges remain: {remaining}"
            )
        return CausalGraph(directed_edges=sorted(self.directed_edges), nodes=list(self.variables))

to_causal_graph()

Convert to a :class:CausalGraph; raises if any edge is still unoriented.

Source code in src/causalrl/discovery.py
def to_causal_graph(self) -> CausalGraph:
    """Convert to a :class:`CausalGraph`; raises if any edge is still unoriented."""
    if self.undirected_edges:
        remaining = sorted(tuple(sorted(e)) for e in self.undirected_edges)
        raise CausalGraphError(
            f"CPDAG is not fully oriented; undirected edges remain: {remaining}"
        )
    return CausalGraph(directed_edges=sorted(self.directed_edges), nodes=list(self.variables))

causalrl.discovery.PAG dataclass

A partial ancestral graph: the complete Markov-equivalence class of MAGs (the FCI output).

marks[(a, b)] is the mark on the b end of edge a—b — a circle o (undetermined by the equivalence class), an arrowhead >, or a tail -. An edge exists iff both (a, b) and (b, a) are present. a -> b is tail-at-a / arrow-at-b; a <-> b (arrowheads at both ends) witnesses a latent confounder; a o-o b is fully unoriented.

Source code in src/causalrl/discovery.py
@dataclass(frozen=True)
class PAG:
    """A partial ancestral graph: the complete Markov-equivalence class of MAGs (the FCI output).

    ``marks[(a, b)]`` is the mark on the ``b`` end of edge ``a—b`` — a circle ``o`` (undetermined by
    the equivalence class), an arrowhead ``>``, or a tail ``-``. An edge exists iff both ``(a, b)``
    and ``(b, a)`` are present. ``a -> b`` is tail-at-``a`` / arrow-at-``b``; ``a <-> b``
    (arrowheads at both ends) witnesses a latent confounder; ``a o-o b`` is fully unoriented.
    """

    variables: tuple[str, ...]
    marks: Mapping[tuple[str, str], str]

    def __post_init__(self) -> None:
        for (a, b), mark in self.marks.items():
            if mark not in (_CIRCLE, _ARROW, _TAIL):
                raise CausalGraphError(f"invalid PAG mark {mark!r} on edge {a}-{b}")
            if (b, a) not in self.marks:
                raise CausalGraphError(f"PAG edge {a}-{b} is missing endpoint ({b!r}, {a!r})")

    def adjacent(self, a: str, b: str) -> bool:
        return (a, b) in self.marks

    def is_directed(self, a: str, b: str) -> bool:
        """Whether ``a -> b`` (tail at ``a``, arrowhead at ``b``)."""
        return self.marks.get((a, b)) == _ARROW and self.marks.get((b, a)) == _TAIL

    def is_bidirected(self, a: str, b: str) -> bool:
        """Whether ``a <-> b`` (arrowheads at both ends — a latent confounder)."""
        return self.marks.get((a, b)) == _ARROW and self.marks.get((b, a)) == _ARROW

    def edges(self) -> list[tuple[str, str, str, str]]:
        """``(a, b, mark_at_a, mark_at_b)`` for each edge, with ``a < b``."""
        out: list[tuple[str, str, str, str]] = []
        seen: set[frozenset[str]] = set()
        for a, b in self.marks:
            key = frozenset((a, b))
            if key in seen:
                continue
            seen.add(key)
            x, y = sorted((a, b))
            out.append((x, y, self.marks[(y, x)], self.marks[(x, y)]))
        return sorted(out)

    def render(self) -> str:
        left = {_CIRCLE: "o", _ARROW: "<", _TAIL: "-"}
        right = {_CIRCLE: "o", _ARROW: ">", _TAIL: "-"}
        return ", ".join(f"{x} {left[ma]}-{right[mb]} {y}" for x, y, ma, mb in self.edges())

is_directed(a, b)

Whether a -> b (tail at a, arrowhead at b).

Source code in src/causalrl/discovery.py
def is_directed(self, a: str, b: str) -> bool:
    """Whether ``a -> b`` (tail at ``a``, arrowhead at ``b``)."""
    return self.marks.get((a, b)) == _ARROW and self.marks.get((b, a)) == _TAIL

is_bidirected(a, b)

Whether a <-> b (arrowheads at both ends — a latent confounder).

Source code in src/causalrl/discovery.py
def is_bidirected(self, a: str, b: str) -> bool:
    """Whether ``a <-> b`` (arrowheads at both ends — a latent confounder)."""
    return self.marks.get((a, b)) == _ARROW and self.marks.get((b, a)) == _ARROW

edges()

(a, b, mark_at_a, mark_at_b) for each edge, with a < b.

Source code in src/causalrl/discovery.py
def edges(self) -> list[tuple[str, str, str, str]]:
    """``(a, b, mark_at_a, mark_at_b)`` for each edge, with ``a < b``."""
    out: list[tuple[str, str, str, str]] = []
    seen: set[frozenset[str]] = set()
    for a, b in self.marks:
        key = frozenset((a, b))
        if key in seen:
            continue
        seen.add(key)
        x, y = sorted((a, b))
        out.append((x, y, self.marks[(y, x)], self.marks[(x, y)]))
    return sorted(out)

causalrl.discovery.conditional_mutual_information(data, x, y, z)

Empirical I(X; Y | Z) in nats (discrete columns; 0 iff X ⊥ Y | Z).

Source code in src/causalrl/discovery.py
def conditional_mutual_information(
    data: Mapping[str, np.ndarray], x: str, y: str, z: Sequence[str]
) -> float:
    """Empirical ``I(X; Y | Z)`` in nats (discrete columns; ``0`` iff ``X ⊥ Y | Z``)."""
    xs, ys = data[x], data[y]
    n = len(xs)
    zcols = [data[zi] for zi in z]
    joint: dict[tuple[int, int, tuple[int, ...]], int] = defaultdict(int)
    xz: dict[tuple[int, tuple[int, ...]], int] = defaultdict(int)
    yz: dict[tuple[int, tuple[int, ...]], int] = defaultdict(int)
    zc: dict[tuple[int, ...], int] = defaultdict(int)
    for i in range(n):
        xv, yv = int(xs[i]), int(ys[i])
        zk = tuple(int(c[i]) for c in zcols)
        joint[(xv, yv, zk)] += 1
        xz[(xv, zk)] += 1
        yz[(yv, zk)] += 1
        zc[zk] += 1
    cmi = 0.0
    for (xv, yv, zk), count in joint.items():
        ratio = (count * zc[zk]) / (xz[(xv, zk)] * yz[(yv, zk)])
        cmi += (count / n) * math.log(ratio)
    return max(cmi, 0.0)

Causal Imitation Learning

causalrl.imitation.is_imitable(graph, *, action, outcome, observable)

Whether the expert is imitable: an observed back-door-admissible set exists.

Source code in src/causalrl/imitation.py
def is_imitable(
    graph: CausalGraph, *, action: str, outcome: str, observable: Iterable[str]
) -> bool:
    """Whether the expert is imitable: an observed back-door-admissible set exists."""
    return (
        imitation_backdoor_set(graph, action=action, outcome=outcome, observable=observable)
        is not None
    )

causalrl.imitation.imitation_backdoor_set(graph, *, action, outcome, observable, max_size=3)

Smallest observed back-door-admissible set for action -> outcome, or None.

A set Z ⊆ observable \ {action, outcome} is admissible when it has no descendant of action and blocks every back-door path (action ⊥ outcome | Z with action's outgoing edges removed). Cloning P(action | Z) then reproduces the expert's outcome distribution.

Source code in src/causalrl/imitation.py
def imitation_backdoor_set(
    graph: CausalGraph,
    *,
    action: str,
    outcome: str,
    observable: Iterable[str],
    max_size: int = 3,
) -> frozenset[str] | None:
    """Smallest observed back-door-admissible set for ``action -> outcome``, or ``None``.

    A set ``Z ⊆ observable \\ {action, outcome}`` is admissible when it has no descendant of
    ``action`` and blocks every back-door path (``action ⊥ outcome | Z`` with ``action``'s outgoing
    edges removed). Cloning ``P(action | Z)`` then reproduces the expert's outcome distribution.
    """
    for name in (action, outcome):
        if name not in graph.nodes:
            raise CausalGraphError(f"unknown node: {name!r}")
    observed = set(observable)
    unknown = observed - set(graph.nodes)
    if unknown:
        raise CausalGraphError(f"unknown observable node(s): {sorted(unknown)}")
    candidates = sorted(observed - {action, outcome})
    for size in range(min(max_size, len(candidates)) + 1):
        for combo in combinations(candidates, size):
            z = set(combo)
            if is_backdoor_admissible(graph, action, outcome, z):
                return frozenset(z)
    return None

causalrl.imitation.CausalImitator

Bases: Agent

Clones P(A | Z) for a back-door-admissible observed set Z (the adjustment set).

Conditioning the cloned policy on Z reproduces the confounding the expert responded to, so deployment matches the expert's reward. Unseen Z values fall back to the marginal P(A).

Source code in src/causalrl/imitation.py
class CausalImitator(Agent):
    """Clones ``P(A | Z)`` for a back-door-admissible observed set ``Z`` (the adjustment set).

    Conditioning the cloned policy on ``Z`` reproduces the confounding the expert responded to, so
    deployment matches the expert's reward. Unseen ``Z`` values fall back to the marginal ``P(A)``.
    """

    def __init__(self, n_actions: int, adjustment: Sequence[str], seed: int | None = None) -> None:
        self.n_actions = n_actions
        self._adjustment = list(adjustment)
        self._rng = np.random.default_rng(seed)
        self._marginal = np.ones(n_actions) / n_actions
        self._conditional: dict[tuple[int, ...], np.ndarray] = {}

    def fit(self, demonstrations: Mapping[str, np.ndarray], *, action: str) -> None:
        actions = np.asarray(demonstrations[action]).astype(int)
        marginal = np.bincount(actions, minlength=self.n_actions).astype(float)
        self._marginal = marginal / marginal.sum()
        z_columns = [np.asarray(demonstrations[z]).astype(int) for z in self._adjustment]
        counts: dict[tuple[int, ...], np.ndarray] = {}
        for i in range(len(actions)):
            key = tuple(int(column[i]) for column in z_columns)
            if key not in counts:
                counts[key] = np.zeros(self.n_actions)
            counts[key][int(actions[i])] += 1.0
        self._conditional = {key: row / row.sum() for key, row in counts.items()}

    def act(self, observation: dict[str, Any]) -> int:
        key = tuple(int(observation[z]) for z in self._adjustment)
        probs = self._conditional.get(key, self._marginal)
        return int(self._rng.choice(self.n_actions, p=probs))

    def update(self, observation: dict[str, Any], action: int, reward: float) -> None:
        """No-op: imitation is fit offline from demonstrations."""

update(observation, action, reward)

No-op: imitation is fit offline from demonstrations.

Source code in src/causalrl/imitation.py
def update(self, observation: dict[str, Any], action: int, reward: float) -> None:
    """No-op: imitation is fit offline from demonstrations."""

causalrl.imitation.BehavioralCloning

Bases: Agent

Naive imitator: clones the marginal action distribution P(A), ignoring covariates.

Source code in src/causalrl/imitation.py
class BehavioralCloning(Agent):
    """Naive imitator: clones the marginal action distribution ``P(A)``, ignoring covariates."""

    def __init__(self, n_actions: int, seed: int | None = None) -> None:
        self.n_actions = n_actions
        self._rng = np.random.default_rng(seed)
        self._probs = np.ones(n_actions) / n_actions

    def fit(self, demonstrations: Mapping[str, np.ndarray], *, action: str) -> None:
        actions = np.asarray(demonstrations[action]).astype(int)
        counts = np.bincount(actions, minlength=self.n_actions).astype(float)
        self._probs = counts / counts.sum()

    def act(self, observation: dict[str, Any]) -> int:
        return int(self._rng.choice(self.n_actions, p=self._probs))

    def update(self, observation: dict[str, Any], action: int, reward: float) -> None:
        """No-op: imitation is fit offline from demonstrations."""

update(observation, action, reward)

No-op: imitation is fit offline from demonstrations.

Source code in src/causalrl/imitation.py
def update(self, observation: dict[str, Any], action: int, reward: float) -> None:
    """No-op: imitation is fit offline from demonstrations."""

Causal Curriculum Learning

causalrl.curriculum.causal_curriculum(graph, goal=None)

A curriculum (skill order) respecting the causal structure: a topological order in which every parent (prerequisite) precedes its children. With goal, restrict to the goal and its ancestors — the skills the goal depends on — still in topological order.

Source code in src/causalrl/curriculum.py
def causal_curriculum(graph: CausalGraph, goal: str | None = None) -> list[str]:
    """A curriculum (skill order) respecting the causal structure: a topological order in which
    every parent (prerequisite) precedes its children. With ``goal``, restrict to the goal and its
    ancestors — the skills the goal depends on — still in topological order."""
    order = graph.topological_order()
    if goal is None:
        return order
    if goal not in graph.nodes:
        raise CausalGraphError(f"unknown goal skill: {goal!r}")
    relevant = graph.ancestors(goal)  # inclusive of goal
    return [node for node in order if node in relevant]

causalrl.curriculum.curriculum_q_learning(tasks, *, episodes_per_task, alpha=0.5, epsilon=0.1, seed=None)

Learn the target policy by Q-learning through a curriculum of subtasks, easiest first.

tasks is ordered from the simplest subtask to the target (the last element); all tasks share the same state and action spaces. The Q-table is carried forward between stages (warm-start transfer), so value learned on the easy subtasks bootstraps the harder ones. This is the causal curriculum applied to RL: order subgoals by prerequisite structure (see :func:causal_curriculum), then train in that order to reach a target policy that flat learning on the sparse target alone struggles to find. Returns the greedy policy on the target task.

Faithful to Y. Bengio, J. Louradour, R. Collobert, J. Weston, Curriculum Learning, ICML 2009; the Q-learning update matches :func:causalrl.shaping.q_learning. No external code is ported.

Source code in src/causalrl/curriculum.py
def curriculum_q_learning(
    tasks: Sequence[TabularMDP],
    *,
    episodes_per_task: int,
    alpha: float = 0.5,
    epsilon: float = 0.1,
    seed: int | None = None,
) -> dict[int, int]:
    """Learn the target policy by Q-learning through a curriculum of subtasks, easiest first.

    ``tasks`` is ordered from the simplest subtask to the target (the last element); all tasks share
    the same state and action spaces. The Q-table is carried forward between stages (warm-start
    transfer), so value learned on the easy subtasks bootstraps the harder ones. This is the causal
    curriculum applied to RL: order subgoals by prerequisite structure (see
    :func:`causal_curriculum`), then train in that order to reach a target policy that flat learning
    on the sparse target alone struggles to find. Returns the greedy policy on the target task.

    Faithful to Y. Bengio, J. Louradour, R. Collobert, J. Weston, *Curriculum Learning*, ICML 2009;
    the Q-learning update matches :func:`causalrl.shaping.q_learning`. No external code is ported.
    """
    if not tasks:
        raise CausalGraphError("curriculum must contain at least one task")
    target = tasks[-1]
    rng = np.random.default_rng(seed)
    q = np.zeros((target.n_states, target.n_actions))
    for task in tasks:
        for _ in range(episodes_per_task):
            s = 0
            for _ in range(4 * task.n_states):
                if s in task.terminals:
                    break
                if rng.random() < epsilon:
                    a = int(rng.integers(0, task.n_actions))
                else:
                    a = int(np.argmax(q[s]))
                s_next = task.transitions[(s, a)]
                bootstrap = 0.0 if s_next in task.terminals else float(np.max(q[s_next]))
                q[s, a] += alpha * (task.rewards[(s, a)] + task.gamma * bootstrap - q[s, a])
                s = s_next
    return {s: int(np.argmax(q[s])) for s in range(target.n_states) if s not in target.terminals}

causalrl.curriculum.is_valid_curriculum(graph, order)

Whether order respects prerequisites: each skill appears after every parent of it that is also in order.

Source code in src/causalrl/curriculum.py
def is_valid_curriculum(graph: CausalGraph, order: Sequence[str]) -> bool:
    """Whether ``order`` respects prerequisites: each skill appears after every parent of it that is
    also in ``order``."""
    present = set(order)
    seen: set[str] = set()
    for skill in order:
        prerequisites = set(graph.parents(skill)) & present
        if not prerequisites <= seen:
            return False
        seen.add(skill)
    return True

causalrl.curriculum.PrerequisiteLearner

Causally-gated skill acquisition: walking the curriculum left to right, a skill is mastered iff all of its parents (prerequisites) are already mastered. Deterministic and order-faithful, so the effect of the ordering is exactly readable from the mastered set.

Source code in src/causalrl/curriculum.py
class PrerequisiteLearner:
    """Causally-gated skill acquisition: walking the curriculum left to right, a skill is mastered
    iff all of its parents (prerequisites) are already mastered. Deterministic and order-faithful,
    so the effect of the ordering is exactly readable from the mastered set."""

    def __init__(self, graph: CausalGraph) -> None:
        self._graph = graph
        self._mastered: set[str] = set()

    def train(self, curriculum: Sequence[str]) -> frozenset[str]:
        """Process the curriculum once and return the set of mastered skills."""
        self._mastered = set()
        for skill in curriculum:
            if set(self._graph.parents(skill)) <= self._mastered:
                self._mastered.add(skill)
        return frozenset(self._mastered)

    def masters(self, skill: str) -> bool:
        """Whether ``skill`` was mastered by the most recent :meth:`train`."""
        return skill in self._mastered

train(curriculum)

Process the curriculum once and return the set of mastered skills.

Source code in src/causalrl/curriculum.py
def train(self, curriculum: Sequence[str]) -> frozenset[str]:
    """Process the curriculum once and return the set of mastered skills."""
    self._mastered = set()
    for skill in curriculum:
        if set(self._graph.parents(skill)) <= self._mastered:
            self._mastered.add(skill)
    return frozenset(self._mastered)

masters(skill)

Whether skill was mastered by the most recent :meth:train.

Source code in src/causalrl/curriculum.py
def masters(self, skill: str) -> bool:
    """Whether ``skill`` was mastered by the most recent :meth:`train`."""
    return skill in self._mastered

Causal Reward Shaping

causalrl.shaping.apply_potential_shaping(mdp, potential)

Return a new MDP with rewards[s, a] += gamma * Phi(s') - Phi(s) (policy-invariant).

Source code in src/causalrl/shaping.py
def apply_potential_shaping(mdp: TabularMDP, potential: Mapping[int, float]) -> TabularMDP:
    """Return a new MDP with ``rewards[s, a] += gamma * Phi(s') - Phi(s)`` (policy-invariant)."""
    shaped = dict(mdp.rewards)
    for (s, a), s_next in mdp.transitions.items():
        shaped[(s, a)] = mdp.rewards[(s, a)] + mdp.gamma * potential[s_next] - potential[s]
    return replace(mdp, rewards=shaped)

causalrl.shaping.causal_potential(mdp)

The ideal (causal) shaping potential: V* with terminal states pinned to 0 (the condition under which potential shaping is policy-invariant for episodic tasks).

Source code in src/causalrl/shaping.py
def causal_potential(mdp: TabularMDP) -> dict[int, float]:
    """The ideal (causal) shaping potential: ``V*`` with terminal states pinned to ``0`` (the
    condition under which potential shaping is policy-invariant for episodic tasks)."""
    values, _ = value_iteration(mdp)
    return {s: (0.0 if s in mdp.terminals else values[s]) for s in range(mdp.n_states)}

causalrl.shaping.value_iteration(mdp, *, tol=1e-09, max_iter=1000)

Return (V*, greedy optimal policy) for the deterministic MDP.

Source code in src/causalrl/shaping.py
def value_iteration(
    mdp: TabularMDP, *, tol: float = 1e-9, max_iter: int = 1000
) -> tuple[dict[int, float], dict[int, int]]:
    """Return ``(V*, greedy optimal policy)`` for the deterministic MDP."""
    values = dict.fromkeys(range(mdp.n_states), 0.0)
    for _ in range(max_iter):
        delta = 0.0
        for s in range(mdp.n_states):
            if s in mdp.terminals:
                continue
            best = max(
                mdp.rewards[(s, a)] + mdp.gamma * values[mdp.transitions[(s, a)]]
                for a in range(mdp.n_actions)
            )
            delta = max(delta, abs(best - values[s]))
            values[s] = best
        if delta < tol:
            break
    policy: dict[int, int] = {}
    for s in range(mdp.n_states):
        if s in mdp.terminals:
            continue
        q = [
            mdp.rewards[(s, a)] + mdp.gamma * values[mdp.transitions[(s, a)]]
            for a in range(mdp.n_actions)
        ]
        policy[s] = int(np.argmax(q))
    return values, policy

causalrl.shaping.q_learning(mdp, *, episodes, potential=None, alpha=0.5, epsilon=0.1, max_steps=None, seed=None)

Tabular epsilon-greedy Q-learning from state 0. With potential given, the reward is shaped online by gamma * Phi(s') - Phi(s). Returns the greedy policy.

Source code in src/causalrl/shaping.py
def q_learning(
    mdp: TabularMDP,
    *,
    episodes: int,
    potential: Mapping[int, float] | None = None,
    alpha: float = 0.5,
    epsilon: float = 0.1,
    max_steps: int | None = None,
    seed: int | None = None,
) -> dict[int, int]:
    """Tabular epsilon-greedy Q-learning from state ``0``. With ``potential`` given, the reward is
    shaped online by ``gamma * Phi(s') - Phi(s)``. Returns the greedy policy."""
    rng = np.random.default_rng(seed)
    q = np.zeros((mdp.n_states, mdp.n_actions))
    steps_cap = max_steps if max_steps is not None else 4 * mdp.n_states
    for _ in range(episodes):
        s = 0
        for _ in range(steps_cap):
            if s in mdp.terminals:
                break
            if rng.random() < epsilon:
                a = int(rng.integers(0, mdp.n_actions))
            else:
                a = int(np.argmax(q[s]))
            s_next = mdp.transitions[(s, a)]
            reward = mdp.rewards[(s, a)]
            if potential is not None:
                reward = reward + mdp.gamma * potential[s_next] - potential[s]
            bootstrap = 0.0 if s_next in mdp.terminals else float(np.max(q[s_next]))
            q[s, a] += alpha * (reward + mdp.gamma * bootstrap - q[s, a])
            s = s_next
    return {s: int(np.argmax(q[s])) for s in range(mdp.n_states) if s not in mdp.terminals}

causalrl.shaping.TabularMDP dataclass

A finite deterministic MDP: transitions/rewards are keyed by (state, action).

Source code in src/causalrl/shaping.py
@dataclass(frozen=True)
class TabularMDP:
    """A finite deterministic MDP: ``transitions``/``rewards`` are keyed by ``(state, action)``."""

    n_states: int
    n_actions: int
    transitions: dict[tuple[int, int], int]
    rewards: dict[tuple[int, int], float]
    terminals: frozenset[int]
    gamma: float

Causal Game Theory

causalrl.games.CausalGame dataclass

A finite game as a causal influence diagram.

utilities[agent] maps each joint action profile (a tuple in agents order) to agent's payoff; graph is the (M)ACID with a decision and utility node per agent.

Source code in src/causalrl/games.py
@dataclass(frozen=True)
class CausalGame:
    """A finite game as a causal influence diagram.

    ``utilities[agent]`` maps each joint action profile (a tuple in ``agents`` order) to ``agent``'s
    payoff; ``graph`` is the (M)ACID with a decision and utility node per agent.
    """

    agents: tuple[str, ...]
    actions: Mapping[str, tuple[int, ...]]
    utilities: Mapping[str, Mapping[tuple[int, ...], float]]
    graph: CausalGraph

    def __post_init__(self) -> None:
        profiles = set(product(*(self.actions[a] for a in self.agents)))
        for agent in self.agents:
            if agent not in self.utilities:
                raise CausalGraphError(f"missing utilities for agent {agent!r}")
            if set(self.utilities[agent]) != profiles:
                raise CausalGraphError(
                    f"utilities for agent {agent!r} must cover the joint action space exactly"
                )
            for node in (decision_node(agent), utility_node(agent)):
                if node not in self.graph.nodes:
                    raise CausalGraphError(f"influence diagram is missing node {node!r}")

causalrl.games.pure_nash_equilibria(game)

All pure-strategy Nash equilibria, by enumeration over the finite joint action space.

Source code in src/causalrl/games.py
def pure_nash_equilibria(game: CausalGame) -> list[dict[str, int]]:
    """All pure-strategy Nash equilibria, by enumeration over the finite joint action space."""
    equilibria: list[dict[str, int]] = []
    for combo in product(*(game.actions[a] for a in game.agents)):
        profile = dict(zip(game.agents, combo, strict=True))
        if is_nash_equilibrium(game, profile):
            equilibria.append(profile)
    return equilibria

causalrl.games.mixed_nash_equilibria(game)

All mixed-strategy Nash equilibria (pure and properly mixed), by support enumeration.

Two-player games are solved exactly over rational arithmetic (:class:fractions.Fraction), so symmetric games yield exact mixes (matching pennies gives 0.5/0.5). Games with three or more agents are solved by support enumeration with a numerical (Newton) solve of the multilinear indifference system; every returned profile is then verified to be an ε-Nash equilibrium (no agent can gain more than 1e-6 by deviating to a pure action). Each equilibrium maps an agent to {action: probability} with off-support actions at zero.

Assumes a non-degenerate game; degenerate games may admit a continuum of equilibria, of which only support-extreme points are enumerated, and the numerical solver may miss a support whose system is ill-conditioned. Raises :class:CausalGraphError for fewer than two agents (use :func:pure_nash_equilibria for pure equilibria of any game).

Faithful to the support-enumeration method (R. Porter, E. Nudelman, Y. Shoham, Simple Search Methods for Finding a Nash Equilibrium, Games and Economic Behavior 2008; B. von Stengel, Computing Equilibria for Two-Person Games, Handbook of Game Theory 2002). No code is ported.

Source code in src/causalrl/games.py
def mixed_nash_equilibria(game: CausalGame) -> list[dict[str, dict[int, float]]]:
    """All mixed-strategy Nash equilibria (pure and properly mixed), by support enumeration.

    Two-player games are solved **exactly** over rational arithmetic (:class:`fractions.Fraction`),
    so symmetric games yield exact mixes (matching pennies gives ``0.5``/``0.5``). Games with three
    or more agents are solved by support enumeration with a **numerical** (Newton) solve of the
    multilinear indifference system; every returned profile is then verified to be an ε-Nash
    equilibrium (no agent can gain more than ``1e-6`` by deviating to a pure action). Each
    equilibrium maps an agent to ``{action: probability}`` with off-support actions at zero.

    Assumes a non-degenerate game; degenerate games may admit a continuum of equilibria, of which
    only support-extreme points are enumerated, and the numerical solver may miss a support whose
    system is ill-conditioned. Raises :class:`CausalGraphError` for fewer than two agents (use
    :func:`pure_nash_equilibria` for pure equilibria of any game).

    Faithful to the support-enumeration method (R. Porter, E. Nudelman, Y. Shoham, *Simple Search
    Methods for Finding a Nash Equilibrium*, Games and Economic Behavior 2008; B. von Stengel,
    *Computing Equilibria for Two-Person Games*, Handbook of Game Theory 2002). No code is ported.
    """
    if len(game.agents) < 2:
        raise CausalGraphError("mixed_nash_equilibria needs at least two agents")
    if len(game.agents) == 2:
        return _mixed_nash_two_player(game)
    return _mixed_nash_general(game)

causalrl.games.best_response(game, agent, profile)

The actions maximizing agent's payoff given the other agents' actions in profile.

Source code in src/causalrl/games.py
def best_response(game: CausalGame, agent: str, profile: Mapping[str, int]) -> frozenset[int]:
    """The actions maximizing ``agent``'s payoff given the other agents' actions in ``profile``."""
    index = game.agents.index(agent)
    base = [profile[a] for a in game.agents]
    payoffs: dict[int, float] = {}
    for action in game.actions[agent]:
        joint = list(base)
        joint[index] = action
        payoffs[action] = game.utilities[agent][tuple(joint)]
    best = max(payoffs.values())
    return frozenset(action for action, value in payoffs.items() if value == best)

causalrl.games.is_nash_equilibrium(game, profile)

Whether every agent's action in profile is a best response to the others.

Source code in src/causalrl/games.py
def is_nash_equilibrium(game: CausalGame, profile: Mapping[str, int]) -> bool:
    """Whether every agent's action in ``profile`` is a best response to the others."""
    return all(profile[agent] in best_response(game, agent, profile) for agent in game.agents)

Causal Gymnasium Wrapper

causalrl.envs.wrapper.CausalEnvWrapper

Bases: Wrapper[Any, Any, Any, Any]

A Gymnasium wrapper that exposes the wrapped environment's causal structure.

Parameters

env: Any gymnasium.Env. When the env carries a non-None .scm attribute and reward_node is provided (and valid), the full causal interface is enabled. When env.scm is None or reward_node is None, construction still succeeds but the causal interface is disabled (pass-through mode). reward_node: The SCM variable name that represents the reward / return signal. Optional — None disables the causal interface. If the env has a live SCM and reward_node is supplied but not present in the graph, a ValueError is raised.

Attributes

reward_node: The SCM node treated as the reward/return, or None when not set. has_causal_interface: True iff the wrapped env exposes a non-None SCM and reward_node is set and present in the graph. All causal methods require this to be True. reward_parents: Direct SCM parents of reward_node in topological order. Requires has_causal_interface. scm: The underlying :class:~causalrl.scm.scm.StructuralCausalModel (live reference to the wrapped env's .scm), or None when not available. active_interventions: The currently stored intervention mapping, or None if no persistent intervention is active.

Source code in src/causalrl/envs/wrapper.py
class CausalEnvWrapper(gym.Wrapper[Any, Any, Any, Any]):
    """A Gymnasium wrapper that exposes the wrapped environment's causal structure.

    Parameters
    ----------
    env:
        Any ``gymnasium.Env``.  When the env carries a non-``None`` ``.scm`` attribute and
        ``reward_node`` is provided (and valid), the full causal interface is enabled.
        When ``env.scm is None`` **or** ``reward_node`` is ``None``, construction still
        succeeds but the causal interface is disabled (pass-through mode).
    reward_node:
        The SCM variable name that represents the reward / return signal.  Optional —
        ``None`` disables the causal interface.  If the env has a live SCM and
        ``reward_node`` is supplied but not present in the graph, a ``ValueError`` is raised.

    Attributes
    ----------
    reward_node:
        The SCM node treated as the reward/return, or ``None`` when not set.
    has_causal_interface:
        ``True`` iff the wrapped env exposes a non-``None`` SCM *and* ``reward_node`` is set
        and present in the graph.  All causal methods require this to be ``True``.
    reward_parents:
        Direct SCM parents of ``reward_node`` in topological order.  Requires
        ``has_causal_interface``.
    scm:
        The underlying :class:`~causalrl.scm.scm.StructuralCausalModel` (live reference to
        the wrapped env's ``.scm``), or ``None`` when not available.
    active_interventions:
        The currently stored intervention mapping, or ``None`` if no persistent
        intervention is active.
    """

    def __init__(
        self,
        env: gym.Env[Any, Any],
        *,
        reward_node: str | None = None,
    ) -> None:
        super().__init__(env)
        # Validate reward_node only when both the SCM and reward_node are provided.
        raw_scm = getattr(env, "scm", None)
        if reward_node is not None and raw_scm is not None:
            scm = cast("StructuralCausalModel", raw_scm)
            if reward_node not in scm.graph.nodes:
                raise ValueError(
                    f"reward_node {reward_node!r} is not a node in the SCM graph "
                    f"(nodes: {sorted(scm.graph.nodes)})"
                )
        self.reward_node: str | None = reward_node
        # Persistent-intervention state.
        self._active_interventions: Mapping[str, Value] | None = None
        self._mutilated_scm: StructuralCausalModel | None = None

    # ------------------------------------------------------------------
    # Causal interface availability
    # ------------------------------------------------------------------

    @property
    def has_causal_interface(self) -> bool:
        """``True`` iff the causal interface is fully operational.

        Requires the wrapped env to expose a non-``None`` ``.scm`` **and** ``reward_node``
        to be set and present in the graph.
        """
        raw_scm = getattr(self.env, "scm", None)
        return raw_scm is not None and self.reward_node is not None

    def _require_causal_interface(self, method: str) -> None:
        """Raise :class:`~causalrl.exceptions.CausalInterfaceUnavailableError` if disabled."""
        if not self.has_causal_interface:
            raw_scm = getattr(self.env, "scm", None)
            if raw_scm is None:
                reason = (
                    f"the wrapped env {type(self.env).__name__!r} has scm=None "
                    "(confounded MDPs and envs without an explicit SCM do not support "
                    "the causal interface)"
                )
            else:
                reason = "reward_node was not provided at construction time"
            raise CausalInterfaceUnavailableError(
                f"{method} requires has_causal_interface=True, but {reason}. "
                "Construct CausalEnvWrapper with a SCM-backed env and a valid reward_node "
                "to enable the causal interface."
            )

    # ------------------------------------------------------------------
    # Causal interface — pure graph queries
    # ------------------------------------------------------------------

    @property
    def scm(self) -> StructuralCausalModel | None:
        """The underlying SCM (live reference to the wrapped env's ``.scm``), or ``None``."""
        return cast("StructuralCausalModel | None", getattr(self.env, "scm", None))

    @property
    def reward_parents(self) -> list[str]:
        """Direct SCM parents of ``reward_node`` in graph-topological order.

        These are the variables that causally determine the immediate reward signal.  Pass
        their names as the ``factor_nodes`` argument of
        :func:`~causalrl.agents.factored_advantage.factored_advantage`.

        Raises
        ------
        CausalInterfaceUnavailableError
            When ``has_causal_interface`` is ``False``.
        """
        self._require_causal_interface("reward_parents")
        scm = cast("StructuralCausalModel", self.scm)
        reward_node = cast(str, self.reward_node)
        return scm.graph.parents(reward_node)

    def do(self, interventions: Mapping[str, Value]) -> StructuralCausalModel:
        """Return a *new* SCM mutilated by ``do(interventions)``.

        The running environment's SCM is NOT modified.  This is a pure causal-graph query
        suitable for off-policy reasoning or shaping.

        Parameters
        ----------
        interventions:
            Mapping ``{node_name: value}`` passed to
            :meth:`~causalrl.scm.scm.StructuralCausalModel.do`.

        Returns
        -------
        StructuralCausalModel
            The mutilated SCM under the specified do-intervention.

        Raises
        ------
        CausalInterfaceUnavailableError
            When ``has_causal_interface`` is ``False``.
        """
        self._require_causal_interface("do")
        return cast("StructuralCausalModel", self.scm).do(interventions)

    def intervene(self, node: str, value: Value) -> StructuralCausalModel:
        """Convenience wrapper: ``do({node: value})``.

        Parameters
        ----------
        node:
            The SCM variable to intervene on.
        value:
            The value to pin it to (scalar, sequence, or Tensor).

        Returns
        -------
        StructuralCausalModel
            The mutilated SCM under ``do({node: value})``.

        Raises
        ------
        CausalInterfaceUnavailableError
            When ``has_causal_interface`` is ``False``.
        """
        return self.do({node: value})

    # ------------------------------------------------------------------
    # Persistent interventional rollouts
    # ------------------------------------------------------------------

    @property
    def active_interventions(self) -> Mapping[str, Value] | None:
        """The currently stored intervention mapping, or ``None`` if none is active."""
        return self._active_interventions

    def set_intervention(self, interventions: Mapping[str, Value]) -> None:
        """Store a persistent intervention that affects subsequent ``reset`` and ``step``.

        After this call, every ``reset()`` and ``step()`` swaps the wrapped env's ``.scm``
        to the pre-computed mutilated SCM for the duration of the call, then restores the
        original SCM in a ``finally`` block.  Precomputed baselines stored on the env
        (e.g. ``arm_values``) are **not** recomputed.

        Parameters
        ----------
        interventions:
            Mapping ``{node_name: value}`` to pin persistently.

        Raises
        ------
        CausalInterfaceUnavailableError
            When ``has_causal_interface`` is ``False``.
        """
        self._require_causal_interface("set_intervention")
        self._active_interventions = dict(interventions)
        # Pre-compute the mutilated SCM once so set_intervention is idempotent-fast.
        self._mutilated_scm = cast("StructuralCausalModel", self.scm).do(interventions)

    def clear_intervention(self) -> None:
        """Remove the persistent intervention; subsequent calls use the unintervened SCM."""
        self._active_interventions = None
        self._mutilated_scm = None

    # ------------------------------------------------------------------
    # Gymnasium pass-through with optional SCM swap
    # ------------------------------------------------------------------

    def reset(
        self,
        *,
        seed: int | None = None,
        options: dict[str, Any] | None = None,
    ) -> tuple[Any, dict[str, Any]]:
        """Reset the wrapped environment, forwarding ``seed`` and ``options``.

        When a persistent intervention is active (``set_intervention`` was called), the
        wrapped env's ``.scm`` is temporarily replaced with the mutilated SCM for the
        duration of this call, then restored unconditionally.
        """
        if self._mutilated_scm is not None:
            orig_scm = cast("StructuralCausalModel", self.scm)
            self.env.scm = self._mutilated_scm  # type: ignore[union-attr]
            try:
                return self.env.reset(seed=seed, options=options)  # type: ignore[return-value]
            finally:
                self.env.scm = orig_scm  # type: ignore[union-attr]
        return self.env.reset(seed=seed, options=options)  # type: ignore[return-value]

    def step(self, action: Any) -> tuple[Any, float, bool, bool, dict[str, Any]]:
        """Step the wrapped environment.

        When a persistent intervention is active (``set_intervention`` was called), the
        wrapped env's ``.scm`` is temporarily replaced with the mutilated SCM for the
        duration of this call, then restored unconditionally.
        """
        if self._mutilated_scm is not None:
            orig_scm = cast("StructuralCausalModel", self.scm)
            self.env.scm = self._mutilated_scm  # type: ignore[union-attr]
            try:
                return self.env.step(action)  # type: ignore[return-value]
            finally:
                self.env.scm = orig_scm  # type: ignore[union-attr]
        return self.env.step(action)  # type: ignore[return-value]

has_causal_interface property

True iff the causal interface is fully operational.

Requires the wrapped env to expose a non-None .scm and reward_node to be set and present in the graph.

scm property

The underlying SCM (live reference to the wrapped env's .scm), or None.

reward_parents property

Direct SCM parents of reward_node in graph-topological order.

These are the variables that causally determine the immediate reward signal. Pass their names as the factor_nodes argument of :func:~causalrl.agents.factored_advantage.factored_advantage.

Raises

CausalInterfaceUnavailableError When has_causal_interface is False.

active_interventions property

The currently stored intervention mapping, or None if none is active.

do(interventions)

Return a new SCM mutilated by do(interventions).

The running environment's SCM is NOT modified. This is a pure causal-graph query suitable for off-policy reasoning or shaping.

Parameters

interventions: Mapping {node_name: value} passed to :meth:~causalrl.scm.scm.StructuralCausalModel.do.

Returns

StructuralCausalModel The mutilated SCM under the specified do-intervention.

Raises

CausalInterfaceUnavailableError When has_causal_interface is False.

Source code in src/causalrl/envs/wrapper.py
def do(self, interventions: Mapping[str, Value]) -> StructuralCausalModel:
    """Return a *new* SCM mutilated by ``do(interventions)``.

    The running environment's SCM is NOT modified.  This is a pure causal-graph query
    suitable for off-policy reasoning or shaping.

    Parameters
    ----------
    interventions:
        Mapping ``{node_name: value}`` passed to
        :meth:`~causalrl.scm.scm.StructuralCausalModel.do`.

    Returns
    -------
    StructuralCausalModel
        The mutilated SCM under the specified do-intervention.

    Raises
    ------
    CausalInterfaceUnavailableError
        When ``has_causal_interface`` is ``False``.
    """
    self._require_causal_interface("do")
    return cast("StructuralCausalModel", self.scm).do(interventions)

intervene(node, value)

Convenience wrapper: do({node: value}).

Parameters

node: The SCM variable to intervene on. value: The value to pin it to (scalar, sequence, or Tensor).

Returns

StructuralCausalModel The mutilated SCM under do({node: value}).

Raises

CausalInterfaceUnavailableError When has_causal_interface is False.

Source code in src/causalrl/envs/wrapper.py
def intervene(self, node: str, value: Value) -> StructuralCausalModel:
    """Convenience wrapper: ``do({node: value})``.

    Parameters
    ----------
    node:
        The SCM variable to intervene on.
    value:
        The value to pin it to (scalar, sequence, or Tensor).

    Returns
    -------
    StructuralCausalModel
        The mutilated SCM under ``do({node: value})``.

    Raises
    ------
    CausalInterfaceUnavailableError
        When ``has_causal_interface`` is ``False``.
    """
    return self.do({node: value})

set_intervention(interventions)

Store a persistent intervention that affects subsequent reset and step.

After this call, every reset() and step() swaps the wrapped env's .scm to the pre-computed mutilated SCM for the duration of the call, then restores the original SCM in a finally block. Precomputed baselines stored on the env (e.g. arm_values) are not recomputed.

Parameters

interventions: Mapping {node_name: value} to pin persistently.

Raises

CausalInterfaceUnavailableError When has_causal_interface is False.

Source code in src/causalrl/envs/wrapper.py
def set_intervention(self, interventions: Mapping[str, Value]) -> None:
    """Store a persistent intervention that affects subsequent ``reset`` and ``step``.

    After this call, every ``reset()`` and ``step()`` swaps the wrapped env's ``.scm``
    to the pre-computed mutilated SCM for the duration of the call, then restores the
    original SCM in a ``finally`` block.  Precomputed baselines stored on the env
    (e.g. ``arm_values``) are **not** recomputed.

    Parameters
    ----------
    interventions:
        Mapping ``{node_name: value}`` to pin persistently.

    Raises
    ------
    CausalInterfaceUnavailableError
        When ``has_causal_interface`` is ``False``.
    """
    self._require_causal_interface("set_intervention")
    self._active_interventions = dict(interventions)
    # Pre-compute the mutilated SCM once so set_intervention is idempotent-fast.
    self._mutilated_scm = cast("StructuralCausalModel", self.scm).do(interventions)

clear_intervention()

Remove the persistent intervention; subsequent calls use the unintervened SCM.

Source code in src/causalrl/envs/wrapper.py
def clear_intervention(self) -> None:
    """Remove the persistent intervention; subsequent calls use the unintervened SCM."""
    self._active_interventions = None
    self._mutilated_scm = None

reset(*, seed=None, options=None)

Reset the wrapped environment, forwarding seed and options.

When a persistent intervention is active (set_intervention was called), the wrapped env's .scm is temporarily replaced with the mutilated SCM for the duration of this call, then restored unconditionally.

Source code in src/causalrl/envs/wrapper.py
def reset(
    self,
    *,
    seed: int | None = None,
    options: dict[str, Any] | None = None,
) -> tuple[Any, dict[str, Any]]:
    """Reset the wrapped environment, forwarding ``seed`` and ``options``.

    When a persistent intervention is active (``set_intervention`` was called), the
    wrapped env's ``.scm`` is temporarily replaced with the mutilated SCM for the
    duration of this call, then restored unconditionally.
    """
    if self._mutilated_scm is not None:
        orig_scm = cast("StructuralCausalModel", self.scm)
        self.env.scm = self._mutilated_scm  # type: ignore[union-attr]
        try:
            return self.env.reset(seed=seed, options=options)  # type: ignore[return-value]
        finally:
            self.env.scm = orig_scm  # type: ignore[union-attr]
    return self.env.reset(seed=seed, options=options)  # type: ignore[return-value]

step(action)

Step the wrapped environment.

When a persistent intervention is active (set_intervention was called), the wrapped env's .scm is temporarily replaced with the mutilated SCM for the duration of this call, then restored unconditionally.

Source code in src/causalrl/envs/wrapper.py
def step(self, action: Any) -> tuple[Any, float, bool, bool, dict[str, Any]]:
    """Step the wrapped environment.

    When a persistent intervention is active (``set_intervention`` was called), the
    wrapped env's ``.scm`` is temporarily replaced with the mutilated SCM for the
    duration of this call, then restored unconditionally.
    """
    if self._mutilated_scm is not None:
        orig_scm = cast("StructuralCausalModel", self.scm)
        self.env.scm = self._mutilated_scm  # type: ignore[union-attr]
        try:
            return self.env.step(action)  # type: ignore[return-value]
        finally:
            self.env.scm = orig_scm  # type: ignore[union-attr]
    return self.env.step(action)  # type: ignore[return-value]

Causal Graph-Factored Advantage (CGFA)

causalrl.agents.factored_advantage.factored_advantage(factor_values, baselines, *, config=None, aggregation='sum', weights=None)

Compute causal graph-factored advantages from per-factor value estimates.

Implements the SCM-aligned critic target from CGFA-PPO (arXiv:2605.06066, §3.2). Given K causal parent factors of the return, and for each rollout step a vector of per-factor value estimates V_1, …, V_K and a scalar baseline b, the per-factor advantage is A_i = V_i - b and the combined advantage is their (weighted) sum or mean.

When K = 1 (single factor) the output reduces exactly to the standard advantage A = V - b, so this is a strict generalisation of the scalar advantage.

Parameters

factor_values: Array of shape (T, K) where T is the number of rollout steps and K is the number of causal factors (SCM parents of the return). Each column [:,i] is the critic's value estimate for factor i. baselines: Array of shape (T,) — the shared scalar baseline for each step (typically the current value-function estimate V(s_t)). config: A :class:FactoredAdvantageConfig that carries factor_nodes, aggregation, and optional weights. When provided, aggregation and weights keyword arguments are ignored (config takes precedence). aggregation: Used when config is None. "sum" (default) or "mean". weights: Used when config is None. Per-factor weights, shape (K,). None means uniform unit weights.

Returns

NDArray[np.float64] Shape (T,) — the combined causal-graph-factored advantage for each step.

Raises

ValueError If factor_values is not 2-D, if baselines length does not match T, or if weights shape does not match K.

Examples

Single-factor (reduces to standard advantage):

import numpy as np V = np.array([[2.0], [3.0], [1.0]]) # (T=3, K=1) b = np.array([1.5, 2.5, 0.5]) factored_advantage(V, b) array([0.5, 0.5, 0.5])

Two-factor sum (CGFA-PPO with two SCM parents of the return):

V2 = np.array([[2.0, 1.0], [3.0, 0.5]]) # (T=2, K=2) b2 = np.array([1.5, 2.0]) factored_advantage(V2, b2) # A_i = V_i - b; sum over i array([ 0. , -0.5])

References

  • Cristiano da Costa Cunha, Ajmal Mian, Tim French, and Wei Liu (2026). "Causal Reinforcement Learning for Complex Card Games: A Magic: The Gathering Benchmark." arXiv:2605.06066.
Source code in src/causalrl/agents/factored_advantage.py
def factored_advantage(
    factor_values: NDArray[np.float64],
    baselines: NDArray[np.float64],
    *,
    config: FactoredAdvantageConfig | None = None,
    aggregation: Literal["sum", "mean"] = "sum",
    weights: NDArray[np.float64] | None = None,
) -> NDArray[np.float64]:
    """Compute causal graph-factored advantages from per-factor value estimates.

    Implements the SCM-aligned critic target from CGFA-PPO (arXiv:2605.06066, §3.2).
    Given ``K`` causal parent factors of the return, and for each rollout step a vector of
    per-factor value estimates ``V_1, …, V_K`` and a scalar baseline ``b``, the per-factor
    advantage is ``A_i = V_i - b`` and the combined advantage is their (weighted) sum or
    mean.

    When ``K = 1`` (single factor) the output reduces exactly to the standard advantage
    ``A = V - b``, so this is a strict generalisation of the scalar advantage.

    Parameters
    ----------
    factor_values:
        Array of shape ``(T, K)`` where ``T`` is the number of rollout steps and ``K`` is
        the number of causal factors (SCM parents of the return).  Each column ``[:,i]``
        is the critic's value estimate for factor ``i``.
    baselines:
        Array of shape ``(T,)`` — the shared scalar baseline for each step (typically the
        current value-function estimate ``V(s_t)``).
    config:
        A :class:`FactoredAdvantageConfig` that carries ``factor_nodes``, ``aggregation``,
        and optional ``weights``.  When provided, ``aggregation`` and ``weights`` keyword
        arguments are ignored (config takes precedence).
    aggregation:
        Used when ``config`` is ``None``.  ``"sum"`` (default) or ``"mean"``.
    weights:
        Used when ``config`` is ``None``.  Per-factor weights, shape ``(K,)``.  ``None``
        means uniform unit weights.

    Returns
    -------
    NDArray[np.float64]
        Shape ``(T,)`` — the combined causal-graph-factored advantage for each step.

    Raises
    ------
    ValueError
        If ``factor_values`` is not 2-D, if ``baselines`` length does not match ``T``, or
        if ``weights`` shape does not match ``K``.

    Examples
    --------
    Single-factor (reduces to standard advantage):

    >>> import numpy as np
    >>> V = np.array([[2.0], [3.0], [1.0]])   # (T=3, K=1)
    >>> b = np.array([1.5, 2.5, 0.5])
    >>> factored_advantage(V, b)
    array([0.5, 0.5, 0.5])

    Two-factor sum (CGFA-PPO with two SCM parents of the return):

    >>> V2 = np.array([[2.0, 1.0], [3.0, 0.5]])  # (T=2, K=2)
    >>> b2 = np.array([1.5, 2.0])
    >>> factored_advantage(V2, b2)   # A_i = V_i - b; sum over i
    array([ 0. , -0.5])

    References
    ----------
    * Cristiano da Costa Cunha, Ajmal Mian, Tim French, and Wei Liu (2026). "Causal
      Reinforcement Learning for Complex Card Games: A Magic: The Gathering Benchmark."
      arXiv:2605.06066.
    """
    fv = np.asarray(factor_values, dtype=np.float64)
    bl = np.asarray(baselines, dtype=np.float64)

    if fv.ndim != 2:
        raise ValueError(f"factor_values must be 2-D (T, K); got shape {fv.shape}")
    t, k = fv.shape
    if bl.shape != (t,):
        raise ValueError(
            f"baselines must have shape ({t},) to match factor_values rows; got {bl.shape}"
        )

    # Resolve aggregation and weights.
    if config is not None:
        agg = config.aggregation
        w = config.weights_array
        if len(w) != k:
            raise ValueError(
                f"FactoredAdvantageConfig has {len(w)} factors but factor_values has K={k} columns"
            )
    else:
        agg = aggregation
        if weights is not None:
            w = np.asarray(weights, dtype=np.float64)
            if w.shape != (k,):
                raise ValueError(f"weights must have shape ({k},) to match K; got {w.shape}")
        else:
            w = np.ones(k, dtype=np.float64)

    # Per-factor advantages: A_i(t) = V_i(t) - b(t).  Shape (T, K).
    per_factor: NDArray[np.float64] = fv - bl[:, np.newaxis]  # broadcast baseline

    # Combine: weighted sum or weighted mean over the K factors.
    combined = per_factor @ w  # (T,)
    if agg == "mean":
        combined = combined / float(w.sum()) if w.sum() != 0.0 else combined

    return combined

causalrl.agents.factored_advantage.FactoredAdvantageConfig dataclass

Configuration bundle for :func:factored_advantage.

Parameters

factor_nodes: Ordered list of SCM parent-node names whose per-factor value estimates are passed to :func:factored_advantage. The order must match the columns of the factor_values array. aggregation: How to combine per-factor advantages into the final scalar advantage.

* ``"sum"`` (default): ``A = Σ_i A_i`` — the formulation in arXiv:2605.06066.
* ``"mean"``: ``A = mean_i A_i`` — normalises when the number of factors varies.

weights: Optional per-factor weights w_i (length must match factor_nodes when provided). The combined advantage becomes A = Σ_i w_i * A_i for aggregation="sum" or the weighted mean for aggregation="mean". None means uniform unit weights.

Source code in src/causalrl/agents/factored_advantage.py
@dataclass
class FactoredAdvantageConfig:
    """Configuration bundle for :func:`factored_advantage`.

    Parameters
    ----------
    factor_nodes:
        Ordered list of SCM parent-node names whose per-factor value estimates are passed to
        :func:`factored_advantage`.  The order must match the columns of the
        ``factor_values`` array.
    aggregation:
        How to combine per-factor advantages into the final scalar advantage.

        * ``"sum"`` (default): ``A = Σ_i A_i`` — the formulation in arXiv:2605.06066.
        * ``"mean"``: ``A = mean_i A_i`` — normalises when the number of factors varies.
    weights:
        Optional per-factor weights ``w_i`` (length must match ``factor_nodes`` when
        provided).  The combined advantage becomes ``A = Σ_i w_i * A_i`` for
        ``aggregation="sum"`` or the weighted mean for ``aggregation="mean"``.  ``None``
        means uniform unit weights.
    """

    factor_nodes: list[str]
    aggregation: Literal["sum", "mean"] = "sum"
    weights: list[float] | None = None
    # derived / validated at post-init
    _weights_arr: NDArray[np.float64] = field(init=False, repr=False)

    def __post_init__(self) -> None:
        n = len(self.factor_nodes)
        if self.weights is not None:
            if len(self.weights) != n:
                raise ValueError(
                    f"weights length ({len(self.weights)}) must match factor_nodes length ({n})"
                )
            self._weights_arr = np.asarray(self.weights, dtype=np.float64)
        else:
            self._weights_arr = np.ones(n, dtype=np.float64)

    @property
    def weights_array(self) -> NDArray[np.float64]:
        """The validated per-factor weight vector (uniform unit weights when unset)."""
        return self._weights_arr

weights_array property

The validated per-factor weight vector (uniform unit weights when unset).

Gymnasium Env Registration

causalrl.envs.registration.register_envs()

Register causalrl demo environments in the Gymnasium registry.

Calling this function more than once in the same process is safe (idempotent).

After calling this function (or importing causalrl), you can use::

import gymnasium
import causalrl  # triggers register_envs()

env = gymnasium.make("causalrl/StructuralCausalBandit-v0")
vec = gymnasium.make_vec("causalrl/StructuralCausalBandit-v0", num_envs=2)
Source code in src/causalrl/envs/registration.py
def register_envs() -> None:
    """Register causalrl demo environments in the Gymnasium registry.

    Calling this function more than once in the same process is safe (idempotent).

    After calling this function (or importing ``causalrl``), you can use::

        import gymnasium
        import causalrl  # triggers register_envs()

        env = gymnasium.make("causalrl/StructuralCausalBandit-v0")
        vec = gymnasium.make_vec("causalrl/StructuralCausalBandit-v0", num_envs=2)
    """
    global _registered
    if _registered:
        return

    # gymnasium.register has an incompletely typed `kwargs` dict parameter in the stubs;
    # suppress the resulting pyright reportUnknownMemberType at each call site.
    gymnasium.register(  # type: ignore[reportUnknownMemberType]
        id="causalrl/StructuralCausalBandit-v0",
        entry_point="causalrl.envs.suite.scbandit:make_confounded_chain_env",
        # disable_env_checker keeps CI fast; callers can re-enable via check_env.
        disable_env_checker=True,
    )

    gymnasium.register(  # type: ignore[reportUnknownMemberType]
        id="causalrl/FrontdoorBandit-v0",
        entry_point="causalrl.envs.suite.scbandit:make_frontdoor_env",
        disable_env_checker=True,
    )

    _registered = True

Exceptions

causalrl.exceptions.CausalRLError

Bases: Exception

Base class for all causalrl errors.

Source code in src/causalrl/exceptions.py
class CausalRLError(Exception):
    """Base class for all causalrl errors."""

causalrl.exceptions.CausalInterfaceUnavailableError

Bases: CausalRLError

The causal interface is not available on this wrapper.

Raised when a method that requires a live SCM and a named reward node is called on a :class:~causalrl.envs.wrapper.CausalEnvWrapper that was constructed without them (e.g. wrapping a :class:~causalrl.envs.base.ConfoundedMDP that carries scm=None, or without passing a reward_node).

Source code in src/causalrl/exceptions.py
class CausalInterfaceUnavailableError(CausalRLError):
    """The causal interface is not available on this wrapper.

    Raised when a method that requires a live SCM and a named reward node is called
    on a :class:`~causalrl.envs.wrapper.CausalEnvWrapper` that was constructed without
    them (e.g. wrapping a :class:`~causalrl.envs.base.ConfoundedMDP` that carries
    ``scm=None``, or without passing a ``reward_node``).
    """

causalrl.exceptions.NotIdentifiableError

Bases: CausalRLError

A causal query is not identifiable from the available data.

Source code in src/causalrl/exceptions.py
class NotIdentifiableError(CausalRLError):
    """A causal query is not identifiable from the available data."""

    def __init__(self, message: str, witness: object | None = None) -> None:
        super().__init__(message)
        self.witness = witness

causalrl.exceptions.CausalGraphError

Bases: CausalRLError

Invalid graph operation (unknown node, cycle, malformed edge).

Source code in src/causalrl/exceptions.py
class CausalGraphError(CausalRLError):
    """Invalid graph operation (unknown node, cycle, malformed edge)."""

causalrl.exceptions.RealizabilityError

Bases: CausalRLError

A counterfactual query cannot be realized from the given evidence.

Source code in src/causalrl/exceptions.py
class RealizabilityError(CausalRLError):
    """A counterfactual query cannot be realized from the given evidence."""

causalrl.exceptions.UnverifiedAssumptionError

Bases: CausalRLError

A method's claimed guarantee requires an assumption the caller has not declared.

Source code in src/causalrl/exceptions.py
class UnverifiedAssumptionError(CausalRLError):
    """A method's claimed guarantee requires an assumption the caller has not declared."""

Partial-Identification And OPE Bounds

causalrl.identification.bounds.manski_bounds(data, *, treatment, outcome, action, outcome_range=(0.0, 1.0))

Sharp no-assumptions bounds on E[outcome | do(treatment = action)] (Manski 1990).

From observational data (integer treatment column, numeric outcome in outcome_range): the units that took action contribute their observed mean, while the rest are bounded only by the outcome range. With p = P(treatment = action) and m = E[outcome | treatment = action] the bounds are [m*p + y_min*(1-p), m*p + y_max*(1-p)] — sharp, collapsing to a point when every unit took action. The observational counterpart of :func:causal_q_bounds.

Source code in src/causalrl/identification/bounds.py
def manski_bounds(
    data: Mapping[str, Sequence[float]],
    *,
    treatment: str,
    outcome: str,
    action: int,
    outcome_range: tuple[float, float] = (0.0, 1.0),
) -> Interval:
    """Sharp no-assumptions bounds on ``E[outcome | do(treatment = action)]`` (Manski 1990).

    From observational ``data`` (integer ``treatment`` column, numeric ``outcome`` in
    ``outcome_range``): the units that took ``action`` contribute their observed mean, while
    the rest are bounded only by the outcome range. With ``p = P(treatment = action)`` and
    ``m = E[outcome | treatment = action]`` the bounds are
    ``[m*p + y_min*(1-p), m*p + y_max*(1-p)]`` — sharp, collapsing to a point when every unit took
    ``action``. The observational counterpart of :func:`causal_q_bounds`.
    """
    x = np.asarray(data[treatment])
    y = np.asarray(data[outcome], dtype=float)
    y_min, y_max = outcome_range
    mask = x == action
    p = float(mask.mean())
    observed = float(y[mask].mean()) if bool(mask.any()) else 0.0
    return Interval(observed * p + y_min * (1.0 - p), observed * p + y_max * (1.0 - p))

causalrl.identification.bounds.ipw_sensitivity_bounds(outcomes, propensities, *, gamma)

Marginal-sensitivity-model bounds on the treated counterfactual mean E[Y(1)].

outcomes and propensities are the treated units' outcomes Y_i and nominal propensities e(Z_i) = P(treated | Z_i) (what an unconfounded model fits). Under Tan's marginal sensitivity model the true inverse-propensity weight lies within an odds-ratio factor gamma >= 1 of the nominal, giving a_i in [1 + (1/g)(1/e_i - 1), 1 + g(1/e_i - 1)]; the bounds are the extreme stabilized (Hájek) weighted means over that range. At gamma = 1 the interval collapses to the IPW point estimate; it widens monotonically with gamma and contains E[Y(1)] whenever the true confounding odds ratio is at most gamma.

Faithful to Z. Tan, A Distributional Approach for Causal Inference Using Propensity Scores (JASA 2006) and Q. Zhao, D. Small, B. Bhattacharya, Sensitivity Analysis for Inverse Probability Weighting Estimators via the Percentile Bootstrap (JRSS-B 2019). No code is ported.

Source code in src/causalrl/identification/bounds.py
def ipw_sensitivity_bounds(
    outcomes: Sequence[float], propensities: Sequence[float], *, gamma: float
) -> Interval:
    """Marginal-sensitivity-model bounds on the treated counterfactual mean ``E[Y(1)]``.

    ``outcomes`` and ``propensities`` are the treated units' outcomes ``Y_i`` and *nominal*
    propensities ``e(Z_i) = P(treated | Z_i)`` (what an unconfounded model fits). Under Tan's
    marginal sensitivity model the true inverse-propensity weight lies within an odds-ratio factor
    ``gamma >= 1`` of the nominal, giving ``a_i in [1 + (1/g)(1/e_i - 1), 1 + g(1/e_i - 1)]``; the
    bounds are the extreme stabilized (Hájek) weighted means over that range. At ``gamma = 1`` the
    interval collapses to the IPW point estimate; it widens monotonically with ``gamma`` and
    contains ``E[Y(1)]`` whenever the true confounding odds ratio is at most ``gamma``.

    Faithful to Z. Tan, *A Distributional Approach for Causal Inference Using Propensity Scores*
    (JASA 2006) and Q. Zhao, D. Small, B. Bhattacharya, *Sensitivity Analysis for Inverse
    Probability Weighting Estimators via the Percentile Bootstrap* (JRSS-B 2019). No code is ported.
    """
    if gamma < 1.0:
        raise ValueError("gamma must be >= 1")
    y = np.asarray(outcomes, dtype=float)
    e = np.asarray(propensities, dtype=float)
    odds = (1.0 - e) / e
    lo_w = 1.0 + odds / gamma
    hi_w = 1.0 + odds * gamma
    return Interval(
        _fractional_extreme(y, lo_w, hi_w, maximize=False),
        _fractional_extreme(y, lo_w, hi_w, maximize=True),
    )

causalrl.identification.bounds.causal_q_bounds(dataset, state, action, *, require_identified=False)

Manski natural bounds on E[return | do(action), state] from confounded logs.

For a return in [0, 1] with empirical mean m = E[R|s,a] and propensity p = P(a|s): lower = m * p, upper = m * p + (1 - p). A never-logged action (p = 0) yields the vacuous [0, 1] — not identifiable from the logs alone. With require_identified=True, a vacuous bound raises NotIdentifiableError carrying (state, action) as the witness.

Source code in src/causalrl/identification/bounds.py
def causal_q_bounds(
    dataset: ConfoundedTrajectoryDataset,
    state: int,
    action: int,
    *,
    require_identified: bool = False,
) -> Interval:
    """Manski natural bounds on E[return | do(action), state] from confounded logs.

    For a return in [0, 1] with empirical mean m = E[R|s,a] and propensity p = P(a|s):
        lower = m * p,  upper = m * p + (1 - p).
    A never-logged action (p = 0) yields the vacuous [0, 1] — not identifiable from the
    logs alone. With `require_identified=True`, a vacuous bound raises NotIdentifiableError
    carrying (state, action) as the witness.
    """
    p = dataset.behavior_propensity(state, action)
    m = dataset.mean_reward(state, action)
    lower = m * p
    upper = m * p + (1.0 - p)
    if require_identified and p == 0.0:
        raise NotIdentifiableError(
            f"E[R|do(a={action}), s={state}] is not identifiable: action never logged "
            f"in this state (vacuous bound [0, 1])",
            witness=(state, action),
        )
    return Interval(lower, upper)

causalrl.identification.bounds.msm_policy_value_bounds(outcomes, logging_propensities, target_propensities, *, gamma)

Marginal-sensitivity-model bounds on an off-policy value V(pi_t) = E[(pi_t/e0) Y].

Self-normalised (Hájek) off-policy value of a target policy pi_t estimated from logs of a logging policy with nominal propensities e0(a|x) = P(a | x) (a valid probability in (0, 1]). outcomes are the logged rewards Y_i; target_propensities are pi_t(a_i | x_i) at the logged action. Under Tan's marginal sensitivity model the true logging propensity deviates from nominal by an odds-ratio at most gamma >= 1, so the true inverse weight 1/ẽ0 lies in [1 + odds/gamma, 1 + odds*gamma] with odds = (1-e0)/e0; each unit's contribution weight is pi_t(a_i|x_i) * (1/ẽ0). The bounds are the extreme stabilised weighted means of Y over those per-unit weight ranges.

Reduces to :func:ipw_sensitivity_bounds when pi_t is constant across the logged actions (the treated / uniform-target mean — the constant cancels in the self-normalised ratio), and collapses to the self-normalised IPS point at gamma = 1. The off-policy generalisation of Tan's MSM in the spirit of N. Kallus & A. Zhou, Confounding-Robust Policy Evaluation in Infinite-Horizon Reinforcement Learning (NeurIPS 2020). The caller supplies pi_t and the nominal e0; no code is ported.

Source code in src/causalrl/identification/bounds.py
def msm_policy_value_bounds(
    outcomes: Sequence[float],
    logging_propensities: Sequence[float],
    target_propensities: Sequence[float],
    *,
    gamma: float,
) -> Interval:
    """Marginal-sensitivity-model bounds on an off-policy value ``V(pi_t) = E[(pi_t/e0) Y]``.

    Self-normalised (Hájek) off-policy value of a target policy ``pi_t`` estimated from logs of a
    logging policy with *nominal* propensities ``e0(a|x) = P(a | x)`` (a valid probability in
    ``(0, 1]``). ``outcomes`` are the logged rewards ``Y_i``; ``target_propensities`` are
    ``pi_t(a_i | x_i)`` at the logged action. Under Tan's marginal sensitivity model the true
    logging propensity deviates from nominal by an odds-ratio at most ``gamma >= 1``, so the true
    inverse weight ``1/ẽ0`` lies in ``[1 + odds/gamma, 1 + odds*gamma]`` with ``odds = (1-e0)/e0``;
    each unit's contribution weight is ``pi_t(a_i|x_i) * (1/ẽ0)``. The bounds are the extreme
    stabilised weighted means of ``Y`` over those per-unit weight ranges.

    Reduces to :func:`ipw_sensitivity_bounds` when ``pi_t`` is constant across the logged actions
    (the treated / uniform-target mean — the constant cancels in the self-normalised ratio), and
    collapses to the self-normalised IPS point at ``gamma = 1``. The off-policy generalisation of
    Tan's MSM in the spirit of N. Kallus & A. Zhou, *Confounding-Robust Policy Evaluation in
    Infinite-Horizon Reinforcement Learning* (NeurIPS 2020). The caller supplies ``pi_t`` and the
    nominal ``e0``; no code is ported.
    """
    if gamma < 1.0:
        raise ValueError("gamma must be >= 1")
    y = np.asarray(outcomes, dtype=float)
    e0 = np.asarray(logging_propensities, dtype=float)
    pt = np.asarray(target_propensities, dtype=float)
    odds = (1.0 - e0) / e0
    lo_w = pt * (1.0 + odds / gamma)
    hi_w = pt * (1.0 + odds * gamma)
    return Interval(
        _fractional_extreme(y, lo_w, hi_w, maximize=False),
        _fractional_extreme(y, lo_w, hi_w, maximize=True),
    )

causalrl.identification.bounds.msm_contribution_bounds(outcomes, logging_propensities, target_propensities_on, target_propensities_off, *, gamma)

Marginal-sensitivity-model bounds on a contribution V(pi_on) - V(pi_off).

The off-policy value DIFFERENCE between two target rules, estimated from confounded logs under Tan's marginal sensitivity model — e.g. a per-agent credit or per-factor contribution E[Y_{do(F=1)}] - E[Y_{do(F=0)}]. Each arm is bounded by :func:msm_policy_value_bounds (target_propensities_on = pi_on(a_i | x_i) at the logged action, ..._off likewise, shared nominal e0) and the contribution interval is the difference

[ on.lower - off.upper ,  on.upper - off.lower ].

Always valid (it contains the true difference for any targets); sharp when the two target supports are disjoint — e.g. the deterministic one-hot arms 1{F=1} / 1{F=0} that partition the logged units, so the two arms' weight perturbations are independent — and conservative otherwise. Collapses to the difference of the two self-normalised IPS points at gamma = 1 and widens monotonically with gamma.

Source code in src/causalrl/identification/bounds.py
def msm_contribution_bounds(
    outcomes: Sequence[float],
    logging_propensities: Sequence[float],
    target_propensities_on: Sequence[float],
    target_propensities_off: Sequence[float],
    *,
    gamma: float,
) -> Interval:
    """Marginal-sensitivity-model bounds on a *contribution* ``V(pi_on) - V(pi_off)``.

    The off-policy value DIFFERENCE between two target rules, estimated from confounded logs
    under Tan's marginal sensitivity model — e.g. a per-agent credit or per-factor contribution
    ``E[Y_{do(F=1)}] - E[Y_{do(F=0)}]``. Each arm is bounded by :func:`msm_policy_value_bounds`
    (``target_propensities_on`` = ``pi_on(a_i | x_i)`` at the logged action, ``..._off`` likewise,
    shared nominal ``e0``) and the contribution interval is the difference

        [ on.lower - off.upper ,  on.upper - off.lower ].

    Always *valid* (it contains the true difference for any targets); **sharp** when the two
    target supports are disjoint — e.g. the deterministic one-hot arms ``1{F=1}`` / ``1{F=0}``
    that partition the logged units, so the two arms' weight perturbations are independent — and
    *conservative* otherwise. Collapses to the difference of the two self-normalised IPS points at
    ``gamma = 1`` and widens monotonically with ``gamma``.
    """
    if gamma < 1.0:
        raise ValueError("gamma must be >= 1")
    on = msm_policy_value_bounds(
        outcomes, logging_propensities, target_propensities_on, gamma=gamma
    )
    off = msm_policy_value_bounds(
        outcomes, logging_propensities, target_propensities_off, gamma=gamma
    )
    return Interval(on.lower - off.upper, on.upper - off.lower)

causalrl.identification.bounds.msm_per_step_bounds(rewards_by_step, propensities_by_step, *, gamma)

Per-step marginal-sensitivity-model bounds on a cumulative (summed) reward.

Each element of rewards_by_step / propensities_by_step is one time step's per-unit rewards r_t and nominal propensities e_t; the cumulative-reward MSM bound is the sum over steps of the per-step :func:ipw_sensitivity_bounds. This is the additive (per-step) cumulative-reward MSM: each step is bounded independently under the sensitivity model and the bounds add, which is tight for sparse / per-step rewards.

Reusable kernel of the per-step cumulative-reward MSM used for confounded multi-step OPE (Bennett & Kallus, Efficient and Sharp OPE in Robust MDPs, NeurIPS 2024). The experiment supplies the per-step nominal propensities; no code is ported.

Source code in src/causalrl/identification/bounds.py
def msm_per_step_bounds(
    rewards_by_step: Sequence[Sequence[float]],
    propensities_by_step: Sequence[Sequence[float]],
    *,
    gamma: float,
) -> Interval:
    """Per-step marginal-sensitivity-model bounds on a cumulative (summed) reward.

    Each element of ``rewards_by_step`` / ``propensities_by_step`` is one time step's
    per-unit rewards ``r_t`` and nominal propensities ``e_t``; the cumulative-reward MSM
    bound is the sum over steps of the per-step :func:`ipw_sensitivity_bounds`. This is the
    additive (per-step) cumulative-reward MSM: each step is bounded independently under the
    sensitivity model and the bounds add, which is tight for sparse / per-step rewards.

    Reusable kernel of the per-step cumulative-reward MSM used for confounded multi-step OPE
    (Bennett & Kallus, *Efficient and Sharp OPE in Robust MDPs*, NeurIPS 2024). The
    experiment supplies the per-step nominal propensities; no code is ported.
    """
    if gamma < 1.0:
        raise ValueError("gamma must be >= 1")
    if len(rewards_by_step) != len(propensities_by_step):
        raise ValueError("rewards_by_step and propensities_by_step must have equal length")
    lower = upper = 0.0
    for r_t, e_t in zip(rewards_by_step, propensities_by_step, strict=True):
        iv = ipw_sensitivity_bounds(r_t, e_t, gamma=gamma)
        lower += iv.lower
        upper += iv.upper
    return Interval(lower, upper)

causalrl.identification.bounds.msm_stratified_bounds(values, propensities, strata, target_weights, *, gamma)

Stratified marginal-sensitivity-model bounds: Σ_s w_s · MSM_s.

Compute the MSM bound within each stratum (units sharing a strata label) and combine them with target_weights (the target stratum marginal w_s, e.g. a uniform initial state distribution). Strata absent from the data contribute nothing. Because conditioning removes between-stratum weight variation, the stratified bound is never wider than the pooled :func:ipw_sensitivity_bounds and never under-covers (THEORY.md, Prop 1).

The reusable kernel of the stratified cumulative-reward MSM; the experiment supplies the stratum labels (e.g. initial state) and nominal propensities.

Source code in src/causalrl/identification/bounds.py
def msm_stratified_bounds(
    values: Sequence[float],
    propensities: Sequence[float],
    strata: Sequence[int],
    target_weights: Mapping[int, float],
    *,
    gamma: float,
) -> Interval:
    """Stratified marginal-sensitivity-model bounds: ``Σ_s w_s · MSM_s``.

    Compute the MSM bound within each stratum (units sharing a ``strata`` label) and combine
    them with ``target_weights`` (the target stratum marginal ``w_s``, e.g. a uniform initial
    state distribution). Strata absent from the data contribute nothing. Because conditioning
    removes between-stratum weight variation, the stratified bound is never wider than the
    pooled :func:`ipw_sensitivity_bounds` and never under-covers (THEORY.md, Prop 1).

    The reusable kernel of the stratified cumulative-reward MSM; the experiment supplies the
    stratum labels (e.g. initial state) and nominal propensities.
    """
    if gamma < 1.0:
        raise ValueError("gamma must be >= 1")
    v = np.asarray(values, dtype=float)
    e = np.asarray(propensities, dtype=float)
    s = np.asarray(strata)
    lower = upper = 0.0
    for label, w in target_weights.items():
        mask = s == label
        if not bool(mask.any()):
            continue
        iv = ipw_sensitivity_bounds(v[mask].tolist(), e[mask].tolist(), gamma=gamma)
        lower += w * iv.lower
        upper += w * iv.upper
    return Interval(lower, upper)

Decision Certificates

The decision stack — certify whether a confounded / off-policy decision ("is the treated arm better than the control arm?") is robust to hidden confounding, cheapest layer first. certify_decision is the one-call front door over the layers below.

causalrl.identification.decision.certify_decision(outcomes, treated, *, confounder_bins=None, mi_cap=None, propensities=None, gamma_max=10.0)

Certify whether a binary decision from confounded logs is robust to hidden confounding.

outcomes are logged rewards Y_i; treated is the binary arm indicator (1 = the arm under test, 0 = baseline). The naive decision is the sign of the logged contrast E[Y | F=1] - E[Y | F=0]. Supply at least one evidence source — they compose:

  • confounder_bins (a measured hidden variable Z) or mi_cap (a structural cap on the information channel MI(I; Z)) runs the sign-robustness layer (:func:pivotality_certificate): certifies iff no hidden confounder consistent with that information can flip the contrast's sign. One-sided — failure to certify is not evidence of a flip.
  • propensities (the nominal logging propensities e0(a_i | x_i) at the logged action) runs the MSM tipping layer: the smallest Tan odds-ratio Gamma at which the off-policy (IPS) value contrast band first includes zero (:func:tipping_gamma over the sharp one-hot :func:msm_contribution_bounds). tipping_gamma is None means the decision is robust to confounding at least as strong as gamma_max.

With informative propensities the MSM layer concerns the inverse-propensity-weighted off-policy contrast, which coincides with the raw logged contrast only under uniform logging.

Source code in src/causalrl/identification/decision.py
def certify_decision(
    outcomes: Sequence[float],
    treated: Sequence[int],
    *,
    confounder_bins: Sequence[int] | None = None,
    mi_cap: float | None = None,
    propensities: Sequence[float] | None = None,
    gamma_max: float = 10.0,
) -> DecisionCertificate:
    """Certify whether a binary decision from confounded logs is robust to hidden confounding.

    ``outcomes`` are logged rewards ``Y_i``; ``treated`` is the binary arm indicator (1 = the
    arm under test, 0 = baseline). The naive decision is the sign of the logged contrast
    ``E[Y | F=1] - E[Y | F=0]``. Supply at least one evidence source — they compose:

    * ``confounder_bins`` (a measured hidden variable ``Z``) or ``mi_cap`` (a structural cap on
      the information channel ``MI(I; Z)``) runs the **sign-robustness layer**
      (:func:`pivotality_certificate`): certifies iff no hidden confounder consistent with that
      information can flip the contrast's sign. One-sided — failure to certify is not evidence of
      a flip.
    * ``propensities`` (the nominal logging propensities ``e0(a_i | x_i)`` at the logged action)
      runs the **MSM tipping layer**: the smallest Tan odds-ratio ``Gamma`` at which the
      off-policy (IPS) value contrast band first includes zero (:func:`tipping_gamma` over the
      sharp one-hot :func:`msm_contribution_bounds`). ``tipping_gamma is None`` means the decision
      is robust to confounding at least as strong as ``gamma_max``.

    With informative propensities the MSM layer concerns the inverse-propensity-weighted
    off-policy contrast, which coincides with the raw logged contrast only under uniform logging.
    """
    y = np.asarray(outcomes, dtype=float)
    f = np.asarray(treated)
    fb = f.astype(bool)
    if not (fb.any() and (~fb).any()):
        raise ValueError("both arms must be present in `treated`")
    if confounder_bins is None and mi_cap is None and propensities is None:
        raise ValueError(
            "supply at least one evidence source: confounder_bins (measured Z), "
            "mi_cap (structural channel cap), or propensities (MSM sensitivity)"
        )

    naive = float(y[fb].mean() - y[~fb].mean())
    decision = "prefer treated" if naive > 0 else "prefer control" if naive < 0 else "indifferent"

    pivot: PivotalityCertificate | None = None
    if confounder_bins is not None or mi_cap is not None:
        pivot = pivotality_certificate(outcomes, treated, confounder_bins, mi_cap=mi_cap)

    g_tip: float | None = None
    msm_certified: bool | None = None
    if propensities is not None:
        on = fb.astype(float).tolist()
        off = (~fb).astype(float).tolist()
        logging_props = list(propensities)  # concrete (non-Optional) capture for the closure

        def _band(g: float) -> Interval:
            return msm_contribution_bounds(outcomes, logging_props, on, off, gamma=g)

        g_tip = tipping_gamma(_band, reference=0.0, gamma_max=gamma_max)
        msm_certified = g_tip is None

    certified = pivot.certified if pivot is not None else bool(msm_certified)
    summary = _summarise(decision, naive, pivot, g_tip, msm_certified, gamma_max)
    return DecisionCertificate(
        decision=decision,
        naive_contrast=naive,
        certified=certified,
        pivotality=pivot,
        tipping_gamma=g_tip,
        msm_certified=msm_certified,
        summary=summary,
    )

causalrl.identification.decision.DecisionCertificate

Bases: NamedTuple

Result of :func:certify_decision.

certified is the headline: is the naive (logged) decision robust to hidden confounding, by the strongest layer that ran? When the structural/measured layer ran it carries its one-sided guarantee (no confounder consistent with the supplied information can flip the sign); otherwise it reports whether the MSM layer found the decision robust up to gamma_max. The component fields and summary make the exact guarantee explicit.

Source code in src/causalrl/identification/decision.py
class DecisionCertificate(NamedTuple):
    """Result of :func:`certify_decision`.

    ``certified`` is the headline: is the naive (logged) decision robust to hidden confounding,
    by the strongest layer that ran? When the structural/measured layer ran it carries its
    one-sided guarantee (no confounder consistent with the supplied information can flip the
    sign); otherwise it reports whether the MSM layer found the decision robust up to ``gamma_max``.
    The component fields and ``summary`` make the exact guarantee explicit.
    """

    decision: str  # "prefer treated" | "prefer control" | "indifferent" (sign of naive_contrast)
    naive_contrast: float  # E[Y | F=1] - E[Y | F=0]
    certified: bool
    pivotality: PivotalityCertificate | None  # structural/measured sign-robustness layer, if run
    tipping_gamma: float | None  # MSM odds-ratio at which the decision tips; None if not run/robust
    msm_certified: bool | None  # MSM layer robust to gamma_max? None if the MSM layer did not run
    summary: str

causalrl.identification.bounds.pivotality_certificate(outcomes, treated, confounder_bins=None, *, mi_cap=None)

One-sided sign-robustness certificate for a naive contrast under hidden confounding.

certified=True means: no hidden variable consistent with the supplied information can flip the sign of E[Y|F=1] - E[Y|F=0]. Two modes:

  • confounder_bins given (a measured hidden variable, e.g. post-hoc showdown/oracle data): certify iff the TV-form :func:confounding_bias_bound is strictly below |naive|; mi_measured reports the plug-in channel.
  • mi_cap given (a structural cap on MI(I;Z) from the environment's information rules — the data-processing route): certify iff mi_cap < mi_flip computed with the outcome-span relaxation (no Z needed anywhere).

The cheapest layer of the decision stack — certificate, then MSM band (:func:msm_contribution_bounds), then abstention (:func:tipping_gamma). One-sided: failure to certify is NOT evidence of a flip. Verified against measured ground truth in three game-log regimes in experiments/games/theory/verify_pivotality.py.

Source code in src/causalrl/identification/bounds.py
def pivotality_certificate(
    outcomes: Sequence[float],
    treated: Sequence[int],
    confounder_bins: Sequence[int] | None = None,
    *,
    mi_cap: float | None = None,
) -> PivotalityCertificate:
    """One-sided sign-robustness certificate for a naive contrast under hidden confounding.

    ``certified=True`` means: no hidden variable consistent with the supplied information can
    flip the sign of ``E[Y|F=1] - E[Y|F=0]``. Two modes:

    * ``confounder_bins`` given (a *measured* hidden variable, e.g. post-hoc showdown/oracle
      data): certify iff the TV-form :func:`confounding_bias_bound` is strictly below
      ``|naive|``; ``mi_measured`` reports the plug-in channel.
    * ``mi_cap`` given (a *structural* cap on ``MI(I;Z)`` from the environment's information
      rules — the data-processing route): certify iff ``mi_cap < mi_flip`` computed with the
      outcome-span relaxation (no ``Z`` needed anywhere).

    The cheapest layer of the decision stack — certificate, then MSM band
    (:func:`msm_contribution_bounds`), then abstention (:func:`tipping_gamma`). One-sided:
    failure to certify is NOT evidence of a flip. Verified against measured ground truth in
    three game-log regimes in ``experiments/games/theory/verify_pivotality.py``.
    """
    y = np.asarray(outcomes, dtype=float)
    f = np.asarray(treated, dtype=bool)
    if not (f.any() and (~f).any()):
        raise ValueError("both arms must be present")
    p = float(f.mean())
    naive = float(y[f].mean() - y[~f].mean())
    if confounder_bins is not None:
        bound = confounding_bias_bound(outcomes, treated, confounder_bins, form="tv")
        z = np.asarray(confounder_bins)
        bins = np.unique(z)
        pz = np.array([(z == b).mean() for b in bins])
        pz1 = np.array([((z == b) & f).sum() / f.sum() for b in bins])
        pz0 = np.array([((z == b) & ~f).sum() / (~f).sum() for b in bins])
        mi_measured: float | None = _plugin_mi(pz, pz1, pz0, p)
        m1 = [float(y[(z == b) & f].mean()) for b in bins]
        m0 = [float(y[(z == b) & ~f].mean()) for b in bins]
        mi_flip = mi_flip_threshold(naive, max(m1) - min(m1), max(m0) - min(m0), p)
    elif mi_cap is not None:
        if mi_cap < 0.0:
            raise ValueError("mi_cap must be >= 0")
        span = float(y.max() - y.min())
        mi_flip = mi_flip_threshold(naive, span, span, p)
        # sharp MI form with M1 = M0 = span: span * sqrt(C / (2 p (1-p))), trivially capped
        bound = min(span * float(np.sqrt(mi_cap / (2 * p * (1 - p)))), 2 * span)
        mi_measured = None
    else:
        raise ValueError("supply confounder_bins (measured Z) or mi_cap (structural cap)")
    return PivotalityCertificate(
        certified=bool(bound < abs(naive)),
        naive=naive,
        bias_bound=float(bound),
        mi_flip=float(mi_flip),
        mi_measured=mi_measured,
    )

causalrl.identification.bounds.confounding_bias_bound(outcomes, treated, confounder_bins, *, form='tv')

Upper bound on the omitted-variable bias |naive - Z-adjusted| from logged rows.

Omitted-variable bias in total-variation form:

``|bias| <= M1 * TV(P_{Z|F=1}, P_Z) + M0 * TV(P_{Z|F=0}, P_Z)``

with M_f the span over Z-bins of E[Y | F=f, Z] (each signed measure integrates to zero, so the midrange trick gives the span, not the sup; sharp — attained with equality by an explicit two-point family at every parameter value). form="mi" applies Pinsker with the KL budget p*KL1 + (1-p)*KL0 = MI(F;Z) split optimally across arms (sharp small-MI constant):

``|bias| <= sqrt( MI/2 * (M1^2/p + M0^2/(1-p)) )``  (capped at the trivial ``M1+M0``),

the (looser-than-TV) form whose budget the environment's information structure can cap. Every stratum must contain both arms (positivity); restrict to an overlap population first — a logger that conditions hard on Z destroys overlap, which is a finding, not a nuisance.

Source code in src/causalrl/identification/bounds.py
def confounding_bias_bound(
    outcomes: Sequence[float],
    treated: Sequence[int],
    confounder_bins: Sequence[int],
    *,
    form: str = "tv",
) -> float:
    """Upper bound on the omitted-variable bias ``|naive - Z-adjusted|`` from logged rows.

    Omitted-variable bias in total-variation form:

        ``|bias| <= M1 * TV(P_{Z|F=1}, P_Z) + M0 * TV(P_{Z|F=0}, P_Z)``

    with ``M_f`` the span over ``Z``-bins of ``E[Y | F=f, Z]`` (each signed measure integrates
    to zero, so the midrange trick gives the span, not the sup; sharp — attained with equality
    by an explicit two-point family at every parameter value). ``form="mi"`` applies Pinsker
    with the KL budget ``p*KL1 + (1-p)*KL0 = MI(F;Z)`` split optimally across arms (sharp
    small-MI constant):

        ``|bias| <= sqrt( MI/2 * (M1^2/p + M0^2/(1-p)) )``  (capped at the trivial ``M1+M0``),

    the (looser-than-TV) form whose budget the environment's information structure can cap.
    Every stratum must contain both arms (positivity); restrict to an overlap population first —
    a logger that conditions hard on ``Z`` destroys overlap, which is a finding, not a nuisance.
    """
    if form not in ("tv", "mi"):
        raise ValueError("form must be 'tv' or 'mi'")
    y = np.asarray(outcomes, dtype=float)
    f = np.asarray(treated, dtype=bool)
    z = np.asarray(confounder_bins)
    if not (y.shape == f.shape == z.shape):
        raise ValueError("outcomes, treated, confounder_bins must have equal length")
    if not (f.any() and (~f).any()):
        raise ValueError("both arms must be present")
    p = float(f.mean())
    bins = np.unique(z)
    m1: list[float] = []
    m0: list[float] = []
    pz: list[float] = []
    pz1: list[float] = []
    pz0: list[float] = []
    for b in bins:
        in_b = z == b
        if not ((in_b & f).any() and (in_b & ~f).any()):
            raise ValueError(
                f"stratum {b!r} lacks one arm (positivity violated) — restrict to an overlap "
                "population before bounding"
            )
        m1.append(float(y[in_b & f].mean()))
        m0.append(float(y[in_b & ~f].mean()))
        pz.append(float(in_b.mean()))
        pz1.append(float((in_b & f).sum() / f.sum()))
        pz0.append(float((in_b & ~f).sum() / (~f).sum()))
    span1 = max(m1) - min(m1)
    span0 = max(m0) - min(m0)
    pz_a = np.asarray(pz, dtype=float)
    pz1_a = np.asarray(pz1, dtype=float)
    pz0_a = np.asarray(pz0, dtype=float)
    if form == "tv":
        tv1 = 0.5 * float(np.abs(pz1_a - pz_a).sum())
        tv0 = 0.5 * float(np.abs(pz0_a - pz_a).sum())
        return span1 * tv1 + span0 * tv0
    mi = _plugin_mi(pz_a, pz1_a, pz0_a, p)
    sharp = float(np.sqrt(mi / 2.0 * (span1**2 / p + span0**2 / (1.0 - p))))
    return min(sharp, span1 + span0)  # trivial cap: TV_f <= 1

causalrl.identification.bounds.mi_flip_threshold(naive, span_treated, span_control, p_treated)

Channel capacity (nats) below which NO hidden confounder can flip the naive sign.

The decision-pivotality threshold (sharp form)

``MI_flip = 2 naive^2 / (M1^2/p + M0^2/(1-p))``:

if the mutual information between the logged binary action and a hidden variable Z is below this value, the omitted-Z bias is strictly smaller than |naive| (omitted- variable bias in TV form + Pinsker with the KL budget split optimally across arms), so the Z-adjusted contrast has the same sign as the naive one. This constant is SHARP: a binary-channel family attains the corresponding bias bound with ratio -> 1 in the small-MI limit (THEORY_pivotality.md, tightness section); the additive form 2(|naive|/(M1/sqrt(p)+M0/sqrt(1-p)))^2 is its (looser) Cauchy-Schwarz relaxation. Combined with the data-processing inequality MI(F;Z) <= MI(I;Z) — the logging actor's policy reads only its information set I — the information structure of the environment caps the reachable confounding budget.

span_treated / span_control are the spans of E[Y | F=f, Z=z] over z (the regression spans M1 / M0); with Z unmeasured, use the outcome span for both (valid, looser).

Source code in src/causalrl/identification/bounds.py
def mi_flip_threshold(
    naive: float,
    span_treated: float,
    span_control: float,
    p_treated: float,
) -> float:
    """Channel capacity (nats) below which NO hidden confounder can flip the naive sign.

    The decision-pivotality threshold (sharp form)

        ``MI_flip = 2 naive^2 / (M1^2/p + M0^2/(1-p))``:

    if the mutual information between the logged binary action and a hidden variable ``Z`` is
    below this value, the omitted-``Z`` bias is strictly smaller than ``|naive|`` (omitted-
    variable bias in TV form + Pinsker with the KL budget split optimally across arms), so the
    ``Z``-adjusted contrast has the same sign as the naive one. This constant is SHARP: a
    binary-channel family attains the corresponding bias bound with ratio -> 1 in the small-MI
    limit (THEORY_pivotality.md, tightness section); the additive form
    ``2(|naive|/(M1/sqrt(p)+M0/sqrt(1-p)))^2`` is its (looser) Cauchy-Schwarz relaxation.
    Combined with the data-processing inequality ``MI(F;Z) <= MI(I;Z)`` — the logging actor's
    policy reads only its information set ``I`` — the *information structure of the
    environment* caps the reachable confounding budget.

    ``span_treated`` / ``span_control`` are the spans of ``E[Y | F=f, Z=z]`` over ``z`` (the
    regression spans ``M1`` / ``M0``); with ``Z`` unmeasured, use the outcome span for both
    (valid, looser).
    """
    if not 0.0 < p_treated < 1.0:
        raise ValueError("p_treated must be in (0, 1)")
    if span_treated < 0.0 or span_control < 0.0:
        raise ValueError("spans must be non-negative")
    denom2 = span_treated**2 / p_treated + span_control**2 / (1.0 - p_treated)
    if denom2 == 0.0:
        return float("inf")
    return float(2.0 * naive**2 / denom2)

causalrl.identification.bounds.tipping_gamma(bound, *, reference=0.0, gamma_max=10.0, tol=0.001)

Sensitivity tipping point: the smallest gamma >= 1 at which the partial-ID interval bound(gamma) first contains reference.

This is the causal-sensitivity reporting companion to the MSM bound kernels: it answers "how strong would unmeasured confounding have to be (on the MSM/Rosenbaum odds-ratio scale) to overturn the conclusion that the estimand lies strictly on one side of reference?". A larger tipping gamma ⇒ a more robust conclusion — the odds-ratio-scale analog of the E-value (VanderWeele & Ding, Ann. Intern. Med. 2017) and of Rosenbaum's Gamma.

bound maps a sensitivity level gamma to an :class:Interval; it must collapse to a point at gamma = 1 and widen monotonically with gamma (as every MSM kernel here does — e.g. lambda g: msm_contribution_bounds(y, e0, on, off, gamma=g)). Returns 1.0 if the point already sits on reference, and None if the interval never reaches reference by gamma_max (the conclusion is robust to confounding at least that strong).

Source code in src/causalrl/identification/bounds.py
def tipping_gamma(
    bound: Callable[[float], Interval],
    *,
    reference: float = 0.0,
    gamma_max: float = 10.0,
    tol: float = 1e-3,
) -> float | None:
    """Sensitivity tipping point: the smallest ``gamma >= 1`` at which the partial-ID interval
    ``bound(gamma)`` first contains ``reference``.

    This is the causal-sensitivity *reporting* companion to the MSM bound kernels: it answers
    "how strong would unmeasured confounding have to be (on the MSM/Rosenbaum odds-ratio scale)
    to overturn the conclusion that the estimand lies strictly on one side of ``reference``?".
    A larger tipping ``gamma`` ⇒ a more robust conclusion — the odds-ratio-scale analog of the
    E-value (VanderWeele & Ding, *Ann. Intern. Med.* 2017) and of Rosenbaum's ``Gamma``.

    ``bound`` maps a sensitivity level ``gamma`` to an :class:`Interval`; it must collapse to a
    point at ``gamma = 1`` and widen monotonically with ``gamma`` (as every MSM kernel here does —
    e.g. ``lambda g: msm_contribution_bounds(y, e0, on, off, gamma=g)``). Returns ``1.0`` if the
    point already sits on ``reference``, and ``None`` if the interval never reaches ``reference``
    by ``gamma_max`` (the conclusion is robust to confounding at least that strong).
    """
    if gamma_max < 1.0:
        raise ValueError("gamma_max must be >= 1")
    eps = 1e-12
    at1 = bound(1.0)
    if at1.lower - eps <= reference <= at1.upper + eps:
        return 1.0
    if not (bound(gamma_max).lower - eps <= reference <= bound(gamma_max).upper + eps):
        return None  # robust: never reaches `reference` within [1, gamma_max]
    lo, hi = 1.0, gamma_max
    while hi - lo > tol:
        mid = 0.5 * (lo + hi)
        iv = bound(mid)
        if iv.lower - eps <= reference <= iv.upper + eps:
            hi = mid
        else:
            lo = mid
    return float(hi)