Re: Optimising multiple hive table join and query in spark

2020-03-16 Thread Manjunath Shetty H
Thanks Georg,

Batch import job frequency is different than the read job. Import job will run 
every 15mins - 1 hour, and Read/Transform job will run once a day.

In this case i think write with sortWithinPartitions doesnt make any difference 
as the combined data stored in HDFS will not be sorted at the end of the day.

Does partition/sort while reading help ?. Tried this out but it still results 
in shuffle during join of multiple tables and generates very complex DAG

-
Manjunath

From: Georg Heiler 
Sent: Monday, March 16, 2020 12:06 PM
To: Manjunath Shetty H 
Subject: Re: Optimising multiple hive table join and query in spark

Hi,

if you only have 1.6, forget bucketing. 
https://databricks.com/session/bucketing-in-spark-sql-2-3 that only works well 
with Hive from 2.3 onwards.
The other thing in your (daily?) batch job

val myData = 
spark.read.<>(/path/to/file).transform(<>)

Now when writing the data:
myData.write.repartition(xxx)
where xxx resembles the number of files you want to have for each period 
(day?). When writing ORC / Parquet make sure to have files of HDFS Block Size 
or more i.e. usually 128MB up to a maximum of 1G.
myData.write.repartition(xxx)).sortWithinPartitions(join_col, join_col)

apply a secondary sort to get ORC Indices.

IF the cardinality of the join_cols is high enough:
myData.write.repartition(xxx, col(join_col), 
col(other_join_col))).sortWithinPartitions(join_col, join_col)

Best,
Georg

Am Mo., 16. März 2020 um 04:27 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Hi Georg,

Thanks for the suggestion. Can you please explain bit more about what you meant 
exactly ?

Bdw i am on Spark 1.6


-
Manjunath

From: Georg Heiler mailto:georg.kf.hei...@gmail.com>>
Sent: Monday, March 16, 2020 12:35 AM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Subject: Re: Optimising multiple hive table join and query in spark

To speed things up:
- sortWithinPartitions (i.e. for each day)& potentially repartition
- pre-shuffle the data with bucketing

Am So., 15. März 2020 um 17:07 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Only partitioned and Join keys are not sorted coz those are written 
incrementally with batch jobs

From: Georg Heiler mailto:georg.kf.hei...@gmail.com>>
Sent: Sunday, March 15, 2020 8:30:53 PM
To: Manjunath Shetty H 
mailto:manjunathshe...@live.com>>
Cc: ayan guha mailto:guha.a...@gmail.com>>; Magnus Nilsson 
mailto:ma...@kth.se>>; user 
mailto:user@spark.apache.org>>
Subject: Re: Optimising multiple hive table join and query in spark

Did you only partition or also bucket by the join column? Are ORCI indices 
active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Mostly the concern is the reshuffle. Even though all the DF's are partitioned 
by same column. During join it does reshuffle, that is the bottleneck as of now 
in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at 
the same place so that during the join it wont do shuffle again.


-
Manjunath

From: ayan guha mailto:guha.a...@gmail.com>>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson mailto:ma...@kth.se>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Optimising multiple hive table join and query in spark

Hi

I would first and foremost try to identify where is the most time spend during 
the query. One possibility is it just takes ramp up time for executors to be 
available, if thats the case then maybe a dedicated yarn queue may help, or 
using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson 
mailto:ma...@kth.se>> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a 
join condition to trick catalyst into not reshuffling the partitions, ie use 
regular equality on the column you partitioned or bucketed by and your custom 
comparer for the other columns. Never got around to try it out hough. I really 
would like a native way to tell catalyst not to reshuffle just because you use 
more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We 
are serving a usecase on top of that by joining 4-5 tables using Hive as of 
now. But it is not fast as we wanted it to be, so we are thinking of using 
spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? 
Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (MMdd) as in

Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread Manjunath Shetty H
Only partitioned and Join keys are not sorted coz those are written 
incrementally with batch jobs

From: Georg Heiler 
Sent: Sunday, March 15, 2020 8:30:53 PM
To: Manjunath Shetty H 
Cc: ayan guha ; Magnus Nilsson ; user 

Subject: Re: Optimising multiple hive table join and query in spark

Did you only partition or also bucket by the join column? Are ORCI indices 
active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H 
mailto:manjunathshe...@live.com>>:
Mostly the concern is the reshuffle. Even though all the DF's are partitioned 
by same column. During join it does reshuffle, that is the bottleneck as of now 
in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at 
the same place so that during the join it wont do shuffle again.


-
Manjunath

From: ayan guha mailto:guha.a...@gmail.com>>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson mailto:ma...@kth.se>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Optimising multiple hive table join and query in spark

Hi

