1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
| import os import time import subprocess from subprocess import Popen, PIPE, STARTUPINFO, STARTF_USESHOWWINDOW import logging
TARGET_IP = "192.168.10.2"
NIC_NAME = "share"
MAX_RETRIES = 10
PING_TIMEOUT = 1
CHECK_INTERVAL = 1
LOG_FILE = "net_monitor.log"
MAX_LOG_SIZE = 1024 * 1024
def setup_logger(): logger = logging.getLogger(__name__) logger.setLevel(logging.INFO)
class TruncatingFileHandler(logging.FileHandler): def emit(self, record): if os.path.exists(self.baseFilename) and os.path.getsize(self.baseFilename) > MAX_LOG_SIZE: with open(self.baseFilename, 'w') as f: f.truncate() super().emit(record)
handler = TruncatingFileHandler(LOG_FILE, encoding='utf-8') formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) return logger
logger = setup_logger()
last_log_message = None
def log_action(message): global last_log_message if message == "网络连接正常。" and message == last_log_message: with open(LOG_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() if lines: last_line = lines[-1] timestamp = logging.Formatter('%(asctime)s').format(logging.LogRecord( name=__name__, level=logging.INFO, pathname='', lineno=0, msg='', args=(), exc_info=None )) new_last_line = f"{timestamp} - INFO - {message}\n" lines[-1] = new_last_line with open(LOG_FILE, 'w', encoding='utf-8') as f: f.writelines(lines) else: logger.info(message) last_log_message = message
def ping_host(ip): try: result = Popen(f'ping -n 1 -w {int(PING_TIMEOUT * 1000)} {ip}', shell=True, stdout=PIPE, stderr=PIPE, creationflags=0x08000000) output = result.communicate()[0].decode('gbk', errors='ignore') return 'TTL=' in output except Exception as e: log_action(f"Ping 失败: {str(e)}") return False
def is_nic_disconnected(nic_name): try: si = STARTUPINFO() si.dwFlags |= STARTF_USESHOWWINDOW si.wShowWindow = subprocess.SW_HIDE result = subprocess.Popen( f'netsh interface show interface "{nic_name}"', shell=True, stdout=PIPE, stderr=PIPE, startupinfo=si, creationflags=subprocess.CREATE_NO_WINDOW ) output = result.communicate()[0].decode('utf-8', errors='ignore') print(output) return "已断开连接" in output except Exception as e: log_action(f"检查网卡状态出错: {str(e)}") return False
def restart_nic(nic_name): try: log_action("正在尝试重启网络适配器...") os.system(f'netsh interface set interface "{nic_name}" admin=disable >nul 2>&1') time.sleep(2) os.system(f'netsh interface set interface "{nic_name}" admin=enable >nul 2>&1') log_action("网卡重启成功。") return True except Exception as e: log_action(f"重启网卡失败: {str(e)}") return False
def main(): failure_count = 0 while True: if not ping_host(TARGET_IP): failure_count += 1 log_action(f"Ping 失败。当前连续失败次数: {failure_count}/{MAX_RETRIES}") if failure_count >= MAX_RETRIES: if is_nic_disconnected(NIC_NAME): log_action("网卡已断开连接,跳过重启操作。") else: success = restart_nic(NIC_NAME) log_action(f"重启 {'成功' if success else '失败'}") failure_count = 0 else: failure_count = 0 log_action("网络连接正常。") time.sleep(CHECK_INTERVAL)
if __name__ == "__main__": try: main() except Exception as e: log_action(f"程序终止: {str(e)}")
|