Source code for mllm_shap.observability.bridge

"""Bridge between TelemetryProbe and structured observability sinks."""

from dataclasses import dataclass

from ..shap.core.telemetry import TelemetryProbe
from .events import StageSpan, TraceEvent
from .sink import ObservabilitySink


[docs] @dataclass(frozen=True) class TelemetryBridge: """Dual-writes telemetry events to structured sink.""" run_id: str """Unique identifier for the SHAP run, used to correlate events and spans in the observability sink.""" sink: ObservabilitySink """The observability sink to which telemetry events will be emitted. This should be an instance of a class that implements the ObservabilitySink interface, such as InMemoryObservabilitySink or a custom implementation that sends events to an external monitoring system."""
[docs] def custom_metric(self, key: str, value: float | int | str | bool) -> None: """Emit a metric update as event.""" self.sink.emit_event( TraceEvent( name="metric", run_id=self.run_id, attrs={"key": key, "value": value}, ) )
[docs] def stage_span(self, stage: str, elapsed_ms: float) -> None: """Emit explicit stage duration span.""" self.sink.emit_span( StageSpan( run_id=self.run_id, stage=stage, elapsed_ms=float(elapsed_ms), ) )
[docs] @staticmethod def from_probe( probe: TelemetryProbe | None, run_id: str, sink: ObservabilitySink, ) -> "TelemetryBridge": """Create bridge and backfill known probe metrics if available.""" bridge = TelemetryBridge(run_id=run_id, sink=sink) if probe is None: return bridge metrics = probe.get_metrics().to_dict() for area, payload in metrics.items(): bridge.sink.emit_event( TraceEvent( name="probe_snapshot", run_id=run_id, attrs={"area": area, "payload": payload}, ) ) return bridge