這是我的第64篇原創

數據傾斜是上帝對某個服務器的過于偏愛。
造成數據傾斜的原因上帝太過于偏愛某個服務器,因此給他分配了太多的任務,導致數據都傾斜到這臺服務器了。
在大數據場景中,無論是MapReduce還是Spark,都會因兩階段之間的shuffle導致各個服務器接受到的數據導致處理量失衡的問題。情況嚴重,就變成數據傾斜了。
我們之所以創造分布式環境,就是因為我們將一個巨大的任務拆解成若干個小任務,給不同的服務器執行,這樣總執行時間就會減小至1/n。理想狀態的任務處理情況應該如下圖所示:

原本單機環境需要執行100s的任務,由5臺服務器共同執行,每臺服務器執行20s,最后總時間會遠遠大于單機環境的執行時間。而且我們還可以通過不斷增加服務器,來不斷減少總運行時間。但是往往會出現這種情況:

某4臺服務器很快就執行完了任務,但是其中有一臺服務器的遲遲不能完成,嚴重的時候甚至會OOM(Out Of Memory)。究其原因,其實如同上面說過的,在分布式處理的不同階段之間會有一個混洗(shuffle)的過程:

在Map或者Spark的Stage1階段,由于每個數據塊的大小都是一致的(默認128M),所以在這個階段是不會出現數據傾斜的。但是一旦我們對數據進行Shuffle,比如按照商品品類進行分組之后,在Reduce或Stage2階段,數據將會出現嚴重的傾斜:原本每臺服務器都只需要處理3條數據,Shuffle之后,其中兩臺服務器各只需處理1條,而剩余的那臺服務器則需要執行8條數據。三臺服務器處理的數據量比為7:1:1。數據傾斜至第一臺服務器。任務延遲,甚至OOM。
如何解決數據傾斜呢:
三個層面:
1、預判-原始數據預防,保證原始數據不傾斜;
2、躲閃-規避數據傾斜,盡量規避Shuffle;
3、硬剛-處理數據傾斜,無法規避Shuffle,用各種辦法優化Shuffle過程。

預判雖說HDFS的數據都是128M,不會一開始就出現數據傾斜,但是仍然有以下幾種情況:
1、數據壓縮后,128M文件大小一樣,但是數據量不一樣;
2、存在不可切分的大文件;
3、流式數據。
這幾種情況還是可能會導致程度不一的數據傾斜的。我們需要做一些簡單的處理:
1、數據壓縮后,128M文件大小一樣,但是數據量不一樣;
解決辦法:壓縮前,保證每個文件中的數據量基本一致;
2、存在不可切分的大文件;
解決辦法:生成數據時,盡量減少不可切分的文件,盡量按照HDFS的邏輯,存成可切分的文件;或者保證這些大文件中的數據量基本一致,且單機可處理。
3、流式數據;
解決辦法:Kafka的partition實現建議使用隨機、輪詢等方法,盡量使各topic的各partition的數據盡量平衡。
閃避既然我們知道數據傾斜的主要原因的shuffle導致的,那么我們首要的優化方向就是shuffle,能不用盡量不要用。有以下幾種方法我們可以規避:
4、ETL預處理
在面對無法避免的原始數據傾斜(Hive表中key分布不均勻、kafka中某topic的partititoner含有業務屬性,天然不均勻等),我們可以通過前置ETL過程,進行預處理。
注意:這個方法只是將成本轉嫁,并沒有解決問題。適合削峰填谷類的操作,比如我們將數據預處理好,避免凌晨集中計算的時候處理時間過長,影響其他任務。
5、過濾不必要的key
很多數據分析師在單體數據庫的時候,就有一個不好的習慣:總喜歡select *。在hive、spark等分布式環境中,就吃苦頭了,經常遇到數據傾斜甚至OOM。有經驗的數據分析師在寫sql的時候,通常會先group by一下,看看數據的分布情況,然后再處理。
咱在分布式環境中也可以做類似的事情,就是采樣。
離線環境可以用隨機采樣,實時環境可以用魚塘采樣。采樣能夠快速摸清楚各個key的大致分布。掃一眼數據量大的key,如果跟你的計算沒啥關系,直接過濾就行。
比如上面舉的例子,母嬰品類占絕大多數,但是運營的要求是分析3C產品,那你過濾掉母嬰產品,一則減少計算量,二則規避了數據傾斜的問題。
6、Reducejoin改為Mapjoin
如果是大小表的join,比如訂單表和訂單類型、訂單狀態的join,如果使用reduce join的話,就非常容易在shuflle之后出現數據傾斜。建議的原則:只要一臺服務器的內存能吃下這張小表(主要看服務器內存大小,建議2g以內,再大就影響服務器性能了),就建議用map join。這樣join完之后,每份數據依然是基本均衡的,而且規避了shuffle導致數據傾斜的問題。
硬剛上述幾步,能做的都做了,還是不行,那就只能硬剛了。這時就只能八仙過海各顯神通了。基本的邏輯還是一樣的,就是能拆的盡量拆,不能拆的用空間換時間,或者自定義。
7、通用優化:shuffle并行度
spark的shuffle并行度默認值是200,建議根據服務器的情況進行調整。一般是集群cpu總和的2-3倍。當發生數據傾斜的時候,適當增大并行度,可以讓任務和數據更均勻的分布在整個集群中。但是這個調優方法有些玄學成分在,因為你不知道他是咋分過去的。
并行度調整有三個方法:
●操作函數內設置
testRDD.groupByKey(200)
●代碼中設置“spark.default.parallelism”
conf.set("spark.default.parallelism", 200)
●配置文件中設置“$SPARK_HOME/conf/spark-defaults.conf” 文件
spark.default.parallelism 200
8、拆分超大key
前面說過采樣后過濾。如果采樣之后發現這個key還是你需要的,無法怎么辦?那就把超大數據量的key拆分出來,單獨做成一個任務,這樣超大數據量的key一個任務,其他中小數據量的key一個任務,兩個任務分別做join啊什么的處理,最后把結果合并一下就行了。
為了避免超大數據量的key單獨join的時候還是一個key一個任務,可以在key上加上隨機數取模的前綴,這樣就把數據分成了N份,然后再join。
9、階段拆分-兩階段聚合
對于聚合類的操作,這種方式可以說是數據傾斜的大殺器。簡單來說就是在需要聚合的key前加一個隨機數取模的前綴,這樣就能得到非常均勻的key,然后按這個加工之后的key進行第一次聚合之后,再對聚合的結果,按照原始key進行二次聚合,這樣基本就不可能出現數據傾斜了。示意圖如下:

對比之前的例子中,處理母嬰的服務器和處理3c、圖書的服務器任務量是7:1:1,這個方案的數據就非常均勻了。
10、任務拆分
很多時候數據情況會非常復雜,有null值、有超大數據量的key、還有各種需要過濾的數據,還有各種聚合和join。那這個時候就需要把任務再拆分。一部分用上面的key值過濾,一部分用Map Join,一部分用超大key單獨處理。
11、隨機前綴
前面說過小表join的時候可以用Map join。但是遇到大表join大表咋辦?三個方法:1、大表拆成小表,多次join;2、SortMergeJoin;3、位圖法(詳見《位圖法搞定10億用戶量用戶標簽處理》)。
那大表+中表,該咋處理?可以考慮用隨機前綴+RDD擴容的方法解決join的問題。
如果你將要join的表不大不小,又不適合用上面大大表的處理方法,那就可以用這個通用的join方法。簡單來說,就是對A表中需要join的字段加上n以內的隨機數前綴,然后再把B表中的數據復制N份,join的字段加上1-N的前綴,然后量表再join,就能解決數據傾斜的問題了。示意如下:
原始數據如下:

不經處理直接join是這樣的,part1很明顯比part2要多好幾倍的數據:

我們對A表和B表進行隨機前綴和RDD擴容處理之后:

然后再join,這樣每個part的數據就非常均勻了:

這個方法比較坑的是B表這個RDD需要擴容,要復制N份,對內存要求比較高。但是這個方法可以說是通殺Join的數據傾斜問題。
12、自定義partitioner
上面說改spark的并行數也可以改善數據傾斜,但是有點玄學的意思在里面。其根本原因就是不管你怎么調優,計算引擎的分區都是按照固定的方法進行的,根本不會,也沒辦法考慮數據真實情況。
無論是二階段聚合解決聚合的問題,還是隨機前綴+RDD擴容解決join的問題,都是通用解決辦法,而且還麻煩。其實最好的解決辦法就是根據現在處理的這份數據,單獨寫一個適合的partitioner。比如現在是按省份進行匯總數據,如果只是簡單的按省份去分(這并沒有錯),那么數據肯定會傾斜,因為各省的數據天然不一樣。我們可以通過歷史數據、抽樣數據或者一些常識,對數據進行人工分區,讓數據按照我們自定義的分區規則比較均勻的分配到不同的task中。
常見的分區方式:
隨機分區:每個區域的數據基本均衡,簡單易用,偶爾出現傾斜,但是特征同樣也會隨機打散。
輪詢分區:絕對不會傾斜,但是需要提前預知分成若干份,進行輪詢。
hash散列:可以針對某個特征進行hash散列,保證相同特征的數據在一個區,但是極容易出現數據傾斜。
范圍分區:需要排序,臨近的數據會被分在同一個區,可以控制分區數據均勻。
數據傾斜并不可怕,咱可以糙一些,也可以精致一些。但是建議還是糙一些,這樣簡單粗暴,多節省一些時間干(xue)點(dong)別(xi)的
往期精彩回顧
熱文|當我們在刷抖音的時候,抖音在干什么?
干貨 |架構師帶你細細的捋一遍MapReduce全流程
干貨|bitmap解決超大表join實戰案例
以上就是12中方法,徹底搞定數據傾斜!的全部內容了,希望大家喜歡。


