
| import os import time import subprocess from subprocess import Popen, PIPE, STARTUPINFO, STARTF_USESHOWWINDOW import logging
TARGET_IP = "192.168.10.2"
NIC_NAME = "share"
MAX_RETRIES = 10
PING_TIMEOUT = 1
CHECK_INTERVAL = 1
LOG_FILE = "net_monitor.log"
MAX_LOG_SIZE = 1024 * 1024
def setup_logger(): logger = logging.getLogger(__name__) logger.setLevel(logging.INFO)
class TruncatingFileHandler(logging.FileHandler): def emit(self, record): if os.path.exists(self.baseFilename) and os.path.getsize(self.baseFilename) > MAX_LOG_SIZE: with open(self.baseFilename, 'w') as f: f.truncate() super().emit(record)
handler = TruncatingFileHandler(LOG_FILE, encoding='utf-8') formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) return logger
logger = setup_logger()
last_log_message = None
def log_action(message): global last_log_message if message == "网络连接正常。" and message == last_log_message: with open(LOG_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() if lines: last_line = lines[-1] timestamp = logging.Formatter('%(asctime)s').format(logging.LogRecord( name=__name__, level=logging.INFO, pathname='', lineno=0, msg='', args=(), exc_info=None )) new_last_line = f"{timestamp} - INFO - {message}\n" lines[-1] = new_last_line with open(LOG_FILE, 'w', encoding='utf-8') as f: f.writelines(lines) else: logger.info(message) last_log_message = message
def ping_host(ip): try: result = Popen(f'ping -n 1 -w {int(PING_TIMEOUT * 1000)} {ip}', shell=True, stdout=PIPE, stderr=PIPE, creationflags=0x08000000) output = result.communicate()[0].decode('gbk', errors='ignore') return 'TTL=' in output except Exception as e: log_action(f"Ping 失败: {str(e)}") return False
def is_nic_disconnected(nic_name): try: si = STARTUPINFO() si.dwFlags |= STARTF_USESHOWWINDOW si.wShowWindow = subprocess.SW_HIDE result = subprocess.Popen( f'netsh interface show interface "{nic_name}"', shell=True, stdout=PIPE, stderr=PIPE, startupinfo=si, creationflags=subprocess.CREATE_NO_WINDOW ) output = result.communicate()[0].decode('utf-8', errors='ignore') print(output) return "已断开连接" in output except Exception as e: log_action(f"检查网卡状态出错: {str(e)}") return False
def restart_nic(nic_name): try: log_action("正在尝试重启网络适配器...") os.system(f'netsh interface set interface "{nic_name}" admin=disable >nul 2>&1') time.sleep(2) os.system(f'netsh interface set interface "{nic_name}" admin=enable >nul 2>&1') log_action("网卡重启成功。") return True except Exception as e: log_action(f"重启网卡失败: {str(e)}") return False
def main(): failure_count = 0 while True: if not ping_host(TARGET_IP): failure_count += 1 log_action(f"Ping 失败。当前连续失败次数: {failure_count}/{MAX_RETRIES}") if failure_count >= MAX_RETRIES: if is_nic_disconnected(NIC_NAME): log_action("网卡已断开连接,跳过重启操作。") else: success = restart_nic(NIC_NAME) log_action(f"重启 {'成功' if success else '失败'}") failure_count = 0 else: failure_count = 0 log_action("网络连接正常。") time.sleep(CHECK_INTERVAL)
if __name__ == "__main__": try: main() except Exception as e: log_action(f"程序终止: {str(e)}")
|