apiVersion: logstash.k8s.elastic.co/v1alpha1 kind: Logstash metadata: name: my-logstash namespace: tools spec: version: 8.15.2 count: 1 elasticsearchRefs: - clusterName: my-elastic name: my-logstash pipelines: - pipeline.id: main config.string: | input { kafka { bootstrap_servers => ["my-cluster-kafka-bootstrap.kafka:9092"] client_id => "tools-logstash" group_id => "d1-prod" auto_offset_reset => "latest" consumer_threads => 3 decorate_events => false topics => ["wfm-java","xsf-java","mpos-java","sxzs-java","dgjs-java","epos-java","wfbs-java","csps-java","uen-java","ingress-nginx"] codec => "json" } } input { tcp { host => "0.0.0.0" port => "4590" mode => "server" codec => json_lines } } input { tcp { host => "0.0.0.0" port => "4560" mode => "server" codec => json_lines } } input { beats { port => "4584" } } filter { if [fields][items] == "wfm-java" { grok { match => { "message" => "(?%{TIMESTAMP_ISO8601})\s*\[%{DATA:thread_name}:%{DATA:traceId}\]\s*%{LOGLEVEL:level}\s*%{DATA:logger_name}\[%{DATA:stack_trace}\] - %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => ["SysTemLog-Time","ISO8601"] } } else if [fields][items] == "xsf-java" { grok { match => { "message" => "(?%{TIMESTAMP_ISO8601})\s*\[%{DATA:thread_name}:%{DATA:traceId}\]\s*%{LOGLEVEL:level}\s*%{DATA:logger_name}\[%{DATA:stack_trace}\] - %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => ["SysTemLog-Time","ISO8601"] } } else if [fields][items] == "mpos-java" { grok { match => { "message" => "(?%{TIMESTAMP_ISO8601})\s*\[%{DATA:thread_name}:%{DATA:traceId}\]\s*%{LOGLEVEL:level}\s*%{DATA:logger_name}\[%{DATA:stack_trace}\] - %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => ["SysTemLog-Time","ISO8601"] } } else if [fields][items] == "sxzs-java" { grok { match => { "message" => "(?%{TIMESTAMP_ISO8601})\s*\[%{DATA:thread_name}:%{DATA:traceId}\]\s*%{LOGLEVEL:level}\s*%{DATA:logger_name}\[%{DATA:stack_trace}\] - %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => ["SysTemLog-Time","ISO8601"] } } else if [fields][items] == "dgjs-java" { grok { match => { "message" => "(?%{TIMESTAMP_ISO8601})\s*\[%{DATA:thread_name}:%{DATA:traceId}\]\s*%{LOGLEVEL:level}\s*%{DATA:logger_name}\[%{DATA:stack_trace}\] - %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => ["SysTemLog-Time","ISO8601"] } } else if [fields][items] == "wfbs-java" { grok { match => { "message" => "(?%{TIMESTAMP_ISO8601})\s*\[%{DATA:thread_name}:%{DATA:traceId}\]\s*%{LOGLEVEL:level}\s*%{DATA:logger_name}\[%{DATA:stack_trace}\] - %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => ["SysTemLog-Time","ISO8601"] } } else if [fields][items] == "csps-java" { grok { match => { "message" => "(?%{TIMESTAMP_ISO8601})\s*\[%{DATA:thread_name}:%{DATA:traceId}\]\s*%{LOGLEVEL:level}\s*%{DATA:logger_name}\[%{DATA:stack_trace}\] - %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => ["SysTemLog-Time","ISO8601"] } } else if [fields][items] == "uen-java" { grok { match => { "message" => "(?%{TIMESTAMP_ISO8601})\s*\[%{DATA:thread_name}:%{DATA:traceId}\]\s*%{LOGLEVEL:level}\s*%{DATA:logger_name}\[%{DATA:stack_trace}\] - %{GREEDYDATA:message}" } overwrite => ["message"] } date { match => ["SysTemLog-Time","ISO8601"] } } else if [fields][items] == "ingress-nginx" { json { source => "message" remove_field => ["message"] } } } output { stdout { codec => rubydebug } if [fields][items] == "wfm-java" { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "wfm-java-prod-%{+YYYY.MM.dd}" } } else if [fields][items] == "xsf-java" { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "xsf-java-prod-%{+YYYY.MM.dd}" } } else if [fields][items] == "mpos-java" { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "mpos-java-prod-%{+YYYY.MM.dd}" } } else if [fields][items] == "sxzs-java" { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "sxzs-java-prod-%{+YYYY.MM.dd}" } } else if [fields][items] == "dgjs-java" { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "dgjs-java-prod-%{+YYYY.MM.dd}" } } else if [fields][items] == "epos-java" { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "epos-java-prod-%{+YYYY.MM.dd}" } } else if [fields][items] == "wfbs-java" { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "wfbs-java-prod-%{+YYYY.MM.dd}" } } else if [fields][items] == "csps-java" { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "csps-java-prod-%{+YYYY.MM.dd}" } } else if [fields][items] == "uen-java" { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "uen-java-prod-%{+YYYY.MM.dd}" } } else if [fields][items] == "ingress-nginx" { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "ingress-nginx-prod-%{+YYYY.MM.dd}" } } else { elasticsearch { hosts => ["es001:9200","es002:9200","es003:9200"] user => "elastic" password => "elastic" index => "other-%{+YYYY.MM.dd}" } } }