这个代码的ARP扫描有什么问题?求指点

作者在 2025-05-24 16:39:52 发布以下内容
import sys
import time
import ctypes
import socket
import struct
import platform
import threading
from tkinter import *
from tkinter import ttk, scrolledtext, messagebox
from concurrent.futures import ThreadPoolExecutor
from uuid import getnode




# ------------------------- 基础工具函数 -------------------------
def get_local_ip():
    """获取本机IPv4地址(排除回环地址)"""
    try:
        # 通过UDP连接公共DNS服务器获取本机IP
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip
    except Exception as e:
        return "127.0.0.1"  # 失败时返回回环地址


# ------------------------- ARP扫描模块 -------------------------
def scan_ips():
    """ARP扫描局域网活动主机"""
    base_ip = '.'.join(get_local_ip().split('.')[:-1]) + '.'
    print(f"[ARP]开始扫描 {base_ip}0/24...")


    local_mac = _get_local_mac()
    local_ip = get_local_ip()


    try:
        # 创建原始套接字(需要管理员权限)
        sock = socket.socket(
            socket.AF_INET, 
            socket.SOCK_RAW, 
            socket.IPPROTO_RAW if platform.system() == 'Windows' else socket.IPPROTO_ARP
        )
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        sock.settimeout(3)
    except Exception as e:
        print(f"[-]创建原始套接字失败: {e}(请用管理员权限运行)")
        return


    # 构建并发送ARP广播包
    arp_packet = _build_arp_request(local_mac, local_ip, base_ip + "0")
    try:
        sock.sendto(arp_packet, (base_ip + '255', 0))
        print("[ARP]广播请求已发送")
    except Exception as e:
        print(f"[-]发送失败: {e}")
        sock.close()
        return


    # 接收响应处理
    active_ips = set()
    start_time = time.time()
    while time.time() - start_time < 3:
        try:
            packet, addr = sock.recvfrom(2048)
            if ip := _parse_arp_response(packet, local_mac):
                active_ips.add(ip)
                print(f"[+]活动主机: {ip}")
        except socket.timeout:
            break
        except:
            continue


    sock.close()
    
    # 排序输出结果
    for ip in sorted(active_ips, key=lambda x: list(map(int, x.split('.')))):
        print(f"[+]活动主机: {ip}")


def _build_arp_request(src_mac, src_ip, target_ip):
    """构造ARP请求包"""
    # 以太网帧头
    eth_header = (
        b'\xff' * 6 +                      # 目标MAC(广播)
        bytes.fromhex(src_mac.replace(':', '')) +  # 源MAC
        struct.pack('!H', 0x0806)           # 协议类型(ARP)
    )
    
    # ARP包头
    arp_header = struct.pack('!HHBBH', 
        0x0001, 0x0800, 6, 4, 0x0001)      # 硬件类型|协议类型|MAC长度|IP长度|操作码
    
    # 填充地址信息
    return eth_header + arp_header + (
        bytes.fromhex(src_mac.replace(':', '')) +  # 源MAC
        socket.inet_aton(src_ip) +                 # 源IP
        b'\x00' * 6 +                             # 目标MAC(空)
        socket.inet_aton(target_ip)                # 目标IP
    )


def _parse_arp_response(packet, local_mac):
    """解析ARP响应包"""
    if len(packet) < 42 or struct.unpack('!H', packet[12:14])[0] != 0x0806:
        return None


    arp_data = packet[14:42]
    src_mac = ':'.join(f"{b:02x}" for b in arp_data[6:12])
    src_ip = socket.inet_ntoa(arp_data[14:18])
    
    return src_ip if src_mac.lower() != local_mac.lower() else None


def _get_local_mac():
    """获取本机MAC地址(跨平台)"""
    if platform.system() == 'Windows':
        return _get_windows_mac()
    return ':'.join(("%012X" % getnode())[i:i+2] for i in range(0, 12, 2)).lower()


def _get_windows_mac():
    """Windows系统获取MAC地址"""
    class IP_ADAPTER_INFO(ctypes.Structure):
        _fields_ = [
            ("next", ctypes.c_void_p),
            ("combo_index", ctypes.c_uint),
            ("adapter_name", ctypes.c_char * 260),
            ("description", ctypes.c_char * 132),
            ("address_length", ctypes.c_uint),
            ("address", ctypes.c_ubyte * 8),
            ("index", ctypes.c_uint),
        ]


    buffer = ctypes.create_string_buffer(4096)
    size = ctypes.c_uint(ctypes.sizeof(buffer))
    if ctypes.windll.iphlpapi.GetAdaptersInfo(buffer, ctypes.byref(size)) != 0:
        return "00:00:00:00:00:00"
    
    adapter = ctypes.cast(buffer, ctypes.POINTER(IP_ADAPTER_INFO)).contents
    return ':'.join(f"{b:02X}" for b in adapter.address[:adapter.address_length]).lower()


