信令服务器搭建

信令服务器搭建需要一个具有公网IP的服务器(VPS),信令服务器的功能包括以下几点:

  1. 节点注册 (/register/{peer_id}): 允许一个 P2P 节点(peer)注册其 ID 和它打算用于 P2P 通信的端口。服务器会自动记录请求来源的公网 IP 地址。
  2. 节点查询 (/query/{peer_id}): 允许一个节点查询另一个已注册节点的信息(公网 IP 和 P2P 端口)。
  3. 列出所有节点 (/list_peers): 显示当前所有已注册的节点信息(主要用于调试)。
  4. 节点注销 (/unregister/{peer_id}): 允许节点从服务器注销。

    from fastapi import FastAPI, Request, HTTPException
    from pydantic import BaseModel # 用于数据验证和序列化
    import uvicorn # ASGI 服务器
    import time # 用于记录时间戳
    from typing import Dict, Any, Optional
    
    # 初始化 FastAPI 应用
    app = FastAPI(
     title="Simple P2P Signaling Server",
     description="A basic signaling server for P2P peer discovery.",
     version="0.1.0"
    )
    
    # 使用 Pydantic 定义请求体模型
    class PeerRegisterInfo(BaseModel):
     p2p_port: int # Peer 宣称其用于 P2P UDP 通信的本地端口
    
    # 使用 Pydantic 定义响应体模型
    class PeerDetailsResponse(BaseModel):
     peer_id: str
     public_ip: str  # 服务器看到的该 Peer 的公网 IP
     p2p_port: int   # Peer 注册的 P2P 端口
     registered_at: float # 注册时的时间戳
    
    # 简单的内存数据库来存储 Peer 信息
    # 格式: { "peer_id": {"public_ip": "x.x.x.x", "p2p_port": 12345, "registered_at": timestamp} }
    peers_db: Dict[str, Dict[str, Any]] = {}
    
    
    @app.get("/")
    async def read_root():
     """
     根路径,确认服务器正在运行。
     """
     return {"message": "P2P Signaling Server is running. Access /docs for API documentation."}
    
    
    @app.post("/register/{peer_id}", status_code=201, response_model=PeerDetailsResponse)
    async def register_peer(peer_id: str, peer_info: PeerRegisterInfo, request: Request):
     """
     注册或更新一个 Peer 的信息。
     - `peer_id`: Peer 的唯一标识符。
     - `peer_info`: 包含 `p2p_port`,即 Peer 用于 P2P 通信的端口。
     服务器会自动从请求中获取 Peer 的公网 IP (`request.client.host`)。
     """
     client_host = request.client.host  # 这是 FastAPI 服务器看到的客户端 IP 地址
                                      # 注意: 如果服务器在反向代理后,需要配置代理以传递正确的客户端IP
    
     current_time = time.time()
     peers_db[peer_id] = {
         "public_ip": client_host,
         "p2p_port": peer_info.p2p_port,
         "registered_at": current_time
     }
     print(f"[INFO] Peer registered/updated: {peer_id} -> IP: {client_host}, P2P Port: {peer_info.p2p_port}")
     
     return PeerDetailsResponse(
         peer_id=peer_id,
         public_ip=client_host,
         p2p_port=peer_info.p2p_port,
         registered_at=current_time
     )
    
    
    @app.get("/query/{peer_id}", response_model=PeerDetailsResponse)
    async def query_peer(peer_id: str):
     """
     查询指定 Peer ID 的注册信息。
     """
     if peer_id not in peers_db:
         print(f"[WARN] Peer query failed: {peer_id} not found.")
         raise HTTPException(status_code=404, detail=f"Peer '{peer_id}' not found")
     
     peer_data = peers_db[peer_id]
     print(f"[INFO] Peer query successful for: {peer_id} -> {peer_data}")
     return PeerDetailsResponse(
         peer_id=peer_id,
         public_ip=peer_data["public_ip"],
         p2p_port=peer_data["p2p_port"],
         registered_at=peer_data["registered_at"]
     )
    
    
    @app.get("/list_peers")
    async def list_all_peers():
     """
     列出所有当前已注册的 Peers 及其信息。
     主要用于调试。
     """
     if not peers_db:
         return {"message": "No peers currently registered."}
     return peers_db
    
    
    @app.delete("/unregister/{peer_id}", status_code=200)
    async def unregister_peer(peer_id: str):
     """
     从服务器注销一个 Peer。
     """
     if peer_id in peers_db:
         del peers_db[peer_id]
         print(f"[INFO] Peer unregistered: {peer_id}")
         return {"message": "Peer unregistered successfully", "peer_id": peer_id}
     else:
         print(f"[WARN] Attempt to unregister non-existent peer: {peer_id}")
         raise HTTPException(status_code=404, detail=f"Peer '{peer_id}' not found, cannot unregister")
    
    # --- 如何运行此服务器 ---
    # 1. 将此代码保存为 `signaling_server.py`。
    # 2. 安装必要的库:
    #    pip install fastapi "uvicorn[standard]" pydantic
    # 3. 在终端中运行服务器:
    #    uvicorn signaling_server:app --host 0.0.0.0 --port 8000 --reload
    #
    #    - `uvicorn signaling_server:app`: 告诉 Uvicorn 运行 `signaling_server.py` 文件中的 `app` FastAPI 实例。
    #    - `--host 0.0.0.0`: 使服务器监听所有网络接口,这样局域网内或公网(如果端口已转发)的其他机器可以访问。
    #    - `--port 8000`: 指定服务器监听的端口。
    #    - `--reload`: (可选,用于开发) 当代码文件发生更改时,服务器会自动重新加载。
    #
    # 你的 P2P 客户端程序随后可以向 `http://<服务器的IP地址>:8000` 发送 HTTP 请求。
    # 例如:
    #   - 注册 peer "alice" 在 P2P 端口 12345:
    #     POST http://<server_ip>:8000/register/alice  (Body: {"p2p_port": 12345})
    #   - 查询 peer "alice" 的信息:
    #     GET http://<server_ip>:8000/query/alice
    
    if __name__ == "__main__":
     # 这部分通常不会直接运行,因为 FastAPI 应用推荐使用 Uvicorn 从命令行启动。
     # 但为了完整性,这里可以添加一行来提示如何运行。
     print("To run this FastAPI application, use the command:")
     print("uvicorn signaling_server:app --host 0.0.0.0 --port 8000 --reload")
     # uvicorn.run(app, host="0.0.0.0", port=8000) # 也可以取消注释这行来直接运行,但不推荐用于生产

    如何使用信令服务器

    如何使用这个信令服务器 (P2P 客户端的视角):

