因为著名的GIL问题,Python提供了一个标准库multiprocessing,并建议用它编写多进程并发程序,以解决CPU密集型程序的运行效率问题。这个库内部主要是在处理各种跨平台问题,使用起来比较繁琐。不过其中提供了一个进程池的实现,善加利用的话倒是能满足许多场景的需求。

虽然自Python 3.2起又有一个concurrent库实现了进程/线程池并提供了更简便的接口,但从几个角度来讲我更倾向于使用multiprocessing的进程池:

  • concurrent极大的限制了用户的控制权,难以应付复杂的应用场景;
  • 很难保证永远不会用到Python 2,所以在有可能的情况下最好是编写Python 2/3兼容的代码;

关于multiprocessing.Pool的使用介绍目前已经遍地都是,我就来对它的内部机制做一点简单的分析。

以下分析均基于Python 3.5.1的源码。

多进程通信的限制

所有多进程协作的程序都会涉及进程间的通信问题,因此在谈论进程池的设计前,关于进程间通信的限制就不可不知。

与多线程模型相比,由于每个进程的地址空间是独立的,因此进程间传递数据就必须依托于操作系统提供的机制。这些机制可能是共享内存/消息队列/信号量或Domain socket等,但不管具体是哪一种,都很可能存在两个方面的限制:

  • 为防止所有资源耗尽,通信通道的容量大小必须有限制;
  • 不是所有操作都提供非阻塞的调用接口,不是所有提供非阻塞调用接口的资源都能使用select/poll/epoll;

为了应对通信通道的容量限制问题,应用设计者要么选择协调数据生产与消费的速度,即主动降速或干脆在通信容量不足的时候阻塞,要么就得自己维护一个内部缓存,以防止数据丢失;而为了避免阻塞调用影响其它操作或CPU开销过高,往往就要使用单独的线程来执行阻塞调用。

multiprocessing库的选择是创建了一个内部缓存,具体是怎么做的,下面再说。

基本结构

额外说明一下,进程池的实现在multiprocessing/pool.py文件中,multiprocessing.Pool实际上只是一个工厂函数,在此不再赘述(源码在multiprocessing/context.py)。

为了避免太多细节的干扰,先让我们聚焦到下面三个对象:

  • worker函数:进程池中每个工作进程都在运行的函数;
  • ApplyResult类:主要用于存放执行结果,必要的话可以在任务结束后执行回调函数;
  • Pool类:创建进程池,并发送任务到进程池及取回执行结果;

worker函数

worker函数是进程池中工作进程的主体,做为预生成的工作进程,它最重要的任务显然就不断的接受任务,执行任务并返回结果。事实正是如此,worker简化后的代码可以看做这样一个简单的循环:

completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
    task = inqueue.get()
    job, i, func, args, kwds = task
    result = (True, func(*args, **kwds))
    outqueue.put((job, i, result))
    completed += 1

当然,这里省略了异常处理,因为无论任务成功与否,调用者都希望得到结果,因此worker函数将所有func执行时的异常都捕捉并包装到了result里。这也可以避免不断产生新的工作进程带来的开销。

inqueue/outqueue就是工作进程与进程池的主控进程(Pool类)通信的管道,worker从inqueue读取需要它执行的任务,然后向outqueue放入任务执行的结果。在不同进程间通信显然是需要操作系统支持的,在Python程序员的角度来看它们就是队列,至于在操作系统层面看这两个队列到底是什么,我们后面再讲。

一个比较有趣的地方是传入的maxtasks参数。如上面代码所示,当maxtasks有值,且执行任务次数达到这个值时,这个工作进程就会终止。先不谈减少的工作进程由谁来补充,这个机制在某些场合是是必不可少的。比如你必须调用某个会造成内存泄露或不关闭文件句柄的库,而且由于客观原因不能修改这个库时,这就是防止操作系统资源被耗尽的救命稻草。

另一个可以提一下的参数是initializer,它会在工作进程启动时被调用一次。但由于工作进程对它的返回值并不做任何处理,我个人觉得很鸡肋,也许就是在配合全局变量或单例/Borg模式的情况下有点用?

ApplyResult类

ApplyResult类是Pool类的apply_async方法(用于添加任务)返回的对象。我把multiprocessing.pool所有添加任务的方法返回的对象统称为result对象,ApplyResult则是其中结构最简单的一个特例。

可以将ApplyResult对象理解为一个容器,它的作用主要是存放与任务及其执行结果相关的所有属性,并为进程池的调用方提供获取结果及判断任务是否完成的方法。具体来说,它的内部属性主要有:

  • _event:一个threading.Event对象,由此对象来判定任务是否完成,在无竞争的情况下类似于一个flag变量。不过调用者可以通过_event.wait方法来实现阻塞式的获取结果;
  • _job:全局计数器为其分配的任务ID,执行结果返回时需要它来保证任务与结果的对应关系;
  • _value:任务执行结果或任务执行中抛出的异常;
  • _callback:任务执行成功后需要运行的回调函数;
  • _error_callback:任务执行失败后需要运行的回调函数;

最后还有一个_cache,指向Pool类内部的一个列表,用于存放所有未完成的Result对象。由于ApplyResult类包含对这个数据结构的引用,当任务完成时就可以直接利用这个引用从列表中把已完成任务的Result对象清除了。

说了这么多,其实基本都可以在ApplyResult类的get和_set方法中一览无余:

