博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python之multiprocessing创建进程
阅读量:6941 次
发布时间:2019-06-27

本文共 5105 字,大约阅读时间需要 17 分钟。

python的multiprocessing模块是用来创建多进程的,下面对multiprocessing总结一下使用记录。

fork()

import ospid = os.fork() # 创建一个子进程if pid == 0:    print('这是子进程')    print(os.getpid(),os.getppid())else:    print('这是父进程')    print(os.getpid())os.wait() # 等待子进程结束释放资源
  • fork函数被调用后会返回两次,pid为0的代表子进程,其他返回子进程的id号表示父进程。

  • getpid和getppid函数可以获取本进程和父进程的id号;

fork方式的缺点:

  1. 兼容性差,只能在类linux系统下使用,windows系统不可使用;

  2. 扩展性差,当需要多条进程的时候,进程管理变得很复杂;

  3. 会产生“孤儿”进程和“僵尸”进程,需要手动回收资源。

优点:

是系统自带的接近低层的创建方式,运行效率高。

Process创建进程

  • 创建方式一:

from multiprocessing import Queue, Processimport osdef test():    time.sleep(2)    print('this is process {}'.format(os.getpid()))if __name__ == '__main__':    p = Process(target=test)    p.start() # 子进程 开始执行    p.join() # 等待子进程结束    print('ths peocess is ended')
  • 创建方式二:

from multiprocessing import Queue, Processimport osclass MyProcess(Process):    def run(self):        time.sleep(2)        print('this is process {}'.format(os.getpid()))    def __del__(self):        print('del the process {}'.format(os.getpid()))if __name__ == '__main__':    p = MyProcess()    p.start()    print('ths process is ended')# 结果:ths process is endedthis is process 7600del the process 7600del the process 12304

说明:

  • Process对象可以创建进程,但Process对象不是进程,其删除与否与系统资源是否被回收没有直接的关系。

  • 上例看到del方法被调用了两次,Process进程创建时,子进程会将主进程的Process对象完全复制一份,这样在主进程和子进程各有一个Process对象,但是p1.start()启动的是子进程,主进程中的Process对象作为一个静态对象存在。

  • 主进程执行完毕后会默认等待子进程结束后回收资源,不需要手动回收资源;

  • join()函数用来控制子进程结束的顺序,主进程会阻塞等待子进程结束,其内部也有一个清除僵尸进程的函数,可以回收资源;

  • 当子进程执行完毕后,会产生一个僵尸进程,其会被join函数回收,或者再有一条进程开启,start函数也会回收僵尸进程,所以不一定需要写join函数。

  • windows系统在子进程结束后会立即自动清除子进程的Process对象,而linux系统子进程的Process对象如果没有join函数和start函数的话会在主进程结束后统一清除。

Process对象分析

class Process(object):    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):        pass# Process对象是python用来创建进程的类group:扩展保留字段;target:目标代码,一般是我们需要创建进程执行的目标函数。name:进程的名字,如果不指定会自动分配一个;args:目标函数的普通参数;kwargs:目标函数的键值对参数;# 方法start():创建一个子进程并执行,该方法一个Process实例只能执行一次,其会创建一个进程执行该类的run方法。run():子进程需要执行的代码;join():主进程阻塞等待子进程直到子进程结束才继续执行,可以设置等待超时时间timeout.terminate():使活着的进程终止;is_alive():判断子进程是否还活着。

进程池Pool

如果需要创建大量的进程,就需要使用Pool了。

from multiprocessing import Queue, Process, Poolimport osdef test():    time.sleep(2)    print('this is process {}'.format(os.getpid()))def get_pool(n=5):    p = Pool(n) # 设置进程池的大小    for i in range(10):        p.apply_async(test)    p.close() # 关闭进程池    p.join()if __name__ == '__main__':    get_pool()    print('ths process is ended')

分析:

  • 如上,进程池Pool被创建出来后,即使实际需要创建的进程数远远大于进程池的最大上限,p1.apply_async(test)代码依旧会不停的执行,并不会停下等待;相当于向进程池提交了10个请求,会被放到一个队列中;

  • 当执行完p1 = Pool(5)这条代码后,5条进程已经被创建出来了,只是还没有为他们各自分配任务,也就是说,无论有多少任务,实际的进程数只有5条,计算机每次最多5条进程并行。

  • 当Pool中有进程任务执行完毕后,这条进程资源会被释放,pool会按先进先出的原则取出一个新的请求给空闲的进程继续执行;

  • 当Pool所有的进程任务完成后,会产生5个僵尸进程,如果主线程不结束,系统不会自动回收资源,需要调用join函数去回收。

  • join函数是主进程等待子进程结束回收系统资源的,如果没有join,主程序退出后不管子进程有没有结束都会被强制杀死;

  • 创建Pool池时,如果不指定进程最大数量,默认创建的进程数为系统的内核数量.

