基于Ray在LSF上多节点并行的python数据处理

考虑到未来肯定大概率还会在超算上处理东西,使用一种快速简单上手的方式使python并行是十分必要的,记录下后续备用。

LSF作业提交

构建详细参数文件run.lsf如下

#!/bin/sh
#BSUB -q mpi
#BSUB -n 48
#BSUB -o ./output.out
#BSUB -e ./error.err
My command here

使用bsub < run.lsf提交,bjobs查看。
下面几段便于随时查看节点使用状况,(经常可能出现只使用了一个节点的尴尬情况)

bjobs = os.popen('bjobs')
bjobs = ''.join(bjobs.readlines())
pattern = re.compile('24\*'+'(.*?)'+'\s') #换成适当的检测字符,此处默认24核MPI节点
host_list = pattern.findall(bjobs)

lsload = os.popen('lsload')
for line in lsload.readlines():
    for h in host_list:
        if h in line:
            print(line[:-1])

基于Ray自动并行处理函数任务

Ray官方手册:https://docs.ray.io/en/latest/index.html
Ray对常用的cluster有支持,例如对LSF的如下,来源:https://github.com/IBMSpectrumComputing/ray-integration ,脚本命名为ray_launch_cluster.sh

bash ray_launch_cluster.sh -c "python -u data_process.py" -n "py36" -m 4000000000(object_store_mem大小,默认4G)

将上述放至run.lsf即可,其中-n表示conda环境名字,-c表示在ray环境下执行的脚本命令。
ray.util.multiprocessing完全够使用,如下

from ray.util.multiprocessing import Pool

def prepare(single_task):
    pass
       
task_list = [single_task_1, single_task_2, ...]

pool = Pool(ray_address='auto')
for _ in pool.map(prepare, task_list):
    pass

到此是最简单实现多节点自动并行方法。

更新时间:2022-08-21 14:42:27

本文由 KoN 创作,如果您觉得本文不错,请随意赞赏
采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
原文链接:/archives/基于ray在lsf上多节点并行的python数据处理
最后更新:2022-08-21 14:42:27

评论

Your browser is out of date!

Update your browser to view this website correctly. Update my browser now

×