I would first and foremost try to identify where is the most time spend during 
the query. One possibility is it just takes ramp up time for executors to be 
available, if thats the case then maybe a dedicated yarn queue may help, or 
using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson 
mailto:ma...@kth.se>> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a 
join condition to trick catalyst into not reshuffling the partitions, ie use 
regular equality on the column you partitioned or bucketed by and your custom 
comparer for the other columns. Never got around to try it out hough. I really 
would like a native way to tell catalyst not to reshuffle just because you use 
more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We 
are serving a usecase on top of that by joining 4-5 tables using Hive as of 
now. But it is not fast as we wanted it to be, so we are thinking of using 
spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? 
Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (MMdd) as integer.
  *   Query will fetch data for last 7 days from some tables while joining with 
other tables.

Approach we thought of as now :

  *   Create dataframe for each table and partition by same column for all 
tables ( Lets say Country as partition column )
  *   Register all tables as temporary tables
  *   Run the sql query with joins

But the problem we are seeing with this approach is , even though we already 
partitioned using country it still does hashParittioning + shuffle during join. 
All the table join contain `Country` column with some extra column based on the 
table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards
Manjunath


--
Best Regards,
Ayan Guha


Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread Georg Heiler
Did you only partition or also bucket by the join column? Are ORCI indices
active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H <
manjunathshe...@live.com>:

> Mostly the concern is the reshuffle. Even though all the DF's are
> partitioned by same column. During join it does reshuffle, that is the
> bottleneck as of now in our POC implementation.
>
> Is there any way tell spark that keep all partitions with same partition
> key at the same place so that during the join it wont do shuffle again.
>
>
> -
> Manjunath
> --
> *From:* ayan guha 
> *Sent:* Sunday, March 15, 2020 5:46 PM
> *To:* Magnus Nilsson 
> *Cc:* user 
> *Subject:* Re: Optimising multiple hive table join and query in spark
>
> Hi
>
> I would first and foremost try to identify where is the most time spend
> during the query. One possibility is it just takes ramp up time for
> executors to be available, if thats the case then maybe a dedicated yarn
> queue may help, or using Spark thriftserver may help.
>
> On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson  wrote:
>
> Been a while but I remember reading on Stack Overflow you can use a UDF as
> a join condition to trick catalyst into not reshuffling the partitions, ie
> use regular equality on the column you partitioned or bucketed by and your
> custom comparer for the other columns. Never got around to try it out
> hough. I really would like a native way to tell catalyst not to reshuffle
> just because you use more columns in the join condition.
>
> On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <
> manjunathshe...@live.com> wrote:
>
> Hi All,
>
> We have 10 tables in data warehouse (hdfs/hive) written using ORC format.
> We are serving a usecase on top of that by joining 4-5 tables using Hive as
> of now. But it is not fast as we wanted it to be, so we are thinking of
> using spark for this use case.
>
> Any suggestion on this ? Is it good idea to use the Spark for this use
> case ? Can we get better performance by using spark ?
>
> Any pointers would be helpful.
>
> *Notes*:
>
>- Data is partitioned by date (MMdd) as integer.
>- Query will fetch data for last 7 days from some tables while joining
>with other tables.
>
>
> *Approach we thought of as now :*
>
>- Create dataframe for each table and partition by same column for all
>tables ( Lets say Country as partition column )
>- Register all tables as temporary tables
>- Run the sql query with joins
>
> But the problem we are seeing with this approach is , even though we
> already partitioned using country it still does hashParittioning +
> shuffle during join. All the table join contain `Country` column with some
> extra column based on the table.
>
> Is there any way to avoid these shuffles ? and improve performance ?
>
>
> Thanks and regards
> Manjunath
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread Manjunath Shetty H
Mostly the concern is the reshuffle. Even though all the DF's are partitioned 
by same column. During join it does reshuffle, that is the bottleneck as of now 
in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at 
the same place so that during the join it wont do shuffle again.


-
Manjunath

From: ayan guha 
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson 
Cc: user 
Subject: Re: Optimising multiple hive table join and query in spark

Hi

I would first and foremost try to identify where is the most time spend during 
the query. One possibility is it just takes ramp up time for executors to be 
available, if thats the case then maybe a dedicated yarn queue may help, or 
using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson 
mailto:ma...@kth.se>> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a 
join condition to trick catalyst into not reshuffling the partitions, ie use 
regular equality on the column you partitioned or bucketed by and your custom 
comparer for the other columns. Never got around to try it out hough. I really 
would like a native way to tell catalyst not to reshuffle just because you use 
more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H 
mailto:manjunathshe...@live.com>> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We 
are serving a usecase on top of that by joining 4-5 tables using Hive as of 
now. But it is not fast as we wanted it to be, so we are thinking of using 
spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? 
Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (MMdd) as integer.
  *   Query will fetch data for last 7 days from some tables while joining with 
other tables.

Approach we thought of as now :

  *   Create dataframe for each table and partition by same column for all 
tables ( Lets say Country as partition column )
  *   Register all tables as temporary tables
  *   Run the sql query with joins

But the problem we are seeing with this approach is , even though we already 
partitioned using country it still does hashParittioning + shuffle during join. 
All the table join contain `Country` column with some extra column based on the 
table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards
Manjunath


