python处理大文件速度慢怎么办?

行业资讯 admin 发布时间:2024-09-13 浏览:0 次

大家好,我是小寒。

在使用 python 处理文件时,有时候会遇到处理超大型文件的情况,这会导致处理时间成倍的增加。今天,我们将学习如何使用 「multiprocessing」「joblib」 和 「tqdm」 Python 包减少大文件的处理时间。

如果觉得不错,点赞、收藏安排起来吧。

数据集

我们将使用来自 Kaggle 的 「美国事故 (2016 - 2021)」 数据集,该数据集包含 280 万条记录和 47 列。# Parallel Computing

import multiprocessing as mp

from joblib import Parallel, delayed

from tqdm.notebook import tqdm

# Data Ingestion 

import pandas as pd

# Text Processing 

import re 

from nltk.corpus import stopwords

import string

在我们开始之前,让我们设置n_workers为2倍的cpu核数。

n_workers = 2 * mp.cpu_count()

print(f"{n_workers} workers are available"

)

8 workers are available

在下一步中,我们将使用 「pandas」

 read_csv 函数提取大型 CSV 文件。然后,打印出 dataframe 的形状、列名和处理时间。%%time

file_name="./US_Accidents_Dec21_updated.csv"

df = pd.read_csv(file_name)

print(f"Shape:{df.shape}\n\nColumn Names:\n{df.columns}\n"

)

输出:

Shape:(2845342, 47)

Column Names:

Index([IDSeverityStart_TimeEnd_TimeStart_LatStart_Lng

,

       End_LatEnd_LngDistance(mi)DescriptionNumberStreet

,

       SideCityCountyStateZipcodeCountryTimezone

,

       Airport_CodeWeather_TimestampTemperature(F)Wind_Chill(F)

,

       Humidity(%)Pressure(in)Visibility(mi)Wind_Direction

,

       Wind_Speed(mph)Precipitation(in)Weather_ConditionAmenity

,

       BumpCrossingGive_WayJunctionNo_ExitRailway

,

       RoundaboutStationStopTraffic_CalmingTraffic_Signal

,

       Turning_LoopSunrise_SunsetCivil_TwilightNautical_Twilight

,

       Astronomical_Twilight

],

      dtype=object

)

CPU times

: total: 17.2 s

Wall time: 17.2 s

数据清洗

首先定义一个用于处理和清理文本的简单函数 clean_text。

我们将使用它从文本行中过滤掉停用词。之后,我们将从句子中删除特殊字符和多余空格。它将是确定「串行」「并行」「批处理」

的处理时间的基线函数。def clean_text(text): 

  # Remove stop words  stops = stopwords.words("english"

)

  text = " ".join([word for word in text.split() if word not in

 stops])

  # Remove Special Characters

  text = text.translate(str.maketrans(, , string.punctuation))

  # removing the extra spaces  text = re.sub( +

,, text)

  return

 text

串行处理

使用 pandas 处理这 280 万条记录,并将结果保存回 “Description”列。%%time

tqdm.pandas()

df[Description] = df[Description

].progress_apply(clean_text)

输出:

「处理 280 万行 需要 9 分 5 秒。」

CPU times

: total: 8 min 14s

Wall time: 9 min 5 s

并行处理

有多种方法可以并行处理文件,我们将了解所有这些方法。

Multiprocessingmultiprocessing 是一个内置的python包,常用于并行处理大文件。 

我们将创建一个具有「8 个 worker 的多处理池,并使用map函数启动进程。并且使用tqdm来显示进度条。」

%%time

p = mp.Pool(n_workers) 

df[Description] = p.map(clean_text,tqdm(df[Description

]))

输出:我们将处理时间缩短了近「3 倍」。处理时间从「9 分 5 秒」下降到「3分51秒」

。100%  2845342/2845342 [02:58<00:00, 135646.12it/s]

CPU times

: user 5.68 s, sys: 1.56 s, total: 7.23 s

