#!/usr/bin/env python3 import os import sys import csv import time import getpass import re from datetime import datetime sys.path.insert(0, 'lib') try: import paramiko HAS_PARAMIKO = True except ImportError: HAS_PARAMIKO = False try: import openpyxl from openpyxl.styles import Font, PatternFill from openpyxl.utils import get_column_letter HAS_EXCEL = True except ImportError: HAS_EXCEL = False def _strip_ansi(text): text = re.sub(r'\x1b\[[0-9;?]*[a-zA-Z=]', '', text) text = re.sub(r'\x1b[=>]', '', text) text = re.sub(r'\r', '', text) text = re.sub(r'\x00+', '', text) return text def load_dg_structure(path='configs/dg_structure.csv'): if not os.path.exists(path): print(f"[ERROR] {path} not found") return [] dgs = {} with open(path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) headers = reader.fieldnames print(f"[INFO] CSV columns: {headers}") def find_col(headers, candidates): for h in headers: hl = h.lower().strip() for c in candidates: if hl == c or hl.startswith(c + ' ') or hl.endswith(' ' + c): return h return None name_col = find_col(headers, ['name']) ip_col = find_col(headers, ['ip address', 'ipaddress', 'ip']) parent_col = find_col(headers, ['parent dg', 'parent device group', 'parent']) missing = [] if not name_col: missing.append('Name') if not ip_col: missing.append('IP Address') if missing: print(f"[ERROR] Missing required columns: {missing}. Found: {headers}") return [] if not parent_col: print("[WARNING] No Parent DG column found — all rows with Name and IP will be included") for row in reader: dg_name = (row.get(name_col) or '').strip() ip_raw = (row.get(ip_col) or '').strip() parent_dg = (row.get(parent_col) or '').strip() if parent_col else None if not dg_name or not ip_raw: continue if parent_col is not None and not parent_dg: continue ip = ip_raw.split(';')[0].strip() if not ip: continue if dg_name not in dgs: dgs[dg_name] = ip result = [(dg, ip) for dg, ip in dgs.items()] print(f"[INFO] Device groups loaded: {len(result)}") return result def load_delete_objects(report_path): if not HAS_EXCEL: print("[ERROR] openpyxl not available") return [] wb = openpyxl.load_workbook(report_path, read_only=True) if 'DELETE_Objects' not in wb.sheetnames: print("[ERROR] DELETE_Objects sheet not found in report") return [] ws = wb['DELETE_Objects'] headers = None objects = [] for row in ws.iter_rows(values_only=True): if headers is None: headers = [str(h) if h else '' for h in row] continue obj = dict(zip(headers, row)) name = (obj.get('object_name') or '').strip() if name: objects.append({ 'object_name': name, 'type': (obj.get('type') or '').strip(), }) print(f"[INFO] DELETE objects loaded: {len(objects)}") return objects def collect_from_device(hostname, username, password, dg_name, output_dir): if not HAS_PARAMIKO: print("[ERROR] paramiko not installed") return False output_file = os.path.join(output_dir, f"{dg_name}.txt") try: ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname, username=username, password=password, timeout=10) channel = ssh.invoke_shell() time.sleep(2) if channel.recv_ready(): channel.recv(65535) channel.send('set cli pager off\n') time.sleep(1) if channel.recv_ready(): channel.recv(65535) channel.send('show config pushed-shared-policy\n') time.sleep(5) output = '' empty_count = 0 for _ in range(300): if channel.recv_ready(): chunk = channel.recv(65535).decode('utf-8', errors='ignore') output += chunk empty_count = 0 else: empty_count += 1 if empty_count > 10: break time.sleep(0.1) channel.close() ssh.close() clean = _strip_ansi(output) with open(output_file, 'w', encoding='utf-8') as f: f.write(clean) lines = len(clean.splitlines()) print(f" [SUCCESS] {dg_name} — {lines} lines saved") return True except Exception as e: print(f" [FAILED] {dg_name} ({hostname}): {e}") return False def phase1_collect(): if not HAS_PARAMIKO: print("[ERROR] paramiko not installed") return dgs = load_dg_structure() if not dgs: return print(f"\nUsername: ", end='') username = input().strip() password = getpass.getpass("Password: ") ts = datetime.now().strftime('%Y%m%d_%H%M%S') output_dir = os.path.join('data', ts, 'devices') os.makedirs(output_dir, exist_ok=True) print(f"\n[INFO] Output: {output_dir}") print(f"[INFO] Connecting to {len(dgs)} device groups...\n") success = 0 failed = 0 for dg_name, ip in dgs: print(f"[{dg_name}] {ip}") ok = collect_from_device(ip, username, password, dg_name, output_dir) if ok: success += 1 else: failed += 1 print(f"\n{'='*50}") print(f" Total : {len(dgs)}") print(f" Success : {success}") print(f" Failed : {failed}") print(f"{'='*50}") print(f"\n[INFO] Data saved to: {output_dir}") print("[INFO] Run option 2 to analyze") def phase2_analyze(): if not HAS_EXCEL: print("[ERROR] openpyxl not available") return reports = sorted( [f for f in os.listdir('reports') if f.endswith('.xlsx')], reverse=True ) if os.path.exists('reports') else [] if not reports: print("[ERROR] No reports found. Run mode 2 first") return print("\nReports:") for idx, r in enumerate(reports[:10], 1): print(f" [{idx}] {r}") choice = input(f"\nSelect [1-{min(10, len(reports))}]: ").strip() try: report_path = f"reports/{reports[int(choice)-1]}" except: print("[ERROR] Invalid") return device_dirs = [] if os.path.exists('data'): for ts_dir in sorted(os.listdir('data'), reverse=True): dev_path = os.path.join('data', ts_dir, 'devices') if os.path.isdir(dev_path) and os.listdir(dev_path): device_dirs.append((ts_dir, dev_path)) if not device_dirs: print("[ERROR] No device data found. Run option 1 first") return print("\nDevice data directories:") for idx, (ts, path) in enumerate(device_dirs[:10], 1): files = len(os.listdir(path)) print(f" [{idx}] {ts} ({files} devices)") choice = input(f"\nSelect [1-{min(10, len(device_dirs))}]: ").strip() try: _, data_dir = device_dirs[int(choice)-1] except: print("[ERROR] Invalid") return objects = load_delete_objects(report_path) if not objects: return device_files = { os.path.splitext(f)[0]: os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt') } print(f"\n[INFO] Analyzing {len(objects)} objects across {len(device_files)} device groups...") results = {} for obj in objects: name = obj['object_name'] found_in = [] for dg_name, filepath in device_files.items(): try: content = open(filepath, 'r', encoding='utf-8', errors='ignore').read() pattern = re.compile( r'(?