Peer A 启动:
Peer A 决定一个用于 P2P UDP 通信的本地端口 (例如,随机选择一个或用户指定一个,如 55555)。
Peer A 向信令服务器发送一个 POST 请求到 /register/peerA,请求体为 {"p2p_port": 55555}。
信令服务器收到请求,记录下 Peer A 的公网 IP (例如 1.2.3.4,这是服务器看到的 Peer A 的 IP) 和 P2P 端口 55555。

Peer B 启动:
Peer B 执行类似操作,注册其 P2P 端口 (例如 66666) 到 /register/peerB。
信令服务器记录 Peer B 的公网 IP (例如 5.6.7.8) 和 P2P 端口 66666。

Peer A 想要连接 Peer B:
Peer A 向信令服务器发送一个 GET 请求到 /query/peerB。
服务器响应 Peer B 的信息:{"public_ip": "5.6.7.8", "p2p_port": 66666, ...}。

Peer B 想要连接 Peer A:
Peer B 向信令服务器发送一个 GET 请求到 /query/peerA。
服务器响应 Peer A 的信息:{"public_ip": "1.2.3.4", "p2p_port": 55555, ...}。

尝试打洞:
现在 Peer A 知道了 Peer B 的公网端点 (5.6.7.8:66666)。
Peer B 知道了 Peer A 的公网端点 (1.2.3.4:55555)。
双方现在可以尝试向对方的公网端点发送 UDP 包以进行 NAT 打洞。

重要注意事项:
极其简化: 这是一个非常基础的信令服务器。
内存存储: 所有 Peer 数据都存储在内存中,服务器重启后数据会丢失。生产环境需要使用数据库(如 Redis, PostgreSQL, MongoDB 等)。
无身份验证/授权: 任何人都可以注册或查询任何 Peer ID。生产环境需要安全机制。
无错误恢复/重试: 客户端需要自己处理网络请求的失败和重试。
无 Peer 超时: 过期的 Peer 信息会一直保留,除非手动注销。可以添加时间戳并定期清理不活跃的 Peer。
公网 IP 地址 (request.client.host):
request.client.host 获取的是直接连接到 FastAPI 服务器的客户端的 IP 地址。
如果你的 FastAPI 服务器直接暴露在公网,这通常就是 Peer 的公网 IP。
如果 FastAPI 服务器部署在反向代理(如 Nginx、Traefik)之后,你需要确保反向代理正确设置了 X-Forwarded-For 或类似的头部,并且 FastAPI (或其 ASGI 服务器 Uvicorn) 被配置为信任这些头部来获取真实的客户端 IP。否则,request.client.host 可能会是反向代理的 IP 地址。
不处理 NAT 类型: 这个信令服务器不扮演 STUN 服务器的角色,它不帮助客户端检测其 NAT 类型。它只是记录和交换客户端报告的 P2P 端口以及服务器观察到的客户端公网 IP。
HTTP 通信: 信令是通过 HTTP 进行的。P2P 客户端需要 HTTP 请求库(如 Python 的 requests 或 httpx)来与此服务器交互。

客户端服务

客户端需要实现以下基本功能:

  1. 向信令服务器注册自己的 peer_id 和一个宣称的 P2P 通信端口。
  2. 向信令服务器查询其他 peer_id 的网络信息。
  3. 列出所有已注册的节点。
  4. 从信令服务器注销自己。
import requests
import json
import random
import socket
import threading
import time
import queue
import os # 用于文件操作
import math # 用于计算块数

# --- 配置和全局变量 (与之前类似,增加文件传输相关) ---
SIGNALING_SERVER_URL = "http://信令服务器IP地址:8000"
current_peer_id = None
current_p2p_port = None

p2p_udp_socket = None
p2p_target_peer_id = None
p2p_target_reported_addr = None
p2p_target_actual_addr = None # 打洞成功后对方的 (ip, port)
p2p_listener_thread = None
p2p_stop_listener = threading.Event()
received_messages_q = queue.Queue() # 用于文本消息和控制消息

# 文件传输相关状态
CHUNK_SIZE = 8192  # bytes
FILE_TRANSFER_TIMEOUT = 5 # 秒,等待ACK的超时
FILE_TRANSFER_RETRIES = 3 # 重传次数

# 使用字典来管理不同类型事件的Event对象,用于线程同步
ack_events = {} # key: chunk_sq_num, value: threading.Event()
file_offer_response_event = threading.Event() # 用于等待对方接受/拒绝文件
file_offer_accepted = False # 标记对方是否接受文件
file_transfer_active = False # 标记当前是否有文件传输在进行
file_transfer_send_thread = None # 文件发送线程

# --- 信令服务器交互函数 (register_peer, query_peer, list_all_peers, unregister_peer) ---
# (请从您之前的 p2p_client.py 中复制这些函数过来,确保 unregister_peer 会调用 stop_current_p2p_activity)
# <在此处粘贴之前的信令函数>
# --- 信令服务器交互函数 (与之前类似,这里省略,假设它们已存在) ---
# register_peer, query_peer, list_all_peers, unregister_peer
# (请从您之前的 p2p_client.py 中复制这些函数过来)

