Hello all, We have one use-case where we are aggregating billion of rows. It does huge shuffle. Example : As per ‘Job’ tab on yarn UI When Input size is 350 G something, shuffle size >3 TBs. This increases Non-DFS usage beyond warning limit and thus affecting entire cluster.
It seems we need to tune our query / resources. Any suggestions ? 1. Our data is high in cardinality : # input rows are ~15 billion # output rows are ~13 billion 2. Spark version is 1.6 Hive is 1.1 It’s CDH. We query using hive context in spark job. (yarn is resource manager) Hive context has configs as : .setConf("hive.exec.dynamic.partition.mode","nonstrict") .setConf("hive.exec.dynamic.partition","true") .setConf("hive.exec.stagingdir","/tmp/hive/") 3. Our aggregation is done using single query as below : SELECT <list of 16 dimension columns >, SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5, <custom-aggregate-operation>(c1, 'HEX', 'UNION') AS c1, <custom-aggregate-operation>(c2, 'HEX', 'UNION') AS c2, <custom-aggregate-operation>(c3, 'HEX', 'UNION') AS c3, <custom-aggregate-operation>(c4, 'HEX', 'UNION') AS c4, <custom-aggregate-operation>(c5, 'HEX', 'UNION') AS c5, <Epochtime1> AS <partition-column>, <Epochtime1> AS <column2> FROM <table-name> WHERE <partition-column> IN (<Epochtime1> ,<Epochtime2> , <Epochtime3> , <Epochtime4> , <Epochtime5> , <Epochtime6> , <Epochtime7> , <Epochtime8>) GROUP BY <list of 16 dimension columns >. 4. Configs are : spark.master=yarn-client spark.yarn.queue=default spark.executor.instances=52 spark.executor.cores=4 spark.executor.memory=30g spark.driver.memory=25g spark.memory.fraction=0.8 spark.memory.storageFraction=0.1 spark.yarn.executor.memoryOverhead=9500 spark.yarn.driver.memoryOverhead=5120 spark.core.connection.ack.wait.timeout=1000 spark.eventLog.enabled=True spark.eventLog.dir=<> spark.eventLog.overwrite=True spark.sql.shuffle.partitions=1000 How to tune this job ?