import os import socket import time import logging import json import requests import signal import subprocess import threading import concurrent.futures from logging.handlers import RotatingFileHandler from flask import Flask from flask.logging import default_handler # 监听相关配置 falsk_addr = "0.0.0.0" # Flask监听地址 falsk_port = 8000 # Flask监听端口 # 任务相关配置 task_interval = 30 # 执行任务的间隔时间 task_timeout = 60 # 执行任务的超时时间,单位秒。任务执行时会阻塞下次任务的执行,故task_timeout不宜配置过大 # 脚本相关配置 script_dirs = ['/opt/monitor/'] # 脚本所在目录 script_file = {'py': 'python3', 'pyc': 'python3', 'sh': '/bin/bash'} # 脚本扩展名和对应解释器 script_timeout = 15 # 运行脚本的超时时间,单位秒 script_concurrent = 3 # 运行脚本的并发数 class FlaskApp(Flask): def __init__(self, *args, **kwargs): super(FlaskApp, self).__init__(*args, **kwargs) self.logger = self.log_config() self.task_failed_count = 0 self.script_status = dict() self.refresh_metrics() # 日志配置 def log_config(self): log_level = "INFO" # 日志级别 logger = logging.getLogger('werkzeug') # 移除所有已存在的处理器,包括默认的控制台处理器 for handler in logger.handlers[:]: logger.removeHandler(handler) # 设置日志级别 logger.setLevel(log_level) # 定义日志格式 log_fmt = "%(asctime)s %(levelname)s [%(funcName)s] - %(message)s" log_datefmt = "%Y-%m-%d %H:%M:%S" log_formatter = logging.Formatter(log_fmt, log_datefmt) # 添加控制台处理器 stream_handler = logging.StreamHandler() stream_handler.setFormatter(log_formatter) logger.addHandler(stream_handler) return logger # 后台调用执行脚本的任务 def refresh_metrics(self): global prometheus_metrics start_time = time.time() prometheus_metrics = self.run_task() cost_time = time.time() - start_time self.logger.info("本次任务耗时%d秒。" % cost_time) t = threading.Timer(max(0, task_interval - cost_time), self.refresh_metrics) t.start() # 执行脚本的任务 def run_task(self): # 发现脚本 script_includes, script_excludes = [], [] for script_dir in script_dirs: if not os.path.isdir(script_dir): self.logger.info(f"忽略非目录或者不存在的目录:{script_dir}!") continue for filename in os.listdir(script_dir): script_name = os.path.join(script_dir, filename) script_extension = filename.split('.')[-1] script_interpreter = script_file.get(script_extension) if not filename.startswith('.') and '.' in filename and script_interpreter: script_includes.append((script_interpreter, script_name)) else: script_excludes.append(script_name) if not script_includes: self.logger.debug("本次任务忽略以下文件:%s后,无需运行的脚本文件,跳过!" % script_excludes) return else: self.logger.debug("本次任务忽略以下文件:%s,即将运行以下脚本文件:%s," % (script_excludes, script_includes)) # 运行脚本 all_output = "" self.logger.debug("开始执行任务...") with concurrent.futures.ThreadPoolExecutor(max_workers=script_concurrent) as executor: future_script = {executor.submit(self.run_script, script_interpreter, script_name): script_name for script_interpreter, script_name in script_includes} try: exception = None for future in concurrent.futures.as_completed(future_script, timeout=task_timeout): script_name = future_script[future] script_exception, script_stdout = future.result() if not script_exception and script_stdout: all_output += "\n" + script_stdout.strip() except concurrent.futures.TimeoutError as err: exception = "任务执行超时" self.logger.warning(f'任务执行超时,强制结束任务!异常信息: {err}') self.kill_task(future_script) except Exception as err: exception = "任务执行出错" self.logger.warning(f'任务执行出错,强制结束任务!异常信息: {err}') self.kill_task(future_script) self.logger.debug("结束执行任务!") return all_output.strip() # 运行脚本的方法 def run_script(self, script_interpreter, script_name): self.logger.debug(f"开始运行脚本: {script_interpreter} {script_name}") proc = subprocess.Popen([script_interpreter, script_name], shell=False, encoding='utf-8', start_new_session=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: exception, return_code, stdout_data, stderr_data = None, None, None, None stdout_data, stderr_data = proc.communicate(timeout=script_timeout) return_code = proc.returncode self.logger.debug(f"脚本{script_name}运行结束:return_code: {return_code} ,stdout_data: {stdout_data}, stderr_data: {stderr_data}") except subprocess.TimeoutExpired as err: exception = "脚本运行超时" stdout_data, stderr_data = self.kill_script(proc) self.logger.warning(f'脚本{script_name}运行超时!标准输出: {stdout_data}, 错误输出: {stderr_data}, 异常信息: {err}') except Exception as err: exception = "脚本运行出错" os.killpg(os.getpgid(proc.pid), signal.SIGTERM) stdout_data, stderr_data = proc.communicate() self.logger.warning(f'脚本{script_name}运行出错!标准输出: {stdout_data}, 错误输出: {stderr_data}, 异常信息: {err}') finally: if not exception: if return_code: exception = "脚本退出状态码非零" self.logger.warning(f'脚本{script_name}退出状态码非零!标准输出: {stdout_data}, 错误输出: {stderr_data}, 退出状态码: {return_code}') elif not stdout_data: exception = "脚本输出结果为空" self.logger.warning(f'脚本{script_name}输出结果为空!标准输出: {stdout_data}, 错误输出: {stderr_data}, 退出状态码: {return_code}') return exception, stdout_data # 强制结束任务的方法 def kill_task(self, future_script): for future in future_script: if not future.done(): future.cancel() # 强制结束脚本的方法 def kill_script(self, proc): os.killpg(os.getpgid(proc.pid), signal.SIGTERM) stdout_data, stderr_data = proc.communicate() return stdout_data, stderr_data prometheus_metrics = "" app = FlaskApp(__name__) def get_metrics(): return prometheus_metrics app.add_url_rule('/', view_func=get_metrics) app.add_url_rule('/metrics', view_func=get_metrics) if __name__ == '__main__': app.run(falsk_addr, falsk_port)