# --- 新增:信令服务器交互函数 (确保它们在这里) ---
def register_peer(peer_id: str, p2p_port: int):
    global current_peer_id, current_p2p_port
    url = f"{SIGNALING_SERVER_URL}/register/{peer_id}"
    payload = {"p2p_port": p2p_port}
    try:
        response = requests.post(url, json=payload)
        response.raise_for_status()
        data = response.json()
        print(f"[注册成功] 服务器响应: {data}")
        current_peer_id = peer_id
        current_p2p_port = p2p_port # 保存我们声明的P2P端口
        return data
    except requests.exceptions.RequestException as e:
        print(f"[注册失败] 错误: {e}")
        if hasattr(e, 'response') and e.response is not None:
            try:
                print(f"服务器错误详情: {e.response.json()}")
            except json.JSONDecodeError:
                print(f"服务器错误详情 (非JSON): {e.response.text}")
        return None

def query_peer(peer_id_to_query: str):
    if not peer_id_to_query:
        print("错误: 需要提供要查询的 peer_id。")
        return None
    url = f"{SIGNALING_SERVER_URL}/query/{peer_id_to_query}"
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        # 我们不在这里打印,让调用者决定如何使用
        # print(f"[查询成功] Peer '{peer_id_to_query}' 的信息: {data}")
        return data
    except requests.exceptions.RequestException as e:
        print(f"[查询失败] 错误: {e}")
        if hasattr(e, 'response') and e.response is not None:
            try:
                print(f"服务器错误详情: {e.response.json()}")
            except json.JSONDecodeError:
                print(f"服务器错误详情 (非JSON): {e.response.text}")
        return None

def list_all_peers():
    url = f"{SIGNALING_SERVER_URL}/list_peers"
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        print("[列出所有 Peers] 服务器响应:")
        if isinstance(data, dict) and data.get("message") == "No peers currently registered.":
             print("  目前没有节点注册。")
        elif isinstance(data, dict):
            for peer_id, info in data.items():
                print(f"  - {peer_id}: {info}")
        else:
            print(data)
        return data
    except requests.exceptions.RequestException as e:
        print(f"[列出 Peers 失败] 错误: {e}")
        return None

def unregister_peer(peer_id_to_unregister: str):
    global current_peer_id, current_p2p_port
    if not peer_id_to_unregister:
        print("错误: 需要提供要注销的 peer_id。")
        return None
    
    stop_current_p2p_activity() # 注销前停止所有P2P活动

    url = f"{SIGNALING_SERVER_URL}/unregister/{peer_id_to_unregister}"
    try:
        response = requests.delete(url)
        response.raise_for_status()
        data = response.json()
        print(f"[注销成功] 服务器响应: {data}")
        if peer_id_to_unregister == current_peer_id:
            current_peer_id = None
            current_p2p_port = None
        return data
    except requests.exceptions.RequestException as e:
        print(f"[注销失败] 错误: {e}")
        if hasattr(e, 'response') and e.response is not None:
            try:
                print(f"服务器错误详情: {e.response.json()}")
            except json.JSONDecodeError:
                print(f"服务器错误详情 (非JSON): {e.response.text}")
        return None
# --- END 信令函数 ---


def initialize_p2p_socket(local_port: int):
    global p2p_udp_socket
    if p2p_udp_socket:
        try:
            p2p_udp_socket.close()
        except: pass # 忽略关闭时的错误
    
    p2p_udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        p2p_udp_socket.bind(('0.0.0.0', local_port))
        p2p_udp_socket.settimeout(1.0) # 短超时用于打洞和ACK等待
        print(f"[P2P] UDP Socket 已在 0.0.0.0:{local_port} 上初始化并绑定。")
        return True
    except OSError as e:
        print(f"[P2P] 错误: 绑定 UDP Socket 到端口 {local_port} 失败: {e}")
        p2p_udp_socket = None
        return False

def send_p2p_control_message(sock, target_addr, message_type, sender_id, payload_dict=None):
    """发送P2P控制消息 (JSON格式)"""
    if not sock: return
    try:
        message = {"type": message_type, "sender_id": sender_id}
        if payload_dict:
            message.update(payload_dict)
        sock.sendto(json.dumps(message).encode('utf-8'), target_addr)
        # print(f"DEBUG: Sent control msg {message_type} to {target_addr}")
    except Exception as e:
        print(f"[P2P] 发送控制消息 {message_type} 失败: {e}")

def send_p2p_data_chunk(sock, target_addr, sender_id, sq_num, data_bytes):
    """发送P2P文件数据块 (头部 + 原始数据)"""
    if not sock: return
    try:
        # 头部格式: "DATA:<sender_id>:<sq_num>:" 注意末尾的冒号作为分隔符
        header = f"DATA:{sender_id}:{sq_num}:".encode('utf-8')
        packet = header + data_bytes
        sock.sendto(packet, target_addr)
        # print(f"DEBUG: Sent DATA chunk {sq_num} ({len(data_bytes)}B) to {target_addr}")
    except Exception as e:
        print(f"[P2P] 发送数据块 {sq_num} 失败: {e}")