Wall time: 3min 51s

Parallel

这里我们将使用 joblib 的 「Parallel」 和 「delayed」 函数。

Parallel 需要两个参数:n_jobs 和 backend 。然后,我们将「clean_text 添加到 delayed」

 函数 中。创建一个循环,一次提供一个值。def text_parallel_clean(array):

  result = Parallel(n_jobs=n_workers,backend="multiprocessing"

)(

  delayed(clean_text)

  (text) 

  for text in

 tqdm(array)

  )

  return

 result

%%time

df[Description] = text_parallel_clean(df[Description

])

输出:

这里比 Multiprocessing 多花了 13 秒。即使那样,并行处理也比串行处理快 4 分 59 秒。

100%  2845342/2845342 [04:03<00:00, 10514.98it/s]

CPU times

: user 44.2 s, sys: 2.92 s, total: 47.1 s

Wall time: 4min 4s

Parallel Batch Processing

有一种更好的方法来处理大文件,方法是将它们分成批次,然后进行并行处理。

批处理函数def proc_batch(batch):

  return

 [

  clean_text(text)

  for text in

 batch

  ]

将文件切割

下面的函数将根据 n_workers 数量将文件分成多个批次。在我们的例子中,我们得到 8 个批次。

def batch_file(array,n_workers):

  file_len = len(array)

  batch_size = round(file_len / n_workers)

  batches = [

  array[ix:ix+batch_size]

  for ix in

 tqdm(range(0, file_len, batch_size))

  ]

  return

 batches

batches = batch_file(df[Description

],n_workers)

运行并行批处理最后,我们将使用「Parallel」「delayed」

来处理批处理。%%time

batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing"

)(

  delayed(proc_batch)

  (batch) 

  for batch in

 tqdm(batches)

  )

df[Description] = [j for i in batch_output for j in

 i]

输出:

「我们改进了处理时间。该技术以处理复杂数据和训练深度学习模型而闻名。」

100%  8/8 [00:00<00:00, 2.19it/s]

CPU times

: user 3.39 s, sys: 1.42 s, total: 4.81 s

Wall time: 3min 56s

tqdm 并发

tqdm 将多处理提升到一个新的水平。它简单而强大。

我会把它推荐给每一位数据科学家。 

%%time

from tqdm.contrib.concurrent import process_map

batch = round(len(df)/n_workers)

df["Description"

] = process_map(

    clean_text, df["Description"

], max_workers=n_workers, chunksize=batch

)

输出:「通过一行代码,我们得到了最好的结果。」

100%  2845342/2845342 [03:48<00:00, 1426320.93it/s]

CPU times

: user 7.32 s, sys: 1.97 s, total: 9.29 s

Wall time: 3min 51s 你需要找到一个平衡点并选择最适合你情况的技术。「它可以是串行处理、并行处理或批处理。如果你使用的是较小、不太复杂的数据集,则并行处理可能适得其反。」

最后

今天的分享就到这里。如果觉得不错,点赞,转发安排起来吧。接下来我们会分享更多的 「深度学习案例以及python相关的技术」,欢迎大家关注。最后,最近新建了一个 python 学习交流群,会经常分享 「python相关学习资料,也可以问问题,非常棒的一个群」

「进群方式:加我微信,备注 “python”」

往期回顾

Fashion-MNIST 服装图片分类-Pytorch实现

python 探索性数据分析(EDA)案例分享

深度学习案例分享 | 房价预测 - PyTorch 实现

万字长文 |  面试高频算法题之动态规划系列

面试高频算法题之回溯算法(全文六千字)  

如果对本文有疑问可以加作者微信直接交流。进技术交流群的可以加微信拉你进群。

在线咨询

点击这里给我发消息售前咨询专员

点击这里给我发消息售后服务专员

在线咨询

免费通话

24h咨询:400-5026888


如您有问题,可以咨询我们的24H咨询电话!

免费通话

微信扫一扫

微信联系
返回顶部