What I can immediately think of is, 
as you are doing IN in the where clause for a series of timestamps, if you can 
consider breaking them and for each epoch timestampYou can load your results to 
an intermediate staging table and then do a final aggregate from that table 
keeping the group by same. As it is sum and can be done in two steps.hth



On Thursday, September 5, 2019, 5:10 AM, Himali Patel 
<himaliben.pa...@thalesgroup.com> wrote:

 <!--#yiv4167175330 _filtered #yiv4167175330 {font-family:"Cambria 
Math";panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv4167175330 
{font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv4167175330 
{font-family:Monaco;panose-1:0 0 0 0 0 0 0 0 0 0;}#yiv4167175330 #yiv4167175330 
p.yiv4167175330MsoNormal, #yiv4167175330 li.yiv4167175330MsoNormal, 
#yiv4167175330 div.yiv4167175330MsoNormal 
{margin:0cm;margin-bottom:.0001pt;font-size:12.0pt;font-family:"Calibri", 
sans-serif;}#yiv4167175330 a:link, #yiv4167175330 
span.yiv4167175330MsoHyperlink 
{color:#0563C1;text-decoration:underline;}#yiv4167175330 a:visited, 
#yiv4167175330 span.yiv4167175330MsoHyperlinkFollowed 
{color:#954F72;text-decoration:underline;}#yiv4167175330 pre 
{margin:0cm;margin-bottom:.0001pt;font-size:10.0pt;font-family:"Courier 
New";}#yiv4167175330 span.yiv4167175330EmailStyle17 {font-family:"Calibri", 
sans-serif;}#yiv4167175330 span.yiv4167175330HTMLPreformattedChar 
{font-family:"Courier New";}#yiv4167175330 p.yiv4167175330p1, #yiv4167175330 
li.yiv4167175330p1, #yiv4167175330 div.yiv4167175330p1 
{margin:0cm;margin-bottom:.0001pt;font-size:8.5pt;font-family:Monaco;color:#2A00FF;}#yiv4167175330
 span.yiv4167175330s1 {color:black;}#yiv4167175330 
span.yiv4167175330apple-converted-space {}#yiv4167175330 
.yiv4167175330MsoChpDefault {font-family:"Calibri", sans-serif;} _filtered 
#yiv4167175330 {margin:72.0pt 72.0pt 72.0pt 72.0pt;}#yiv4167175330 
div.yiv4167175330WordSection1 {}-->
Hello all,
 
  
 
We have one use-case where we are aggregating billion of rows. It does huge 
shuffle.
 
Example : 
 
As per ‘Job’ tab on yarn UI
 
When Input size is 350 G something,  shuffle size >3 TBs. This increases 
Non-DFS usage beyond warning limit and thus affecting entire cluster.
 
  
 
It seems we need to tune our query / resources. Any suggestions ?
 
  
 
1.
 
Our data is high in cardinality :
 
# input rows are ~15 billion
 
# output rows are ~13 billion
 
  
 
2.
 
Spark version is 1.6
 
Hive is 1.1
 
It’s CDH.
 
We query using hive context in spark job. (yarn is resource manager)
 
Hive context has configs as :
 
.setConf("hive.exec.dynamic.partition.mode","nonstrict")
 
.setConf("hive.exec.dynamic.partition","true")
 
.setConf("hive.exec.stagingdir","/tmp/hive/")
 
  
 
3.
 
Our aggregation is done using single query as below :
 
SELECT 
 
<list of 16 dimension columns >,
 
SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,
 
<custom-aggregate-operation>(c1, 'HEX', 'UNION') AS c1, 
<custom-aggregate-operation>(c2, 'HEX', 'UNION') AS c2, 
<custom-aggregate-operation>(c3, 'HEX', 'UNION') AS c3, 
<custom-aggregate-operation>(c4, 'HEX', 'UNION') AS c4, 
<custom-aggregate-operation>(c5, 'HEX', 'UNION') AS c5, 
 
<Epochtime1>  AS <partition-column>, <Epochtime1> AS <column2>
 
FROM <table-name>
 
WHERE <partition-column> IN (<Epochtime1> ,<Epochtime2> , <Epochtime3> , 
<Epochtime4> , <Epochtime5> , <Epochtime6> , <Epochtime7> , <Epochtime8>)
 
GROUP BY <list of 16 dimension columns >.
 
  
 
4.
 
Configs are : 
 spark.master=yarn-client 
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<> spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000       How to tune this job ?                  
        

 
  
 


Reply via email to