def get(self, timeout=None):
    self.wait(timeout)
    if not self.ready():
        raise TimeoutError
    if self._success:
        return self._value
    else:
        raise self._value

def _set(self, i, obj):
    self._success, self._value = obj
    if self._callback and self._success:
        self._callback(self._value)
    if self._error_callback and not self._success:
        self._error_callback(self._value)
    self._event.set()
    del self._cache[self._job]

Pool类

Pool类是整个进程池实现中最关键也是最复杂的一环,但愿我能讲解清楚。

通信队列

前面说道为了解决进程间通信通道容量限制带来的问题,multiprocessing.pool库创建了一个内部缓存来存放任务;此外进程池需要与外部交换任务本身及其结果(也就是worker函数的inqueue/outqueue参数),这就是Pool类内的三个队列:

  • _taskqueue:queue.Queue实例,它既是线程间通信队列,也充当Pool类内部的缓存(内部是collections.deque对象),用于存放来不及送到进程池执行的任务对象;
  • _inqueue:multiprocessing.SimpleQueue实例,用于传送任务对象到进程池;
  • _outqueue:multiprocessing.SimpleQueue实例,用于从进程池传送result对象到外部;

multiprocessing.SimpleQueue是用于进程间通信的队列,因此在操作系统层面观察到它的存在。简单来说,它(在非windows操作系统上)就是os.pipe返回的对象。用pipe在进程间传送数据,这就是multiprocessing库中队列的真相。

pipe的缓冲区在Linux操作系统上默认为64K,可见向其写入内容的进程有很大可能会频繁阻塞,这一点决定了Pool的内部设计。

线程分工

在讲线程分工之前必须介绍Pool类两个重要的内部数据结构:

  • _pool:列表,存放所有活动的工作进程;
  • _cache:列表,存放所有未得到执行结果的result对象;

不难发现在Pool类的__init__方法中创建了如下三个线程:

  • _worker_handler:对应静态方法_handle_workers,主要负责维护进程池;
  • _task_handler:对应静态方法_handle_tasks,主要负责传送任务给工作进程;
  • _result_handler:对应静态方法_handle_results,主要负责接收任务执行结果;

_worker_handler线程做的事情其实很简单:每隔0.1秒检查一次_pool中每个工作进程的exitcode,如果exitcode不为None就代表该工作进程已经终止。这样的话_worker_handler线程就要执行清理工作并将工作进程补足。

_task_handler线程的作用主要是扮演主线程的分身:

  • 从_taskqueue读取(主线程写入的)消息,转发到_inqueue(这个动作很可能阻塞);
  • 如果是批量加入的任务,将其拆解为单个任务再转发;

_result_handler线程的角色比较简单,就是不断的从队列中取得worker返回的结果,并执行result对象的_set方法来触发回调函数并设定任务执行结果。

但是它还有一项特殊的任务,就是只要进程池不是强制关闭或操作系统出现错误,就必须保证收回所有任务的执行结果。这部分代码简化后如下:

while cache and thread._state != TERMINATE:
    try:
        task = get()
    except (OSError, EOFError):
        return

    if task is None:
        continue
    job, i, obj = task
    try:
        cache[job]._set(i, obj)
    except KeyError:
        pass

最后看看主线程的一个重要任务:添加任务。我们选取Pool类一系列用于添加任务的接口中最简单的一个:

def apply_async(self, func, args=(), kwds={}, callback=None,
        error_callback=None):
    if self._state != RUN:
        raise ValueError("Pool not running")
    result = ApplyResult(self._cache, callback, error_callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

可以看到Pool将任务填入_taskqueue队列之后,将包装好的ApplyResult对象返回给了调用者。构造ApplyResult对象时将Pool的内部数据结构_cache包装了进去,这样_result_handler线程要从_cache中清除已完成任务就很简单了。

数据流向

说到这里数据流向就比较清楚了:

task的流向:

main_thread -> _taskqueue -> _task_handler => _inqueue => workers

result的流向:

workers => _outqueue => _result_handler

在这里,"->"表示进程内的数据流,"=>"表示进程间的数据流。

但是除此以外,还有进程池关闭时的控制流。

为了优雅的关闭进程池,必须保证所有线程/进程都是正常完成所有处理之后自行退出。所以Pool类的close方法仅仅设置了主线程以及_worker_handler线程的状态为CLOSE就结束了,剩下的工作需要由各线程依次通过队列发送一个特殊的消息来通知所有线程/进程(这里选择的特殊消息是None)。

这个发送None消息的动作由_worker_handler发起通知到_task_handler,但其他线程/进程的通知消息则全部由_task_handler发送:

_worker_handler -> _taskqueue -> _task_handler

_task_handler -> _outqueue -> _result_handler
_task_handler(循环N次) => _inqueue  => workers

其它

task与result的对应

从上面的分析可以看到task与result是各自独立的对象,Pool类如何找到它们的对应关系在worker函数的这两行代码中可见端倪:

job, i, func, args, kwds = task

put((job, i, result))

其中job就是result对象内部的_job,即全局计数器生成的任务ID;i则是一个task在它对应的批量任务内部的编号;

参考

由于graphviz学艺不精,我没有画图来帮助说明。不过前面提到的concurrent实现的进程池源代码中有一个数据流图颇为相近,可以参考。源文件为python安装目录中的concurrent/futures/process.py。


Comments

comments powered by Disqus