def udp_listener():
    global p2p_udp_socket, p2p_stop_listener, received_messages_q, ack_events
    global file_offer_response_event, file_offer_accepted, file_transfer_active
    global p2p_target_actual_addr # 监听线程也可能在打洞阶段更新这个

    print("[P2P Listener] 监听线程已启动。")
    if not p2p_udp_socket:
        print("[P2P Listener] 错误: Socket未初始化。")
        return

    active_file_receive_info = {} # 用于存储正在接收的文件的信息

    while not p2p_stop_listener.is_set():
        try:
            # raw_data, addr = p2p_udp_socket.recvfrom(CHUNK_SIZE + 256) # 缓冲区稍大于块大小以容纳头部
            raw_data, addr = p2p_udp_socket.recvfrom(65536) # 确保能接收大的块
            
            # 尝试解析为JSON (控制消息)
            try:
                message_str = raw_data.decode('utf-8')
                msg_data = json.loads(message_str)
                msg_type = msg_data.get("type")
                sender_id = msg_data.get("sender_id")

                # print(f"DEBUG Listener: Received JSON msg: {msg_data} from {addr}")

                if msg_type == "PUNCH_SYN" or msg_type == "PUNCH_ACK_RESPONSE": # 来自打洞过程
                     if not p2p_target_actual_addr or p2p_target_actual_addr[0] != addr[0]:
                        print(f"[P2P Listener] 收到来自 {addr} 的打洞相关消息: {msg_type}")
                        p2p_target_actual_addr = addr # 更新对方的实际地址
                        # 如果是SYN,回复ACK_RESPONSE
                        if msg_type == "PUNCH_SYN":
                            send_p2p_control_message(p2p_udp_socket, addr, "PUNCH_ACK_RESPONSE", current_peer_id)
                     # 将打洞消息也放入队列,让主打洞逻辑感知
                     received_messages_q.put((msg_data, addr))


                elif msg_type == "MSG": # 文本消息
                    received_messages_q.put((msg_data, addr))
                
                elif msg_type == "FILE_OFFER":
                    if file_transfer_active:
                        print(f"[P2P Listener] 收到来自 {sender_id} 的文件提议,但当前正忙于另一传输,已拒绝。")
                        send_p2p_control_message(p2p_udp_socket, addr, "FILE_REJECT", current_peer_id, 
                                                 {"filename": msg_data.get("filename"), "reason": "Receiver busy"})
                        continue
                    
                    print(f"\n[P2P Listener] 收到来自 {sender_id} ({addr}) 的文件提议: ")
                    print(f"  文件名: {msg_data.get('filename')}")
                    print(f"  大小: {msg_data.get('filesize')} bytes")
                    print(f"  块大小: {msg_data.get('chunk_size')}")
                    print(f"  总块数: {msg_data.get('total_chunks')}")
                    
                    user_choice = input("接受此文件吗? (y/N): ").strip().lower()
                    if user_choice == 'y':
                        file_transfer_active = True
                        active_file_receive_info = {
                            "filename": msg_data.get('filename'),
                            "filesize": msg_data.get('filesize'),
                            "chunk_size": msg_data.get('chunk_size'),
                            "total_chunks": msg_data.get('total_chunks'),
                            "sender_id": sender_id,
                            "sender_addr": addr,
                            "output_file": None,
                            "received_chunks": set(),
                            "next_expected_chunk": 0
                        }
                        # 创建接收文件夹
                        receive_dir = "received_files_p2p"
                        if not os.path.exists(receive_dir):
                            os.makedirs(receive_dir)
                        
                        # 构建保存路径,避免路径遍历漏洞 (简单替换)
                        safe_filename = os.path.basename(active_file_receive_info["filename"])
                        filepath = os.path.join(receive_dir, safe_filename)
                        
                        try:
                            active_file_receive_info["output_file"] = open(filepath, "wb")
                            print(f"[P2P Listener] 接受文件提议,将保存到 {filepath}")
                            send_p2p_control_message(p2p_udp_socket, addr, "FILE_ACCEPT", current_peer_id, 
                                                     {"filename": active_file_receive_info["filename"]})
                        except IOError as e:
                            print(f"[P2P Listener] 错误: 无法打开文件 {filepath} 进行写入: {e}")
                            send_p2p_control_message(p2p_udp_socket, addr, "FILE_REJECT", current_peer_id, 
                                                     {"filename": active_file_receive_info["filename"], "reason": "Cannot write file"})
                            file_transfer_active = False
                            active_file_receive_info = {}

                    else:
                        print(f"[P2P Listener] 拒绝文件提议: {msg_data.get('filename')}")
                        send_p2p_control_message(p2p_udp_socket, addr, "FILE_REJECT", current_peer_id, 
                                                 {"filename": msg_data.get('filename')})

                elif msg_type == "FILE_ACCEPT":
                    if sender_id == p2p_target_peer_id: # 确保是目标peer的响应
                        print(f"[P2P Listener] {sender_id} 接受了文件 '{msg_data.get('filename')}'。")
                        file_offer_accepted = True
                        file_offer_response_event.set() # 通知发送方可以开始传输

                elif msg_type == "FILE_REJECT":
                    if sender_id == p2p_target_peer_id:
                        print(f"[P2P Listener] {sender_id} 拒绝了文件 '{msg_data.get('filename')}'. 原因: {msg_data.get('reason')}")
                        file_offer_accepted = False
                        file_offer_response_event.set()

                elif msg_type == "ACK": # ACK:<receiver_id>:<chunk_sq_num>
                    sq_num = msg_data.get("sq_num")
                    if sq_num is not None and sender_id == p2p_target_peer_id:
                        # print(f"DEBUG Listener: Received ACK for chunk {sq_num} from {sender_id}")
                        if sq_num in ack_events:
                            ack_events[sq_num].set() # 通知发送线程对应的ACK已收到
                        else:
                            # print(f"DEBUG Listener: Received unexpected ACK for chunk {sq_num} or for a non-active transfer.")
                            pass
                
                elif msg_type == "FILE_END":
                    if file_transfer_active and active_file_receive_info and \
                       sender_id == active_file_receive_info["sender_id"] and \
                       msg_data.get("filename") == active_file_receive_info["filename"]:
                        
                        print(f"[P2P Listener] {sender_id} 完成发送文件 '{active_file_receive_info['filename']}'.")
                        if active_file_receive_info["output_file"]:
                            active_file_receive_info["output_file"].close()
                        
                        # 简单校验:块数是否匹配
                        if len(active_file_receive_info["received_chunks"]) == active_file_receive_info["total_chunks"]:
                            print(f"[P2P Listener] 文件 '{active_file_receive_info['filename']}' 接收完毕且块数匹配。")
                            send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"], 
                                                     "FILE_VERIFY", current_peer_id, 
                                                     {"filename": active_file_receive_info["filename"], "status": "OK"})
                        else:
                            print(f"[P2P Listener] 警告: 文件 '{active_file_receive_info['filename']}' 接收完毕但块数不匹配! "
                                  f"收到 {len(active_file_receive_info['received_chunks'])}/{active_file_receive_info['total_chunks']}")
                            send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"], 
                                                     "FILE_VERIFY", current_peer_id, 
                                                     {"filename": active_file_receive_info["filename"], "status": "ERROR_CHUNK_MISMATCH"})
                        
                        file_transfer_active = False
                        active_file_receive_info = {}
                
                elif msg_type == "FILE_VERIFY":
                    if file_transfer_active and sender_id == p2p_target_peer_id: # 确保是发送方在等待这个
                        print(f"[P2P Listener] 收到来自 {sender_id} 的文件接收确认: '{msg_data.get('filename')}' 状态: {msg_data.get('status')}")
                        # 这里可以设置一个Event来通知发送线程对方已完全接收并校验
                        # 例如: file_final_verify_event.set()
                        # For now, just print.
                        file_transfer_active = False # 发送方也标记传输结束

                # 其他控制消息...

            except (json.JSONDecodeError, UnicodeDecodeError): # 不是JSON,尝试解析为数据块
                header_part = raw_data.split(b':', 3) # DATA:sender:sq_num:actual_data
                if len(header_part) == 4 and header_part[0] == b'DATA':
                    try:
                        data_sender_id = header_part[1].decode('utf-8')
                        data_sq_num = int(header_part[2].decode('utf-8'))
                        actual_data = header_part[3]

                        # print(f"DEBUG Listener: Received DATA chunk {data_sq_num} from {data_sender_id} ({len(actual_data)}B)")

                        if file_transfer_active and active_file_receive_info and \
                           data_sender_id == active_file_receive_info["sender_id"] and \
                           addr[0] == active_file_receive_info["sender_addr"][0]: # 确保来自正确的发送者IP
                            
                            # 简单停止等待:只接受期望的下一个块
                            if data_sq_num == active_file_receive_info["next_expected_chunk"]:
                                if active_file_receive_info["output_file"]:
                                    active_file_receive_info["output_file"].write(actual_data)
                                active_file_receive_info["received_chunks"].add(data_sq_num)
                                active_file_receive_info["next_expected_chunk"] += 1
                                
                                # print(f"  接收并写入块 {data_sq_num}。发送ACK。")
                                send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"], "ACK", 
                                                         current_peer_id, {"sq_num": data_sq_num})
                                
                                # 打印进度 (简单版)
                                progress = (len(active_file_receive_info["received_chunks"]) / active_file_receive_info["total_chunks"]) * 100
                                print(f"\r  接收进度: {progress:.2f}% ({len(active_file_receive_info['received_chunks'])}/{active_file_receive_info['total_chunks']})", end="")


                            # 如果是重复的已接收块 (对方可能没收到我们的ACK),重发ACK
                            elif data_sq_num < active_file_receive_info["next_expected_chunk"] and \
                                 data_sq_num in active_file_receive_info["received_chunks"]:
                                # print(f"  收到重复块 {data_sq_num}。重发ACK。")
                                send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"], "ACK", 
                                                         current_peer_id, {"sq_num": data_sq_num})
                            # else: (忽略乱序的块,在停止等待中)
                            #    print(f"  收到乱序块 {data_sq_num} (期望 {active_file_receive_info['next_expected_chunk']})。已忽略。")


                        # else: (不是当前活动传输的数据块,忽略)
                        #    print(f"  收到来自 {data_sender_id} 的数据块,但当前无匹配的接收任务。")

                    except Exception as e_parse:
                        print(f"[P2P Listener] 解析DATA块头部失败: {e_parse}, raw_header: {header_part[:3]}")
                # else: (无法识别的原始数据包,忽略)
                #    print(f"[P2P Listener] 收到来自 {addr} 的无法解析的原始数据: {raw_data[:50]}...")


        except socket.timeout:
            continue # 超时是正常的,继续检查 stop_event
        except OSError as e:
            if not p2p_stop_listener.is_set():
                 print(f"[P2P Listener] Socket错误: {e}")
            break
        except Exception as e:
            if not p2p_stop_listener.is_set():
                print(f"[P2P Listener] 未知错误: {e}")
            break
    print("[P2P Listener] 监听线程已停止。")


