1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450
| import requests import time import os import logging from logging.handlers import RotatingFileHandler import subprocess from typing import Optional, Dict, Any, Tuple from bs4 import BeautifulSoup from playwright.sync_api import sync_playwright
MODEM_IP = "192.168.1.1" MODEM_USERNAME = "user" MODEM_PASSWORD = "yYxxxxxx" MODEM_BASE_URL = f"http://{MODEM_IP}"
ROUTER_IP = "192.168.31.1" ROUTER_USERNAME = "admin" ROUTER_ENCRYPTED_PASSWORD = "a93423bed470c10f9799f60xxxxxxxxxxxx006db1942471d43ba83a19" ROUTER_MAC = "d4:35:38:1c:ae:76"
ROUTER_BASE_URL = f"http://{ROUTER_IP}" ROUTER_LOGIN_URL = f"{ROUTER_BASE_URL}/cgi-bin/luci/api/xqsystem/login" ROUTER_SET_WAN6_URL = f"{ROUTER_BASE_URL}/cgi-bin/luci/;stok={{}}/api/xqnetwork/set_wan6" ROUTER_LOGOUT_URL = f"{ROUTER_BASE_URL}/cgi-bin/luci/;stok={{}}/web/logout" ROUTER_LOGIN_PARAMS = "?username={}&password={}&logtype=2&nonce=0_58:11:22:4a:95:91_1759321406_9326"
PING_INTERVAL = 60 LAST_KNOWN_GUA = None
LOG_FILE = "ipv6-sync.log"
class IPv6SyncService: def __init__(self): self.setup_logging() self.current_gua = None
def setup_logging(self): """配置日志系统""" log_dir = os.path.dirname(LOG_FILE) if log_dir: os.makedirs(log_dir, exist_ok=True)
self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.INFO)
if self.logger.handlers: self.logger.handlers.clear()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler = RotatingFileHandler( LOG_FILE, maxBytes=5 * 1024 * 1024, backupCount=2, encoding='utf-8' ) file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler() console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler) self.logger.addHandler(console_handler)
def ping_ipv6(self, ipv6_addr: str) -> bool: """ping检测IPv6地址是否可达(兼容Windows和Linux)""" if not ipv6_addr: self.logger.warning("ping目标为空,跳过检测") return False
try: if os.name == "nt": result = subprocess.run( ["ping", "-6", "-n", "3", "-w", "3000", ipv6_addr], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) else: result = subprocess.run( ["ping6", "-c", "3", "-W", "3", ipv6_addr], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True )
return result.returncode == 0
except FileNotFoundError: self.logger.error("未找到ping命令,请检查系统是否支持IPv6 ping") return False except Exception as e: self.logger.error(f"ping检测失败: {str(e)}") return False
def get_ipv6_info_from_modem(self) -> Optional[Dict[str, Any]]: """从光猫获取IPv6信息(仅核心使用公网GUA地址,保留其他信息供扩展)""" try: self.logger.info("启动浏览器,准备访问光猫管理页面...") with sync_playwright() as playwright: static_chrome_path = "/home/jinxinhao/static-chromium/chrome-linux/chrome" if os.path.exists(static_chrome_path): browser = playwright.chromium.launch( executable_path=static_chrome_path, headless=True, args=["--no-sandbox", "--disable-setuid-sandbox"] ) else: browser = playwright.chromium.launch( headless=True, args=["--no-sandbox", "--disable-setuid-sandbox"] )
context = browser.new_context() page = context.new_page()
try: self.logger.info(f"访问光猫登录页: {MODEM_BASE_URL}") page.goto(MODEM_BASE_URL, timeout=30000)
self.logger.info("输入光猫登录信息...") page.locator("#Frm_Username").fill(MODEM_USERNAME) page.locator("#Frm_Password").fill(MODEM_PASSWORD) self.logger.info("提交登录请求...") page.get_by_role("button", name="登 录").click() page.wait_for_load_state("networkidle", timeout=30000)
self.logger.info("导航到IPv6连接信息页面...") main_frame = page.frame(name="mainFrame") if not main_frame: self.logger.error("错误:未找到mainFrame iframe") return None
main_frame.get_by_text("网络侧信息").click() main_frame.get_by_text("IPv6连接信息").click() page.wait_for_load_state("networkidle", timeout=30000)
self.logger.info("提取光猫IPv6信息...") table_html = main_frame.locator("#Tbl_WANstauts1").inner_html(timeout=10000) if not table_html: self.logger.error("错误:未能获取IPv6信息表格") return None
soup = BeautifulSoup(table_html, 'html.parser') ipv6_info = {}
gua_input = soup.find('input', {'id': 'Sta_PPP_GUA1_1'}) if gua_input and 'value' in gua_input.attrs: ipv6_info['modem_gua_ipv6'] = gua_input['value'].strip() self.logger.debug(f"提取到光猫公网IPv6: {ipv6_info['modem_gua_ipv6']}") else: self.logger.error("未找到光猫公网IPv6地址(GUA)") return None
prefix_td = soup.find('td', string='前缀') if prefix_td and prefix_td.find_next_sibling('td'): ipv6_info['prefix'] = prefix_td.find_next_sibling('td').text.strip() else: self.logger.warning("未找到IPv6前缀信息(不影响Native模式配置)")
dns1_input = soup.find('input', {'id': 'Sta_PPP_DNS1_1'}) ipv6_info['dns1'] = dns1_input['value'].strip() if (dns1_input and 'value' in dns1_input.attrs) else ""
dns2_input = soup.find('input', {'id': 'Sta_PPP_DNS2_1'}) ipv6_info['dns2'] = dns2_input['value'].strip() if (dns2_input and 'value' in dns2_input.attrs) else ""
self.logger.info("退出光猫登录...") main_frame.get_by_role("link", name="退出").click() page.wait_for_load_state("networkidle", timeout=10000)
self.logger.info("✅ 成功从光猫获取核心IPv6信息") self.logger.info(f" 光猫公网IPv6(GUA): {ipv6_info['modem_gua_ipv6']}")
return ipv6_info
finally: context.close() browser.close() self.logger.info("光猫操作浏览器会话已关闭")
except Exception as e: self.logger.error(f"获取光猫IPv6信息失败: {str(e)}", exc_info=True) return None
def split_prefix_to_64(self, original_prefix: str, subnet_index: int = 1) -> Tuple[str, int]: """保留前缀拆分函数,供静态配置扩展使用""" if '/' not in original_prefix: raise ValueError(f"无效前缀格式: {original_prefix}")
pure_prefix, prefix_len = original_prefix.split('/') if int(prefix_len) != 60: raise ValueError(f"仅支持/60前缀,实际为{prefix_len}位")
parts = pure_prefix.split(':') while len(parts) < 8: parts.insert(parts.index('') if '' in parts else -1, '0')
fourth_part = parts[3] subnet_bits = fourth_part[-1] new_subnet_bits = hex(int(subnet_bits, 16) + subnet_index)[2:] if len(new_subnet_bits) > 1: new_subnet_bits = new_subnet_bits[-1] prefix_part = fourth_part[:-1] new_prefix_part = hex(int(prefix_part, 16) + 1)[2:].zfill(3) new_fourth_part = new_prefix_part + new_subnet_bits else: new_fourth_part = fourth_part[:-1] + new_subnet_bits
parts[3] = new_fourth_part simplified_parts = parts[:4] + ['', ''] lan_prefix = ':'.join(simplified_parts).replace(':::', '::')
return lan_prefix, 64
def login_router(self) -> Optional[Tuple[requests.Session, str]]: """登录路由器并返回会话和stok""" session = requests.Session() session.timeout = 30
try: self.logger.info("正在登录路由器...") login_url = ROUTER_LOGIN_URL + ROUTER_LOGIN_PARAMS.format( ROUTER_USERNAME, ROUTER_ENCRYPTED_PASSWORD ) headers = { "User-Agent": "Mozilla/5.0 (compatible; IPv6-Sync-Service/1.0)", "Accept": "*/*" }
response = session.get(login_url, headers=headers) response.raise_for_status() result = response.json()
if result.get("code") == 0: stok = result.get("token") self.logger.info(f"✅ 路由器登录成功,stok: {stok}") return session, stok else: self.logger.error(f"❌ 路由器登录失败: {result.get('msg')}") return None
except Exception as e: self.logger.error(f"路由器登录错误: {str(e)}") return None finally: if not session: session.close()
def logout_router(self, session: requests.Session, stok: str) -> bool: """注销路由器会话""" try: self.logger.info("注销路由器会话...") response = session.post( ROUTER_LOGOUT_URL.format(stok), timeout=5, allow_redirects=False ) return response.status_code == 302 except Exception as e: self.logger.warning(f"路由器注销失败: {str(e)}") return False
def set_router_static_ipv6(self, session: requests.Session, stok: str, ipv6_info: Dict[str, Any]) -> bool: """保留静态配置函数,供后续扩展使用""" if not all(key in ipv6_info for key in ['modem_gua_ipv6', 'prefix', 'dns1', 'dns2']): self.logger.error("光猫信息不完整,无法配置静态IPv6") return False
try: original_prefix = ipv6_info['prefix'] lan_prefix, lan_prefix_len = self.split_prefix_to_64(original_prefix) self.logger.info(f"将光猫{original_prefix}拆分为LAN侧64位子网: {lan_prefix}/{lan_prefix_len}")
pure_prefix, _ = original_prefix.split('/') clean_mac = ROUTER_MAC.replace(':', '').lower() wan_ip = f"{pure_prefix[:-1]}ffff:{clean_mac[:4]}:{clean_mac[4:8]}:{clean_mac[8:]}" self.logger.info(f"生成路由器WAN口IPv6: {wan_ip}")
params = { "wanType": "static", "ipaddr": wan_ip, "gw": ipv6_info['modem_gua_ipv6'], "prefix": lan_prefix, "assign": lan_prefix_len, "dns1": ipv6_info['dns1'], "dns2": ipv6_info['dns2'] }
api_url = ROUTER_SET_WAN6_URL.format(stok) headers = {"Content-Type": "application/x-www-form-urlencoded"} params_str = "&".join([f"{k}={v}" for k, v in params.items()])
self.logger.info("路由器静态IPv6配置参数:") for k, v in params.items(): self.logger.info(f" {k}: {v}")
response = session.post(api_url, data=params_str, headers=headers) response.raise_for_status() result = response.json()
if result.get("code") == 0: self.logger.info("✅ 路由器静态IPv6配置成功") return True else: self.logger.error(f"❌ 路由器静态配置失败: {result}") return False
except Exception as e: self.logger.error(f"配置静态IPv6出错: {str(e)}", exc_info=True) return False
def set_router_native_ipv6(self, session: requests.Session, stok: str) -> bool: """核心函数:配置路由器IPv6为Native模式""" try: params = { "wanType": "native", "autosetipv6": 0 }
api_url = ROUTER_SET_WAN6_URL.format(stok) headers = {"Content-Type": "application/x-www-form-urlencoded"} params_str = "&".join([f"{k}={v}" for k, v in params.items()])
self.logger.info("路由器Native模式配置参数:") for k, v in params.items(): self.logger.info(f" {k}: {v}")
response = session.post(api_url, data=params_str, headers=headers) response.raise_for_status() result = response.json()
if result.get("code") == 0: self.logger.info("✅ 路由器IPv6模式已成功设置为Native") return True else: self.logger.error(f"❌ 路由器Native模式配置失败: {result}") return False
except Exception as e: self.logger.error(f"配置Native模式出错: {str(e)}", exc_info=True) return False
def check_and_sync(self): """优化逻辑:精简步骤编号,日志更连贯,减少光猫登录次数""" global LAST_KNOWN_GUA
self.logger.info("=== 开始IPv6状态检查 ===") current_gua = None modem_info = None need_config = False
self.logger.info("【步骤1】检查历史光猫公网IPv6地址...") if LAST_KNOWN_GUA: self.logger.info(f"使用上次保存的地址: {LAST_KNOWN_GUA} 进行ping测试") is_reachable = self.ping_ipv6(LAST_KNOWN_GUA) if is_reachable: self.logger.info(f"✅ 历史地址({LAST_KNOWN_GUA})可达,无需进一步操作") self.logger.info("=== IPv6状态检查完成 ===\n") return else: self.logger.warning(f"❌ 历史地址({LAST_KNOWN_GUA})不可达,需获取新地址") else: self.logger.info("无历史地址记录,直接获取光猫最新信息")
self.logger.info("【步骤2】登录光猫获取最新公网IPv6地址...") modem_info = self.get_ipv6_info_from_modem() if not modem_info: self.logger.error("❌ 未能获取光猫信息,本次检查终止") self.logger.info("=== IPv6状态检查完成 ===\n") return
current_gua = modem_info['modem_gua_ipv6'] self.current_gua = current_gua self.logger.info(f"✅ 获取到最新光猫公网IPv6: {current_gua}")
self.logger.info("【步骤3】测试新获取的光猫公网IPv6可达性...") is_reachable = self.ping_ipv6(current_gua) if is_reachable: self.logger.info(f"✅ 新地址({current_gua})可达,更新历史记录") LAST_KNOWN_GUA = current_gua else: self.logger.warning(f"❌ 新地址({current_gua})仍不可达,需要配置路由器") need_config = True
if need_config: self.logger.info("【步骤4】配置路由器IPv6为Native模式...") router_login = self.login_router() if not router_login: self.logger.error("❌ 路由器登录失败,配置终止") self.logger.info("=== IPv6状态检查完成 ===\n") return
session, stok = router_login try: config_success = self.set_router_native_ipv6(session, stok) if config_success: LAST_KNOWN_GUA = current_gua self.logger.info("✅ 路由器Native模式配置成功") else: self.logger.error("❌ 路由器Native模式配置失败") finally: self.logout_router(session, stok) session.close() self.logger.info("路由器会话已关闭")
self.logger.info("=== IPv6状态检查完成 ===\n")
def run_service(self): """服务主循环:定时执行检查逻辑""" self.logger.info(f"IPv6同步服务启动,每{PING_INTERVAL}秒执行一次检查") while True: try: self.check_and_sync() except Exception as e: self.logger.error(f"服务运行异常: {str(e)}", exc_info=True) self.logger.info(f"等待{PING_INTERVAL}秒后进行下一次检查...\n") time.sleep(PING_INTERVAL)
if __name__ == "__main__": service = IPv6SyncService() service.run_service()
|