Skip to main content

Bridge

The Bridge is a Python-based service that provides the core log processing pipeline. It consumes raw OTLP JSON from Kafka, normalizes it into flat documents, detects anomalies, correlates traces, and indexes everything into OpenSearch.

Architecture

The Bridge runs 4 concurrent threads, each handling a distinct stage of the pipeline:
Kafka "raw-logs"


┌─────────────────────┐
│  Thread 1: OTLP ETL │  Flatten OTLP JSON → normalized documents
│  (consumer group:   │  Write to "enriched-logs" topic
│   logclaw-bridge-etl)│
└──────────┬──────────┘

     Kafka "enriched-logs"
       │           │
       ▼           ▼
┌──────────┐  ┌──────────────────────┐
│ Thread 2 │  │ Thread 3: Indexer    │
│ Anomaly  │  │ Bulk write to       │
│ Detector │  │ OpenSearch           │
│ (Z-score)│  │ logclaw-logs-*      │
└──────────┘  └──────────────────────┘


┌──────────────────────┐
│ Thread 4: Lifecycle  │
│ Request correlation  │
│ (5-layer trace)      │
└──────────────────────┘

OTLP ETL (Thread 1)

The ETL thread consumes OTLP JSON messages from the raw-logs Kafka topic and flattens them into canonical log documents. OTLP unwrapping path:
resourceLogs → scopeLogs → logRecords → flatten each record
Field mapping:
OTLP FieldOutput FieldDescription
resource.attributes["service.name"]serviceService name
logRecord.body.stringValuemessageLog message
logRecord.severityTextlevelLog level (INFO, WARN, ERROR)
logRecord.timeUnixNanotimestampISO-8601 timestamp
logRecord.traceIdtrace_idDistributed trace ID
logRecord.spanIdspan_idSpan ID
resource.attributes["host.name"]hostHostname
resource.attributes["tenant_id"]tenant_idTenant identifier
logRecord.attributes[*](flattened)Custom attributes as top-level fields

Anomaly Detection (Thread 2)

Uses a sliding-window Z-score algorithm to detect anomalous error rate spikes per service. Configuration:
ParameterEnv VarDefaultDescription
ThresholdANOMALY_THRESHOLD2.5Z-score threshold for anomaly flagging
Window SizeWINDOW_SIZE50Number of data points in sliding window
When an anomaly is detected, the document is enriched with:
  • anomaly_score — the computed Z-score
  • is_anomaly — boolean flag
  • Written to the anomalies topic for the Ticketing Agent

Request Lifecycle Engine (Thread 4)

The lifecycle engine performs 5-layer trace correlation to group related log entries into request timelines:
  1. Trace ID grouping — group logs sharing the same trace_id
  2. Temporal proximity — cluster logs within a time window
  3. Service dependency mapping — map caller→callee relationships
  4. Error propagation tracking — trace error cascades across services
  5. Blast radius computation — determine affected services and endpoints

Prometheus Metrics

The Bridge exposes Prometheus-format metrics at GET /metrics:
MetricTypeDescription
logclaw_bridge_etl_consumed_totalCounterKafka messages (batches) consumed from raw-logs
logclaw_bridge_etl_records_received_totalCounterIndividual OTLP log records flattened
logclaw_bridge_etl_produced_totalCounterEnriched documents written to enriched-logs
logclaw_bridge_anomalies_detected_totalCounterAnomalies detected
logclaw_bridge_opensearch_indexed_totalCounterDocuments indexed into OpenSearch
logclaw_bridge_opensearch_errors_totalCounterOpenSearch indexing errors

Configuration

Environment Variables

VariableRequiredDefaultDescription
KAFKA_BROKERSYesKafka bootstrap servers
KAFKA_TOPIC_RAWNoraw-logsTopic to consume raw OTLP JSON
KAFKA_TOPIC_ENRICHEDNoenriched-logsTopic to produce enriched documents
OPENSEARCH_ENDPOINTYesOpenSearch cluster URL
OPENSEARCH_USERNAMENoOpenSearch Basic Auth username
OPENSEARCH_PASSWORDNoOpenSearch Basic Auth password
ANOMALY_THRESHOLDNo2.5Z-score threshold
WINDOW_SIZENo50Sliding window size
PORTNo8080HTTP server port

Runtime Configuration

The Bridge supports dynamic runtime configuration via the /config endpoint:
# Get current config
curl http://localhost:8080/config

# Update anomaly threshold
curl -X PATCH http://localhost:8080/config \
  -H "Content-Type: application/json" \
  -d '{"anomalyThreshold": 3.0, "windowSize": 100}'

Helm Values

logclaw-bridge:
  bridge:
    kafkaBrokers: "logclaw-kafka-kafka-bootstrap:9093"
    opensearchEndpoint: "https://logclaw-opensearch:9200"
    anomalyThreshold: 2.5
    windowSize: 50

Health Check

curl http://localhost:8080/health
Returns:
{
  "status": "ok",
  "threads": {
    "etl": "running",
    "anomaly": "running",
    "indexer": "running",
    "lifecycle": "running"
  },
  "kafka": "connected",
  "opensearch": "connected"
}