def attempt_udp_hole_punch(target_id: str, target_ip: str, target_port: int):
    global p2p_udp_socket, p2p_target_actual_addr, p2p_target_peer_id, p2p_target_reported_addr
    
    if not p2p_udp_socket:
        print("[P2P] 错误: 本地UDP Socket未初始化。")
        return False

    print(f"[P2P] 开始尝试与 {target_id} ({target_ip}:{target_port}) 进行UDP打洞...")
    p2p_target_peer_id = target_id
    p2p_target_reported_addr = (target_ip, target_port)
    p2p_target_actual_addr = None # 重置

    # 发送几次打洞包
    for i in range(3): #减少次数,依赖监听线程收到的PUNCH_SYN和回复的ACK_RESPONSE
        print(f"  发送打洞包 (PUNCH_SYN) 第 {i+1} 次到 {target_ip}:{target_port}")
        send_p2p_control_message(p2p_udp_socket, (target_ip, target_port), "PUNCH_SYN", current_peer_id)
        time.sleep(0.5) 

    print(f"[P2P] 等待来自 {target_id} 的回应 (最多5秒)...")
    start_time = time.time()
    responded_correctly = False
    while time.time() - start_time < 5: 
        try:
            msg_data, addr = received_messages_q.get_nowait() # 从监听线程队列获取
            msg_type = msg_data.get("type")
            sender_id = msg_data.get("sender_id")

            if addr[0] == target_ip and sender_id == target_id: # 确保是目标peer的IP和ID
                if msg_type == "PUNCH_SYN" or msg_type == "PUNCH_ACK_RESPONSE":
                    p2p_target_actual_addr = addr # 核心:使用对方实际发包的地址
                    print(f"[P2P] 收到来自 {sender_id} ({addr}) 的 {msg_type}。打洞可能成功!")
                    print(f"      对方实际地址已更新为: {p2p_target_actual_addr}")
                    if msg_type == "PUNCH_SYN": # 如果对方也发了SYN,我们回复ACK_RESPONSE
                         send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "PUNCH_ACK_RESPONSE", current_peer_id)
                    responded_correctly = True
                    break
        except queue.Empty:
            time.sleep(0.2)
            continue
        except Exception as e:
            print(f"  处理打洞回应时出错: {e}")
            time.sleep(0.2)
            continue
            
    if responded_correctly and p2p_target_actual_addr:
        print(f"[P2P] 与 {target_id} 的UDP打洞过程完成。")
        return True
    else:
        print(f"[P2P] 未能从 {target_id} 收到有效回应,打洞可能失败。")
        p2p_target_actual_addr = None
        return False

