Hi Sathi,

Thanks for a quick reply, so this ( list of some epoch times in IN clause) was 
part of 30 days aggregation already. As per our input to output aggregation 
ratio, our cardinality is too high. So we require query tuning kind of thing. 
As we can’t assign additional resource for this job.


From: Sathi Chowdhury <sath...@yahoo.com.INVALID>
Date: Thursday, 5 September 2019 at 8:10 PM
To: Himali Patel <himali.pa...@guavus.com>, "user@spark.apache.org" 
<user@spark.apache.org>
Subject: Re: Tune hive query launched thru spark-yarn job.

What I can immediately think of is,
as you are doing IN in the where clause for a series of timestamps, if you can 
consider breaking them and for each epoch timestamp
You can load your results to an intermediate staging table and then do a final 
aggregate from that table keeping the group by same. As it is sum and can be 
done in two steps.
hth






On Thursday, September 5, 2019, 5:10 AM, Himali Patel 
<himaliben.pa...@thalesgroup.com> wrote:

Hello all,



We have one use-case where we are aggregating billion of rows. It does huge 
shuffle.

Example :

As per ‘Job’ tab on yarn UI

When Input size is 350 G something,  shuffle size >3 TBs. This increases 
Non-DFS usage beyond warning limit and thus affecting entire cluster.



It seems we need to tune our query / resources. Any suggestions ?



1.

Our data is high in cardinality :

# input rows are ~15 billion

# output rows are ~13 billion



2.

Spark version is 1.6

Hive is 1.1

It’s CDH.

We query using hive context in spark job. (yarn is resource manager)

Hive context has configs as :

.setConf("hive.exec.dynamic.partition.mode","nonstrict")

.setConf("hive.exec.dynamic.partition","true")

.setConf("hive.exec.stagingdir","/tmp/hive/")



3.

Our aggregation is done using single query as below :

SELECT

<list of 16 dimension columns >,

SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,

<custom-aggregate-operation>(c1, 'HEX', 'UNION') AS c1, 
<custom-aggregate-operation>(c2, 'HEX', 'UNION') AS c2, 
<custom-aggregate-operation>(c3, 'HEX', 'UNION') AS c3, 
<custom-aggregate-operation>(c4, 'HEX', 'UNION') AS c4, 
<custom-aggregate-operation>(c5, 'HEX', 'UNION') AS c5,

<Epochtime1>  AS <partition-column>, <Epochtime1> AS <column2>

FROM <table-name>

WHERE <partition-column> IN (<Epochtime1> ,<Epochtime2> , <Epochtime3> , 
<Epochtime4> , <Epochtime5> , <Epochtime6> , <Epochtime7> , <Epochtime8>)

GROUP BY <list of 16 dimension columns >.



4.

Configs are :

spark.master=yarn-client
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<>

spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000





How to tune this job ?




















Reply via email to