# ------------------------- 端口扫描模块 -------------------------
def scan_ports(params):
    """多线程端口扫描"""
    required = ['ip', 'startport', 'endport']
    if any(r not in params for r in required):
        print(f"[-]缺少必要参数: {required}")
        return


    ip = params['ip']
    start = int(params['startport'])
    end = int(params['endport'])
    parallel = params.get('parallel', 'false').lower() == 'true'
    maxthread = min(int(params.get('maxthread', 5000)), 5000)


    print(f"[端口]开始扫描 {ip}:{start}-{end}...")


    def check_port(port):
        """尝试连接指定端口"""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.settimeout(1)
                s.connect((ip, port))
                return port, True
        except:
            return port, False


    with ThreadPoolExecutor(max_workers=maxthread if parallel else 1) as executor:
        futures = [executor.submit(check_port, p) for p in range(start, end+1)]
        for future in futures:
            port, status = future.result()
            if status:
                print(f"[+]端口 {port}")


# ------------------------- 命令行处理模块 -------------------------
def parse_params(args):
    """解析命令行参数"""
    return {k:v for k,v in (a.split('=',1) for a in args if '=' in a)}


def print_usage():
    """显示详细的使用说明"""
    usage = """
=== 网络工具箱 Netool 使用说明 ===


[命令列表]
1. listen    - 启动TCP监听服务
2. send      - 发送TCP数据包
3. scan      - 执行网络扫描
4. gui       - 启动图形界面


=== 详细参数说明 ===


### 1. 监听服务 (listen)
格式:netool listen ip=<IP地址> port=<端口>
参数说明:
  - ip    : 监听IP地址(默认:0.0.0.0)
  - port  : 监听端口(必需)


示例:
  netool listen ip=0.0.0.0 port=8080


### 2. 发送数据 (send)
格式:netool send ip=<目标IP> port=<目标端口> [可选参数]
参数说明:
  - ip        : 目标IP地址(必需)
  - port      : 目标端口(必需)
  - msg       : 发送消息内容(默认:测试数据)
  - times     : 发送次数(默认:1)
  - parallel  : 并行发送(true/false,默认:false)
  - maxthread : 最大线程数(并行时有效,默认:100)


示例:
  netool send ip=192.168.1.100 port=8080 msg="Hello" times=10 parallel=true


### 3. 网络扫描 (scan)
格式:netool scan type=<扫描类型> [类型参数]
参数说明:
  - type      : 扫描类型(ip/port,必需)
  
当 type=ip 时:
  (自动扫描本机所在网段)


当 type=port 时:
  - ip        : 目标IP地址(必需)
  - startport : 起始端口(必需)
  - endport   : 结束端口(必需)
  - parallel  : 并行扫描(true/false,默认:false)
  - maxthread : 最大线程数(默认:500)


示例:
  # ARP扫描局域网
  netool scan type=ip
  
  # 端口扫描
  netool scan type=port ip=192.168.1.1 startport=1 endport=100


### 4. 图形界面 (gui)
格式:netool gui


=== 注意事项 ===
1. ARP扫描需要管理员权限:
   Windows:以管理员身份运行cmd(24H2以上可以使用 sudo 命令)
   Linux  :使用 sudo 执行
2. 端口扫描线程数不宜超过2000
3. 发送模式parallel=true时建议maxthread<=5000
"""
    print(usage)


# ------------------------- 网络服务模块 -------------------------
def handle_listen(params):
    """启动TCP监听服务"""
    if 'port' not in params:
        print("[-]缺少必要参数: port")
        return


    ip = params.get('ip', '127.0.0.1')
    port = int(params['port'])


    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((ip, port))
        s.listen(5)
        print(f"[监听]启动于 {ip}:{port}")


        def client_handler(conn, addr):
            print(f"[连接]来自 {addr}")
            with conn:
                while True:
                    try:
                        if data := conn.recv(1024):
                            print(f"[数据]{data.decode()}")
                            print("--------新数据--------")
                        else:
                            break
                    except:
                        break


        while True:
            conn, addr = s.accept()
            threading.Thread(target=client_handler, args=(conn, addr), daemon=True).start()


    except Exception as e:
        print(f"[-]监听失败: {e}")