Pool对象分析

class Pool(object):    def __init__(self, processes=None, initializer=None, initargs=(),                 maxtasksperchild=None, context=None):        pass# 初始化参数processes:进程池的大小,默认cpu内核的数量initializer:创建进程执行的目标函数,其会按照进程池的大小创建相应个数的进程;initargs:目标函数的参数context:代码的上下文# 方法apply():使用阻塞方式调用func;apply_async():使用非阻塞方式条用func;close():关闭Pool,使其不再接受新的任务;terminate():不管任务是否完成,立即终止;join():主进程阻塞,等待子进程的退出,必须在close()后面使用;map(self, func, iterable, chunksize=None):多进程执行一个函数,传入不同的参数;starmap(self, func, iterable, chunksize=None):和map类似,但iterable参数可解压缩;starmap_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):使用异步的方式的starmap,callback为返回后的处理函数map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):异步方式的map
  • 实例

from multiprocessing import Poolimport osdef test(n):    time.sleep(1)    print('this is process {}'.format(os.getpid()))    return ndef test1(n, m):    print(n, m)    print('this is process {}'.format(os.getpid()))def back_func(values): # 多进程执行完毕会返回所有的结果的列表    print(values)def back_func_err(values): # 多进程执行完毕会返回所有错误的列表    print(values)def get_pool(n=5):    p = Pool(n)    # p.map(test, (i for i in range(10))) # 阻塞式多进程执行    # p.starmap(test1, zip([1,2,3],[3,4,5])) # 阻塞式多进程执行多参数函数    # 异步多进程执行函数    p.map_async(test, (i for i in range(5)), callback=back_func, error_callback=back_func_err)    # 异步多进程执行多参数函数    p.starmap_async(test1, zip([1,2,3],[3,4,5]), callback=back_func, error_callback=back_func_err)    print('-----')    p.close()    p.join()if __name__ == '__main__':    get_pool()    print('ths process is ended')

进程锁

进程虽然不像线程那样共享内存的数据,而是每个进程有单独的内存,但多进程也是共享文件系统的,即硬盘系统;当多进程同时写入文件操作时,可能造成数据的破坏,因此进程也存在同步锁。

from multiprocessing import Pool, Lockmuex = Lock()def test():    if muex.acquire():        f = open('./test_pro.txt', 'r+', encoding='utf-8')        x = f.read()        if not x:            f.write('0')        else:            f.seek(0)            f.write(str(int(x)+1))        f.close()        muex.release()if __name__ == '__main__':    p = Pool(5)    for i in range(10):        p.apply_async(test)    p.close()    p.join()    with open('./test_pro.txt', 'r+', encoding='utf-8') as f:        print(f.read())

进程锁可以保证文件系统的安全,但是它使得并行变成了串行,效率下降了,也可能造成死锁问题,一般避免用锁机制。

  • 参考

转载于:https://www.cnblogs.com/xyou/p/9577030.html

你可能感兴趣的文章
以P2P网贷为例互联网金融产品如何利用大数据做风控?
查看>>
Polymer初探
查看>>
zprofiler三板斧解决cpu占用率过高问题(转载)
查看>>
深入浅出NIO Socket实现机制
查看>>
bzoj 1930: [Shoi2003]pacman 吃豆豆 [费用流]
查看>>
(数字IC)低功耗设计入门(三)——系统与架构级低功耗设计
查看>>
Dynamics CRM2016 新功能之从CRM APP中导出数据至EXCEL
查看>>
Android——推断Service是否已经启动
查看>>
subprocess模块
查看>>
大数据入门基础系列之初步认识大数据生态系统圈(博主推荐)
查看>>
linux下命令行的查找顺序
查看>>
基于HTML5 Canvas 点击添加 2D 3D 机柜模型
查看>>
详述 SQL 中的 distinct 和 row_number() over() 的区别及用法
查看>>
xshell 登陆堡垒机实现自动跳转
查看>>
Hexo-设置阅读全文
查看>>
实模式与保护模式
查看>>
分布式ID生成器解决方案
查看>>
ResolveUrl in external JavaScript file in asp.net project
查看>>
EL表达式JSON应用
查看>>
人民邮电出版社图灵公司征求《Windows Communication Foundation Unleashed》译者
查看>>