Source code for mllm_shap.observability.sink

"""Observability sink interfaces and in-memory implementation."""

from abc import ABC, abstractmethod
from dataclasses import dataclass, field

from .events import StageSpan, TraceEvent


[docs] class ObservabilitySink(ABC): """Abstract event sink for explainability traces."""
[docs] @abstractmethod def emit_event(self, event: TraceEvent) -> None: """Emit structured event."""
[docs] @abstractmethod def emit_span(self, span: StageSpan) -> None: """Emit structured stage span."""
[docs] @dataclass class InMemoryObservabilitySink(ObservabilitySink): """Testing/debug sink storing all events in memory.""" events: list[TraceEvent] = field(default_factory=list) spans: list[StageSpan] = field(default_factory=list)
[docs] def emit_event(self, event: TraceEvent) -> None: self.events.append(event)
[docs] def emit_span(self, span: StageSpan) -> None: self.spans.append(span)