Hello all,

We have one use-case where we are aggregating billion of rows. It does huge 
shuffle.
Example :
As per ‘Job’ tab on yarn UI
When Input size is 350 G something,  shuffle size >3 TBs. This increases 
Non-DFS usage beyond warning limit and thus affecting entire cluster.

It seems we need to tune our query / resources. Any suggestions ?

1.
Our data is high in cardinality :
# input rows are ~15 billion
# output rows are ~13 billion

2.
Spark version is 1.6
Hive is 1.1
It’s CDH.
We query using hive context in spark job. (yarn is resource manager)
Hive context has configs as :

.setConf("hive.exec.dynamic.partition.mode","nonstrict")

.setConf("hive.exec.dynamic.partition","true")

.setConf("hive.exec.stagingdir","/tmp/hive/")

3.
Our aggregation is done using single query as below :
SELECT
<list of 16 dimension columns >,
SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,
<custom-aggregate-operation>(c1, 'HEX', 'UNION') AS c1, 
<custom-aggregate-operation>(c2, 'HEX', 'UNION') AS c2, 
<custom-aggregate-operation>(c3, 'HEX', 'UNION') AS c3, 
<custom-aggregate-operation>(c4, 'HEX', 'UNION') AS c4, 
<custom-aggregate-operation>(c5, 'HEX', 'UNION') AS c5,
<Epochtime1>  AS <partition-column>, <Epochtime1> AS <column2>
FROM <table-name>
WHERE <partition-column> IN (<Epochtime1> ,<Epochtime2> , <Epochtime3> , 
<Epochtime4> , <Epochtime5> , <Epochtime6> , <Epochtime7> , <Epochtime8>)
GROUP BY <list of 16 dimension columns >.

4.
Configs are :

spark.master=yarn-client
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<>

spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000





How to tune this job ?



















Reply via email to