def handle_send(params):
    """发送TCP数据包"""
    required = ['ip', 'port']
    if any(r not in params for r in required):
        print(f"[-]缺少必要参数: {required}")
        return


    ip = params['ip']
    port = int(params['port'])
    msg = params.get('msg', 'Python-Requests')
    times = int(params.get('times', 1))
    parallel = params.get('parallel', 'false').lower() == 'true'
    maxthread = min(int(params.get('maxthread', 500)), 500)


    def send_packet():
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.settimeout(2)
                s.connect((ip, port))
                s.sendall(msg.encode())
                print(f"[发送]至{ip}:{port}")
        except Exception as e:
            print(f"[-]发送失败: {e}")


    if parallel:
        with ThreadPoolExecutor(max_workers=maxthread) as executor:
            [executor.submit(send_packet) for _ in range(times)]
    else:
        for _ in range(times):
            send_packet()
# ------------------------- 扫描命令处理模块 -------------------------
def handle_scan(params):
    """处理扫描命令"""
    if 'type' not in params:
        print("[-]缺少扫描类型参数")
        return
    
    scan_type = params['type']
    if scan_type == 'ip':
        scan_ips()
    elif scan_type == 'port':
        scan_ports(params)
    else:
        print(f"[-]无效的扫描类型: {scan_type}")