def _send_file_in_thread(filepath, filesize, chunk_size, total_chunks, target_addr, target_id_for_ack):
    global p2p_udp_socket, ack_events, file_transfer_active
    print(f"[文件发送线程] 开始发送 '{os.path.basename(filepath)}' ({total_chunks} 块)...")
    
    try:
        with open(filepath, "rb") as f:
            for i in range(total_chunks):
                if p2p_stop_listener.is_set(): # 检查是否被外部停止
                    print("[文件发送线程] 发送被中止。")
                    break

                chunk_data = f.read(chunk_size)
                if not chunk_data: # 文件提前结束?不太可能如果filesize正确
                    print(f"[文件发送线程] 错误: 读取文件块 {i} 时数据为空。")
                    break
                
                ack_events[i] = threading.Event() # 为当前块创建ACK事件
                
                retries_left = FILE_TRANSFER_RETRIES
                chunk_sent_successfully = False
                while retries_left > 0:
                    if p2p_stop_listener.is_set(): break
                    
                    # print(f"  发送块 {i}/{total_chunks-1} (尝试 {FILE_TRANSFER_RETRIES - retries_left + 1})...")
                    send_p2p_data_chunk(p2p_udp_socket, target_addr, current_peer_id, i, chunk_data)
                    
                    # 等待ACK
                    ack_received = ack_events[i].wait(timeout=FILE_TRANSFER_TIMEOUT)
                    
                    if ack_received:
                        # print(f"  收到块 {i} 的ACK。")
                        chunk_sent_successfully = True
                        progress = ((i + 1) / total_chunks) * 100
                        print(f"\r  发送进度: {progress:.2f}% ({i+1}/{total_chunks})", end="")
                        break 
                    else:
                        print(f"\n  块 {i} ACK超时。剩余尝试 {retries_left - 1} 次。")
                        retries_left -= 1
                
                del ack_events[i] # 清理事件

                if not chunk_sent_successfully:
                    print(f"\n[文件发送线程] 块 {i} 发送失败,已达最大重试次数。中止传输。")
                    file_transfer_active = False # 标记传输失败
                    return # 直接从线程退出

            if chunk_sent_successfully: # 只有所有块都成功才发送FILE_END
                print(f"\n[文件发送线程] 所有 {total_chunks} 块已发送并确认。发送FILE_END。")
                send_p2p_control_message(p2p_udp_socket, target_addr, "FILE_END", current_peer_id, 
                                         {"filename": os.path.basename(filepath)})
                # 此处可以等待对方的 FILE_VERIFY 消息,但为简化,我们只打印
            
    except IOError as e:
        print(f"[文件发送线程] 文件读写错误: {e}")
    except Exception as e:
        print(f"[文件发送线程] 未知错误: {e}")
    finally:
        print(f"[文件发送线程] '{os.path.basename(filepath)}' 发送处理完毕。")
        # file_transfer_active = False # 应该在收到FILE_VERIFY或超时后设置

def initiate_file_transfer():
    global p2p_target_actual_addr, p2p_target_peer_id, p2p_udp_socket
    global file_offer_response_event, file_offer_accepted, file_transfer_active, file_transfer_send_thread

    if not p2p_target_actual_addr:
        print("错误: 未建立P2P连接,或对方实际地址未知。请先进行打洞。")
        return
    if file_transfer_active:
        print("错误: 当前已有文件传输正在进行中。")
        return

    filepath = input("请输入要发送的完整文件路径: ").strip()
    if not os.path.exists(filepath) or not os.path.isfile(filepath):
        print(f"错误: 文件 '{filepath}' 不存在或不是一个文件。")
        return

    try:
        filesize = os.path.getsize(filepath)
        filename = os.path.basename(filepath)
        total_chunks = math.ceil(filesize / CHUNK_SIZE)
    except Exception as e:
        print(f"获取文件信息失败: {e}")
        return

    print(f"准备发送文件: '{filename}', 大小: {filesize} bytes, 分为 {total_chunks} 块 (每块 {CHUNK_SIZE} B)。")

    # 重置事件和状态
    file_offer_response_event.clear()
    file_offer_accepted = False
    
    # 发送FILE_OFFER
    send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "FILE_OFFER", current_peer_id,
                             {"filename": filename, "filesize": filesize, 
                              "chunk_size": CHUNK_SIZE, "total_chunks": total_chunks})
    
    print(f"已向 {p2p_target_peer_id} 发送文件提议,等待响应 (最多15秒)...")
    
    # 等待对方通过监听线程设置 file_offer_response_event
    offer_responded = file_offer_response_event.wait(timeout=15.0)

    if not offer_responded:
        print("对方未在规定时间内响应提议。")
        return
    
    if file_offer_accepted:
        print(f"{p2p_target_peer_id} 已接受文件。开始传输...")
        file_transfer_active = True # 标记传输开始
        # 创建并启动文件发送线程
        file_transfer_send_thread = threading.Thread(target=_send_file_in_thread, 
                                                     args=(filepath, filesize, CHUNK_SIZE, total_chunks, 
                                                           p2p_target_actual_addr, p2p_target_peer_id),
                                                     daemon=True)
        file_transfer_send_thread.start()
        # 主线程返回菜单,发送在后台进行
        print("文件正在后台发送。您可以在主菜单进行其他操作,或等待发送完成提示。")
    else:
        print(f"{p2p_target_peer_id} 拒绝了文件提议或发生错误。")


