Thanks a lot for the reply Albert.

On looking at it and reading about it further - I do see that
"AdaptiveSparkPlan isFinalPlan=false" is mentioned.

Could you point me to how I can see the final plan ? I couldn't find that
in any of the resources I was referring to

On Fri, 7 Jan 2022, 07:25 Albert, <zinki...@gmail.com> wrote:

> I happen to encounter something similar.
>
> it's probably because you are just `explain` it. when you actually `run`
> it. you will get the final spark plan in which case the exchange will be
> reused.
> right, this is different compared with 3.1 probably because the upgraded
> aqe.
>
> not sure whether this is expected though.
>
> On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari <abdealikoth...@gmail.com>
> wrote:
>
>> Just thought I'd do a quick bump and add the dev mailing list - in case
>> there is some insight there
>> Feels like this should be categorized as a bug for spark 3.2.0
>>
>> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari <abdealikoth...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am using pyspark for some projects. And one of the things we are doing
>>> is trying to find the tables/columns being used by Spark using the
>>> execution plan.
>>>
>>> When we upgrade to spark 3.2 - the spark plan seems to be different from
>>> previous versions - mainly when we are doing joins.
>>> Below is a reproducible example (you could run the same in versions 2.3
>>> to 3.1 to see the difference)
>>>
>>> My original data frames have the columns: id#0 and id#4
>>> But after doing the joins we are seeing new columns id#34 and id#19
>>> which are not created from the original dataframes I was working with.
>>> In previous versions of spark, this used to use a ReusedExchange step
>>> (shown below)
>>>
>>> I was trying to understand if this is expected in spark 3.2 where the
>>> execution plan seems to be creating a new data source which does not
>>> originate from df1 and df2 which I provided.
>>> NOTE: The same happens even if I read from parquet files
>>>
>>> In spark 3.2:
>>> In [1]: import pyspark
>>>    ...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>>
>>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
>>>    ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
>>> 'col2'])
>>>    ...: df1.explain()
>>>    ...: df2.explain()
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#0L,col1#1L]
>>>
>>> == Physical Plan ==
>>> *(1) Scan ExistingRDD[id#4L,col2#5L]
>>>
>>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
>>>    ...: df4 = df2.join(df3, df1['id'] == df2['id'])
>>>    ...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#53]
>>>    :     +- Filter isnotnull(id#4L)
>>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>>    +- Project [id#0L, col1#1L, col2#20L]
>>>       +- SortMergeJoin [id#0L], [id#19L], Inner
>>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>          :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#45]
>>>          :     +- Filter isnotnull(id#0L)
>>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> *         +- Sort [id#19L ASC NULLS FIRST], false, 0            +-
>>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
>>>          +- Filter isnotnull(id#19L)                  +- Scan
>>> ExistingRDD[id#19L,col2#20L]*
>>>
>>> In [4]: df1.createOrReplaceTempView('df1')
>>>    ...: df2.createOrReplaceTempView('df2')
>>>    ...: df3 = spark.sql("""
>>>    ...:     SELECT df1.id, df1.col1, df2.col2
>>>    ...:     FROM df1 JOIN df2 ON df1.id = df2.id
>>>    ...: """)
>>>    ...: df3.createOrReplaceTempView('df3')
>>>    ...: df4 = spark.sql("""
>>>    ...:     SELECT df2.*, df3.*
>>>    ...:     FROM df2 JOIN df3 ON df2.id = df3.id
>>>    ...: """)
>>>    ...: df4.explain()
>>> == Physical Plan ==
>>> AdaptiveSparkPlan isFinalPlan=false
>>> +- SortMergeJoin [id#4L], [id#0L], Inner
>>>    :- Sort [id#4L ASC NULLS FIRST], false, 0
>>>    :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#110]
>>>    :     +- Filter isnotnull(id#4L)
>>>    :        +- Scan ExistingRDD[id#4L,col2#5L]
>>>    +- Project [id#0L, col1#1L, col2#35L]
>>>       +- SortMergeJoin [id#0L], [id#34L], Inner
>>>          :- Sort [id#0L ASC NULLS FIRST], false, 0
>>>          :  +- Exchange hashpartitioning(id#0L, 200),
>>> ENSURE_REQUIREMENTS, [id=#102]
>>>          :     +- Filter isnotnull(id#0L)
>>>          :        +- Scan ExistingRDD[id#0L,col1#1L]
>>>
>>>
>>>
>>> *         +- Sort [id#34L ASC NULLS FIRST], false, 0            +-
>>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
>>>          +- Filter isnotnull(id#34L)                  +- Scan
>>> ExistingRDD[id#34L,col2#35L]*
>>>
>>>
>>> Doing this in spark 3.1.1 - the plan is:
>>>
>>> *(8) SortMergeJoin [id#4L], [id#0L], Inner
>>> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
>>> :  +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
>>> [id=#56]
>>> :     +- *(1) Filter isnotnull(id#4L)
>>> :        +- *(1) Scan ExistingRDD[id#4L,col2#5L]
>>> +- *(7) Project [id#0L, col1#1L, col2#20L]
>>>    +- *(7) SortMergeJoin [id#0L], [id#19L], Inner
>>>       :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
>>>       :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
>>> [id=#62]
>>>       :     +- *(3) Filter isnotnull(id#0L)
>>>       :        +- *(3) Scan ExistingRDD[id#0L,col1#1L]
>>>
>>> *      +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0         +-
>>> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
>>> ENSURE_REQUIREMENTS, [id=#56]*
>>>
>>>
>
> --
> ~~~~~~~~~~~~~~~
> no mistakes
> ~~~~~~~~~~~~~~~~~~
>

Reply via email to