最開(kāi)始接觸Spark是我剛來(lái)摩拜實(shí)習(xí)的時(shí)候,組里有一個(gè)架構(gòu)師(ccmeng1886)為了找工作把Spark的源碼通讀了三遍ORZ,還一直給我們灌輸學(xué)好Spark就能拿高工資的思想。正好年末不是很忙,就接了一個(gè)非常簡(jiǎn)單的項(xiàng)目,順便學(xué)習(xí)了一下Spark,全程用Pyspark實(shí)現(xiàn)了一下,感覺(jué)非常地爽。
為什么標(biāo)題叫《Spark基礎(chǔ)性能優(yōu)化》,因?yàn)楸疚膬H僅梳理最基本的Spark性能優(yōu)化方案,對(duì)于較深層次細(xì)節(jié)調(diào)優(yōu)暫時(shí)還沒(méi)有精力去研究。對(duì)于算法工程師來(lái)說(shuō),能夠遵循一些最基本的調(diào)優(yōu)原則,不求甚解這就夠了。
偶然間發(fā)現(xiàn)美團(tuán)點(diǎn)評(píng)技術(shù)團(tuán)隊(duì)的博客(首頁(yè) - 美團(tuán)點(diǎn)評(píng)技術(shù)團(tuán)隊(duì))已經(jīng)有人分享過(guò)Spark性能優(yōu)化指南,解決了我很多疑惑,避免遺忘對(duì)它進(jìn)行整理(發(fā)現(xiàn)美團(tuán)點(diǎn)評(píng)技術(shù)團(tuán)隊(duì)的博客寫(xiě)得真是非常地接地氣,尤其來(lái)說(shuō),簡(jiǎn)直是無(wú)法多得的寶藏),總的來(lái)說(shuō)優(yōu)化方案主要分為開(kāi)發(fā)調(diào)優(yōu)、資源調(diào)優(yōu)、數(shù)據(jù)傾斜調(diào)優(yōu)、shuffle調(diào)優(yōu)四個(gè)部分。
在項(xiàng)目的開(kāi)發(fā)中,最開(kāi)始我把所有任務(wù)丟在一個(gè)進(jìn)程里,需要花兩個(gè)多小時(shí)才能跑完;后來(lái)在Hive建了一張中間表,把預(yù)處理的數(shù)據(jù)存到Hive里,再?gòu)腍ive讀取數(shù)據(jù)做預(yù)測(cè),這樣只需要20min就能跑完,性能大約提高了6倍。后來(lái)想了一下,我的腳本之所以執(zhí)行得慢有兩個(gè)重要的原因:
使用了太多shuffle類(lèi)算子,如join、group by等;沒(méi)有把預(yù)處理的數(shù)據(jù)緩存到內(nèi)存中,導(dǎo)致后面的action算子都會(huì)重新計(jì)算那個(gè)RDD。這就是為什么我僅僅建了一張中間表就使速度提高了6倍,其實(shí)有更好的做法,只要cache()一下就好了。開(kāi)發(fā)調(diào)優(yōu)
最基本的Spark性能優(yōu)化,就是要優(yōu)化你的代碼。Spark中rdd內(nèi)部的轉(zhuǎn)換關(guān)系是一個(gè)DAG(有向無(wú)環(huán)圖),只有出發(fā)了action 算子才開(kāi)始計(jì)算。開(kāi)始可以畫(huà)出計(jì)算pipeline,寫(xiě)得多了腦子自然會(huì)形成計(jì)算的pipeline,在開(kāi)發(fā)過(guò)程中,時(shí)時(shí)刻刻都要注意一些性能優(yōu)化的基本原則。
原則一:避免創(chuàng)建重復(fù)的RDD,盡可能復(fù)用同一個(gè)RDD
對(duì)于同一份數(shù)據(jù)不要?jiǎng)?chuàng)建多個(gè)RDD,對(duì)不同的數(shù)據(jù)執(zhí)行算子操作時(shí)要盡可能地復(fù)用一個(gè)RDD。
原則二:對(duì)多次使用的RDD進(jìn)行持久化
前面已經(jīng)提到Spark中rdd內(nèi)部的轉(zhuǎn)換關(guān)系是一個(gè)DAG,因此對(duì)于一個(gè)RDD執(zhí)行多次算子時(shí),都會(huì)重新從源頭處計(jì)算一遍,這種方式的性能是很差的。如下圖所示,其中D和E代表action算子,在計(jì)算D和E時(shí)要分別從A開(kāi)始計(jì)算。
有向無(wú)環(huán)圖(DAG)最好的方法就是對(duì)C進(jìn)行持久化,此時(shí)Spark就會(huì)將數(shù)據(jù)保存到內(nèi)存或者磁盤(pán)中,以后每次對(duì)C這個(gè)RDD進(jìn)行算子操作時(shí),都會(huì)直接從內(nèi)存或磁盤(pán)中提取持久化的RDD數(shù)據(jù),不會(huì)從源頭處重新計(jì)算一遍。
原則三:盡量避免使用shuffle類(lèi)算子
Spark作業(yè)運(yùn)行過(guò)程中,最消耗性能的地方就是shuffle過(guò)程。shuffle過(guò)程就是將分布在集群中多個(gè)節(jié)點(diǎn)上的同一個(gè)key,拉取到同一個(gè)節(jié)點(diǎn)上,進(jìn)行g(shù)roupby或join等操作,reduceByKey、join、distinct、repartition等都屬于shuffle算子。
至于什么是shuffle,引用Spark核心設(shè)計(jì)思想的經(jīng)典論文“Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing”可以解釋。
narrow依賴(lài)和wide依賴(lài)的例子。圖中每個(gè)無(wú)陰影藍(lán)色方框(大方框)表示一個(gè)RDD,有陰影的藍(lán)色方框(小方框)表示RDD分區(qū)RDD就是一個(gè)不可變的帶分區(qū)的記錄 ** ,Spark提供了RDD上的兩類(lèi)操作,轉(zhuǎn)換和動(dòng)作。轉(zhuǎn)換是用來(lái)定義一個(gè)新的RDD,包括 ** p, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, ** pValues等,動(dòng)作是返回一個(gè)結(jié)果,包括collect, reduce, count, save, lookupKey。
shuffle顧名思義就是被打散,是否被shuffle就看計(jì)算后對(duì)應(yīng)多少分區(qū),那么:
如果一個(gè)RDD的依賴(lài)的每個(gè)分區(qū)只依賴(lài)另一個(gè)RDD的同一個(gè)分區(qū),就是narow,如圖上的 ** p、filter、union等,這樣就不需要進(jìn)行shuffle,同時(shí)還可以按照流水線的方式,把一個(gè)分區(qū)上的多個(gè)操作放在一個(gè)Task里進(jìn)行。如果一個(gè)RDD的每個(gè)分區(qū)需要依賴(lài)另一個(gè)RDD的所有分區(qū),就是wide,如圖上的groupbykey,這樣的依賴(lài)需要進(jìn)行shuffle,運(yùn)算量倍增。原則四:使用預(yù)聚合的shuffle操作
如果有些時(shí)候?qū)嵲跓o(wú)法避免使用shuffle操作,那么盡量使用可以預(yù)聚合的算子。預(yù)聚合就是在每個(gè)節(jié)點(diǎn)本地對(duì)相同的key進(jìn)行一次聚合操作,多條相同的key被聚合起來(lái)后,那么其他節(jié)點(diǎn)再拉取所有節(jié)點(diǎn)上的相同key時(shí),就會(huì)大大減少磁盤(pán)IO以及網(wǎng)絡(luò)傳輸開(kāi)銷(xiāo)。下圖所示,每個(gè)節(jié)點(diǎn)本地首先對(duì)于相同key進(jìn)行了聚合。
預(yù)聚合的shuffle操作原則五:使用高性能的算子
除了shuffle相關(guān)的算子有優(yōu)化原則之外,其他的算子也都有著相應(yīng)的優(yōu)化原則,不一一陳述。
資源調(diào)優(yōu)
在spark-submit時(shí)可以為作業(yè)配置合適的資源,理論上來(lái)說(shuō)資源給的越多任務(wù)執(zhí)行得越快,但集群又不是你家的,侵占太多資源可能會(huì)被kill掉。摩拜的Spark部署在集群上,公司內(nèi)部共享這些資源,Spark的SparkContext先把任務(wù)提交到Y(jié)ARN上,再由Application Master創(chuàng)建應(yīng)用程序,然后為它向Resource Manager申請(qǐng)資源,并啟動(dòng)Executor來(lái)運(yùn)行任務(wù)集,同時(shí)監(jiān)控它的整個(gè)運(yùn)行過(guò)程,直到運(yùn)行完成。
Yarn-Cluster運(yùn)行模式執(zhí)行過(guò)程資源相關(guān)的參數(shù)
num-executors
參數(shù)說(shuō)明:該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個(gè)Executor進(jìn)程來(lái)執(zhí)行。Driver在向YARN集群管理器申請(qǐng)資源時(shí),YARN集群管理器會(huì)盡可能按照你的設(shè)置來(lái)在集群的各個(gè)工作節(jié)點(diǎn)上,啟動(dòng)相應(yīng)數(shù)量的Executor進(jìn)程。這個(gè)參數(shù)非常之重要,如果不設(shè)置的話,默認(rèn)只會(huì)給你啟動(dòng)少量的Executor進(jìn)程,此時(shí)你的Spark作業(yè)的運(yùn)行速度是非常慢的。參數(shù)調(diào)優(yōu)建議:每個(gè)Spark作業(yè)的運(yùn)行一般設(shè)置50~100個(gè)左右的Executor進(jìn)程比較合適,設(shè)置太少或太多的Executor進(jìn)程都不好。設(shè)置的太少,無(wú)法充分利用集群資源;設(shè)置的太多的話,大部分隊(duì)列可能無(wú)法給予充分的資源。executor-memory
參數(shù)說(shuō)明:該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的內(nèi)存。Executor內(nèi)存的大小,很多時(shí)候直接決定了Spark作業(yè)的性能,而且跟常見(jiàn)的JVM OOM異常,也有直接的關(guān)聯(lián)。參數(shù)調(diào)優(yōu)建議:每個(gè)Executor進(jìn)程的內(nèi)存設(shè)置4G~8G較為合適。但是這只是一個(gè)參考值,具體的設(shè)置還是得根據(jù)不同部門(mén)的資源隊(duì)列來(lái)定??梢钥纯醋约簣F(tuán)隊(duì)的資源隊(duì)列的最大內(nèi)存限制是多少,num-executors乘以executor-memory,是不能超過(guò)隊(duì)列的最大內(nèi)存量的。此外,如果你是跟團(tuán)隊(duì)里其他人共享這個(gè)資源隊(duì)列,那么申請(qǐng)的內(nèi)存量最好不要超過(guò)資源隊(duì)列最大總內(nèi)存的1/3~1/2,避免你自己的Spark作業(yè)占用了隊(duì)列所有的資源,導(dǎo)致別的同學(xué)的作業(yè)無(wú)法運(yùn)行。executor-cores
參數(shù)說(shuō)明:該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的CPU core數(shù)量。這個(gè)參數(shù)決定了每個(gè)Executor進(jìn)程并行執(zhí)行task線程的能力。因?yàn)槊總€(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)task線程,因此每個(gè)Executor進(jìn)程的CPU core數(shù)量越多,越能夠快速地執(zhí)行完分配給自己的所有task線程。參數(shù)調(diào)優(yōu)建議:Executor的CPU core數(shù)量設(shè)置為2~4個(gè)較為合適。同樣得根據(jù)不同部門(mén)的資源隊(duì)列來(lái)定,可以看看自己的資源隊(duì)列的最大CPU core限制是多少,再依據(jù)設(shè)置的Executor數(shù)量,來(lái)決定每個(gè)Executor進(jìn)程可以分配到幾個(gè)CPU core。同樣建議,如果是跟他人共享這個(gè)隊(duì)列,那么num-executors * executor-cores不要超過(guò)隊(duì)列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學(xué)的作業(yè)運(yùn)行。給出本次項(xiàng)目執(zhí)行時(shí)所給的資源,還是比較省的,這個(gè)可以根據(jù)集群的情況進(jìn)行調(diào)整。
spark-submit -- ** ster yarn --executor-memory 8G --num-executors 16 --executor-cores 2數(shù)據(jù)傾斜調(diào)優(yōu)
數(shù)據(jù)傾斜就是大部分執(zhí)行得很快,個(gè)別任務(wù)執(zhí)行得很慢。比如進(jìn)行g(shù)roupby的時(shí)候,某個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別大,就會(huì)發(fā)生數(shù)據(jù)傾斜。
shuffle過(guò)程會(huì)導(dǎo)致數(shù)據(jù)傾斜,可能會(huì)觸發(fā)數(shù)據(jù)傾斜的shuffle算子包括distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等,因此能很快定位導(dǎo)致數(shù)據(jù)傾斜的代碼。
遇到數(shù)據(jù)傾斜時(shí)通常會(huì)使用repartition這個(gè)轉(zhuǎn)換操作對(duì)RDD進(jìn)行重新分區(qū),重新分區(qū)后數(shù)據(jù)會(huì)均勻分布在不同的分區(qū)中,避免了數(shù)據(jù)傾斜。
參考資料
Spark性能優(yōu)化指南——基礎(chǔ)篇Spark性能優(yōu)化指南——高級(jí)篇 >usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdfSpark中的narrow/wide dependency如何理解,有什么作用?Spark技術(shù)在京東智能供應(yīng)鏈預(yù)測(cè)的應(yīng)用推薦閱讀:
>zhuanlan.zhihu.com/p/31084018