泌陽(yáng)縣網(wǎng)站建設(shè)千鋒教育的官網(wǎng)
鶴壁市浩天電氣有限公司
2026/01/24 14:01:39
泌陽(yáng)縣網(wǎng)站建設(shè),千鋒教育的官網(wǎng),貴州省城鄉(xiāng)住房建設(shè)廳網(wǎng)站,用html做簡(jiǎn)單網(wǎng)站PySpark實(shí)戰(zhàn)#xff1a;用Python玩轉(zhuǎn)Hadoop大數(shù)據(jù)處理
一、引言#xff1a;Python開(kāi)發(fā)者的大數(shù)據(jù)困境與解決方案
1.1 一個(gè)真實(shí)的痛點(diǎn)場(chǎng)景
作為Python開(kāi)發(fā)者#xff0c;你是否遇到過(guò)這樣的問(wèn)題#xff1f;
用Pandas處理1GB的CSV文件很輕松#xff0c;但面對(duì)100GB的用戶(hù)…PySpark實(shí)戰(zhàn)用Python玩轉(zhuǎn)Hadoop大數(shù)據(jù)處理一、引言Python開(kāi)發(fā)者的大數(shù)據(jù)困境與解決方案1.1 一個(gè)真實(shí)的痛點(diǎn)場(chǎng)景作為Python開(kāi)發(fā)者你是否遇到過(guò)這樣的問(wèn)題用Pandas處理1GB的CSV文件很輕松但面對(duì)100GB的用戶(hù)行為日志時(shí)電腦直接“卡死”想做個(gè)簡(jiǎn)單的詞頻統(tǒng)計(jì)循環(huán)遍歷文件的方式跑了3小時(shí)還沒(méi)結(jié)束聽(tīng)說(shuō)Hadoop能處理大數(shù)據(jù)但Java代碼寫(xiě)起來(lái)太麻煩不想放棄Python的易用性。這不是你的問(wèn)題——傳統(tǒng)Python工具如Pandas、NumPy是單機(jī)級(jí)別的無(wú)法應(yīng)對(duì)TB級(jí)以上的分布式數(shù)據(jù)處理需求。而Hadoop生態(tài)雖然強(qiáng)大但陡峭的學(xué)習(xí)曲線(xiàn)Java/Scala讓很多Python開(kāi)發(fā)者望而卻步。1.2 為什么選擇PySparkPySpark的出現(xiàn)完美解決了這個(gè)矛盾底層依托SparkSpark是Hadoop生態(tài)中最快的分布式計(jì)算引擎比MapReduce快100倍支持內(nèi)存計(jì)算、迭代計(jì)算上層用Python封裝保留了Python的簡(jiǎn)潔語(yǔ)法讓開(kāi)發(fā)者用熟悉的def、for、if就能寫(xiě)分布式程序無(wú)縫集成Hadoop直接讀取HDFS中的數(shù)據(jù)依托YARN進(jìn)行資源管理完美融入現(xiàn)有大數(shù)據(jù)架構(gòu)。簡(jiǎn)單來(lái)說(shuō)PySpark就是**“Python開(kāi)發(fā)者的大數(shù)據(jù)瑞士軍刀”**——用你最熟悉的語(yǔ)言處理最龐大的數(shù)據(jù)。1.3 本文能給你帶來(lái)什么讀完本文你將掌握PySpark的核心概念RDD、DataFrame、Dataset從數(shù)據(jù)讀取到存儲(chǔ)的完整分布式處理流程電商用戶(hù)行為分析的實(shí)戰(zhàn)案例附可運(yùn)行代碼10個(gè)PySpark性能優(yōu)化技巧解決數(shù)據(jù)傾斜、提升運(yùn)行速度。無(wú)論你是Python新手還是有經(jīng)驗(yàn)的開(kāi)發(fā)者都能從本文中找到適合自己的學(xué)習(xí)路徑。二、PySpark基礎(chǔ)從0到1搭建環(huán)境2.1 先決條件在開(kāi)始之前你需要準(zhǔn)備硬件至少2臺(tái)電腦或虛擬機(jī)組成的Hadoop集群?jiǎn)喂?jié)點(diǎn)也可測(cè)試但無(wú)法體驗(yàn)分布式軟件Hadoop 3.x安裝教程參考官方文檔Spark 3.x選擇“Pre-built for Apache Hadoop”版本下載地址Python 3.7推薦用Anaconda管理環(huán)境知識(shí)Python基礎(chǔ)函數(shù)、列表推導(dǎo)式、Hadoop基本概念HDFS、MapReduce。2.2 安裝與配置PySpark安裝PySparkpipinstallpyspark配置環(huán)境變量以Linux為例exportSPARK_HOME/opt/spark-3.3.0exportPATH$PATH:$SPARK_HOME/binexportPYSPARK_PYTHONpython3# 指定Python解釋器驗(yàn)證安裝運(yùn)行pyspark命令若出現(xiàn)以下界面則表示安裝成功Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ / __/ _/ /__ / .__/\_,_/_/ /_/\_ version 3.3.0 /_/ Using Python version 3.8.10 (default, Jun 22 2022 20:18:18)2.3 第一個(gè)PySpark程序Word Count讓我們用經(jīng)典的“詞頻統(tǒng)計(jì)”來(lái)入門(mén)PySpark。假設(shè)我們有一個(gè)文本文件data.txt內(nèi)容如下Hello PySpark Hello Hadoop PySpark is easy步驟1初始化SparkSessionSparkSession是PySpark的入口負(fù)責(zé)管理集群資源frompyspark.sqlimportSparkSession# 初始化SparkSessionsparkSparkSession.builder .appName(WordCount).master(local[*])# 本地運(yùn)行使用所有CPU核心.getOrCreate()步驟2讀取數(shù)據(jù)用spark.read.text()讀取文本文件返回一個(gè)DataFramedfspark.read.text(data.txt)df.show()# 顯示前5行數(shù)據(jù)輸出------------------ | value| ------------------ | Hello PySpark| | Hello Hadoop| |PySpark is easy| ------------------步驟3處理數(shù)據(jù)用split()分割單詞explode()將數(shù)組展開(kāi)為多行再groupBy()統(tǒng)計(jì)詞頻frompyspark.sql.functionsimportsplit,explode,count# 分割單詞將每一行拆分為單詞數(shù)組words_dfdf.select(explode(split(df.value, )).alias(word))# 統(tǒng)計(jì)詞頻按單詞分組計(jì)數(shù)word_count_dfwords_df.groupBy(word).agg(count(*).alias(count))# 排序并顯示結(jié)果word_count_df.orderBy(count,ascendingFalse).show()步驟4輸出結(jié)果------------ | word|count| ------------ | Hello| 2| |PySpark| 2| | Hadoop| 1| | is| 1| | easy| 1| ------------步驟5停止SparkSessionspark.stop()2.4 關(guān)鍵概念解釋SparkSessionPySpark 2.0的核心入口替代了舊版的SparkContext和SQLContextDataFrame結(jié)構(gòu)化數(shù)據(jù)的分布式集合類(lèi)似Excel表格包含行和列支持SQL查詢(xún)Transformation轉(zhuǎn)換操作如split、groupBy延遲執(zhí)行Lazy Evaluation直到遇到Action才會(huì)運(yùn)行Action動(dòng)作操作如show、count觸發(fā)實(shí)際的計(jì)算。三、PySpark核心組件RDD、DataFrame、Dataset3.1 三者的關(guān)系PySpark有三個(gè)核心數(shù)據(jù)結(jié)構(gòu)它們的關(guān)系可以用一句話(huà)概括RDD是底層基礎(chǔ)DataFrame是結(jié)構(gòu)化升級(jí)Dataset是類(lèi)型安全的擴(kuò)展Python中不常用因?yàn)镻ython是動(dòng)態(tài)類(lèi)型。特性RDDDataFrameDatasetScala/Java數(shù)據(jù)結(jié)構(gòu)無(wú)結(jié)構(gòu)任意對(duì)象結(jié)構(gòu)化列名類(lèi)型結(jié)構(gòu)化類(lèi)型安全性能低無(wú)優(yōu)化高Catalyst優(yōu)化器高類(lèi)型檢查易用性低需要手動(dòng)優(yōu)化高SQL-like語(yǔ)法中需要定義樣例類(lèi)適用場(chǎng)景復(fù)雜數(shù)據(jù)處理如機(jī)器學(xué)習(xí)特征工程結(jié)構(gòu)化數(shù)據(jù)處理如日志分析強(qiáng)類(lèi)型需求如金融數(shù)據(jù)3.2 RDD分布式彈性數(shù)據(jù)集RDDResilient Distributed Dataset是Spark的核心抽象代表分布式、不可變、可分區(qū)的數(shù)據(jù)集。3.2.1 RDD的創(chuàng)建從本地集合創(chuàng)建sc.parallelize([1,2,3,4])從文件系統(tǒng)創(chuàng)建sc.textFile(hdfs://node1:9000/data.txt)從其他RDD轉(zhuǎn)換rdd.map(lambda x: x*2)。3.2.2 RDD的操作Transformationmap映射、filter過(guò)濾、reduceByKey按鍵歸約Actioncollect收集所有數(shù)據(jù)到Driver、count計(jì)數(shù)、saveAsTextFile保存到文件。示例用RDD實(shí)現(xiàn)Word CountfrompysparkimportSparkContext scSparkContext(appNameWordCountRDD)# 讀取文件為RDDrddsc.textFile(data.txt)# 處理數(shù)據(jù)分割單詞→過(guò)濾空值→統(tǒng)計(jì)詞頻word_count_rddrdd.flatMap(lambdaline:line.split( )).filter(lambdaword:word!).map(lambdaword:(word,1)).reduceByKey(lambdaa,b:ab)# 輸出結(jié)果print(word_count_rdd.collect())# [(Hello, 2), (PySpark, 2), (Hadoop, 1), (is, 1), (easy, 1)]sc.stop()3.3 DataFrame結(jié)構(gòu)化數(shù)據(jù)的“超級(jí)表格”DataFrame是Spark 1.3引入的結(jié)構(gòu)化數(shù)據(jù)抽象相當(dāng)于分布式的Pandas DataFrame但性能更強(qiáng)支持內(nèi)存緩存、查詢(xún)優(yōu)化。3.3.1 DataFrame的創(chuàng)建從RDD轉(zhuǎn)換rdd.toDF([column1, column2])從文件讀取spark.read.csv(data.csv, headerTrue)從數(shù)據(jù)庫(kù)讀取spark.read.jdbc(url, table, properties)。3.3.2 DataFrame的操作SQL風(fēng)格df.select(name, age).filter(df.age 18)方法鏈風(fēng)格df.groupBy(gender).agg({salary: avg})混合風(fēng)格df.createOrReplaceTempView(user)然后用spark.sql(SELECT * FROM user WHERE age 18)。示例用DataFrame處理用戶(hù)數(shù)據(jù)假設(shè)我們有一個(gè)users.csv文件name,age,gender,salary Alice,25,Female,5000 Bob,30,Male,8000 Charlie,28,Male,6000 David,35,Male,10000 Eve,22,Female,4000frompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportavg,maxsparkSparkSession.builder.appName(UserAnalysis).getOrCreate()# 讀取CSV文件帶表頭dfspark.read.csv(users.csv,headerTrue,inferSchemaTrue)# inferSchema自動(dòng)推斷列類(lèi)型df.show()輸出---------------------- | name|age|gender|salary| ---------------------- | Alice| 25|Female| 5000| | Bob| 30| Male| 8000| |Charlie| 28| Male| 6000| | David| 35| Male|10000| | Eve| 22|Female| 4000| ----------------------統(tǒng)計(jì)分析# 計(jì)算不同性別的平均工資和最高工資gender_salary_dfdf.groupBy(gender).agg(avg(salary).alias(avg_salary),max(salary).alias(max_salary))gender_salary_df.show()輸出-------------------------- |gender|avg_salary|max_salary| -------------------------- |Female| 4500.0| 5000| | Male| 8000.0| 10000| --------------------------3.4 如何選擇如果你需要處理結(jié)構(gòu)化數(shù)據(jù)如CSV、JSON、數(shù)據(jù)庫(kù)表優(yōu)先用DataFrame如果你需要處理非結(jié)構(gòu)化數(shù)據(jù)如文本、圖像或復(fù)雜的轉(zhuǎn)換邏輯如機(jī)器學(xué)習(xí)中的特征工程用RDD如果你是Python開(kāi)發(fā)者幾乎不用考慮Dataset因?yàn)镻ython不支持類(lèi)型檢查。四、PySpark實(shí)戰(zhàn)電商用戶(hù)行為分析4.1 項(xiàng)目背景假設(shè)你是某電商平臺(tái)的大數(shù)據(jù)分析師需要分析用戶(hù)的行為數(shù)據(jù)如點(diǎn)擊、收藏、購(gòu)買(mǎi)以?xún)?yōu)化推薦系統(tǒng)。數(shù)據(jù)存儲(chǔ)在HDFS上格式為JSON每天產(chǎn)生100GB包含以下字段user_id用戶(hù)IDitem_id商品IDaction行為類(lèi)型click/collect/buytimestamp時(shí)間戳。4.2 目標(biāo)統(tǒng)計(jì)每日Top 10熱門(mén)商品點(diǎn)擊量最多計(jì)算用戶(hù)轉(zhuǎn)化率從點(diǎn)擊到購(gòu)買(mǎi)的比例分析用戶(hù)行為的時(shí)間分布哪個(gè)時(shí)間段最活躍。4.3 數(shù)據(jù)準(zhǔn)備上傳數(shù)據(jù)到HDFShdfs dfs -mkdir /user/behavior hdfs dfs -put user_behavior.json /user/behavior/查看數(shù)據(jù)hdfs dfs -cat /user/behavior/user_behavior.json|head-n2輸出{user_id:1001,item_id:2001,action:click,timestamp:1678900000}{user_id:1001,item_id:2002,action:collect,timestamp:1678900100}4.4 代碼實(shí)現(xiàn)4.4.1 步驟1初始化SparkSessionfrompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimportcol,count,window,from_unixtime,expr sparkSparkSession.builder .appName(UserBehaviorAnalysis).master(yarn)# 提交到Y(jié)ARN集群運(yùn)行.config(spark.executor.memory,4g)# 每個(gè) executor 分配4GB內(nèi)存.config(spark.executor.cores,2)# 每個(gè) executor 分配2個(gè)核心.getOrCreate()4.4.2 步驟2讀取HDFS中的JSON數(shù)據(jù)# 讀取JSON文件自動(dòng)推斷 schemadfspark.read.json(hdfs://node1:9000/user/behavior/user_behavior.json)# 查看 schemadf.printSchema()輸出root |-- action: string (nullable true) |-- item_id: string (nullable true) |-- timestamp: long (nullable true) |-- user_id: string (nullable true)4.4.3 步驟3數(shù)據(jù)清洗過(guò)濾無(wú)效數(shù)據(jù)action為空或timestamp為0將時(shí)間戳轉(zhuǎn)換為可讀格式y(tǒng)yyy-MM-dd HH:mm:ss。# 過(guò)濾無(wú)效數(shù)據(jù)cleaned_dfdf.filter(col(action).isNotNull()(col(timestamp)!0))# 轉(zhuǎn)換時(shí)間戳from_unixtime將 Unix 時(shí)間戳轉(zhuǎn)換為字符串to_timestamp轉(zhuǎn)換為時(shí)間類(lèi)型cleaned_dfcleaned_df.withColumn(datetime,from_unixtime(col(timestamp),yyyy-MM-dd HH:mm:ss)).withColumn(datetime,col(datetime).cast(timestamp))# 轉(zhuǎn)換為 timestamp 類(lèi)型cleaned_df.show(5)輸出------------------------------------------------- |action|item_id| timestamp|user_id| datetime| ------------------------------------------------- | click| 2001|1678900000| 1001|2023-03-15 10:26:40| |collect| 2002|1678900100| 1001|2023-03-15 10:28:20| | buy| 2003|1678900200| 1002|2023-03-15 10:30:00| | click| 2004|1678900300| 1003|2023-03-15 10:31:40| |collect| 2005|1678900400| 1003|2023-03-15 10:33:20| -------------------------------------------------4.4.4 步驟4統(tǒng)計(jì)Top 10熱門(mén)商品點(diǎn)擊量# 過(guò)濾點(diǎn)擊行為click_dfcleaned_df.filter(col(action)click)# 按商品ID分組統(tǒng)計(jì)點(diǎn)擊量item_click_dfclick_df.groupBy(item_id).agg(count(*).alias(click_count)).orderBy(col(click_count).desc()).limit(10)# 取前10item_click_df.show()輸出------------------ |item_id|click_count| ------------------ | 2010| 1234| | 2005| 987| | 2008| 856| | 2002| 745| | 2001| 634| | 2003| 523| | 2007| 412| | 2004| 301| | 2006| 290| | 2009| 180| ------------------4.4.5 步驟5計(jì)算用戶(hù)轉(zhuǎn)化率點(diǎn)擊→購(gòu)買(mǎi)轉(zhuǎn)化率公式購(gòu)買(mǎi)用戶(hù)數(shù) / 點(diǎn)擊用戶(hù)數(shù) × 100%# 統(tǒng)計(jì)點(diǎn)擊用戶(hù)數(shù)去重click_usersclick_df.select(user_id).distinct().count()# 統(tǒng)計(jì)購(gòu)買(mǎi)用戶(hù)數(shù)去重buy_dfcleaned_df.filter(col(action)buy)buy_usersbuy_df.select(user_id).distinct().count()# 計(jì)算轉(zhuǎn)化率conversion_rate(buy_users/click_users)*100ifclick_users!0else0print(f用戶(hù)轉(zhuǎn)化率{conversion_rate:.2f}%)輸出用戶(hù)轉(zhuǎn)化率15.67%4.4.6 步驟6分析用戶(hù)行為時(shí)間分布用window函數(shù)按小時(shí)分組統(tǒng)計(jì)每個(gè)小時(shí)的行為次數(shù)。# 按小時(shí)窗口分組統(tǒng)計(jì)行為次數(shù)time_distribution_dfcleaned_df.groupBy(window(col(datetime),1 hour)).agg(count(*).alias(action_count)).orderBy(col(window.start))# 按時(shí)間排序# 提取窗口的開(kāi)始時(shí)間簡(jiǎn)化輸出time_distribution_dftime_distribution_df.withColumn(hour,expr(date_format(window.start, HH:mm))).select(hour,action_count)time_distribution_df.show(5)輸出------------------- | hour|action_count| ------------------- |00:00| 123| |01:00| 89| |02:00| 56| |03:00| 34| |04:00| 21| -------------------4.4.7 步驟7保存結(jié)果到HDFS將Top 10熱門(mén)商品和時(shí)間分布結(jié)果保存為Parquet格式比CSV更高效支持壓縮。# 保存Top 10熱門(mén)商品item_click_df.write.parquet(hdfs://node1:9000/user/behavior/top10_items.parquet,modeoverwrite)# 保存時(shí)間分布time_distribution_df.write.parquet(hdfs://node1:9000/user/behavior/time_distribution.parquet,modeoverwrite)4.5 結(jié)果可視化可選用Matplotlib將時(shí)間分布結(jié)果可視化importmatplotlib.pyplotasplt# 將DataFrame轉(zhuǎn)換為Pandas注意數(shù)據(jù)量小的時(shí)候用否則會(huì)OOMpdftime_distribution_df.toPandas()# 繪制柱狀圖plt.figure(figsize(12,6))plt.bar(pdf[hour],pdf[action_count])plt.xlabel(Hour of Day)plt.ylabel(Action Count)plt.title(User Behavior Time Distribution)plt.xticks(rotation45)plt.show()注圖片顯示晚8點(diǎn)到10點(diǎn)是用戶(hù)最活躍的時(shí)間段4.6 經(jīng)驗(yàn)總結(jié)數(shù)據(jù)清洗是關(guān)鍵原始數(shù)據(jù)中可能有大量無(wú)效值必須先過(guò)濾否則會(huì)影響分析結(jié)果使用Parquet格式Parquet是列式存儲(chǔ)支持壓縮如Snappy比CSV節(jié)省存儲(chǔ)空間且查詢(xún)更快合理分配資源通過(guò)spark.executor.memory和spark.executor.cores調(diào)整 executor 的資源避免內(nèi)存不足或資源浪費(fèi)。五、PySpark性能優(yōu)化10個(gè)必知技巧5.1 避免使用collect()用take()或show()替代collect()會(huì)將所有數(shù)據(jù)從集群拉取到Driver節(jié)點(diǎn)容易導(dǎo)致OOM內(nèi)存溢出。如果需要查看部分?jǐn)?shù)據(jù)用take(10)取前10條或show(10)顯示前10條。5.2 使用persist()或cache()緩存數(shù)據(jù)如果某個(gè)RDD或DataFrame需要多次使用如多次查詢(xún)用persist()或cache()將其緩存到內(nèi)存中避免重復(fù)計(jì)算。# 緩存DataFrame到內(nèi)存默認(rèn)是MEMORY_ONLYdf.persist()# 或者用cache()等價(jià)于persist(MEMORY_ONLY)df.cache()5.3 選擇合適的分區(qū)數(shù)分區(qū)數(shù)太少會(huì)導(dǎo)致每個(gè)任務(wù)處理的數(shù)據(jù)量太大運(yùn)行慢分區(qū)數(shù)太多會(huì)導(dǎo)致任務(wù)調(diào)度開(kāi)銷(xiāo)大運(yùn)行慢。推薦公式分區(qū)數(shù) 集群總核心數(shù) × 2 ~ 4# 重新分區(qū)會(huì)觸發(fā)shuffledfdf.repartition(100)# 設(shè)置為100個(gè)分區(qū)# 合并分區(qū)不會(huì)觸發(fā)shuffledfdf.coalesce(50)# 合并為50個(gè)分區(qū)5.4 避免數(shù)據(jù)傾斜數(shù)據(jù)傾斜是指某個(gè)分區(qū)的數(shù)據(jù)量遠(yuǎn)大于其他分區(qū)導(dǎo)致該分區(qū)的任務(wù)運(yùn)行很慢。常見(jiàn)解決方法加鹽法給傾斜的鍵加隨機(jī)前綴將其分散到多個(gè)分區(qū)過(guò)濾法過(guò)濾掉傾斜的鍵如果不影響分析結(jié)果自定義分區(qū)器根據(jù)數(shù)據(jù)分布自定義分區(qū)邏輯。示例加鹽法解決數(shù)據(jù)傾斜假設(shè)item_id2010的數(shù)據(jù)量很大導(dǎo)致傾斜frompyspark.sql.functionsimportrand,concat,lit# 給item_id加隨機(jī)前綴0-9salted_dfclick_df.withColumn(salt,concat(col(item_id),lit(_),rand(seed42).cast(int)))# 按salt分區(qū)統(tǒng)計(jì)然后去掉前綴result_dfsalted_df.groupBy(salt).agg(count(*).alias(click_count)).withColumn(item_id,split(col(salt),_).getItem(0)).groupBy(item_id).agg(sum(click_count).alias(click_count)).orderBy(col(click_count).desc())5.5 使用廣播變量Broadcast Variable當(dāng)需要將一個(gè)大的只讀變量如字典、 lookup表傳遞給每個(gè)任務(wù)時(shí)用廣播變量可以減少網(wǎng)絡(luò)傳輸只傳輸一次到每個(gè) executor而不是每個(gè)任務(wù)。frompyspark.sql.functionsimportbroadcast# 加載大的 lookup 表如商品分類(lèi)item_category_dfspark.read.csv(item_category.csv,headerTrue)# 廣播 lookup 表減少join時(shí)的數(shù)據(jù)傳輸broadcast_dfbroadcast(item_category_df)# 關(guān)聯(lián)用戶(hù)行為數(shù)據(jù)和商品分類(lèi)joined_dfclick_df.join(broadcast_df,onitem_id,howleft)5.6 使用累加器Accumulator累加器是一種分布式的變量用于高效統(tǒng)計(jì)如計(jì)數(shù)、求和。與reduce()相比累加器不需要將數(shù)據(jù)拉取到Driver節(jié)點(diǎn)性能更好。# 初始化累加器計(jì)數(shù)無(wú)效數(shù)據(jù)invalid_countspark.sparkContext.accumulator(0)# 定義函數(shù)如果action為空累加器加1defcount_invalid(row):ifrow.actionisNone:invalid_count.add(1)returnrow# 應(yīng)用函數(shù)map操作cleaned_dfdf.rdd.map(count_invalid).toDF()# 輸出無(wú)效數(shù)據(jù)量print(f無(wú)效數(shù)據(jù)量{invalid_count.value})5.7 使用DataFrame替代RDDDataFrame的Catalyst優(yōu)化器會(huì)自動(dòng)優(yōu)化查詢(xún)計(jì)劃如 predicate pushdown、column pruning比RDD更高效。例如過(guò)濾操作會(huì)下推到數(shù)據(jù)源如HDFS減少讀取的數(shù)據(jù)量。5.8 避免shuffle操作shuffle是Spark中最昂貴的操作需要在節(jié)點(diǎn)間傳輸數(shù)據(jù)盡量避免或減少。常見(jiàn)的shuffle操作有g(shù)roupByKey、reduceByKey、join、repartition。優(yōu)化方法用reduceByKey替代groupByKeyreduceByKey會(huì)在map端先合并減少shuffle的數(shù)據(jù)量用broadcast join替代shuffle join當(dāng)其中一個(gè)表很小的時(shí)候。5.9 調(diào)整spark.sql.shuffle.partitionsspark.sql.shuffle.partitions是控制shuffle操作分區(qū)數(shù)的參數(shù)默認(rèn)是200。如果數(shù)據(jù)量很大需要增大該參數(shù)如設(shè)置為1000避免每個(gè)分區(qū)的數(shù)據(jù)量太大。# 在SparkSession中設(shè)置sparkSparkSession.builder .appName(OptimizationExample).config(spark.sql.shuffle.partitions,1000).getOrCreate()5.10 使用Tungsten執(zhí)行引擎Tungsten是Spark的執(zhí)行引擎支持內(nèi)存管理和代碼生成Code Generation比傳統(tǒng)引擎更高效。PySpark默認(rèn)啟用Tungsten不需要額外配置。六、結(jié)論P(yáng)ySpark是Python開(kāi)發(fā)者的大數(shù)據(jù)通行證6.1 核心要點(diǎn)總結(jié)PySpark是Python與Spark的結(jié)合讓Python開(kāi)發(fā)者能處理TB級(jí)以上的大數(shù)據(jù)核心數(shù)據(jù)結(jié)構(gòu)RDD底層基礎(chǔ)、DataFrame結(jié)構(gòu)化數(shù)據(jù)首選實(shí)戰(zhàn)流程數(shù)據(jù)讀取→清洗→轉(zhuǎn)換→分析→保存性能優(yōu)化避免collect()、使用緩存、解決數(shù)據(jù)傾斜、減少shuffle。6.2 為什么PySpark值得學(xué)習(xí)市場(chǎng)需求大大數(shù)據(jù)工程師是當(dāng)前最熱門(mén)的崗位之一PySpark是必備技能易用性高用Python寫(xiě)分布式程序比Java/Scala更簡(jiǎn)單生態(tài)完善無(wú)縫集成Hadoop、Hive、HBase等組件覆蓋大數(shù)據(jù)處理的全流程。6.3 行動(dòng)號(hào)召下載Spark搭建本地環(huán)境運(yùn)行本文中的Word Count示例找一個(gè)公開(kāi)的大數(shù)據(jù)集如Kaggle的電商數(shù)據(jù)用PySpark做分析在評(píng)論區(qū)分享你的PySpark學(xué)習(xí)經(jīng)驗(yàn)或提出問(wèn)題我們一起討論。6.4 未來(lái)展望PySpark的未來(lái)發(fā)展方向?qū)崟r(shí)處理結(jié)合Spark Streaming或Structured Streaming處理實(shí)時(shí)數(shù)據(jù)如直播彈幕、物聯(lián)網(wǎng)傳感器數(shù)據(jù)機(jī)器學(xué)習(xí)用MLlib庫(kù)做分布式機(jī)器學(xué)習(xí)如分類(lèi)、聚類(lèi)、推薦系統(tǒng)云原生與AWS、Azure、阿里云等云平臺(tái)深度集成支持Serverless Spark如AWS Glue、Databricks。七、附加部分7.1 參考文獻(xiàn)《Spark快速大數(shù)據(jù)分析》第2版Spark官方推薦書(shū)籍詳細(xì)講解Spark的核心概念和實(shí)戰(zhàn)Spark官方文檔https://spark.apache.org/docs/latest/PySpark官方文檔https://spark.apache.org/docs/latest/api/python/。7.2 致謝感謝Apache Spark社區(qū)的開(kāi)發(fā)者他們用Python封裝了Spark的強(qiáng)大功能讓我們能更輕松地處理大數(shù)據(jù)感謝我的同事們他們?cè)趯?shí)戰(zhàn)中給了我很多寶貴的建議。7.3 作者簡(jiǎn)介我是張三一名資深大數(shù)據(jù)工程師擁有5年Spark開(kāi)發(fā)經(jīng)驗(yàn)擅長(zhǎng)用PySpark處理電商、金融等領(lǐng)域的大數(shù)據(jù)。我的博客專(zhuān)注于大數(shù)據(jù)技術(shù)分享歡迎關(guān)注我的公眾號(hào)“大數(shù)據(jù)筆記”獲取更多實(shí)戰(zhàn)教程。附錄本文代碼倉(cāng)庫(kù)https://github.com/zhangsan/pyspark-tutorial包含所有示例代碼和數(shù)據(jù)聲明本文為原創(chuàng)內(nèi)容未經(jīng)許可不得轉(zhuǎn)載。如需引用請(qǐng)注明出處。