def start_p2p_chat_and_file_menu():
    global p2p_target_peer_id, p2p_target_reported_addr, p2p_target_actual_addr
    global p2p_listener_thread, p2p_stop_listener, received_messages_q, file_transfer_active

    if not current_peer_id or not current_p2p_port:
        print("请先注册您的 Peer ID 和 P2P 端口。")
        return

    # 如果当前已有P2P会话,先询问是否结束
    if p2p_target_peer_id:
        print(f"当前已连接到 {p2p_target_peer_id} ({p2p_target_actual_addr}).")
        choice = input("是否要结束当前P2P会话并尝试连接新的Peer? (y/N): ").strip().lower()
        if choice == 'y':
            stop_current_p2p_activity()
        else: # 进入当前会话的菜单
            pass # 下面的逻辑会处理

    # 如果没有活动P2P目标,或者用户选择连接新的,则进行查询和打洞
    if not p2p_target_actual_addr: # p2p_target_actual_addr 是打洞成功的标志
        peer_to_connect = input("请输入您想连接的 Peer ID: ").strip()
        if not peer_to_connect: return
        if peer_to_connect == current_peer_id:
            print("不能连接自己。"); return

        print(f"正在查询 {peer_to_connect} 的信息...")
        peer_info = query_peer(peer_to_connect)
        if not peer_info:
            print(f"未能获取 {peer_to_connect} 的信息。"); return
        
        print(f"获取到 {peer_to_connect} 的信息: IP={peer_info['public_ip']}, P2P Port={peer_info['p2p_port']}")
        
        stop_current_p2p_activity() # 确保旧的已停止

        p2p_stop_listener = threading.Event() # 重置
        received_messages_q = queue.Queue() # 重置

        if not initialize_p2p_socket(current_p2p_port): return

        p2p_listener_thread = threading.Thread(target=udp_listener, daemon=True)
        p2p_listener_thread.start()
        time.sleep(0.1)

        if not attempt_udp_hole_punch(peer_to_connect, peer_info['public_ip'], peer_info['p2p_port']):
            print(f"无法与 {peer_to_connect} 建立P2P连接。")
            stop_current_p2p_activity()
            return
    
    # --- P2P 会话菜单 ---
    print(f"\n--- P2P 会话已连接到 {p2p_target_peer_id} ({p2p_target_actual_addr}) ---")
    while p2p_target_actual_addr and not p2p_stop_listener.is_set(): # 只要连接有效
        print("\nP2P 操作:")
        print("1. 发送文本消息")
        print("2. 发送文件")
        print("3. 返回主菜单 (结束当前P2P会话)")
        
        # 检查接收到的消息 (只显示文本消息,文件消息由listener处理)
        try:
            msg_data, addr = received_messages_q.get_nowait()
            if p2p_target_actual_addr and addr[0] == p2p_target_actual_addr[0]:
                msg_type = msg_data.get("type")
                sender_id = msg_data.get("sender_id")
                if msg_type == "MSG":
                    text_msg = msg_data.get("text")
                    print(f"\r[{time.strftime('%H:%M:%S')}] {sender_id}: {text_msg}\n> ", end="")
                # 其他类型的消息(如打洞消息)已在listener或打洞逻辑中处理,或在此忽略
            else: # 来自未知源
                 print(f"\r收到来自未知源 {addr} 的P2P消息: {msg_data}\n> ", end="")
        except queue.Empty:
            pass
        except Exception as e:
            print(f"处理消息队列时出错: {e}")


        try:
            p2p_choice = input("> ").strip()
            if p2p_choice == '1':
                message_text = input("输入消息: ").strip()
                if message_text:
                    send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG", 
                                             current_peer_id, {"text": message_text})
                    print(f"[{time.strftime('%H:%M:%S')}] 我: {message_text}")
            elif p2p_choice == '2':
                initiate_file_transfer()
            elif p2p_choice == '3':
                print("结束当前P2P会话...")
                send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG",
                                         current_peer_id, {"text": "/quit_chat"}) # 通知对方
                stop_current_p2p_activity()
                break # 退出P2P会话循环
            else:
                if p2p_choice: # 如果输入了东西但不是有效选项
                    print("无效的P2P操作选项。")
        
        except KeyboardInterrupt:
             print("\n检测到 Ctrl+C,结束P2P会话...")
             if p2p_target_actual_addr:
                send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG",
                                         current_peer_id, {"text": "/quit_chat_ctrl_c"})
             stop_current_p2p_activity()
             break
        except EOFError:
             print("\n输入流结束,结束P2P会话...")
             if p2p_target_actual_addr:
                send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG",
                                         current_peer_id, {"text": "/quit_chat_eof"})
             stop_current_p2p_activity()
             break
        
        # 短暂休眠,让监听线程有机会处理消息并打印
        # 避免input()完全阻塞消息的显示 (虽然不是完美的解决方案)
        time.sleep(0.1)


