--- apiVersion: v1 data: aggregator.yaml: | data_dir: /vector-data-dir api: enabled: true address: 127.0.0.1:8686 playground: false sources: kafka_logs: type: kafka bootstrap_servers: ${BOOTSTRAP_SERVERS} group_id: vector-aggregator topics: - xspay-java - jhpay-java - ingress-nginx decoding: codec: json transforms: kafka_transform: type: remap inputs: [kafka_logs] drop_on_abort: true metric_tag_values: single source: |- if .fields.items == "mpos-java" { . = parse_regex!(.message, r'^(?P\d+-\d+-\d+ \d+:\d+:\d+\,\d+) \[(?P[^\]]+)(?::(?P[a-zA-Z0-9]+))?\] (?P\w+) \[(?P[^\]]+)\] - (?P.*)$') } else if .fields.items == "xspay-java" { . = parse_regex!(.message, r'^(?P\d+-\d+-\d+ \d+:\d+:\d+\.\d+) \[(?P[^\]]+)(?::(?P[a-zA-Z0-9]+))?\] (?P\w+) \[(?P[^\]]+)\] - (?P.*)$') del(.message) } else if .fields.items == "ingress-nginx" { .messages = parse_json!(.message) .time_local = .messages.time_local .domain_name = .messages.domain_name .status = .messages.status .upstream_status = .messages.upstream_status .uri = .messages.uri .client_ip = .messages.client_ip .remote_addr = .messages.remote_addr .server_protocol = .messages.server_protocol del(.message) del(.messages) } sinks: prom_exporter: type: prometheus_exporter inputs: [kafka_transform] address: 0.0.0.0:9090 elasticsearch: type: elasticsearch inputs: [kafka_transform] api_version: auto compression: none acknowledgements: enabled: true doc_type: _doc endpoints: ["${ELASTICSEARCH_ENDPOINT}"] auth: strategy: basic user: "${ELASTICSEARCH_USER}" password: "${ELASTICSEARCH_PASSWORD}" id_key: id mode: bulk bulk: index: "{{fields.tems}}-${K8S_CLUSTER}-%Y.%m.%d" kind: ConfigMap metadata: labels: app.kubernetes.io/component: Aggregator app.kubernetes.io/instance: vector-aggregator app.kubernetes.io/name: vector-aggregator app.kubernetes.io/version: 0.43.0-distroless-libc name: vector-aggregator-config namespace: vector