Integration

Event Forwarder works best as a source adapter in front of an existing collector. The collector should treat stdout as newline-delimited JSON and should ignore or separately route stderr diagnostics.

  • Vector from Datadog

  • Grafana Alloy

  • Fluent Bit

  • Generic collector pattern

Vector has an exec source that can run Event Forwarder on a schedule and parse stdout as JSON lines.

Use Vector’s scheduled exec mode for one-shot forwarder runs. Use Event Forwarder’s daemon mode when Vector, Kubernetes, Docker, or another collector is scraping a long-running process stdout stream.

Minimal Vector example:

data_dir: "${VECTOR_DATA_DIR:-/var/lib/vector}"

sources:
  service_events_api:
    type: exec
    command:
      - /usr/local/bin/event-forwarder
    mode: scheduled
    include_stderr: false
    environment:
      SERVICE_ENDPOINT: "${SERVICE_ENDPOINT:?SERVICE_ENDPOINT must be set}"
      SERVICE_API_KEY: "${SERVICE_API_KEY:?SERVICE_API_KEY must be set}"
      SERVICE_API_ID: "${SERVICE_API_ID:-admin}"
      SERVICE_STATE_FILE: "${SERVICE_STATE_FILE:-/var/lib/vector/event-forwarder-state.json}"
      SERVICE_PAGE_SIZE: "${SERVICE_PAGE_SIZE:-50}"
      SERVICE_TIMEOUT: "${SERVICE_TIMEOUT:-30s}"
      SERVICE_LOG_LEVEL: "${SERVICE_LOG_LEVEL:-info}"
    framing:
      method: newline_delimited
    decoding:
      codec: json
    scheduled:
      exec_interval_secs: ${SERVICE_SCRAPE_INTERVAL_SECS:-10}

transforms:
  service_normalize:
    type: remap
    inputs:
      - service_events_api
    drop_on_error: true
    source: |
      .event_id = del(._id) ?? .event_id
      .timestamp_ms = to_int(.timestamp_ms) ?? to_int!(.timestamp)
      .timestamp = from_unix_timestamp!(.timestamp_ms, unit: "milliseconds")
      .source = "service"

sinks:
  console:
    type: console
    inputs:
      - service_normalize
    encoding:
      codec: json

Replace the console sink with your existing Vector sink, such as Datadog Logs, Splunk HEC, Kafka, S3, HTTP, or another destination.

Grafana Alloy is usually a good fit when the forwarder runs as a container or Kubernetes workload and Alloy scrapes that workload’s stdout. In this pattern, Event Forwarder keeps its state on a mounted volume, while Alloy reads the container log stream and forwards events to Loki or another configured pipeline.

Example Docker run:

docker run -d \
  --name event-forwarder \
  --label logs.service=event-forwarder \
  -e SERVICE_ENDPOINT="https://service.example.com/api/v1/events/search" \
  -e SERVICE_API_KEY="$SERVICE_API_KEY" \
  -e SERVICE_STATE_FILE="/var/lib/event-forwarder/state.json" \
  -v event-forwarder-state:/var/lib/event-forwarder \
  registry.evertrust.io/event-forwarder:latest \
  --daemon \
  --poll-interval 10s

Example Alloy pipeline that discovers the Docker container logs and parses the forwarded JSON:

discovery.docker "containers" {
  host = "unix:///var/run/docker.sock"
}

discovery.relabel "event_forwarder" {
  targets = discovery.docker.containers.targets

  rule {
    source_labels = ["__meta_docker_container_label_logs_service"]
    regex         = "event-forwarder"
    action        = "keep"
  }
}

loki.source.docker "event_forwarder" {
  host    = "unix:///var/run/docker.sock"
  targets = discovery.relabel.event_forwarder.output
  forward_to = [loki.process.service_events.receiver]
}

loki.process "service_events" {
  stage.json {
    expressions = {
      event_id = "_id",
      timestamp_ms = "timestamp",
    }
  }

  stage.labels {
    values = {
      event_id = "",
    }
  }

  forward_to = [loki.write.default.receiver]
}

loki.write "default" {
  endpoint {
    url = "https://loki.example.com/loki/api/v1/push"
  }
}

The same pattern applies in Kubernetes: run Event Forwarder as a deployment or sidecar, mount a persistent volume for SERVICE_STATE_FILE, and configure Alloy to collect that pod’s stdout.

Fluent Bit can either run Event Forwarder with the exec input plugin or read the forwarder’s container logs through its normal container log inputs. The exec input is useful for lightweight host-based deployments.

Example fluent-bit.conf:

[SERVICE]
    Flush        5
    Log_Level    info
    Parsers_File parsers.conf

[INPUT]
    Name     exec
    Tag      service.events
    Command  /usr/local/bin/event-forwarder
    Interval_Sec 10
    Parser   json

[FILTER]
    Name    modify
    Match   service.events
    Add     source service

[OUTPUT]
    Name    stdout
    Match   service.events
    Format  json_lines

Example parsers.conf:

[PARSER]
    Name   json
    Format json

Set the Event Forwarder configuration in the Fluent Bit service environment:

export SERVICE_ENDPOINT="https://service.example.com/api/v1/events/search"
export SERVICE_API_KEY="replace-me"
export SERVICE_STATE_FILE="/var/lib/fluent-bit/event-forwarder-state.json"

Then replace the stdout output with your production output, such as Splunk, HTTP, Kafka, S3, OpenSearch, or another Fluent Bit destination.

For any other logging stack, use one of these patterns:

  • Run event-forwarder directly as a scheduled command and decode stdout as newline-delimited JSON.

  • Run event-forwarder --daemon as a long-running process and scrape stdout.

  • Run event-forwarder as a container and scrape the container stdout stream.

  • Run event-forwarder from a systemd timer or cron job and pipe stdout into a collector-supported input.

Keep these rules consistent across integrations:

  • preserve stdout for event JSON only;

  • keep stderr separate for logs and diagnostics;

  • configure a persistent SERVICE_STATE_FILE;

  • protect SERVICE_API_KEY with the collector or orchestrator’s secret mechanism;

  • use collector-side transforms only for routing, enrichment, normalization, and destination-specific schema requirements.

Official collector references: