读了 concurrent.futures 源码,记录一下实现原理。 主要包括三个文件:_base.py、thread.py 和 process.py,_base.py 主要是 Future 本身的内容,thread.py 和 process.py 是 Future 的执行器。 _base.py 主要定了三部分内容: 1). waiter 类。 class _Waiter(object): class _AsCompletedWaiter(_Waiter): class _FirstCompletedWaiter(_Waiter): class _AllCompletedWaiter(_Waiter): waiter 类用来等待 Future 执行完,_Waiter 里定义了 threading.Event(),_AsCompletedWaiter 每个 Future 完成都会触发 event.set(),_FirstCompletedWaiter 每个 Future 完成也会触发,_AllCompletedWaiter 会等所有 Future 完成才触发 event.set()。 另外,_AsCompletedWaiter 和 _AllCompletedWaiter 还有把锁 threading.Lock()。 2). 辅助函数。 def _create_and_install_waiters(fs, return_when): def as_completed(fs, timeout=None): def wait(fs, timeout=None, return_when=ALL_COMPLETED): _create_and_install_waiters 是对 Future 列表 fs 创建和安装 waiter,创建好响应的 waiter 之后,会对 fs 中的每一个 Future 增加此 waiter (Future 有个列表变量 _waiters,加入即可),并且返回此 waiter; as_completed 是一个生成器,配合 for 使用可以循环得到已经完成的 Future,as_completed 使用了 _create_and_install_waiters; wait 用于等待 Future 列表依次完成。 3). Future 类和 Executor 类。 Future 类的成员变量: self._condition = threading.Condition() self._state = PENDING self._result = None self._exception = None self._traceback = None self._waiters = [] self._done_callbacks = [] _condition 用于控制 Future 内部的条件,比如 result() 要得到值,如果没有完成就要_condition.wait,直到 set_result() 触发 _condition.notify_all(),当然,cancel() 也可以触发 _condition.notify_all()。 Future 支持 callback,记录在 _done_callbacks,Future 完成后会执行这些 callback。 Executor 类供继承,需要实现 submit 方法,thread.py 中的 ThreadPoolExecutor 和 process.py 中的 ProcessPoolExecutor 都实现了此类。 thread.py 主要包括三部分: class _WorkItem(object): def _worker(executor_reference, work_queue): class ThreadPoolExecutor(_base.Executor): _WorkItem 是 Future 的包装,变量有 self.future、self.fn (执行的函数)、self.args 和 self.kwargs,里面的 run() 函数用来执行 Future 并设置 Future 信息,_state 会被设置成 FINISHED,如果是无异常会设置 _result,否则设置 _exception 和 _traceback。 _worker 不断从 _work_queue 队列中取 Future 并执行。 ThreadPoolExecutor 实现 _base.Executor,主要变量是 _max_workers 和 _work_queue,_max_workers 是最大线程数,_work_queue 是 queue.Queue(),当调用 submit 的时候会把 Future 包装成 _WorkItem,放入 _work_queue,然后开启最多 _max_workers 的线程去执行 _worker (不断读取队列并执行 )。 process.py (责任编辑:好模板) |