信令服务器搭建
信令服务器搭建需要一个具有公网IP的服务器(VPS),信令服务器的功能包括以下几点:
- 节点注册 (/register/{peer_id}): 允许一个 P2P 节点(peer)注册其 ID 和它打算用于 P2P 通信的端口。服务器会自动记录请求来源的公网 IP 地址。
- 节点查询 (/query/{peer_id}): 允许一个节点查询另一个已注册节点的信息(公网 IP 和 P2P 端口)。
- 列出所有节点 (/list_peers): 显示当前所有已注册的节点信息(主要用于调试)。
节点注销 (/unregister/{peer_id}): 允许节点从服务器注销。
from fastapi import FastAPI, Request, HTTPException from pydantic import BaseModel # 用于数据验证和序列化 import uvicorn # ASGI 服务器 import time # 用于记录时间戳 from typing import Dict, Any, Optional # 初始化 FastAPI 应用 app = FastAPI( title="Simple P2P Signaling Server", description="A basic signaling server for P2P peer discovery.", version="0.1.0" ) # 使用 Pydantic 定义请求体模型 class PeerRegisterInfo(BaseModel): p2p_port: int # Peer 宣称其用于 P2P UDP 通信的本地端口 # 使用 Pydantic 定义响应体模型 class PeerDetailsResponse(BaseModel): peer_id: str public_ip: str # 服务器看到的该 Peer 的公网 IP p2p_port: int # Peer 注册的 P2P 端口 registered_at: float # 注册时的时间戳 # 简单的内存数据库来存储 Peer 信息 # 格式: { "peer_id": {"public_ip": "x.x.x.x", "p2p_port": 12345, "registered_at": timestamp} } peers_db: Dict[str, Dict[str, Any]] = {} @app.get("/") async def read_root(): """ 根路径,确认服务器正在运行。 """ return {"message": "P2P Signaling Server is running. Access /docs for API documentation."} @app.post("/register/{peer_id}", status_code=201, response_model=PeerDetailsResponse) async def register_peer(peer_id: str, peer_info: PeerRegisterInfo, request: Request): """ 注册或更新一个 Peer 的信息。 - `peer_id`: Peer 的唯一标识符。 - `peer_info`: 包含 `p2p_port`,即 Peer 用于 P2P 通信的端口。 服务器会自动从请求中获取 Peer 的公网 IP (`request.client.host`)。 """ client_host = request.client.host # 这是 FastAPI 服务器看到的客户端 IP 地址 # 注意: 如果服务器在反向代理后,需要配置代理以传递正确的客户端IP current_time = time.time() peers_db[peer_id] = { "public_ip": client_host, "p2p_port": peer_info.p2p_port, "registered_at": current_time } print(f"[INFO] Peer registered/updated: {peer_id} -> IP: {client_host}, P2P Port: {peer_info.p2p_port}") return PeerDetailsResponse( peer_id=peer_id, public_ip=client_host, p2p_port=peer_info.p2p_port, registered_at=current_time ) @app.get("/query/{peer_id}", response_model=PeerDetailsResponse) async def query_peer(peer_id: str): """ 查询指定 Peer ID 的注册信息。 """ if peer_id not in peers_db: print(f"[WARN] Peer query failed: {peer_id} not found.") raise HTTPException(status_code=404, detail=f"Peer '{peer_id}' not found") peer_data = peers_db[peer_id] print(f"[INFO] Peer query successful for: {peer_id} -> {peer_data}") return PeerDetailsResponse( peer_id=peer_id, public_ip=peer_data["public_ip"], p2p_port=peer_data["p2p_port"], registered_at=peer_data["registered_at"] ) @app.get("/list_peers") async def list_all_peers(): """ 列出所有当前已注册的 Peers 及其信息。 主要用于调试。 """ if not peers_db: return {"message": "No peers currently registered."} return peers_db @app.delete("/unregister/{peer_id}", status_code=200) async def unregister_peer(peer_id: str): """ 从服务器注销一个 Peer。 """ if peer_id in peers_db: del peers_db[peer_id] print(f"[INFO] Peer unregistered: {peer_id}") return {"message": "Peer unregistered successfully", "peer_id": peer_id} else: print(f"[WARN] Attempt to unregister non-existent peer: {peer_id}") raise HTTPException(status_code=404, detail=f"Peer '{peer_id}' not found, cannot unregister") # --- 如何运行此服务器 --- # 1. 将此代码保存为 `signaling_server.py`。 # 2. 安装必要的库: # pip install fastapi "uvicorn[standard]" pydantic # 3. 在终端中运行服务器: # uvicorn signaling_server:app --host 0.0.0.0 --port 8000 --reload # # - `uvicorn signaling_server:app`: 告诉 Uvicorn 运行 `signaling_server.py` 文件中的 `app` FastAPI 实例。 # - `--host 0.0.0.0`: 使服务器监听所有网络接口,这样局域网内或公网(如果端口已转发)的其他机器可以访问。 # - `--port 8000`: 指定服务器监听的端口。 # - `--reload`: (可选,用于开发) 当代码文件发生更改时,服务器会自动重新加载。 # # 你的 P2P 客户端程序随后可以向 `http://<服务器的IP地址>:8000` 发送 HTTP 请求。 # 例如: # - 注册 peer "alice" 在 P2P 端口 12345: # POST http://<server_ip>:8000/register/alice (Body: {"p2p_port": 12345}) # - 查询 peer "alice" 的信息: # GET http://<server_ip>:8000/query/alice if __name__ == "__main__": # 这部分通常不会直接运行,因为 FastAPI 应用推荐使用 Uvicorn 从命令行启动。 # 但为了完整性,这里可以添加一行来提示如何运行。 print("To run this FastAPI application, use the command:") print("uvicorn signaling_server:app --host 0.0.0.0 --port 8000 --reload") # uvicorn.run(app, host="0.0.0.0", port=8000) # 也可以取消注释这行来直接运行,但不推荐用于生产
如何使用信令服务器
如何使用这个信令服务器 (P2P 客户端的视角):
Peer A 启动:
Peer A 决定一个用于 P2P UDP 通信的本地端口 (例如,随机选择一个或用户指定一个,如 55555)。
Peer A 向信令服务器发送一个 POST 请求到 /register/peerA,请求体为 {"p2p_port": 55555}。
信令服务器收到请求,记录下 Peer A 的公网 IP (例如 1.2.3.4,这是服务器看到的 Peer A 的 IP) 和 P2P 端口 55555。
Peer B 启动:
Peer B 执行类似操作,注册其 P2P 端口 (例如 66666) 到 /register/peerB。
信令服务器记录 Peer B 的公网 IP (例如 5.6.7.8) 和 P2P 端口 66666。
Peer A 想要连接 Peer B:
Peer A 向信令服务器发送一个 GET 请求到 /query/peerB。
服务器响应 Peer B 的信息:{"public_ip": "5.6.7.8", "p2p_port": 66666, ...}。
Peer B 想要连接 Peer A:
Peer B 向信令服务器发送一个 GET 请求到 /query/peerA。
服务器响应 Peer A 的信息:{"public_ip": "1.2.3.4", "p2p_port": 55555, ...}。
尝试打洞:
现在 Peer A 知道了 Peer B 的公网端点 (5.6.7.8:66666)。
Peer B 知道了 Peer A 的公网端点 (1.2.3.4:55555)。
双方现在可以尝试向对方的公网端点发送 UDP 包以进行 NAT 打洞。
重要注意事项:
极其简化: 这是一个非常基础的信令服务器。
内存存储: 所有 Peer 数据都存储在内存中,服务器重启后数据会丢失。生产环境需要使用数据库(如 Redis, PostgreSQL, MongoDB 等)。
无身份验证/授权: 任何人都可以注册或查询任何 Peer ID。生产环境需要安全机制。
无错误恢复/重试: 客户端需要自己处理网络请求的失败和重试。
无 Peer 超时: 过期的 Peer 信息会一直保留,除非手动注销。可以添加时间戳并定期清理不活跃的 Peer。
公网 IP 地址 (request.client.host):
request.client.host 获取的是直接连接到 FastAPI 服务器的客户端的 IP 地址。
如果你的 FastAPI 服务器直接暴露在公网,这通常就是 Peer 的公网 IP。
如果 FastAPI 服务器部署在反向代理(如 Nginx、Traefik)之后,你需要确保反向代理正确设置了 X-Forwarded-For 或类似的头部,并且 FastAPI (或其 ASGI 服务器 Uvicorn) 被配置为信任这些头部来获取真实的客户端 IP。否则,request.client.host 可能会是反向代理的 IP 地址。
不处理 NAT 类型: 这个信令服务器不扮演 STUN 服务器的角色,它不帮助客户端检测其 NAT 类型。它只是记录和交换客户端报告的 P2P 端口以及服务器观察到的客户端公网 IP。
HTTP 通信: 信令是通过 HTTP 进行的。P2P 客户端需要 HTTP 请求库(如 Python 的 requests 或 httpx)来与此服务器交互。
客户端服务
客户端需要实现以下基本功能:
- 向信令服务器注册自己的 peer_id 和一个宣称的 P2P 通信端口。
- 向信令服务器查询其他 peer_id 的网络信息。
- 列出所有已注册的节点。
- 从信令服务器注销自己。
import requests
import json
import random
import socket
import threading
import time
import queue
import os # 用于文件操作
import math # 用于计算块数
# --- 配置和全局变量 (与之前类似,增加文件传输相关) ---
SIGNALING_SERVER_URL = "http://信令服务器IP地址:8000"
current_peer_id = None
current_p2p_port = None
p2p_udp_socket = None
p2p_target_peer_id = None
p2p_target_reported_addr = None
p2p_target_actual_addr = None # 打洞成功后对方的 (ip, port)
p2p_listener_thread = None
p2p_stop_listener = threading.Event()
received_messages_q = queue.Queue() # 用于文本消息和控制消息
# 文件传输相关状态
CHUNK_SIZE = 8192 # bytes
FILE_TRANSFER_TIMEOUT = 5 # 秒,等待ACK的超时
FILE_TRANSFER_RETRIES = 3 # 重传次数
# 使用字典来管理不同类型事件的Event对象,用于线程同步
ack_events = {} # key: chunk_sq_num, value: threading.Event()
file_offer_response_event = threading.Event() # 用于等待对方接受/拒绝文件
file_offer_accepted = False # 标记对方是否接受文件
file_transfer_active = False # 标记当前是否有文件传输在进行
file_transfer_send_thread = None # 文件发送线程
# --- 信令服务器交互函数 (register_peer, query_peer, list_all_peers, unregister_peer) ---
# (请从您之前的 p2p_client.py 中复制这些函数过来,确保 unregister_peer 会调用 stop_current_p2p_activity)
# <在此处粘贴之前的信令函数>
# --- 信令服务器交互函数 (与之前类似,这里省略,假设它们已存在) ---
# register_peer, query_peer, list_all_peers, unregister_peer
# (请从您之前的 p2p_client.py 中复制这些函数过来)
# --- 新增:信令服务器交互函数 (确保它们在这里) ---
def register_peer(peer_id: str, p2p_port: int):
global current_peer_id, current_p2p_port
url = f"{SIGNALING_SERVER_URL}/register/{peer_id}"
payload = {"p2p_port": p2p_port}
try:
response = requests.post(url, json=payload)
response.raise_for_status()
data = response.json()
print(f"[注册成功] 服务器响应: {data}")
current_peer_id = peer_id
current_p2p_port = p2p_port # 保存我们声明的P2P端口
return data
except requests.exceptions.RequestException as e:
print(f"[注册失败] 错误: {e}")
if hasattr(e, 'response') and e.response is not None:
try:
print(f"服务器错误详情: {e.response.json()}")
except json.JSONDecodeError:
print(f"服务器错误详情 (非JSON): {e.response.text}")
return None
def query_peer(peer_id_to_query: str):
if not peer_id_to_query:
print("错误: 需要提供要查询的 peer_id。")
return None
url = f"{SIGNALING_SERVER_URL}/query/{peer_id_to_query}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
# 我们不在这里打印,让调用者决定如何使用
# print(f"[查询成功] Peer '{peer_id_to_query}' 的信息: {data}")
return data
except requests.exceptions.RequestException as e:
print(f"[查询失败] 错误: {e}")
if hasattr(e, 'response') and e.response is not None:
try:
print(f"服务器错误详情: {e.response.json()}")
except json.JSONDecodeError:
print(f"服务器错误详情 (非JSON): {e.response.text}")
return None
def list_all_peers():
url = f"{SIGNALING_SERVER_URL}/list_peers"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
print("[列出所有 Peers] 服务器响应:")
if isinstance(data, dict) and data.get("message") == "No peers currently registered.":
print(" 目前没有节点注册。")
elif isinstance(data, dict):
for peer_id, info in data.items():
print(f" - {peer_id}: {info}")
else:
print(data)
return data
except requests.exceptions.RequestException as e:
print(f"[列出 Peers 失败] 错误: {e}")
return None
def unregister_peer(peer_id_to_unregister: str):
global current_peer_id, current_p2p_port
if not peer_id_to_unregister:
print("错误: 需要提供要注销的 peer_id。")
return None
stop_current_p2p_activity() # 注销前停止所有P2P活动
url = f"{SIGNALING_SERVER_URL}/unregister/{peer_id_to_unregister}"
try:
response = requests.delete(url)
response.raise_for_status()
data = response.json()
print(f"[注销成功] 服务器响应: {data}")
if peer_id_to_unregister == current_peer_id:
current_peer_id = None
current_p2p_port = None
return data
except requests.exceptions.RequestException as e:
print(f"[注销失败] 错误: {e}")
if hasattr(e, 'response') and e.response is not None:
try:
print(f"服务器错误详情: {e.response.json()}")
except json.JSONDecodeError:
print(f"服务器错误详情 (非JSON): {e.response.text}")
return None
# --- END 信令函数 ---
def initialize_p2p_socket(local_port: int):
global p2p_udp_socket
if p2p_udp_socket:
try:
p2p_udp_socket.close()
except: pass # 忽略关闭时的错误
p2p_udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
p2p_udp_socket.bind(('0.0.0.0', local_port))
p2p_udp_socket.settimeout(1.0) # 短超时用于打洞和ACK等待
print(f"[P2P] UDP Socket 已在 0.0.0.0:{local_port} 上初始化并绑定。")
return True
except OSError as e:
print(f"[P2P] 错误: 绑定 UDP Socket 到端口 {local_port} 失败: {e}")
p2p_udp_socket = None
return False
def send_p2p_control_message(sock, target_addr, message_type, sender_id, payload_dict=None):
"""发送P2P控制消息 (JSON格式)"""
if not sock: return
try:
message = {"type": message_type, "sender_id": sender_id}
if payload_dict:
message.update(payload_dict)
sock.sendto(json.dumps(message).encode('utf-8'), target_addr)
# print(f"DEBUG: Sent control msg {message_type} to {target_addr}")
except Exception as e:
print(f"[P2P] 发送控制消息 {message_type} 失败: {e}")
def send_p2p_data_chunk(sock, target_addr, sender_id, sq_num, data_bytes):
"""发送P2P文件数据块 (头部 + 原始数据)"""
if not sock: return
try:
# 头部格式: "DATA:<sender_id>:<sq_num>:" 注意末尾的冒号作为分隔符
header = f"DATA:{sender_id}:{sq_num}:".encode('utf-8')
packet = header + data_bytes
sock.sendto(packet, target_addr)
# print(f"DEBUG: Sent DATA chunk {sq_num} ({len(data_bytes)}B) to {target_addr}")
except Exception as e:
print(f"[P2P] 发送数据块 {sq_num} 失败: {e}")
def udp_listener():
global p2p_udp_socket, p2p_stop_listener, received_messages_q, ack_events
global file_offer_response_event, file_offer_accepted, file_transfer_active
global p2p_target_actual_addr # 监听线程也可能在打洞阶段更新这个
print("[P2P Listener] 监听线程已启动。")
if not p2p_udp_socket:
print("[P2P Listener] 错误: Socket未初始化。")
return
active_file_receive_info = {} # 用于存储正在接收的文件的信息
while not p2p_stop_listener.is_set():
try:
# raw_data, addr = p2p_udp_socket.recvfrom(CHUNK_SIZE + 256) # 缓冲区稍大于块大小以容纳头部
raw_data, addr = p2p_udp_socket.recvfrom(65536) # 确保能接收大的块
# 尝试解析为JSON (控制消息)
try:
message_str = raw_data.decode('utf-8')
msg_data = json.loads(message_str)
msg_type = msg_data.get("type")
sender_id = msg_data.get("sender_id")
# print(f"DEBUG Listener: Received JSON msg: {msg_data} from {addr}")
if msg_type == "PUNCH_SYN" or msg_type == "PUNCH_ACK_RESPONSE": # 来自打洞过程
if not p2p_target_actual_addr or p2p_target_actual_addr[0] != addr[0]:
print(f"[P2P Listener] 收到来自 {addr} 的打洞相关消息: {msg_type}")
p2p_target_actual_addr = addr # 更新对方的实际地址
# 如果是SYN,回复ACK_RESPONSE
if msg_type == "PUNCH_SYN":
send_p2p_control_message(p2p_udp_socket, addr, "PUNCH_ACK_RESPONSE", current_peer_id)
# 将打洞消息也放入队列,让主打洞逻辑感知
received_messages_q.put((msg_data, addr))
elif msg_type == "MSG": # 文本消息
received_messages_q.put((msg_data, addr))
elif msg_type == "FILE_OFFER":
if file_transfer_active:
print(f"[P2P Listener] 收到来自 {sender_id} 的文件提议,但当前正忙于另一传输,已拒绝。")
send_p2p_control_message(p2p_udp_socket, addr, "FILE_REJECT", current_peer_id,
{"filename": msg_data.get("filename"), "reason": "Receiver busy"})
continue
print(f"\n[P2P Listener] 收到来自 {sender_id} ({addr}) 的文件提议: ")
print(f" 文件名: {msg_data.get('filename')}")
print(f" 大小: {msg_data.get('filesize')} bytes")
print(f" 块大小: {msg_data.get('chunk_size')}")
print(f" 总块数: {msg_data.get('total_chunks')}")
user_choice = input("接受此文件吗? (y/N): ").strip().lower()
if user_choice == 'y':
file_transfer_active = True
active_file_receive_info = {
"filename": msg_data.get('filename'),
"filesize": msg_data.get('filesize'),
"chunk_size": msg_data.get('chunk_size'),
"total_chunks": msg_data.get('total_chunks'),
"sender_id": sender_id,
"sender_addr": addr,
"output_file": None,
"received_chunks": set(),
"next_expected_chunk": 0
}
# 创建接收文件夹
receive_dir = "received_files_p2p"
if not os.path.exists(receive_dir):
os.makedirs(receive_dir)
# 构建保存路径,避免路径遍历漏洞 (简单替换)
safe_filename = os.path.basename(active_file_receive_info["filename"])
filepath = os.path.join(receive_dir, safe_filename)
try:
active_file_receive_info["output_file"] = open(filepath, "wb")
print(f"[P2P Listener] 接受文件提议,将保存到 {filepath}")
send_p2p_control_message(p2p_udp_socket, addr, "FILE_ACCEPT", current_peer_id,
{"filename": active_file_receive_info["filename"]})
except IOError as e:
print(f"[P2P Listener] 错误: 无法打开文件 {filepath} 进行写入: {e}")
send_p2p_control_message(p2p_udp_socket, addr, "FILE_REJECT", current_peer_id,
{"filename": active_file_receive_info["filename"], "reason": "Cannot write file"})
file_transfer_active = False
active_file_receive_info = {}
else:
print(f"[P2P Listener] 拒绝文件提议: {msg_data.get('filename')}")
send_p2p_control_message(p2p_udp_socket, addr, "FILE_REJECT", current_peer_id,
{"filename": msg_data.get('filename')})
elif msg_type == "FILE_ACCEPT":
if sender_id == p2p_target_peer_id: # 确保是目标peer的响应
print(f"[P2P Listener] {sender_id} 接受了文件 '{msg_data.get('filename')}'。")
file_offer_accepted = True
file_offer_response_event.set() # 通知发送方可以开始传输
elif msg_type == "FILE_REJECT":
if sender_id == p2p_target_peer_id:
print(f"[P2P Listener] {sender_id} 拒绝了文件 '{msg_data.get('filename')}'. 原因: {msg_data.get('reason')}")
file_offer_accepted = False
file_offer_response_event.set()
elif msg_type == "ACK": # ACK:<receiver_id>:<chunk_sq_num>
sq_num = msg_data.get("sq_num")
if sq_num is not None and sender_id == p2p_target_peer_id:
# print(f"DEBUG Listener: Received ACK for chunk {sq_num} from {sender_id}")
if sq_num in ack_events:
ack_events[sq_num].set() # 通知发送线程对应的ACK已收到
else:
# print(f"DEBUG Listener: Received unexpected ACK for chunk {sq_num} or for a non-active transfer.")
pass
elif msg_type == "FILE_END":
if file_transfer_active and active_file_receive_info and \
sender_id == active_file_receive_info["sender_id"] and \
msg_data.get("filename") == active_file_receive_info["filename"]:
print(f"[P2P Listener] {sender_id} 完成发送文件 '{active_file_receive_info['filename']}'.")
if active_file_receive_info["output_file"]:
active_file_receive_info["output_file"].close()
# 简单校验:块数是否匹配
if len(active_file_receive_info["received_chunks"]) == active_file_receive_info["total_chunks"]:
print(f"[P2P Listener] 文件 '{active_file_receive_info['filename']}' 接收完毕且块数匹配。")
send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"],
"FILE_VERIFY", current_peer_id,
{"filename": active_file_receive_info["filename"], "status": "OK"})
else:
print(f"[P2P Listener] 警告: 文件 '{active_file_receive_info['filename']}' 接收完毕但块数不匹配! "
f"收到 {len(active_file_receive_info['received_chunks'])}/{active_file_receive_info['total_chunks']}")
send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"],
"FILE_VERIFY", current_peer_id,
{"filename": active_file_receive_info["filename"], "status": "ERROR_CHUNK_MISMATCH"})
file_transfer_active = False
active_file_receive_info = {}
elif msg_type == "FILE_VERIFY":
if file_transfer_active and sender_id == p2p_target_peer_id: # 确保是发送方在等待这个
print(f"[P2P Listener] 收到来自 {sender_id} 的文件接收确认: '{msg_data.get('filename')}' 状态: {msg_data.get('status')}")
# 这里可以设置一个Event来通知发送线程对方已完全接收并校验
# 例如: file_final_verify_event.set()
# For now, just print.
file_transfer_active = False # 发送方也标记传输结束
# 其他控制消息...
except (json.JSONDecodeError, UnicodeDecodeError): # 不是JSON,尝试解析为数据块
header_part = raw_data.split(b':', 3) # DATA:sender:sq_num:actual_data
if len(header_part) == 4 and header_part[0] == b'DATA':
try:
data_sender_id = header_part[1].decode('utf-8')
data_sq_num = int(header_part[2].decode('utf-8'))
actual_data = header_part[3]
# print(f"DEBUG Listener: Received DATA chunk {data_sq_num} from {data_sender_id} ({len(actual_data)}B)")
if file_transfer_active and active_file_receive_info and \
data_sender_id == active_file_receive_info["sender_id"] and \
addr[0] == active_file_receive_info["sender_addr"][0]: # 确保来自正确的发送者IP
# 简单停止等待:只接受期望的下一个块
if data_sq_num == active_file_receive_info["next_expected_chunk"]:
if active_file_receive_info["output_file"]:
active_file_receive_info["output_file"].write(actual_data)
active_file_receive_info["received_chunks"].add(data_sq_num)
active_file_receive_info["next_expected_chunk"] += 1
# print(f" 接收并写入块 {data_sq_num}。发送ACK。")
send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"], "ACK",
current_peer_id, {"sq_num": data_sq_num})
# 打印进度 (简单版)
progress = (len(active_file_receive_info["received_chunks"]) / active_file_receive_info["total_chunks"]) * 100
print(f"\r 接收进度: {progress:.2f}% ({len(active_file_receive_info['received_chunks'])}/{active_file_receive_info['total_chunks']})", end="")
# 如果是重复的已接收块 (对方可能没收到我们的ACK),重发ACK
elif data_sq_num < active_file_receive_info["next_expected_chunk"] and \
data_sq_num in active_file_receive_info["received_chunks"]:
# print(f" 收到重复块 {data_sq_num}。重发ACK。")
send_p2p_control_message(p2p_udp_socket, active_file_receive_info["sender_addr"], "ACK",
current_peer_id, {"sq_num": data_sq_num})
# else: (忽略乱序的块,在停止等待中)
# print(f" 收到乱序块 {data_sq_num} (期望 {active_file_receive_info['next_expected_chunk']})。已忽略。")
# else: (不是当前活动传输的数据块,忽略)
# print(f" 收到来自 {data_sender_id} 的数据块,但当前无匹配的接收任务。")
except Exception as e_parse:
print(f"[P2P Listener] 解析DATA块头部失败: {e_parse}, raw_header: {header_part[:3]}")
# else: (无法识别的原始数据包,忽略)
# print(f"[P2P Listener] 收到来自 {addr} 的无法解析的原始数据: {raw_data[:50]}...")
except socket.timeout:
continue # 超时是正常的,继续检查 stop_event
except OSError as e:
if not p2p_stop_listener.is_set():
print(f"[P2P Listener] Socket错误: {e}")
break
except Exception as e:
if not p2p_stop_listener.is_set():
print(f"[P2P Listener] 未知错误: {e}")
break
print("[P2P Listener] 监听线程已停止。")
def attempt_udp_hole_punch(target_id: str, target_ip: str, target_port: int):
global p2p_udp_socket, p2p_target_actual_addr, p2p_target_peer_id, p2p_target_reported_addr
if not p2p_udp_socket:
print("[P2P] 错误: 本地UDP Socket未初始化。")
return False
print(f"[P2P] 开始尝试与 {target_id} ({target_ip}:{target_port}) 进行UDP打洞...")
p2p_target_peer_id = target_id
p2p_target_reported_addr = (target_ip, target_port)
p2p_target_actual_addr = None # 重置
# 发送几次打洞包
for i in range(3): #减少次数,依赖监听线程收到的PUNCH_SYN和回复的ACK_RESPONSE
print(f" 发送打洞包 (PUNCH_SYN) 第 {i+1} 次到 {target_ip}:{target_port}")
send_p2p_control_message(p2p_udp_socket, (target_ip, target_port), "PUNCH_SYN", current_peer_id)
time.sleep(0.5)
print(f"[P2P] 等待来自 {target_id} 的回应 (最多5秒)...")
start_time = time.time()
responded_correctly = False
while time.time() - start_time < 5:
try:
msg_data, addr = received_messages_q.get_nowait() # 从监听线程队列获取
msg_type = msg_data.get("type")
sender_id = msg_data.get("sender_id")
if addr[0] == target_ip and sender_id == target_id: # 确保是目标peer的IP和ID
if msg_type == "PUNCH_SYN" or msg_type == "PUNCH_ACK_RESPONSE":
p2p_target_actual_addr = addr # 核心:使用对方实际发包的地址
print(f"[P2P] 收到来自 {sender_id} ({addr}) 的 {msg_type}。打洞可能成功!")
print(f" 对方实际地址已更新为: {p2p_target_actual_addr}")
if msg_type == "PUNCH_SYN": # 如果对方也发了SYN,我们回复ACK_RESPONSE
send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "PUNCH_ACK_RESPONSE", current_peer_id)
responded_correctly = True
break
except queue.Empty:
time.sleep(0.2)
continue
except Exception as e:
print(f" 处理打洞回应时出错: {e}")
time.sleep(0.2)
continue
if responded_correctly and p2p_target_actual_addr:
print(f"[P2P] 与 {target_id} 的UDP打洞过程完成。")
return True
else:
print(f"[P2P] 未能从 {target_id} 收到有效回应,打洞可能失败。")
p2p_target_actual_addr = None
return False
def _send_file_in_thread(filepath, filesize, chunk_size, total_chunks, target_addr, target_id_for_ack):
global p2p_udp_socket, ack_events, file_transfer_active
print(f"[文件发送线程] 开始发送 '{os.path.basename(filepath)}' ({total_chunks} 块)...")
try:
with open(filepath, "rb") as f:
for i in range(total_chunks):
if p2p_stop_listener.is_set(): # 检查是否被外部停止
print("[文件发送线程] 发送被中止。")
break
chunk_data = f.read(chunk_size)
if not chunk_data: # 文件提前结束?不太可能如果filesize正确
print(f"[文件发送线程] 错误: 读取文件块 {i} 时数据为空。")
break
ack_events[i] = threading.Event() # 为当前块创建ACK事件
retries_left = FILE_TRANSFER_RETRIES
chunk_sent_successfully = False
while retries_left > 0:
if p2p_stop_listener.is_set(): break
# print(f" 发送块 {i}/{total_chunks-1} (尝试 {FILE_TRANSFER_RETRIES - retries_left + 1})...")
send_p2p_data_chunk(p2p_udp_socket, target_addr, current_peer_id, i, chunk_data)
# 等待ACK
ack_received = ack_events[i].wait(timeout=FILE_TRANSFER_TIMEOUT)
if ack_received:
# print(f" 收到块 {i} 的ACK。")
chunk_sent_successfully = True
progress = ((i + 1) / total_chunks) * 100
print(f"\r 发送进度: {progress:.2f}% ({i+1}/{total_chunks})", end="")
break
else:
print(f"\n 块 {i} ACK超时。剩余尝试 {retries_left - 1} 次。")
retries_left -= 1
del ack_events[i] # 清理事件
if not chunk_sent_successfully:
print(f"\n[文件发送线程] 块 {i} 发送失败,已达最大重试次数。中止传输。")
file_transfer_active = False # 标记传输失败
return # 直接从线程退出
if chunk_sent_successfully: # 只有所有块都成功才发送FILE_END
print(f"\n[文件发送线程] 所有 {total_chunks} 块已发送并确认。发送FILE_END。")
send_p2p_control_message(p2p_udp_socket, target_addr, "FILE_END", current_peer_id,
{"filename": os.path.basename(filepath)})
# 此处可以等待对方的 FILE_VERIFY 消息,但为简化,我们只打印
except IOError as e:
print(f"[文件发送线程] 文件读写错误: {e}")
except Exception as e:
print(f"[文件发送线程] 未知错误: {e}")
finally:
print(f"[文件发送线程] '{os.path.basename(filepath)}' 发送处理完毕。")
# file_transfer_active = False # 应该在收到FILE_VERIFY或超时后设置
def initiate_file_transfer():
global p2p_target_actual_addr, p2p_target_peer_id, p2p_udp_socket
global file_offer_response_event, file_offer_accepted, file_transfer_active, file_transfer_send_thread
if not p2p_target_actual_addr:
print("错误: 未建立P2P连接,或对方实际地址未知。请先进行打洞。")
return
if file_transfer_active:
print("错误: 当前已有文件传输正在进行中。")
return
filepath = input("请输入要发送的完整文件路径: ").strip()
if not os.path.exists(filepath) or not os.path.isfile(filepath):
print(f"错误: 文件 '{filepath}' 不存在或不是一个文件。")
return
try:
filesize = os.path.getsize(filepath)
filename = os.path.basename(filepath)
total_chunks = math.ceil(filesize / CHUNK_SIZE)
except Exception as e:
print(f"获取文件信息失败: {e}")
return
print(f"准备发送文件: '{filename}', 大小: {filesize} bytes, 分为 {total_chunks} 块 (每块 {CHUNK_SIZE} B)。")
# 重置事件和状态
file_offer_response_event.clear()
file_offer_accepted = False
# 发送FILE_OFFER
send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "FILE_OFFER", current_peer_id,
{"filename": filename, "filesize": filesize,
"chunk_size": CHUNK_SIZE, "total_chunks": total_chunks})
print(f"已向 {p2p_target_peer_id} 发送文件提议,等待响应 (最多15秒)...")
# 等待对方通过监听线程设置 file_offer_response_event
offer_responded = file_offer_response_event.wait(timeout=15.0)
if not offer_responded:
print("对方未在规定时间内响应提议。")
return
if file_offer_accepted:
print(f"{p2p_target_peer_id} 已接受文件。开始传输...")
file_transfer_active = True # 标记传输开始
# 创建并启动文件发送线程
file_transfer_send_thread = threading.Thread(target=_send_file_in_thread,
args=(filepath, filesize, CHUNK_SIZE, total_chunks,
p2p_target_actual_addr, p2p_target_peer_id),
daemon=True)
file_transfer_send_thread.start()
# 主线程返回菜单,发送在后台进行
print("文件正在后台发送。您可以在主菜单进行其他操作,或等待发送完成提示。")
else:
print(f"{p2p_target_peer_id} 拒绝了文件提议或发生错误。")
def start_p2p_chat_and_file_menu():
global p2p_target_peer_id, p2p_target_reported_addr, p2p_target_actual_addr
global p2p_listener_thread, p2p_stop_listener, received_messages_q, file_transfer_active
if not current_peer_id or not current_p2p_port:
print("请先注册您的 Peer ID 和 P2P 端口。")
return
# 如果当前已有P2P会话,先询问是否结束
if p2p_target_peer_id:
print(f"当前已连接到 {p2p_target_peer_id} ({p2p_target_actual_addr}).")
choice = input("是否要结束当前P2P会话并尝试连接新的Peer? (y/N): ").strip().lower()
if choice == 'y':
stop_current_p2p_activity()
else: # 进入当前会话的菜单
pass # 下面的逻辑会处理
# 如果没有活动P2P目标,或者用户选择连接新的,则进行查询和打洞
if not p2p_target_actual_addr: # p2p_target_actual_addr 是打洞成功的标志
peer_to_connect = input("请输入您想连接的 Peer ID: ").strip()
if not peer_to_connect: return
if peer_to_connect == current_peer_id:
print("不能连接自己。"); return
print(f"正在查询 {peer_to_connect} 的信息...")
peer_info = query_peer(peer_to_connect)
if not peer_info:
print(f"未能获取 {peer_to_connect} 的信息。"); return
print(f"获取到 {peer_to_connect} 的信息: IP={peer_info['public_ip']}, P2P Port={peer_info['p2p_port']}")
stop_current_p2p_activity() # 确保旧的已停止
p2p_stop_listener = threading.Event() # 重置
received_messages_q = queue.Queue() # 重置
if not initialize_p2p_socket(current_p2p_port): return
p2p_listener_thread = threading.Thread(target=udp_listener, daemon=True)
p2p_listener_thread.start()
time.sleep(0.1)
if not attempt_udp_hole_punch(peer_to_connect, peer_info['public_ip'], peer_info['p2p_port']):
print(f"无法与 {peer_to_connect} 建立P2P连接。")
stop_current_p2p_activity()
return
# --- P2P 会话菜单 ---
print(f"\n--- P2P 会话已连接到 {p2p_target_peer_id} ({p2p_target_actual_addr}) ---")
while p2p_target_actual_addr and not p2p_stop_listener.is_set(): # 只要连接有效
print("\nP2P 操作:")
print("1. 发送文本消息")
print("2. 发送文件")
print("3. 返回主菜单 (结束当前P2P会话)")
# 检查接收到的消息 (只显示文本消息,文件消息由listener处理)
try:
msg_data, addr = received_messages_q.get_nowait()
if p2p_target_actual_addr and addr[0] == p2p_target_actual_addr[0]:
msg_type = msg_data.get("type")
sender_id = msg_data.get("sender_id")
if msg_type == "MSG":
text_msg = msg_data.get("text")
print(f"\r[{time.strftime('%H:%M:%S')}] {sender_id}: {text_msg}\n> ", end="")
# 其他类型的消息(如打洞消息)已在listener或打洞逻辑中处理,或在此忽略
else: # 来自未知源
print(f"\r收到来自未知源 {addr} 的P2P消息: {msg_data}\n> ", end="")
except queue.Empty:
pass
except Exception as e:
print(f"处理消息队列时出错: {e}")
try:
p2p_choice = input("> ").strip()
if p2p_choice == '1':
message_text = input("输入消息: ").strip()
if message_text:
send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG",
current_peer_id, {"text": message_text})
print(f"[{time.strftime('%H:%M:%S')}] 我: {message_text}")
elif p2p_choice == '2':
initiate_file_transfer()
elif p2p_choice == '3':
print("结束当前P2P会话...")
send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG",
current_peer_id, {"text": "/quit_chat"}) # 通知对方
stop_current_p2p_activity()
break # 退出P2P会话循环
else:
if p2p_choice: # 如果输入了东西但不是有效选项
print("无效的P2P操作选项。")
except KeyboardInterrupt:
print("\n检测到 Ctrl+C,结束P2P会话...")
if p2p_target_actual_addr:
send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG",
current_peer_id, {"text": "/quit_chat_ctrl_c"})
stop_current_p2p_activity()
break
except EOFError:
print("\n输入流结束,结束P2P会话...")
if p2p_target_actual_addr:
send_p2p_control_message(p2p_udp_socket, p2p_target_actual_addr, "MSG",
current_peer_id, {"text": "/quit_chat_eof"})
stop_current_p2p_activity()
break
# 短暂休眠,让监听线程有机会处理消息并打印
# 避免input()完全阻塞消息的显示 (虽然不是完美的解决方案)
time.sleep(0.1)
def stop_current_p2p_activity():
global p2p_listener_thread, p2p_stop_listener, p2p_udp_socket
global p2p_target_peer_id, p2p_target_actual_addr, p2p_target_reported_addr
global file_transfer_active, file_transfer_send_thread, ack_events
print("[系统] 正在停止所有P2P活动...")
file_transfer_active = False # 停止任何进行中的文件传输标记
if p2p_stop_listener:
p2p_stop_listener.set() # 发送停止信号
if file_transfer_send_thread and file_transfer_send_thread.is_alive():
print(" 等待文件发送线程结束...")
file_transfer_send_thread.join(timeout=2.0)
if file_transfer_send_thread.is_alive():
print(" 警告: 文件发送线程未能正常停止。")
file_transfer_send_thread = None
ack_events.clear() # 清空ACK事件
if p2p_listener_thread and p2p_listener_thread.is_alive():
print(" 等待监听线程结束...")
p2p_listener_thread.join(timeout=2.0)
if p2p_listener_thread.is_alive():
print(" 警告: 监听线程未能正常停止。")
p2p_listener_thread = None
if p2p_udp_socket:
print(" 关闭UDP Socket。")
try:
p2p_udp_socket.close()
except Exception as e_close:
print(f" 关闭socket时出错: {e_close}")
p2p_udp_socket = None
# 清理队列中的剩余消息 (可选)
while not received_messages_q.empty():
try:
received_messages_q.get_nowait()
except queue.Empty:
break
print("[系统] P2P活动已清理。")
p2p_target_peer_id = None
p2p_target_actual_addr = None
p2p_target_reported_addr = None
def main_menu():
# ... (主菜单逻辑与上一个版本类似,但选项3会调用 start_p2p_chat_and_file_menu) ...
# 确保注销和退出时调用 stop_current_p2p_activity()
global current_peer_id, current_p2p_port
if not current_peer_id:
# ... (注册逻辑,与之前相同) ...
print("\n--- P2P 客户端 (未注册) ---")
my_id = input("请输入你的 Peer ID (例如: alice, bob): ").strip()
if not my_id:
print("Peer ID 不能为空。")
return True
try:
my_port_str = input(f"请输入你用于P2P的UDP端口 (50000-65000, 回车随机): ").strip()
if not my_port_str:
my_port = random.randint(50000, 65000)
print(f"已随机生成P2P端口: {my_port}")
else:
my_port = int(my_port_str)
if not (1024 < my_port < 65536):
print("端口号不合法,将使用随机端口。")
my_port = random.randint(50000, 65000)
print(f"将使用P2P端口: {my_port}")
except ValueError:
print("无效的端口输入,将使用随机端口。")
my_port = random.randint(50000, 65000)
print(f"已随机生成P2P端口: {my_port}")
if not register_peer(my_id, my_port):
return True
print(f"\n--- P2P 客户端 (ID: {current_peer_id}, P2P端口: {current_p2p_port}) ---")
print("选择操作:")
print("1. [信令] 查询其他 Peer 的信息")
print("2. [信令] 列出所有已注册的 Peers")
print("3. [P2P] 开始/管理 P2P 会话 (聊天/文件)")
print("4. [信令] 注销我的 Peer ID")
print("5. [信令] 重新注册 (使用新的ID或端口)")
print("6. 退出")
choice = input("请输入选项 (1-6): ").strip()
if choice == '1':
peer_to_query = input("请输入要查询的 Peer ID: ").strip()
info = query_peer(peer_to_query)
if info:
print(f"Peer '{peer_to_query}' 的信息: {info}")
elif choice == '2':
list_all_peers()
elif choice == '3':
start_p2p_chat_and_file_menu() # 进入P2P会话管理
elif choice == '4':
if current_peer_id:
confirm = input(f"确定要注销 Peer ID '{current_peer_id}'吗? (y/N): ").strip().lower()
if confirm == 'y':
unregister_peer(current_peer_id)
else:
print("你还没有注册任何 Peer ID。")
elif choice == '5':
if current_peer_id:
print(f"你当前注册为 '{current_peer_id}'。重新注册会使用新的ID和端口。")
unregister_peer(current_peer_id)
current_peer_id = None
current_p2p_port = None
elif choice == '6':
print("正在退出...")
if current_peer_id:
unregister_peer(current_peer_id)
return False
else:
print("无效选项,请重试。")
return True
if __name__ == "__main__":
print("欢迎使用 P2P 客户端 Demo (包含P2P文本聊天和文件传输)!")
# ... (与之前相同的服务器连接检查) ...
print(f"将连接到信令服务器: {SIGNALING_SERVER_URL}")
print("请确保信令服务器正在运行。")
print("------------------------------------")
try:
ping_response = requests.get(SIGNALING_SERVER_URL, timeout=3)
if ping_response.status_code == 200:
print("成功连接到信令服务器。")
else:
print(f"警告: 信令服务器响应状态码 {ping_response.status_code}。")
except requests.exceptions.ConnectionError:
print(f"错误: 无法连接到信令服务器 {SIGNALING_SERVER_URL}。请检查服务器。")
exit()
except requests.exceptions.Timeout:
print(f"错误: 连接信令服务器超时。")
exit()
try:
while True:
if not main_menu():
break
except KeyboardInterrupt:
print("\n检测到 Ctrl+C,正在退出程序...")
finally:
stop_current_p2p_activity() # 确保所有P2P资源被清理
print("客户端已退出。")
其中SIGNALING_SERVER_URL全局变量需要修改为自己的信令服务器地址,端口默认为8000。
没有评论