Re: Check if shuffle is caused for repartitioned pyspark dataframes

2022-12-26 Thread Shivam Verma
I tried sorting the repartitioned dataframes on the partition key before
saving them as parquet files, however, when I read those
repartitioned-sorted dataframes
and join them on the partition key, the spark plan still shows `Exchange
hashpartitioning` step, which I want to avoid:















*== Physical Plan ==*(5) HashAggregate(keys=[id#15373L],
functions=[sum(col_20#15393), sum(col_40#15413), max(col_60#15433)])+- *(5)
HashAggregate(keys=[id#15373L], functions=[partial_sum(col_20#15393),
partial_sum(col_40#15413), partial_max(col_60#15433)])   +- *(5) Project
[id#15373L, col_20#15393, col_40#15413, col_60#15433]  +- *(5)
SortMergeJoin [id#15373L], [id#15171L], Inner :- *(2) Sort
[id#15373L ASC NULLS FIRST], false, 0 :  +- Exchange
hashpartitioning(id#15373L, 4) <= Want to avoid
this : +- *(1) Project [id#15373L, col_20#15393, col_40#15413,
col_60#15433] :+- *(1) Filter isnotnull(id#15373L)
 :   +- *(1) FileScan parquet
[id#15373L,col_20#15393,col_40#15413,col_60#15433] Batched: true, Format:
Parquet, Location:
InMemoryFileIndex[s3a://performance/co_partition_test/repartitioned_df2...,
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
struct +- *(4)
Sort [id#15171L ASC NULLS FIRST], false, 0+- Exchange
hashpartitioning(id#15171L, 4) <= Want to avoid
this+- *(3) Project [id#15171L]  +- *(3)
Filter isnotnull(id#15171L) +- *(3) FileScan parquet
[id#15171L] Batched: true, Format: Parquet, Location:
InMemoryFileIndex[s3a://performance/co_partition_test/repartitioned_df1...,
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
struct*

How do I ensure that this *Exchange hashpartitioning* step is skipped, or
is the exchange hashpartitioning part of the SparkPlan but won't actually
do any repartitioning and hence no overhead would be involved?

On Fri, Dec 23, 2022 at 10:08 PM Russell Jurney 
wrote:

> This may not be good advice but... could you sort by the partition key to
> ensure the partitions match up? Thinking of olden times :)
>
> On Fri, Dec 23, 2022 at 4:42 AM Shivam Verma 
> wrote:
>
>> Hi Gurunandan,
>>
>> Thanks for the reply!
>>
>> I do see the exchange operator in the SQL tab, but I can see it in both
>> the experiments:
>> 1. Using repartitioned dataframes
>> 2. Using initial dataframes
>>
>> Does that mean that the repartitioned dataframes are not actually
>> "co-partitioned"?
>> If that's the case, I have two more questions:
>>
>> 1. Why is the job with repartitioned dataframes faster (at least 3x) as
>> compared to the job using initial dataframes?
>> 2. How do I ensure co-partitioning for pyspark dataframes?
>>
>> Thanks,
>> Shivam
>>
>>
>>
>> On Wed, Dec 14, 2022 at 5:58 PM Gurunandan 
>> wrote:
>>
>>> Hi,
>>> One of the options for validation is to navigate `SQL TAB` in Spark UI
>>> and click on a Query of interest to view detailed information of each
>>> Query. We need to validate if the Exchange Operator is present for
>>> shuffle, like shared in the attachment.
>>>
>>> Otherwise we can print the executed plan and validate for Exchange
>>> Operator in the Physical Plan.
>>>
>>> On Wed, Dec 14, 2022 at 10:56 AM Shivam Verma 
>>> wrote:
>>> >
>>> > Hello folks,
>>> >
>>> > I have a use case where I save two pyspark dataframes as parquet files
>>> and then use them later to join with each other or with other tables and
>>> perform multiple aggregations.
>>> >
>>> > Since I know the column being used in the downstream joins and
>>> groupby, I was hoping I could use co-partitioning for the two dataframes
>>> when saving them and avoid shuffle later.
>>> >
>>> > I repartitioned the two dataframes (providing same number of
>>> partitions and same column for repartitioning).
>>> >
>>> > While I'm seeing an improvement in execution time with the above
>>> approach, how do I confirm that a shuffle is actually NOT happening (maybe
>>> through SparkUI)?
>>> > The spark plan and shuffle read/write are the same in the two
>>> scenarios:
>>> > 1. Using repartitioned dataframes to perform join+aggregation
>>> > 2. Using base dataframes itself (without explicit repartitioning) to
>>> perform join+aggregatio
>>> >
>>> > I have a StackOverflow post with more details regarding the same:
>>> > https://stackoverflow.com/q/74771971/14741697
>>> >
>>> > Thanks in advance, appreciate your help!
>>> >
>>> > Regards,
>>> > Shivam
>>> >
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
>
> Thanks,
> Russell Jurney @rjurney <http://twitter.com/rjurney>
> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> <http://facebook.com/jurney> datasyndrome.com Book a time on Calendly
> <https://calendly.com/rjurney_personal/30min>
>


Re: Check if shuffle is caused for repartitioned pyspark dataframes

2022-12-23 Thread Shivam Verma
Hi Gurunandan,

Thanks for the reply!

I do see the exchange operator in the SQL tab, but I can see it in both the
experiments:
1. Using repartitioned dataframes
2. Using initial dataframes

Does that mean that the repartitioned dataframes are not actually
"co-partitioned"?
If that's the case, I have two more questions:

1. Why is the job with repartitioned dataframes faster (at least 3x) as
compared to the job using initial dataframes?
2. How do I ensure co-partitioning for pyspark dataframes?

Thanks,
Shivam



On Wed, Dec 14, 2022 at 5:58 PM Gurunandan  wrote:

> Hi,
> One of the options for validation is to navigate `SQL TAB` in Spark UI
> and click on a Query of interest to view detailed information of each
> Query. We need to validate if the Exchange Operator is present for
> shuffle, like shared in the attachment.
>
> Otherwise we can print the executed plan and validate for Exchange
> Operator in the Physical Plan.
>
> On Wed, Dec 14, 2022 at 10:56 AM Shivam Verma 
> wrote:
> >
> > Hello folks,
> >
> > I have a use case where I save two pyspark dataframes as parquet files
> and then use them later to join with each other or with other tables and
> perform multiple aggregations.
> >
> > Since I know the column being used in the downstream joins and groupby,
> I was hoping I could use co-partitioning for the two dataframes when saving
> them and avoid shuffle later.
> >
> > I repartitioned the two dataframes (providing same number of partitions
> and same column for repartitioning).
> >
> > While I'm seeing an improvement in execution time with the above
> approach, how do I confirm that a shuffle is actually NOT happening (maybe
> through SparkUI)?
> > The spark plan and shuffle read/write are the same in the two scenarios:
> > 1. Using repartitioned dataframes to perform join+aggregation
> > 2. Using base dataframes itself (without explicit repartitioning) to
> perform join+aggregatio
> >
> > I have a StackOverflow post with more details regarding the same:
> > https://stackoverflow.com/q/74771971/14741697
> >
> > Thanks in advance, appreciate your help!
> >
> > Regards,
> > Shivam
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Check if shuffle is caused for repartitioned pyspark dataframes

2022-12-13 Thread Shivam Verma
Hello folks,

I have a use case where I save two pyspark dataframes as parquet files and
then use them later to join with each other or with other tables and
perform multiple aggregations.

Since I know the column being used in the downstream joins and groupby, I
was hoping I could use co-partitioning for the two dataframes when saving
them and avoid shuffle later.

I repartitioned the two dataframes (providing same number of partitions and
same column for repartitioning).

While I'm seeing an *improvement in execution time* with the above
approach, how do I confirm that a shuffle is actually NOT happening (maybe
through SparkUI)?
The spark plan and shuffle read/write are the same in the two scenarios:
1. Using repartitioned dataframes to perform join+aggregation
2. Using base dataframes itself (without explicit repartitioning) to
perform join+aggregatio

I have a StackOverflow post with more details regarding the same:
https://stackoverflow.com/q/74771971/14741697

Thanks in advance, appreciate your help!

Regards,
Shivam