--- apiVersion: v1 data: nginx_error_status.py: |- #!/usr/bin/env python3 # -- coding: utf-8 -- import os import sys import requests import json es_index_dict = { "ingress-nginx-prod-*": "259f6359-38aa-477d-9666-46fc7db8e047" } #os.environ['ES_URL'] = "http://elasticsearch-es-http.elastic-system:9200" os.environ['ES_URL'] = "http://d1-es.uenpay.com" os.environ['ES_USER'] = "uenpay" os.environ['ES_PASSWORD'] = "123456" os.environ['DURATION'] = "5m" os.environ['define_description'] = "nginx状态码5xx" os.environ['QUERY_CONDITION'] = "(status:5*)" os.environ['METRIC_NAME'] = "nginx_error_status" def add_custom_labels(metric_name, base_labels, count): custom_labels = {} for key, value in os.environ.items(): if key.startswith('define_'): label_key = key.replace('define_', '', 1) custom_labels[label_key] = value all_labels = {**base_labels, **custom_labels} label_parts = [f"{k}=\"{v}\"" for k, v in all_labels.items()] labels_str = ",".join(label_parts) print(f'{metric_name}{{{labels_str}}} {count}') def es_query(es_index_name, query_condition): result = None headers = {'Content-Type': 'application/json'} es_auth = (os.getenv('ES_USER'), os.getenv('ES_PASSWORD')) es_url = f"{os.getenv('ES_URL')}/{es_index_name}/_search" duration = ("now-" + os.getenv('DURATION','15m')) body_template = { "size": 10000, "query": { "bool": { "filter": { "range": { "@timestamp": { "gte": duration } } }, "must": [ { "query_string": { "query": query_condition } } ] } } } try: r = requests.post(url=es_url, headers=headers, auth=es_auth, json=body_template) if r.status_code == 200: result = r.json() except Exception as err: print(f"An error occurred in es_query: {err}", file=sys.stderr) return result def counting(): query_condition = os.getenv('QUERY_CONDITION') metric_name = os.getenv('METRIC_NAME') for index_name in es_index_dict: try: result = es_query(index_name, query_condition) if result and 'hits' in result.keys(): count = result['hits']['total']['value'] for hit in result['hits']['hits']: if '_source' in hit and 'domain_name' in hit['_source']: domain_name = hit["_source"]["domain_name"] status = hit["_source"]["status"] request_method = hit["_source"]["request_method"] uri = hit["_source"]["uri"] es_index_id = es_index_dict[index_name] base_labels = {'domain_name': domain_name, 'status': status, 'request_method': request_method, 'uri': uri, 'es_index_id': es_index_id, 'query_condition': query_condition.replace(" ", "%20").replace('"', '')} add_custom_labels(metric_name, base_labels, count) else: print('%s %d' % (metric_name, count)) except Exception as err: print(f"An error occurred in couting: {err}", file=sys.stderr) if __name__ == '__main__': counting() nginx_error_status.py.ba2: |- #!/usr/bin/env python3 # -- coding: utf-8 -- import os import sys import requests import json es_index_dict = { "ingress-nginx-prod-*": "51b4d440-734f-11ee-942a-c96909d7cda5" } os.environ['ES_URL'] = "http://elasticsearch-es-http.mid:9200" #os.environ['ES_URL'] = "http://es.uenpay.com" os.environ['ES_USER'] = "kibana" os.environ['ES_PASSWORD'] = "123456" os.environ['DURATION'] = "5m" os.environ['define_description'] = "nginx状态码4xx或5xx" os.environ['QUERY_CONDITION'] = "(status:5*)" os.environ['METRIC_NAME'] = "nginx_error_status" def add_custom_labels(metric_name, base_labels, count): custom_labels = {} for key, value in os.environ.items(): if key.startswith('define_'): label_key = key.replace('define_', '', 1) custom_labels[label_key] = value all_labels = {**base_labels, **custom_labels} label_parts = [f"{k}=\"{v}\"" for k, v in all_labels.items()] labels_str = ",".join(label_parts) print(f'{metric_name}{{{labels_str}}} {count}') def es_query(es_index_name, query_condition): result = None headers = {'Content-Type': 'application/json'} es_auth = (os.getenv('ES_USER'), os.getenv('ES_PASSWORD')) es_url = f"{os.getenv('ES_URL')}/{es_index_name}/_search" duration = ("now-" + os.getenv('DURATION','15m')) body_template = { "size": 10000, "query": { "bool": { "filter": { "range": { "@timestamp": { "gte": duration } } }, "must": [ { "query_string": { "query": query_condition } } ] } } } try: r = requests.post(url=es_url, headers=headers, auth=es_auth, json=body_template) if r.status_code == 200: result = r.json() except Exception as err: print(f"An error occurred in es_query: {err}", file=sys.stderr) return result def counting(): query_condition = os.getenv('QUERY_CONDITION') metric_name = os.getenv('METRIC_NAME') ignore_extensions = ['.js', '.css', '.png'] for index_name in es_index_dict: try: result = es_query(index_name, query_condition) if result and 'hits' in result.keys(): count = result['hits']['total']['value'] for hit in result['hits']['hits']: if '_source' in hit and 'domain_name' in hit['_source']: domain_name = hit["_source"]["domain_name"] status = hit["_source"]["status"] request_method = hit["_source"]["request_method"] uri = hit["_source"]["uri"] if not any(uri.lower().endswith(ext) for ext in ignore_extensions): es_index_id = es_index_dict[index_name] base_labels = { 'domain_name': domain_name, 'status': status, 'request_method': request_method, 'uri': uri, 'es_index_id': es_index_id, 'query_condition': query_condition.replace(" ", "%20").replace('"', '') } add_custom_labels(metric_name, base_labels, 1) else: # 如果是js、css、png等后缀,则跳过 continue else: print('%s %d' % (metric_name, count)) except Exception as err: print(f"An error occurred in counting: {err}", file=sys.stderr) if __name__ == '__main__': counting() nginx_uri_ip_top1.py: >- #!/usr/bin/env python3 # -- coding: utf-8 -- import os import sys import requests import json from collections import defaultdict es_index_dict = { "ingress-nginx-prod-*": "259f6359-38aa-477d-9666-46fc7db8e047" } #os.environ['ES_URL'] = "http://elasticsearch-es-http.elastic-system:9200" os.environ['ES_URL'] = "http://d1-es.uenpay.com" os.environ['ES_USER'] = "uenpay" os.environ['ES_PASSWORD'] = "123456" os.environ['DURATION'] = "5m" os.environ['QUERY_CONDITION'] = '(domain_name.keyword:hk.uenpay.com) AND NOT (remote_addr:10.10.*) AND NOT (remote_addr:192.168.*)' os.environ['METRIC_NAME'] = "nginx_uri_ip_top" def add_custom_labels(metric_name, base_labels, count): custom_labels = {} for key, value in os.environ.items(): if key.startswith('define_'): label_key = key.replace('define_', '', 1) custom_labels[label_key] = value all_labels = {**base_labels, **custom_labels} label_parts = [f"{k}=\"{v}\"" for k, v in all_labels.items()] labels_str = ",".join(label_parts) print(f'{metric_name}{{{labels_str}}} {count}') def es_query(es_index_name, query_condition): result = None headers = {'Content-Type': 'application/json'} es_auth = (os.getenv('ES_USER'), os.getenv('ES_PASSWORD')) es_url = f"{os.getenv('ES_URL')}/{es_index_name}/_search" duration = ("now-" + os.getenv('DURATION','15m')) body_template = { "size": 10000, "query": { "bool": { "filter": { "range": { "@timestamp": { "gte": duration } } }, "must": [ { "query_string": { "query": query_condition } } ] } } } try: r = requests.post(url=es_url, headers=headers, auth=es_auth, json=body_template) if r.status_code == 200: result = r.json() except Exception as err: print(f"An error occurred in es_query: {err}", file=sys.stderr) return result def counting(): query_condition = os.getenv('QUERY_CONDITION') metric_name = os.getenv('METRIC_NAME') # Create a dictionary to count occurrences of (domain_name, client_ip, uri, status, request_method) ip_uri_count = defaultdict(int) for index_name in es_index_dict: try: result = es_query(index_name, query_condition) if result and 'hits' in result and 'hits' in result['hits']: for hit in result['hits']['hits']: if '_source' in hit: source = hit['_source'] domain_name = source.get("domain_name") client_ip = source.get("client_ip") uri = source.get("uri") status = source.get("status") request_method = source.get("request_method") # Exclude URIs that end with .js, .css, .png if uri and not (uri.endswith('.js') or uri.endswith('.css') or uri.endswith('.png')): if domain_name and client_ip and status and request_method: ip_uri_count[(domain_name, client_ip, uri, status, request_method)] += 1 sorted_ip_uri_count = sorted(ip_uri_count.items(), key=lambda item: item[1], reverse=True) # Remove this part to display all results instead of the top 30 # sorted_ip_uri_count = sorted_ip_uri_count[:30] # Output the top 30 results in Prometheus format for (domain_name, client_ip, uri, status, request_method), count in sorted_ip_uri_count: base_labels = { 'domain_name': domain_name, 'client_ip': client_ip, 'uri': uri, 'status': status, 'request_method': request_method } add_custom_labels(metric_name, base_labels, count) else: print(f"No results found for index: {index_name}") except Exception as err: print(f"An error occurred in counting: {err}", file=sys.stderr) if __name__ == '__main__': counting() nginx_uri_ip_top2.py: >- #!/usr/bin/env python3 # -- coding: utf-8 -- import os import sys import requests import json from collections import defaultdict es_index_dict = { "ingress-nginx-prod-*": "259f6359-38aa-477d-9666-46fc7db8e047" } #os.environ['ES_URL'] = "http://elasticsearch-es-http.elastic-system:9200" os.environ['ES_URL'] = "http://d1-es.uenpay.com" os.environ['ES_USER'] = "uenpay" os.environ['ES_PASSWORD'] = "123456" os.environ['DURATION'] = "5m" os.environ['QUERY_CONDITION'] = '(domain_name.keyword:hk.uenpay.hkrt.cn) AND NOT (remote_addr:10.10.*) AND NOT (remote_addr:192.168.*)' os.environ['METRIC_NAME'] = "nginx_uri_ip_top" def add_custom_labels(metric_name, base_labels, count): custom_labels = {} for key, value in os.environ.items(): if key.startswith('define_'): label_key = key.replace('define_', '', 1) custom_labels[label_key] = value all_labels = {**base_labels, **custom_labels} label_parts = [f"{k}=\"{v}\"" for k, v in all_labels.items()] labels_str = ",".join(label_parts) print(f'{metric_name}{{{labels_str}}} {count}') def es_query(es_index_name, query_condition): result = None headers = {'Content-Type': 'application/json'} es_auth = (os.getenv('ES_USER'), os.getenv('ES_PASSWORD')) es_url = f"{os.getenv('ES_URL')}/{es_index_name}/_search" duration = ("now-" + os.getenv('DURATION','15m')) body_template = { "size": 10000, "query": { "bool": { "filter": { "range": { "@timestamp": { "gte": duration } } }, "must": [ { "query_string": { "query": query_condition } } ] } } } try: r = requests.post(url=es_url, headers=headers, auth=es_auth, json=body_template) if r.status_code == 200: result = r.json() except Exception as err: print(f"An error occurred in es_query: {err}", file=sys.stderr) return result def counting(): query_condition = os.getenv('QUERY_CONDITION') metric_name = os.getenv('METRIC_NAME') # Create a dictionary to count occurrences of (domain_name, client_ip, uri, status, request_method) ip_uri_count = defaultdict(int) for index_name in es_index_dict: try: result = es_query(index_name, query_condition) if result and 'hits' in result and 'hits' in result['hits']: for hit in result['hits']['hits']: if '_source' in hit: source = hit['_source'] domain_name = source.get("domain_name") client_ip = source.get("client_ip") uri = source.get("uri") status = source.get("status") request_method = source.get("request_method") # Exclude URIs that end with .js, .css, .png if uri and not (uri.endswith('.js') or uri.endswith('.css') or uri.endswith('.png')): if domain_name and client_ip and status and request_method: ip_uri_count[(domain_name, client_ip, uri, status, request_method)] += 1 sorted_ip_uri_count = sorted(ip_uri_count.items(), key=lambda item: item[1], reverse=True) # Remove this part to display all results instead of the top 30 # sorted_ip_uri_count = sorted_ip_uri_count[:30] # Output the top 30 results in Prometheus format for (domain_name, client_ip, uri, status, request_method), count in sorted_ip_uri_count: base_labels = { 'domain_name': domain_name, 'client_ip': client_ip, 'uri': uri, 'status': status, 'request_method': request_method } add_custom_labels(metric_name, base_labels, count) else: print(f"No results found for index: {index_name}") except Exception as err: print(f"An error occurred in counting: {err}", file=sys.stderr) if __name__ == '__main__': counting() kind: ConfigMap metadata: name: script-monitor1-file namespace: monitoring