# ------------------------- 图形界面模块 -------------------------
class NetoolGUI(Tk):
    """主图形界面"""
    def __init__(self):
        super().__init__()
        self.title("Netool")
        self.geometry("800x600")
        
        # 创建选项卡
        self.notebook = ttk.Notebook(self)
        self._create_listen_tab()
        self._create_send_tab()
        self._create_scan_tab()
        self.notebook.pack(expand=True, fill=BOTH)
        
        # 日志区域
        self.log_area = scrolledtext.ScrolledText(self, height=10)
        self.log_area.pack(side=BOTTOM, fill=X)


    def _create_listen_tab(self):
        """监听选项卡"""
        frame = ttk.Frame(self.notebook)
        
        ttk.Label(frame, text="监听IP:").grid(row=0, column=0, padx=5, pady=5)
        self.listen_ip = ttk.Entry(frame)
        self.listen_ip.insert(0, '0.0.0.0')
        self.listen_ip.grid(row=0, column=1)
        
        ttk.Label(frame, text="端口:").grid(row=1, column=0)
        self.listen_port = ttk.Entry(frame)
        self.listen_port.grid(row=1, column=1)
        
        ttk.Button(frame, text="开始监听", command=self.start_listen)\
            .grid(row=2, columnspan=2, pady=10)
        
        self.notebook.add(frame, text="监听")


    def _create_send_tab(self):
        """发送选项卡"""
        frame = ttk.Frame(self.notebook)
        fields = [
            ('目标IP:', 'ip'), ('目标端口:', 'port'),
            ('消息内容:', 'msg'), ('发送次数:', 'times'),
            ('并行发送:', 'parallel'), ('最大线程:', 'maxthread')
        ]
        
        for i, (label, name) in enumerate(fields):
            ttk.Label(frame, text=label).grid(row=i, column=0, padx=5, pady=2, sticky=W)
            entry = ttk.Entry(frame)
            entry.grid(row=i, column=1, padx=5, pady=2)
            setattr(self, f'send_{name}', entry)
        
        self.send_msg.insert(0, '测试数据')
        self.send_times.insert(0, '1')
        self.send_parallel.insert(0, 'false')
        self.send_maxthread.insert(0, '100')
        
        ttk.Button(frame, text="开始发送", command=self.start_send)\
            .grid(row=len(fields), columnspan=2)
        
        self.notebook.add(frame, text="发送")


    def _create_scan_tab(self):
        """扫描选项卡"""
        frame = ttk.Frame(self.notebook)
        
        # 扫描类型选择
        ttk.Label(frame, text="扫描类型:").grid(row=0, column=0, padx=5, pady=5)
        self.scan_type = ttk.Combobox(frame, values=['ip', 'port'], state='readonly')
        self.scan_type.current(0)
        self.scan_type.grid(row=0, column=1)
        
        # 动态字段
        self.scan_fields = {
            'ip': (ttk.Label(frame, text="目标IP:"), ttk.Entry(frame)),
            'startport': (ttk.Label(frame, text="起始端口:"), ttk.Entry(frame)),
            'endport': (ttk.Label(frame, text="结束端口:"), ttk.Entry(frame)),
            'parallel': (ttk.Label(frame, text="并行扫描:"), 
                        ttk.Combobox(frame, values=['true', 'false'], state='readonly')),
            'maxthread': (ttk.Label(frame, text="最大线程:"), ttk.Entry(frame))
        }
        
        # 初始布局
        for i, (key, (label, entry)) in enumerate(self.scan_fields.items(), 1):
            label.grid(row=i, column=0, padx=5, pady=2, sticky=W)
            entry.grid(row=i, column=1, padx=5, pady=2)
            if key == 'parallel':
                entry.set('false')
            elif key == 'maxthread':
                entry.insert(0, '500')
        
        # 扫描按钮
        ttk.Button(frame, text="开始扫描", command=self.start_scan)\
            .grid(row=6, columnspan=2, pady=10)
        
        # 类型切换事件
        self.scan_type.bind('<<ComboboxSelected>>', self._update_scan_ui)
        self._update_scan_ui()
        self.notebook.add(frame, text="扫描")


    def _update_scan_ui(self, event=None):
        """更新扫描界面"""
        scan_type = self.scan_type.get()
        visible = scan_type == 'port'
        
        for key in ['ip', 'startport', 'endport', 'parallel', 'maxthread']:
            self.scan_fields[key][0].grid_remove() if not visible else self.scan_fields[key][0].grid()
            self.scan_fields[key][1].grid_remove() if not visible else self.scan_fields[key][1].grid()


    def log(self, message):
        """日志输出"""
        self.log_area.insert(END, f"{message}\n")
        self.log_area.see(END)


    def start_listen(self):
        """启动监听线程"""
        ip = self.listen_ip.get().strip() or '0.0.0.0'
        port = self.listen_port.get().strip()
        
        if not port.isdigit():
            messagebox.showerror("错误", "端口号必须为数字")
            return
        
        threading.Thread(
            target=self._run_listen, 
            args=(ip, int(port)), 
            daemon=True
        ).start()
        self.log(f"开始监听 {ip}:{port}")


    def _run_listen(self, ip, port):
        """监听服务运行逻辑"""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                s.bind((ip, port))
                s.listen(5)
                self.log(f"监听服务已启动")
                
                while True:
                    conn, addr = s.accept()
                    self.log(f"[+]新连接: {addr[0]}:{addr[1]}")
                    threading.Thread(
                        target=self._handle_client,
                        args=(conn,),
                        daemon=True
                    ).start()
        except Exception as e:
            self.log(f"[-]监听错误: {e}")


    def _handle_client(self, conn):
        """处理客户端连接"""
        with conn:
            while True:
                try:
                    data = conn.recv(1024)
                    if data:
                        self.log(f"[+]收到数据: {data.decode()}")
                    else:
                        break
                except:
                    break


    def start_send(self):
        """启动发送线程"""
        params = {
            'ip': self.send_ip.get(),
            'port': self.send_port.get(),
            'msg': self.send_msg.get(),
            'times': self.send_times.get(),
            'parallel': self.send_parallel.get(),
            'maxthread': self.send_maxthread.get()
        }
        threading.Thread(
            target=handle_send, 
            args=(params,), 
            daemon=True
        ).start()
        self.log("开始发送数据...")


    def start_scan(self):
        """启动扫描线程"""
        scan_type = self.scan_type.get()
        params = {
            'type': scan_type,
            'ip': self.scan_fields['ip'][1].get() if scan_type == 'port' else '',
            'startport': self.scan_fields['startport'][1].get(),
            'endport': self.scan_fields['endport'][1].get(),
            'parallel': self.scan_fields['parallel'][1].get(),
            'maxthread': self.scan_fields['maxthread'][1].get()
        }
        threading.Thread(
            target=handle_scan, 
            args=(params,), 
            daemon=True
        ).start()
        self.log(f"开始{scan_type.upper()}扫描...")


# ------------------------- 主程序入口 -------------------------
def main():
    if len(sys.argv) < 2:
        print_usage()
        return
    
    command = sys.argv[1].lower()
    params = parse_params(sys.argv[2:])
    
    try:
        commands = {
            'listen': handle_listen,
            'send': handle_send,
            'scan': handle_scan,
            'gui': lambda _: NetoolGUI().mainloop()
        }
        if command in commands:
            commands[command](params)
        else:
            print(f"[-]未知命令: {command}")
            print_usage()
    except Exception as e:
        print(f"[-]运行时错误: {e}")


if __name__ == "__main__":
    main()


默认分类 | 阅读 14 次
文章评论,共1条
XXGGZZQQ(作者)
5小时前
1
如果不用ARP,那么ICMP该怎么写?
游客请输入验证码
文章分类
文章归档