Skip to content
text
# Related Code
main.py:16-22                           - 多进程初始化
src/unified_downloader.py:426-777       - UnifiedDownloadWorker
src/unified_downloader.py:156-198       - SafePipeWriter
src/unified_downloader.py:200-385       - 下载函数

多进程架构

为什么需要多进程

方案问题
单线程下载时 UI 完全冻结
多线程Python GIL 限制,SDK 内部状态不线程安全
多进程完全隔离,各进程独立运行

架构设计

关键组件

  1. UnifiedDownloadWorker: QThread 子类,桥接 UI 和下载进程
  2. multiprocessing.Process: 执行实际下载
  3. multiprocessing.Pipe: 进程间通信

Spawn vs Fork

python
# main.py:16
MULTIPROCESSING_START_METHOD = "spawn"

为什么选择 Spawn

方法行为问题
fork复制父进程内存PyQt 不安全,可能 crash
forkserver单独服务器进程增加复杂度
spawn启动全新解释器安全,PyInstaller 兼容

PyInstaller 兼容性

python
# main.py:28-37
def setup_multiprocessing():
    if getattr(sys, "frozen", False):
        # PyInstaller 环境必须设置
        multiprocessing.set_start_method(MULTIPROCESSING_START_METHOD, force=True)

为什么 frozen 检查?

PyInstaller 打包后,子进程启动方式不同。必须显式设置 spawn 方法。

进程间通信

Pipe 通信协议

消息格式

python
# 进度更新
{"type": "status", "message": "Downloading file.safetensors"}

# 日志消息
{"type": "log", "message": "Downloaded 50%"}

# 完成
{"type": "finished"}

# 错误
{"type": "error", "message": "Network timeout"}

SafePipeWriter

python
# src/unified_downloader.py:156-198
class SafePipeWriter:
    """进程安全的管道写入"""

    def __init__(self, pipe_conn):
        self._pipe = pipe_conn
        self._lock = multiprocessing.Lock()

    def send(self, data):
        with self._lock:  # 保护并发写入
            self._pipe.send(data)

为什么需要锁?

SDK 的进度回调可能在多个线程中触发,需要保护 Pipe 写入。

下载进程入口

python
# src/unified_downloader.py:200-385
def unified_download_model(
    platform: str,
    model_id: str,
    save_path: str,
    token: str,
    repo_type: str,
    endpoint: str,
    pipe_writer: SafePipeWriter,
):
    """在独立进程中执行下载"""

环境隔离

python
# 设置环境变量(不影响主进程)
os.environ[config["token_env"]] = token
os.environ[config["endpoint_env"]] = endpoint

进度回调

python
# HuggingFace 使用 tqdm 进度条
class UnifiedProgressBar(tqdm):
    def update(self, n=1):
        super().update(n)
        self.pipe_writer.send({
            "type": "status",
            "message": f"Progress: {self.n}/{self.total}"
        })

取消机制

实现细节

python
# src/unified_downloader.py:530-579
def cancel_download(self):
    self._cancel_event.set()

    if self._download_process and self._download_process.is_alive():
        self._download_process.terminate()
        self._download_process.join(timeout=5)

        if self._download_process.is_alive():
            self._download_process.kill()  # 强制终止

为什么两步终止?

  1. terminate(): 发送 SIGTERM,允许优雅退出
  2. kill(): 发送 SIGKILL,强制终止(超时后)

资源清理

python
# src/unified_downloader.py:691-777
def cleanup(self):
    """清理所有资源"""
    # 1. 停止输出处理线程
    # 2. 关闭 Pipe
    # 3. 终止下载进程
    # 4. 清理日志处理器
    # 5. 清理锁文件

清理顺序

为什么顺序重要?

  • 先停线程再关 Pipe,避免读取已关闭的连接
  • 先 join 进程再清理资源,确保进程完全退出