def stop_current_p2p_activity():
    global p2p_listener_thread, p2p_stop_listener, p2p_udp_socket
    global p2p_target_peer_id, p2p_target_actual_addr, p2p_target_reported_addr
    global file_transfer_active, file_transfer_send_thread, ack_events

    print("[系统] 正在停止所有P2P活动...")
    file_transfer_active = False # 停止任何进行中的文件传输标记

    if p2p_stop_listener:
        p2p_stop_listener.set() # 发送停止信号

    if file_transfer_send_thread and file_transfer_send_thread.is_alive():
        print("  等待文件发送线程结束...")
        file_transfer_send_thread.join(timeout=2.0)
        if file_transfer_send_thread.is_alive():
            print("  警告: 文件发送线程未能正常停止。")
    file_transfer_send_thread = None
    ack_events.clear() # 清空ACK事件

    if p2p_listener_thread and p2p_listener_thread.is_alive():
        print("  等待监听线程结束...")
        p2p_listener_thread.join(timeout=2.0) 
        if p2p_listener_thread.is_alive():
            print("  警告: 监听线程未能正常停止。")
    p2p_listener_thread = None
    
    if p2p_udp_socket:
        print("  关闭UDP Socket。")
        try:
            p2p_udp_socket.close()
        except Exception as e_close:
            print(f"  关闭socket时出错: {e_close}")
        p2p_udp_socket = None
    
    # 清理队列中的剩余消息 (可选)
    while not received_messages_q.empty():
        try:
            received_messages_q.get_nowait()
        except queue.Empty:
            break
    
    print("[系统] P2P活动已清理。")
    p2p_target_peer_id = None
    p2p_target_actual_addr = None
    p2p_target_reported_addr = None

def main_menu():
    # ... (主菜单逻辑与上一个版本类似,但选项3会调用 start_p2p_chat_and_file_menu) ...
    # 确保注销和退出时调用 stop_current_p2p_activity()
    global current_peer_id, current_p2p_port

    if not current_peer_id:
        # ... (注册逻辑,与之前相同) ...
        print("\n--- P2P 客户端 (未注册) ---")
        my_id = input("请输入你的 Peer ID (例如: alice, bob): ").strip()
        if not my_id:
            print("Peer ID 不能为空。")
            return True 

        try:
            my_port_str = input(f"请输入你用于P2P的UDP端口 (50000-65000, 回车随机): ").strip()
            if not my_port_str:
                my_port = random.randint(50000, 65000)
                print(f"已随机生成P2P端口: {my_port}")
            else:
                my_port = int(my_port_str)
                if not (1024 < my_port < 65536):
                    print("端口号不合法,将使用随机端口。")
                    my_port = random.randint(50000, 65000)
            print(f"将使用P2P端口: {my_port}")
        except ValueError:
            print("无效的端口输入,将使用随机端口。")
            my_port = random.randint(50000, 65000)
            print(f"已随机生成P2P端口: {my_port}")

        if not register_peer(my_id, my_port):
            return True 
    
    print(f"\n--- P2P 客户端 (ID: {current_peer_id}, P2P端口: {current_p2p_port}) ---")
    print("选择操作:")
    print("1. [信令] 查询其他 Peer 的信息")
    print("2. [信令] 列出所有已注册的 Peers")
    print("3. [P2P]  开始/管理 P2P 会话 (聊天/文件)")
    print("4. [信令] 注销我的 Peer ID")
    print("5. [信令] 重新注册 (使用新的ID或端口)")
    print("6. 退出")

    choice = input("请输入选项 (1-6): ").strip()

    if choice == '1':
        peer_to_query = input("请输入要查询的 Peer ID: ").strip()
        info = query_peer(peer_to_query)
        if info:
             print(f"Peer '{peer_to_query}' 的信息: {info}")
    elif choice == '2':
        list_all_peers()
    elif choice == '3':
        start_p2p_chat_and_file_menu() # 进入P2P会话管理
    elif choice == '4':
        if current_peer_id:
            confirm = input(f"确定要注销 Peer ID '{current_peer_id}'吗? (y/N): ").strip().lower()
            if confirm == 'y':
                unregister_peer(current_peer_id) 
        else:
            print("你还没有注册任何 Peer ID。")
    elif choice == '5':
        if current_peer_id:
             print(f"你当前注册为 '{current_peer_id}'。重新注册会使用新的ID和端口。")
             unregister_peer(current_peer_id) 
        current_peer_id = None 
        current_p2p_port = None
    elif choice == '6':
        print("正在退出...")
        if current_peer_id:
            unregister_peer(current_peer_id)
        return False 
    else:
        print("无效选项,请重试。")
    return True

if __name__ == "__main__":
    print("欢迎使用 P2P 客户端 Demo (包含P2P文本聊天和文件传输)!")
    # ... (与之前相同的服务器连接检查) ...
    print(f"将连接到信令服务器: {SIGNALING_SERVER_URL}")
    print("请确保信令服务器正在运行。")
    print("------------------------------------")

    try:
        ping_response = requests.get(SIGNALING_SERVER_URL, timeout=3)
        if ping_response.status_code == 200:
            print("成功连接到信令服务器。")
        else:
            print(f"警告: 信令服务器响应状态码 {ping_response.status_code}。")
    except requests.exceptions.ConnectionError:
        print(f"错误: 无法连接到信令服务器 {SIGNALING_SERVER_URL}。请检查服务器。")
        exit()
    except requests.exceptions.Timeout:
        print(f"错误: 连接信令服务器超时。")
        exit()

    try:
        while True:
            if not main_menu():
                break
    except KeyboardInterrupt:
        print("\n检测到 Ctrl+C,正在退出程序...")
    finally:
        stop_current_p2p_activity() # 确保所有P2P资源被清理
        print("客户端已退出。")

其中SIGNALING_SERVER_URL全局变量需要修改为自己的信令服务器地址,端口默认为8000。