博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python线程池源码分析
阅读量:6606 次
发布时间:2019-06-24

本文共 2593 字,大约阅读时间需要 8 分钟。

对Python线程池的研究是之前对Apshceduler分析的附加工作。

在之前对中,写到调度器将任务放入线程池的函数

def _do_submit_job(self, job, run_times):        def callback(f):            exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else                       (f.exception(), getattr(f.exception(), '__traceback__', None)))            if exc:                self._run_job_error(job.id, exc, tb)            else:                self._run_job_success(job.id, f.result())        f = self._pool.submit(_run_job, job, job._jobstore_alias, run_times, self._logger.name)        f.add_done_callback(callback)

这里分析的线程池类是concurrent.futures.ThreadPoolExecutor,也就是上述代码中self._pool所使用的类。先上self._pool.submit函数的代码,再做详细分析

def submit(self, fn, *args, **kwargs):        with self._shutdown_lock:            if self._shutdown:                raise RuntimeError('cannot schedule new futures after shutdown')            f = _base.Future()            w = _WorkItem(f, fn, args, kwargs)            self._work_queue.put(w)            self._adjust_thread_count()            return f

f和w是两个非常重要的变量,f作为submit返回的对象,submit函数的调用者可以对其添加回调,待fn执行完成后,会在当前线程执行,具体是如何实现的,这里先不说,下面再详细分析;w则是封装了线程需要执行的方法和参数,通过self._work_queue.put(w)方法放入一个队列当中。

self._adjust_thread_count()方法则是检查当前线程池的线程数量,如果小于设定的最大值,就开辟一个线程,代码就不上了,直接看这些个线程都是干嘛的

def _worker(executor_reference, work_queue):    try:        while True:            work_item = work_queue.get(block=True)            if work_item is not None:                work_item.run()                # Delete references to object. See issue16284                del work_item                continue            executor = executor_reference()            # Exit if:            #   - The interpreter is shutting down OR            #   - The executor that owns the worker has been collected OR            #   - The executor that owns the worker has been shutdown.            if _shutdown or executor is None or executor._shutdown:                # Notice other workers                work_queue.put(None)                return            del executor    except BaseException:        _base.LOGGER.critical('Exception in worker', exc_info=True)

这些线程就是一个死循环,不断的从任务队列中获取到_WorkItem,然后通过其封装方法,执行我们需要的任务。如果取到的任务为None,就往队列中再放入一个None,以通知其它线程结束,然后结束当前循环。

def run(self):        if not self.future.set_running_or_notify_cancel():            return        try:            result = self.fn(*self.args, **self.kwargs)        except BaseException as e:            self.future.set_exception(e)        else:            self.future.set_result(result)

如果没有异常,执行结束后,会执行之前我们说的回调。在self.future.set_result(result)方法中会执行任务回调,当然了,是在当前线程中。如果需要写入数据库之类的操作,不建议在回调中直接写入。

转载地址:http://zfbso.baihongyu.com/

你可能感兴趣的文章
rpm安装PostgreSQL
查看>>
k sum(lintcode)
查看>>
如何部署 Calico 网络?- 每天5分钟玩转 Docker 容器技术(67)
查看>>
28. extjs中Ext.BLANK_IMAGE_URL的作用
查看>>
如何通过Node.js启动cesium
查看>>
Mac本地环境配置以及安装织梦CMS,增加新的坑解决办法
查看>>
[HTML]html读取本地文件并显示
查看>>
cs331n 线性分类器损失函数与最优化
查看>>
LindDotNetCore~授权中间件的介绍
查看>>
Appium 设置手机连接方式
查看>>
《android开发艺术探索》读书笔记(五)--RemoteViews
查看>>
XPath轴(XPath Axes)总结
查看>>
Linux内核同步 - sleepable RCU的实现
查看>>
文件的读写、二进制文件的读写、文件随机读写
查看>>
[转载]正确解决:坑爹的0xc000007b——应用程序无法正常启动
查看>>
TouchID 指纹解锁
查看>>
swagger常用注解说明
查看>>
Win8 Metro(C#)数字图像处理--2.73一种背景图像融合特效
查看>>
研究生极简手册——学术论文指南
查看>>
Android Studio你必须学会的快捷键(Eclipse转AS必看)
查看>>