--
Best Regards,
Ayan Guha


Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread ayan guha
Hi

I would first and foremost try to identify where is the most time spend
during the query. One possibility is it just takes ramp up time for
executors to be available, if thats the case then maybe a dedicated yarn
queue may help, or using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson  wrote:

> Been a while but I remember reading on Stack Overflow you can use a UDF as
> a join condition to trick catalyst into not reshuffling the partitions, ie
> use regular equality on the column you partitioned or bucketed by and your
> custom comparer for the other columns. Never got around to try it out
> hough. I really would like a native way to tell catalyst not to reshuffle
> just because you use more columns in the join condition.
>
> On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <
> manjunathshe...@live.com> wrote:
>
>> Hi All,
>>
>> We have 10 tables in data warehouse (hdfs/hive) written using ORC format.
>> We are serving a usecase on top of that by joining 4-5 tables using Hive as
>> of now. But it is not fast as we wanted it to be, so we are thinking of
>> using spark for this use case.
>>
>> Any suggestion on this ? Is it good idea to use the Spark for this use
>> case ? Can we get better performance by using spark ?
>>
>> Any pointers would be helpful.
>>
>> *Notes*:
>>
>>- Data is partitioned by date (MMdd) as integer.
>>- Query will fetch data for last 7 days from some tables while
>>joining with other tables.
>>
>>
>> *Approach we thought of as now :*
>>
>>- Create dataframe for each table and partition by same column for
>>all tables ( Lets say Country as partition column )
>>- Register all tables as temporary tables
>>- Run the sql query with joins
>>
>> But the problem we are seeing with this approach is , even though we
>> already partitioned using country it still does hashParittioning +
>> shuffle during join. All the table join contain `Country` column with some
>> extra column based on the table.
>>
>> Is there any way to avoid these shuffles ? and improve performance ?
>>
>>
>> Thanks and regards
>> Manjunath
>>
>

-- 
Best Regards,
Ayan Guha


Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread Magnus Nilsson
Been a while but I remember reading on Stack Overflow you can use a UDF as
a join condition to trick catalyst into not reshuffling the partitions, ie
use regular equality on the column you partitioned or bucketed by and your
custom comparer for the other columns. Never got around to try it out
hough. I really would like a native way to tell catalyst not to reshuffle
just because you use more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H 
wrote:

> Hi All,
>
> We have 10 tables in data warehouse (hdfs/hive) written using ORC format.
> We are serving a usecase on top of that by joining 4-5 tables using Hive as
> of now. But it is not fast as we wanted it to be, so we are thinking of
> using spark for this use case.
>
> Any suggestion on this ? Is it good idea to use the Spark for this use
> case ? Can we get better performance by using spark ?
>
> Any pointers would be helpful.
>
> *Notes*:
>
>- Data is partitioned by date (MMdd) as integer.
>- Query will fetch data for last 7 days from some tables while joining
>with other tables.
>
>
> *Approach we thought of as now :*
>
>- Create dataframe for each table and partition by same column for all
>tables ( Lets say Country as partition column )
>- Register all tables as temporary tables
>- Run the sql query with joins
>
> But the problem we are seeing with this approach is , even though we
> already partitioned using country it still does hashParittioning +
> shuffle during join. All the table join contain `Country` column with some
> extra column based on the table.
>
> Is there any way to avoid these shuffles ? and improve performance ?
>
>
> Thanks and regards
> Manjunath
>


Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread Dennis Suhari
Hi,

I am also using Spark on Hive Metastore. The performance is much more better 
esp. for larger datasets. I have the feeling that the performance is better if 
I load the data into dataframes and do a join instead of doing direct join 
within SparkSQL. But i can’t explain yet. 

Any body experiences in that ?

Br,

Dennis

Von meinem iPhone gesendet

> Am 15.03.2020 um 06:04 schrieb Manjunath Shetty H :
> 
> 
> Hi All,
> 
> We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We 
> are serving a usecase on top of that by joining 4-5 tables using Hive as of 
> now. But it is not fast as we wanted it to be, so we are thinking of using 
> spark for this use case.
> 
> Any suggestion on this ? Is it good idea to use the Spark for this use case ? 
> Can we get better performance by using spark ?
> 
> Any pointers would be helpful.
> 
> Notes: 
> Data is partitioned by date (MMdd) as integer.
> Query will fetch data for last 7 days from some tables while joining with 
> other tables.
> 
> Approach we thought of as now :
> Create dataframe for each table and partition by same column for all tables ( 
> Lets say Country as partition column )
> Register all tables as temporary tables 
> Run the sql query with joins
> But the problem we are seeing with this approach is , even though we already 
> partitioned using country it still does hashParittioning + shuffle during 
> join. All the table join contain `Country` column with some extra column 
> based on the table.
> 
> Is there any way to avoid these shuffles ? and improve performance ?
> 
> 
> Thanks and regards 
> Manjunath