您当前的位置:首页 > IT编程 > python
| C语言 | Java | VB | VC | python | Android | TensorFlow | C++ | oracle | 学术与代码 | cnn卷积神经网络 | gnn | 图像修复 | Keras | 数据集 | Neo4j | 自然语言处理 | 深度学习 | 医学CAD | 医学影像 | 超参数 | pointnet | pytorch | 异常检测 | Transformers | 情感分类 | 知识图谱 |

自学教程:详解pandas apply 并行处理的几种方法

51自学网 2021-10-30 22:55:26
  python
这篇教程详解pandas apply 并行处理的几种方法写得很实用,希望能帮到您。

1. pandarallel (pip install )

对于一个带有Pandas DataFrame df的简单用例和一个应用func的函数,只需用parallel_apply替换经典的apply。

from pandarallel import pandarallel # Initializationpandarallel.initialize() # Standard pandas applydf.apply(func) # Parallel applydf.parallel_apply(func)

注意,如果不想并行化计算,仍然可以使用经典的apply方法。

另外可以通过在initialize函数中传递progress_bar=True来显示每个工作CPU的一个进度条。

2. joblib (pip install )

 https://pypi.python.org/pypi/joblib

# Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly from math import sqrtfrom joblib import Parallel, delayed def test():  start = time.time()  result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000))  end = time.time()  print(end-start)  result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))  end2 = time.time()  print(end2-end)

-------输出结果----------

0.4434356689453125
0.6346755027770996

3. multiprocessing

import multiprocessing as mp with mp.Pool(mp.cpu_count()) as pool:  df['newcol'] = pool.map(f, df['col'])multiprocessing.cpu_count()

返回系统的CPU数量。

该数量不同于当前进程可以使用的CPU数量。可用的CPU数量可以由 len(os.sched_getaffinity(0)) 方法获得。

可能引发 NotImplementedError

参见os.cpu_count()

4. 几种方法性能比较

(1)代码

import sysimport timeimport pandas as pdimport multiprocessing as mpfrom joblib import Parallel, delayedfrom pandarallel import pandarallelfrom tqdm import tqdm, tqdm_notebook  def get_url_len(url):  url_list = url.split(".")  time.sleep(0.01) # 休眠0.01秒  return len(url_list) def test1(data):  """  不进行任何优化  """  start = time.time()  data['len'] = data['url'].apply(get_url_len)  end = time.time()  cost_time = end - start  res = sum(data['len'])  print("res:{}, cost time:{}".format(res, cost_time)) def test_mp(data):  """  采用mp优化  """  start = time.time()  with mp.Pool(mp.cpu_count()) as pool:    data['len'] = pool.map(get_url_len, data['url'])  end = time.time()  cost_time = end - start  res = sum(data['len'])  print("test_mp /t res:{}, cost time:{}".format(res, cost_time)) def test_pandarallel(data):  """  采用pandarallel优化  """  start = time.time()  pandarallel.initialize()  data['len'] = data['url'].parallel_apply(get_url_len)  end = time.time()  cost_time = end - start  res = sum(data['len'])  print("test_pandarallel /t res:{}, cost time:{}".format(res, cost_time))  def test_delayed(data):  """  采用delayed优化  """  def key_func(subset):    subset["len"] = subset["url"].apply(get_url_len)    return subset   start = time.time()  data_grouped = data.groupby(data.index)  # data_grouped 是一个可迭代的对象,那么就可以使用 tqdm 来可视化进度条  results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped))  data = pd.concat(results)  end = time.time()  cost_time = end - start  res = sum(data['len'])  print("test_delayed /t res:{}, cost time:{}".format(res, cost_time))  if __name__ == '__main__':    columns = ['title', 'url', 'pub_old', 'pub_new']  temp = pd.read_csv("./input.csv", names=columns, nrows=10000)  data = temp  """  for i in range(99):    data = data.append(temp)  """  print(len(data))  """  test1(data)  test_mp(data)  test_pandarallel(data)  """  test_delayed(data)

(2) 结果输出

1k
res:4338, cost time:0.0018074512481689453
test_mp   res:4338, cost time:0.2626469135284424
test_pandarallel   res:4338, cost time:0.3467681407928467
 
1w
res:42936, cost time:0.008773326873779297
test_mp   res:42936, cost time:0.26111721992492676
test_pandarallel   res:42936, cost time:0.33237743377685547
 
10w
res:426742, cost time:0.07944369316101074
test_mp   res:426742, cost time:0.294996976852417
test_pandarallel   res:426742, cost time:0.39208269119262695
 
100w
res:4267420, cost time:0.8074917793273926
test_mp   res:4267420, cost time:0.9741342067718506
test_pandarallel   res:4267420, cost time:0.6779992580413818
 
1000w
res:42674200, cost time:8.027287006378174
test_mp   res:42674200, cost time:7.751036882400513
test_pandarallel   res:42674200, cost time:4.404983282089233

在get_url_len函数里加个sleep语句(模拟复杂逻辑),数据量为1k,运行结果如下:

1k
res:4338, cost time:10.054503679275513
test_mp   res:4338, cost time:0.35697126388549805
test_pandarallel   res:4338, cost time:0.43415403366088867
test_delayed   res:4338, cost time:2.294757843017578

5. 小结

(1)如果数据量比较少,并行处理比单次执行效率更慢;

(2)如果apply的函数逻辑简单,并行处理比单次执行效率更慢。

6. 问题及解决方法

(1)ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.

https://www.jianshu.com/p/0be1b4b27bde

(2)Linux查看物理CPU个数、核数、逻辑CPU个数

https://lover.blog.csdn.net/article/details/113951192

(3) 进度条的使用

https://www.jb51.net/article/206219.htm

到此这篇关于详解pandas apply 并行处理的几种方法的文章就介绍到这了,更多相关pandas apply 并行处理内容请搜索51zixue.net以前的文章或继续浏览下面的相关文章希望大家以后多多支持51zixue.net!


python自动生成sql语句的脚本
Python的Tqdm模块实现进度条配置
万事OK自学网:51自学网_软件自学网_CAD自学网自学excel、自学PS、自学CAD、自学C语言、自学css3实例,是一个通过网络自主学习工作技能的自学平台,网友喜欢的软件自学网站。