作者在 2025-05-24 16:39:52 发布以下内容
import sys
import time
import ctypes
import socket
import struct
import platform
import threading
from tkinter import *
from tkinter import ttk, scrolledtext, messagebox
from concurrent.futures import ThreadPoolExecutor
from uuid import getnode
# ------------------------- 基础工具函数 -------------------------
def get_local_ip():
"""获取本机IPv4地址(排除回环地址)"""
try:
# 通过UDP连接公共DNS服务器获取本机IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception as e:
return "127.0.0.1" # 失败时返回回环地址
# ------------------------- ARP扫描模块 -------------------------
def scan_ips():
"""ARP扫描局域网活动主机"""
base_ip = '.'.join(get_local_ip().split('.')[:-1]) + '.'
print(f"[ARP]开始扫描 {base_ip}0/24...")
local_mac = _get_local_mac()
local_ip = get_local_ip()
try:
# 创建原始套接字(需要管理员权限)
sock = socket.socket(
socket.AF_INET,
socket.SOCK_RAW,
socket.IPPROTO_RAW if platform.system() == 'Windows' else socket.IPPROTO_ARP
)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.settimeout(3)
except Exception as e:
print(f"[-]创建原始套接字失败: {e}(请用管理员权限运行)")
return
# 构建并发送ARP广播包
arp_packet = _build_arp_request(local_mac, local_ip, base_ip + "0")
try:
sock.sendto(arp_packet, (base_ip + '255', 0))
print("[ARP]广播请求已发送")
except Exception as e:
print(f"[-]发送失败: {e}")
sock.close()
return
# 接收响应处理
active_ips = set()
start_time = time.time()
while time.time() - start_time < 3:
try:
packet, addr = sock.recvfrom(2048)
if ip := _parse_arp_response(packet, local_mac):
active_ips.add(ip)
print(f"[+]活动主机: {ip}")
except socket.timeout:
break
except:
continue
sock.close()
# 排序输出结果
for ip in sorted(active_ips, key=lambda x: list(map(int, x.split('.')))):
print(f"[+]活动主机: {ip}")
def _build_arp_request(src_mac, src_ip, target_ip):
"""构造ARP请求包"""
# 以太网帧头
eth_header = (
b'\xff' * 6 + # 目标MAC(广播)
bytes.fromhex(src_mac.replace(':', '')) + # 源MAC
struct.pack('!H', 0x0806) # 协议类型(ARP)
)
# ARP包头
arp_header = struct.pack('!HHBBH',
0x0001, 0x0800, 6, 4, 0x0001) # 硬件类型|协议类型|MAC长度|IP长度|操作码
# 填充地址信息
return eth_header + arp_header + (
bytes.fromhex(src_mac.replace(':', '')) + # 源MAC
socket.inet_aton(src_ip) + # 源IP
b'\x00' * 6 + # 目标MAC(空)
socket.inet_aton(target_ip) # 目标IP
)
def _parse_arp_response(packet, local_mac):
"""解析ARP响应包"""
if len(packet) < 42 or struct.unpack('!H', packet[12:14])[0] != 0x0806:
return None
arp_data = packet[14:42]
src_mac = ':'.join(f"{b:02x}" for b in arp_data[6:12])
src_ip = socket.inet_ntoa(arp_data[14:18])
return src_ip if src_mac.lower() != local_mac.lower() else None
def _get_local_mac():
"""获取本机MAC地址(跨平台)"""
if platform.system() == 'Windows':
return _get_windows_mac()
return ':'.join(("%012X" % getnode())[i:i+2] for i in range(0, 12, 2)).lower()
def _get_windows_mac():
"""Windows系统获取MAC地址"""
class IP_ADAPTER_INFO(ctypes.Structure):
_fields_ = [
("next", ctypes.c_void_p),
("combo_index", ctypes.c_uint),
("adapter_name", ctypes.c_char * 260),
("description", ctypes.c_char * 132),
("address_length", ctypes.c_uint),
("address", ctypes.c_ubyte * 8),
("index", ctypes.c_uint),
]
buffer = ctypes.create_string_buffer(4096)
size = ctypes.c_uint(ctypes.sizeof(buffer))
if ctypes.windll.iphlpapi.GetAdaptersInfo(buffer, ctypes.byref(size)) != 0:
return "00:00:00:00:00:00"
adapter = ctypes.cast(buffer, ctypes.POINTER(IP_ADAPTER_INFO)).contents
return ':'.join(f"{b:02X}" for b in adapter.address[:adapter.address_length]).lower()
# ------------------------- 端口扫描模块 -------------------------
def scan_ports(params):
"""多线程端口扫描"""
required = ['ip', 'startport', 'endport']
if any(r not in params for r in required):
print(f"[-]缺少必要参数: {required}")
return
ip = params['ip']
start = int(params['startport'])
end = int(params['endport'])
parallel = params.get('parallel', 'false').lower() == 'true'
maxthread = min(int(params.get('maxthread', 5000)), 5000)
print(f"[端口]开始扫描 {ip}:{start}-{end}...")
def check_port(port):
"""尝试连接指定端口"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
s.connect((ip, port))
return port, True
except:
return port, False
with ThreadPoolExecutor(max_workers=maxthread if parallel else 1) as executor:
futures = [executor.submit(check_port, p) for p in range(start, end+1)]
for future in futures:
port, status = future.result()
if status:
print(f"[+]端口 {port}")
# ------------------------- 命令行处理模块 -------------------------
def parse_params(args):
"""解析命令行参数"""
return {k:v for k,v in (a.split('=',1) for a in args if '=' in a)}
def print_usage():
"""显示详细的使用说明"""
usage = """
=== 网络工具箱 Netool 使用说明 ===
[命令列表]
1. listen - 启动TCP监听服务
2. send - 发送TCP数据包
3. scan - 执行网络扫描
4. gui - 启动图形界面
=== 详细参数说明 ===
### 1. 监听服务 (listen)
格式:netool listen ip=<IP地址> port=<端口>
参数说明:
- ip : 监听IP地址(默认:0.0.0.0)
- port : 监听端口(必需)
示例:
netool listen ip=0.0.0.0 port=8080
### 2. 发送数据 (send)
格式:netool send ip=<目标IP> port=<目标端口> [可选参数]
参数说明:
- ip : 目标IP地址(必需)
- port : 目标端口(必需)
- msg : 发送消息内容(默认:测试数据)
- times : 发送次数(默认:1)
- parallel : 并行发送(true/false,默认:false)
- maxthread : 最大线程数(并行时有效,默认:100)
示例:
netool send ip=192.168.1.100 port=8080 msg="Hello" times=10 parallel=true
### 3. 网络扫描 (scan)
格式:netool scan type=<扫描类型> [类型参数]
参数说明:
- type : 扫描类型(ip/port,必需)
当 type=ip 时:
(自动扫描本机所在网段)
当 type=port 时:
- ip : 目标IP地址(必需)
- startport : 起始端口(必需)
- endport : 结束端口(必需)
- parallel : 并行扫描(true/false,默认:false)
- maxthread : 最大线程数(默认:500)
示例:
# ARP扫描局域网
netool scan type=ip
# 端口扫描
netool scan type=port ip=192.168.1.1 startport=1 endport=100
### 4. 图形界面 (gui)
格式:netool gui
=== 注意事项 ===
1. ARP扫描需要管理员权限:
Windows:以管理员身份运行cmd(24H2以上可以使用 sudo 命令)
Linux :使用 sudo 执行
2. 端口扫描线程数不宜超过2000
3. 发送模式parallel=true时建议maxthread<=5000
"""
print(usage)
# ------------------------- 网络服务模块 -------------------------
def handle_listen(params):
"""启动TCP监听服务"""
if 'port' not in params:
print("[-]缺少必要参数: port")
return
ip = params.get('ip', '127.0.0.1')
port = int(params['port'])
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((ip, port))
s.listen(5)
print(f"[监听]启动于 {ip}:{port}")
def client_handler(conn, addr):
print(f"[连接]来自 {addr}")
with conn:
while True:
try:
if data := conn.recv(1024):
print(f"[数据]{data.decode()}")
print("--------新数据--------")
else:
break
except:
break
while True:
conn, addr = s.accept()
threading.Thread(target=client_handler, args=(conn, addr), daemon=True).start()
except Exception as e:
print(f"[-]监听失败: {e}")
def handle_send(params):
"""发送TCP数据包"""
required = ['ip', 'port']
if any(r not in params for r in required):
print(f"[-]缺少必要参数: {required}")
return
ip = params['ip']
port = int(params['port'])
msg = params.get('msg', 'Python-Requests')
times = int(params.get('times', 1))
parallel = params.get('parallel', 'false').lower() == 'true'
maxthread = min(int(params.get('maxthread', 500)), 500)
def send_packet():
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(2)
s.connect((ip, port))
s.sendall(msg.encode())
print(f"[发送]至{ip}:{port}")
except Exception as e:
print(f"[-]发送失败: {e}")
if parallel:
with ThreadPoolExecutor(max_workers=maxthread) as executor:
[executor.submit(send_packet) for _ in range(times)]
else:
for _ in range(times):
send_packet()
# ------------------------- 扫描命令处理模块 -------------------------
def handle_scan(params):
"""处理扫描命令"""
if 'type' not in params:
print("[-]缺少扫描类型参数")
return
scan_type = params['type']
if scan_type == 'ip':
scan_ips()
elif scan_type == 'port':
scan_ports(params)
else:
print(f"[-]无效的扫描类型: {scan_type}")
# ------------------------- 图形界面模块 -------------------------
class NetoolGUI(Tk):
"""主图形界面"""
def __init__(self):
super().__init__()
self.title("Netool")
self.geometry("800x600")
# 创建选项卡
self.notebook = ttk.Notebook(self)
self._create_listen_tab()
self._create_send_tab()
self._create_scan_tab()
self.notebook.pack(expand=True, fill=BOTH)
# 日志区域
self.log_area = scrolledtext.ScrolledText(self, height=10)
self.log_area.pack(side=BOTTOM, fill=X)
def _create_listen_tab(self):
"""监听选项卡"""
frame = ttk.Frame(self.notebook)
ttk.Label(frame, text="监听IP:").grid(row=0, column=0, padx=5, pady=5)
self.listen_ip = ttk.Entry(frame)
self.listen_ip.insert(0, '0.0.0.0')
self.listen_ip.grid(row=0, column=1)
ttk.Label(frame, text="端口:").grid(row=1, column=0)
self.listen_port = ttk.Entry(frame)
self.listen_port.grid(row=1, column=1)
ttk.Button(frame, text="开始监听", command=self.start_listen)\
.grid(row=2, columnspan=2, pady=10)
self.notebook.add(frame, text="监听")
def _create_send_tab(self):
"""发送选项卡"""
frame = ttk.Frame(self.notebook)
fields = [
('目标IP:', 'ip'), ('目标端口:', 'port'),
('消息内容:', 'msg'), ('发送次数:', 'times'),
('并行发送:', 'parallel'), ('最大线程:', 'maxthread')
]
for i, (label, name) in enumerate(fields):
ttk.Label(frame, text=label).grid(row=i, column=0, padx=5, pady=2, sticky=W)
entry = ttk.Entry(frame)
entry.grid(row=i, column=1, padx=5, pady=2)
setattr(self, f'send_{name}', entry)
self.send_msg.insert(0, '测试数据')
self.send_times.insert(0, '1')
self.send_parallel.insert(0, 'false')
self.send_maxthread.insert(0, '100')
ttk.Button(frame, text="开始发送", command=self.start_send)\
.grid(row=len(fields), columnspan=2)
self.notebook.add(frame, text="发送")
def _create_scan_tab(self):
"""扫描选项卡"""
frame = ttk.Frame(self.notebook)
# 扫描类型选择
ttk.Label(frame, text="扫描类型:").grid(row=0, column=0, padx=5, pady=5)
self.scan_type = ttk.Combobox(frame, values=['ip', 'port'], state='readonly')
self.scan_type.current(0)
self.scan_type.grid(row=0, column=1)
# 动态字段
self.scan_fields = {
'ip': (ttk.Label(frame, text="目标IP:"), ttk.Entry(frame)),
'startport': (ttk.Label(frame, text="起始端口:"), ttk.Entry(frame)),
'endport': (ttk.Label(frame, text="结束端口:"), ttk.Entry(frame)),
'parallel': (ttk.Label(frame, text="并行扫描:"),
ttk.Combobox(frame, values=['true', 'false'], state='readonly')),
'maxthread': (ttk.Label(frame, text="最大线程:"), ttk.Entry(frame))
}
# 初始布局
for i, (key, (label, entry)) in enumerate(self.scan_fields.items(), 1):
label.grid(row=i, column=0, padx=5, pady=2, sticky=W)
entry.grid(row=i, column=1, padx=5, pady=2)
if key == 'parallel':
entry.set('false')
elif key == 'maxthread':
entry.insert(0, '500')
# 扫描按钮
ttk.Button(frame, text="开始扫描", command=self.start_scan)\
.grid(row=6, columnspan=2, pady=10)
# 类型切换事件
self.scan_type.bind('<<ComboboxSelected>>', self._update_scan_ui)
self._update_scan_ui()
self.notebook.add(frame, text="扫描")
def _update_scan_ui(self, event=None):
"""更新扫描界面"""
scan_type = self.scan_type.get()
visible = scan_type == 'port'
for key in ['ip', 'startport', 'endport', 'parallel', 'maxthread']:
self.scan_fields[key][0].grid_remove() if not visible else self.scan_fields[key][0].grid()
self.scan_fields[key][1].grid_remove() if not visible else self.scan_fields[key][1].grid()
def log(self, message):
"""日志输出"""
self.log_area.insert(END, f"{message}\n")
self.log_area.see(END)
def start_listen(self):
"""启动监听线程"""
ip = self.listen_ip.get().strip() or '0.0.0.0'
port = self.listen_port.get().strip()
if not port.isdigit():
messagebox.showerror("错误", "端口号必须为数字")
return
threading.Thread(
target=self._run_listen,
args=(ip, int(port)),
daemon=True
).start()
self.log(f"开始监听 {ip}:{port}")
def _run_listen(self, ip, port):
"""监听服务运行逻辑"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((ip, port))
s.listen(5)
self.log(f"监听服务已启动")
while True:
conn, addr = s.accept()
self.log(f"[+]新连接: {addr[0]}:{addr[1]}")
threading.Thread(
target=self._handle_client,
args=(conn,),
daemon=True
).start()
except Exception as e:
self.log(f"[-]监听错误: {e}")
def _handle_client(self, conn):
"""处理客户端连接"""
with conn:
while True:
try:
data = conn.recv(1024)
if data:
self.log(f"[+]收到数据: {data.decode()}")
else:
break
except:
break
def start_send(self):
"""启动发送线程"""
params = {
'ip': self.send_ip.get(),
'port': self.send_port.get(),
'msg': self.send_msg.get(),
'times': self.send_times.get(),
'parallel': self.send_parallel.get(),
'maxthread': self.send_maxthread.get()
}
threading.Thread(
target=handle_send,
args=(params,),
daemon=True
).start()
self.log("开始发送数据...")
def start_scan(self):
"""启动扫描线程"""
scan_type = self.scan_type.get()
params = {
'type': scan_type,
'ip': self.scan_fields['ip'][1].get() if scan_type == 'port' else '',
'startport': self.scan_fields['startport'][1].get(),
'endport': self.scan_fields['endport'][1].get(),
'parallel': self.scan_fields['parallel'][1].get(),
'maxthread': self.scan_fields['maxthread'][1].get()
}
threading.Thread(
target=handle_scan,
args=(params,),
daemon=True
).start()
self.log(f"开始{scan_type.upper()}扫描...")
# ------------------------- 主程序入口 -------------------------
def main():
if len(sys.argv) < 2:
print_usage()
return
command = sys.argv[1].lower()
params = parse_params(sys.argv[2:])
try:
commands = {
'listen': handle_listen,
'send': handle_send,
'scan': handle_scan,
'gui': lambda _: NetoolGUI().mainloop()
}
if command in commands:
commands[command](params)
else:
print(f"[-]未知命令: {command}")
print_usage()
except Exception as e:
print(f"[-]运行时错误: {e}")
if __name__ == "__main__":
main()
import time
import ctypes
import socket
import struct
import platform
import threading
from tkinter import *
from tkinter import ttk, scrolledtext, messagebox
from concurrent.futures import ThreadPoolExecutor
from uuid import getnode
# ------------------------- 基础工具函数 -------------------------
def get_local_ip():
"""获取本机IPv4地址(排除回环地址)"""
try:
# 通过UDP连接公共DNS服务器获取本机IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception as e:
return "127.0.0.1" # 失败时返回回环地址
# ------------------------- ARP扫描模块 -------------------------
def scan_ips():
"""ARP扫描局域网活动主机"""
base_ip = '.'.join(get_local_ip().split('.')[:-1]) + '.'
print(f"[ARP]开始扫描 {base_ip}0/24...")
local_mac = _get_local_mac()
local_ip = get_local_ip()
try:
# 创建原始套接字(需要管理员权限)
sock = socket.socket(
socket.AF_INET,
socket.SOCK_RAW,
socket.IPPROTO_RAW if platform.system() == 'Windows' else socket.IPPROTO_ARP
)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.settimeout(3)
except Exception as e:
print(f"[-]创建原始套接字失败: {e}(请用管理员权限运行)")
return
# 构建并发送ARP广播包
arp_packet = _build_arp_request(local_mac, local_ip, base_ip + "0")
try:
sock.sendto(arp_packet, (base_ip + '255', 0))
print("[ARP]广播请求已发送")
except Exception as e:
print(f"[-]发送失败: {e}")
sock.close()
return
# 接收响应处理
active_ips = set()
start_time = time.time()
while time.time() - start_time < 3:
try:
packet, addr = sock.recvfrom(2048)
if ip := _parse_arp_response(packet, local_mac):
active_ips.add(ip)
print(f"[+]活动主机: {ip}")
except socket.timeout:
break
except:
continue
sock.close()
# 排序输出结果
for ip in sorted(active_ips, key=lambda x: list(map(int, x.split('.')))):
print(f"[+]活动主机: {ip}")
def _build_arp_request(src_mac, src_ip, target_ip):
"""构造ARP请求包"""
# 以太网帧头
eth_header = (
b'\xff' * 6 + # 目标MAC(广播)
bytes.fromhex(src_mac.replace(':', '')) + # 源MAC
struct.pack('!H', 0x0806) # 协议类型(ARP)
)
# ARP包头
arp_header = struct.pack('!HHBBH',
0x0001, 0x0800, 6, 4, 0x0001) # 硬件类型|协议类型|MAC长度|IP长度|操作码
# 填充地址信息
return eth_header + arp_header + (
bytes.fromhex(src_mac.replace(':', '')) + # 源MAC
socket.inet_aton(src_ip) + # 源IP
b'\x00' * 6 + # 目标MAC(空)
socket.inet_aton(target_ip) # 目标IP
)
def _parse_arp_response(packet, local_mac):
"""解析ARP响应包"""
if len(packet) < 42 or struct.unpack('!H', packet[12:14])[0] != 0x0806:
return None
arp_data = packet[14:42]
src_mac = ':'.join(f"{b:02x}" for b in arp_data[6:12])
src_ip = socket.inet_ntoa(arp_data[14:18])
return src_ip if src_mac.lower() != local_mac.lower() else None
def _get_local_mac():
"""获取本机MAC地址(跨平台)"""
if platform.system() == 'Windows':
return _get_windows_mac()
return ':'.join(("%012X" % getnode())[i:i+2] for i in range(0, 12, 2)).lower()
def _get_windows_mac():
"""Windows系统获取MAC地址"""
class IP_ADAPTER_INFO(ctypes.Structure):
_fields_ = [
("next", ctypes.c_void_p),
("combo_index", ctypes.c_uint),
("adapter_name", ctypes.c_char * 260),
("description", ctypes.c_char * 132),
("address_length", ctypes.c_uint),
("address", ctypes.c_ubyte * 8),
("index", ctypes.c_uint),
]
buffer = ctypes.create_string_buffer(4096)
size = ctypes.c_uint(ctypes.sizeof(buffer))
if ctypes.windll.iphlpapi.GetAdaptersInfo(buffer, ctypes.byref(size)) != 0:
return "00:00:00:00:00:00"
adapter = ctypes.cast(buffer, ctypes.POINTER(IP_ADAPTER_INFO)).contents
return ':'.join(f"{b:02X}" for b in adapter.address[:adapter.address_length]).lower()
# ------------------------- 端口扫描模块 -------------------------
def scan_ports(params):
"""多线程端口扫描"""
required = ['ip', 'startport', 'endport']
if any(r not in params for r in required):
print(f"[-]缺少必要参数: {required}")
return
ip = params['ip']
start = int(params['startport'])
end = int(params['endport'])
parallel = params.get('parallel', 'false').lower() == 'true'
maxthread = min(int(params.get('maxthread', 5000)), 5000)
print(f"[端口]开始扫描 {ip}:{start}-{end}...")
def check_port(port):
"""尝试连接指定端口"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
s.connect((ip, port))
return port, True
except:
return port, False
with ThreadPoolExecutor(max_workers=maxthread if parallel else 1) as executor:
futures = [executor.submit(check_port, p) for p in range(start, end+1)]
for future in futures:
port, status = future.result()
if status:
print(f"[+]端口 {port}")
# ------------------------- 命令行处理模块 -------------------------
def parse_params(args):
"""解析命令行参数"""
return {k:v for k,v in (a.split('=',1) for a in args if '=' in a)}
def print_usage():
"""显示详细的使用说明"""
usage = """
=== 网络工具箱 Netool 使用说明 ===
[命令列表]
1. listen - 启动TCP监听服务
2. send - 发送TCP数据包
3. scan - 执行网络扫描
4. gui - 启动图形界面
=== 详细参数说明 ===
### 1. 监听服务 (listen)
格式:netool listen ip=<IP地址> port=<端口>
参数说明:
- ip : 监听IP地址(默认:0.0.0.0)
- port : 监听端口(必需)
示例:
netool listen ip=0.0.0.0 port=8080
### 2. 发送数据 (send)
格式:netool send ip=<目标IP> port=<目标端口> [可选参数]
参数说明:
- ip : 目标IP地址(必需)
- port : 目标端口(必需)
- msg : 发送消息内容(默认:测试数据)
- times : 发送次数(默认:1)
- parallel : 并行发送(true/false,默认:false)
- maxthread : 最大线程数(并行时有效,默认:100)
示例:
netool send ip=192.168.1.100 port=8080 msg="Hello" times=10 parallel=true
### 3. 网络扫描 (scan)
格式:netool scan type=<扫描类型> [类型参数]
参数说明:
- type : 扫描类型(ip/port,必需)
当 type=ip 时:
(自动扫描本机所在网段)
当 type=port 时:
- ip : 目标IP地址(必需)
- startport : 起始端口(必需)
- endport : 结束端口(必需)
- parallel : 并行扫描(true/false,默认:false)
- maxthread : 最大线程数(默认:500)
示例:
# ARP扫描局域网
netool scan type=ip
# 端口扫描
netool scan type=port ip=192.168.1.1 startport=1 endport=100
### 4. 图形界面 (gui)
格式:netool gui
=== 注意事项 ===
1. ARP扫描需要管理员权限:
Windows:以管理员身份运行cmd(24H2以上可以使用 sudo 命令)
Linux :使用 sudo 执行
2. 端口扫描线程数不宜超过2000
3. 发送模式parallel=true时建议maxthread<=5000
"""
print(usage)
# ------------------------- 网络服务模块 -------------------------
def handle_listen(params):
"""启动TCP监听服务"""
if 'port' not in params:
print("[-]缺少必要参数: port")
return
ip = params.get('ip', '127.0.0.1')
port = int(params['port'])
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((ip, port))
s.listen(5)
print(f"[监听]启动于 {ip}:{port}")
def client_handler(conn, addr):
print(f"[连接]来自 {addr}")
with conn:
while True:
try:
if data := conn.recv(1024):
print(f"[数据]{data.decode()}")
print("--------新数据--------")
else:
break
except:
break
while True:
conn, addr = s.accept()
threading.Thread(target=client_handler, args=(conn, addr), daemon=True).start()
except Exception as e:
print(f"[-]监听失败: {e}")
def handle_send(params):
"""发送TCP数据包"""
required = ['ip', 'port']
if any(r not in params for r in required):
print(f"[-]缺少必要参数: {required}")
return
ip = params['ip']
port = int(params['port'])
msg = params.get('msg', 'Python-Requests')
times = int(params.get('times', 1))
parallel = params.get('parallel', 'false').lower() == 'true'
maxthread = min(int(params.get('maxthread', 500)), 500)
def send_packet():
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(2)
s.connect((ip, port))
s.sendall(msg.encode())
print(f"[发送]至{ip}:{port}")
except Exception as e:
print(f"[-]发送失败: {e}")
if parallel:
with ThreadPoolExecutor(max_workers=maxthread) as executor:
[executor.submit(send_packet) for _ in range(times)]
else:
for _ in range(times):
send_packet()
# ------------------------- 扫描命令处理模块 -------------------------
def handle_scan(params):
"""处理扫描命令"""
if 'type' not in params:
print("[-]缺少扫描类型参数")
return
scan_type = params['type']
if scan_type == 'ip':
scan_ips()
elif scan_type == 'port':
scan_ports(params)
else:
print(f"[-]无效的扫描类型: {scan_type}")
# ------------------------- 图形界面模块 -------------------------
class NetoolGUI(Tk):
"""主图形界面"""
def __init__(self):
super().__init__()
self.title("Netool")
self.geometry("800x600")
# 创建选项卡
self.notebook = ttk.Notebook(self)
self._create_listen_tab()
self._create_send_tab()
self._create_scan_tab()
self.notebook.pack(expand=True, fill=BOTH)
# 日志区域
self.log_area = scrolledtext.ScrolledText(self, height=10)
self.log_area.pack(side=BOTTOM, fill=X)
def _create_listen_tab(self):
"""监听选项卡"""
frame = ttk.Frame(self.notebook)
ttk.Label(frame, text="监听IP:").grid(row=0, column=0, padx=5, pady=5)
self.listen_ip = ttk.Entry(frame)
self.listen_ip.insert(0, '0.0.0.0')
self.listen_ip.grid(row=0, column=1)
ttk.Label(frame, text="端口:").grid(row=1, column=0)
self.listen_port = ttk.Entry(frame)
self.listen_port.grid(row=1, column=1)
ttk.Button(frame, text="开始监听", command=self.start_listen)\
.grid(row=2, columnspan=2, pady=10)
self.notebook.add(frame, text="监听")
def _create_send_tab(self):
"""发送选项卡"""
frame = ttk.Frame(self.notebook)
fields = [
('目标IP:', 'ip'), ('目标端口:', 'port'),
('消息内容:', 'msg'), ('发送次数:', 'times'),
('并行发送:', 'parallel'), ('最大线程:', 'maxthread')
]
for i, (label, name) in enumerate(fields):
ttk.Label(frame, text=label).grid(row=i, column=0, padx=5, pady=2, sticky=W)
entry = ttk.Entry(frame)
entry.grid(row=i, column=1, padx=5, pady=2)
setattr(self, f'send_{name}', entry)
self.send_msg.insert(0, '测试数据')
self.send_times.insert(0, '1')
self.send_parallel.insert(0, 'false')
self.send_maxthread.insert(0, '100')
ttk.Button(frame, text="开始发送", command=self.start_send)\
.grid(row=len(fields), columnspan=2)
self.notebook.add(frame, text="发送")
def _create_scan_tab(self):
"""扫描选项卡"""
frame = ttk.Frame(self.notebook)
# 扫描类型选择
ttk.Label(frame, text="扫描类型:").grid(row=0, column=0, padx=5, pady=5)
self.scan_type = ttk.Combobox(frame, values=['ip', 'port'], state='readonly')
self.scan_type.current(0)
self.scan_type.grid(row=0, column=1)
# 动态字段
self.scan_fields = {
'ip': (ttk.Label(frame, text="目标IP:"), ttk.Entry(frame)),
'startport': (ttk.Label(frame, text="起始端口:"), ttk.Entry(frame)),
'endport': (ttk.Label(frame, text="结束端口:"), ttk.Entry(frame)),
'parallel': (ttk.Label(frame, text="并行扫描:"),
ttk.Combobox(frame, values=['true', 'false'], state='readonly')),
'maxthread': (ttk.Label(frame, text="最大线程:"), ttk.Entry(frame))
}
# 初始布局
for i, (key, (label, entry)) in enumerate(self.scan_fields.items(), 1):
label.grid(row=i, column=0, padx=5, pady=2, sticky=W)
entry.grid(row=i, column=1, padx=5, pady=2)
if key == 'parallel':
entry.set('false')
elif key == 'maxthread':
entry.insert(0, '500')
# 扫描按钮
ttk.Button(frame, text="开始扫描", command=self.start_scan)\
.grid(row=6, columnspan=2, pady=10)
# 类型切换事件
self.scan_type.bind('<<ComboboxSelected>>', self._update_scan_ui)
self._update_scan_ui()
self.notebook.add(frame, text="扫描")
def _update_scan_ui(self, event=None):
"""更新扫描界面"""
scan_type = self.scan_type.get()
visible = scan_type == 'port'
for key in ['ip', 'startport', 'endport', 'parallel', 'maxthread']:
self.scan_fields[key][0].grid_remove() if not visible else self.scan_fields[key][0].grid()
self.scan_fields[key][1].grid_remove() if not visible else self.scan_fields[key][1].grid()
def log(self, message):
"""日志输出"""
self.log_area.insert(END, f"{message}\n")
self.log_area.see(END)
def start_listen(self):
"""启动监听线程"""
ip = self.listen_ip.get().strip() or '0.0.0.0'
port = self.listen_port.get().strip()
if not port.isdigit():
messagebox.showerror("错误", "端口号必须为数字")
return
threading.Thread(
target=self._run_listen,
args=(ip, int(port)),
daemon=True
).start()
self.log(f"开始监听 {ip}:{port}")
def _run_listen(self, ip, port):
"""监听服务运行逻辑"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((ip, port))
s.listen(5)
self.log(f"监听服务已启动")
while True:
conn, addr = s.accept()
self.log(f"[+]新连接: {addr[0]}:{addr[1]}")
threading.Thread(
target=self._handle_client,
args=(conn,),
daemon=True
).start()
except Exception as e:
self.log(f"[-]监听错误: {e}")
def _handle_client(self, conn):
"""处理客户端连接"""
with conn:
while True:
try:
data = conn.recv(1024)
if data:
self.log(f"[+]收到数据: {data.decode()}")
else:
break
except:
break
def start_send(self):
"""启动发送线程"""
params = {
'ip': self.send_ip.get(),
'port': self.send_port.get(),
'msg': self.send_msg.get(),
'times': self.send_times.get(),
'parallel': self.send_parallel.get(),
'maxthread': self.send_maxthread.get()
}
threading.Thread(
target=handle_send,
args=(params,),
daemon=True
).start()
self.log("开始发送数据...")
def start_scan(self):
"""启动扫描线程"""
scan_type = self.scan_type.get()
params = {
'type': scan_type,
'ip': self.scan_fields['ip'][1].get() if scan_type == 'port' else '',
'startport': self.scan_fields['startport'][1].get(),
'endport': self.scan_fields['endport'][1].get(),
'parallel': self.scan_fields['parallel'][1].get(),
'maxthread': self.scan_fields['maxthread'][1].get()
}
threading.Thread(
target=handle_scan,
args=(params,),
daemon=True
).start()
self.log(f"开始{scan_type.upper()}扫描...")
# ------------------------- 主程序入口 -------------------------
def main():
if len(sys.argv) < 2:
print_usage()
return
command = sys.argv[1].lower()
params = parse_params(sys.argv[2:])
try:
commands = {
'listen': handle_listen,
'send': handle_send,
'scan': handle_scan,
'gui': lambda _: NetoolGUI().mainloop()
}
if command in commands:
commands[command](params)
else:
print(f"[-]未知命令: {command}")
print_usage()
except Exception as e:
print(f"[-]运行时错误: {e}")
if __name__ == "__main__":
main()