Thanks a lot for the reply Albert. On looking at it and reading about it further - I do see that "AdaptiveSparkPlan isFinalPlan=false" is mentioned.
Could you point me to how I can see the final plan ? I couldn't find that in any of the resources I was referring to On Fri, 7 Jan 2022, 07:25 Albert, <zinki...@gmail.com> wrote: > I happen to encounter something similar. > > it's probably because you are just `explain` it. when you actually `run` > it. you will get the final spark plan in which case the exchange will be > reused. > right, this is different compared with 3.1 probably because the upgraded > aqe. > > not sure whether this is expected though. > > On Thu, Jan 6, 2022 at 12:11 AM Abdeali Kothari <abdealikoth...@gmail.com> > wrote: > >> Just thought I'd do a quick bump and add the dev mailing list - in case >> there is some insight there >> Feels like this should be categorized as a bug for spark 3.2.0 >> >> On Wed, Dec 29, 2021 at 5:25 PM Abdeali Kothari <abdealikoth...@gmail.com> >> wrote: >> >>> Hi, >>> I am using pyspark for some projects. And one of the things we are doing >>> is trying to find the tables/columns being used by Spark using the >>> execution plan. >>> >>> When we upgrade to spark 3.2 - the spark plan seems to be different from >>> previous versions - mainly when we are doing joins. >>> Below is a reproducible example (you could run the same in versions 2.3 >>> to 3.1 to see the difference) >>> >>> My original data frames have the columns: id#0 and id#4 >>> But after doing the joins we are seeing new columns id#34 and id#19 >>> which are not created from the original dataframes I was working with. >>> In previous versions of spark, this used to use a ReusedExchange step >>> (shown below) >>> >>> I was trying to understand if this is expected in spark 3.2 where the >>> execution plan seems to be creating a new data source which does not >>> originate from df1 and df2 which I provided. >>> NOTE: The same happens even if I read from parquet files >>> >>> In spark 3.2: >>> In [1]: import pyspark >>> ...: spark = pyspark.sql.SparkSession.builder.getOrCreate() >>> >>> In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1']) >>> ...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id', >>> 'col2']) >>> ...: df1.explain() >>> ...: df2.explain() >>> == Physical Plan == >>> *(1) Scan ExistingRDD[id#0L,col1#1L] >>> >>> == Physical Plan == >>> *(1) Scan ExistingRDD[id#4L,col2#5L] >>> >>> In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id']) >>> ...: df4 = df2.join(df3, df1['id'] == df2['id']) >>> ...: df4.explain() >>> == Physical Plan == >>> AdaptiveSparkPlan isFinalPlan=false >>> +- SortMergeJoin [id#4L], [id#0L], Inner >>> :- Sort [id#4L ASC NULLS FIRST], false, 0 >>> : +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, >>> [id=#53] >>> : +- Filter isnotnull(id#4L) >>> : +- Scan ExistingRDD[id#4L,col2#5L] >>> +- Project [id#0L, col1#1L, col2#20L] >>> +- SortMergeJoin [id#0L], [id#19L], Inner >>> :- Sort [id#0L ASC NULLS FIRST], false, 0 >>> : +- Exchange hashpartitioning(id#0L, 200), >>> ENSURE_REQUIREMENTS, [id=#45] >>> : +- Filter isnotnull(id#0L) >>> : +- Scan ExistingRDD[id#0L,col1#1L] >>> >>> >>> >>> * +- Sort [id#19L ASC NULLS FIRST], false, 0 +- >>> Exchange hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46] >>> +- Filter isnotnull(id#19L) +- Scan >>> ExistingRDD[id#19L,col2#20L]* >>> >>> In [4]: df1.createOrReplaceTempView('df1') >>> ...: df2.createOrReplaceTempView('df2') >>> ...: df3 = spark.sql(""" >>> ...: SELECT df1.id, df1.col1, df2.col2 >>> ...: FROM df1 JOIN df2 ON df1.id = df2.id >>> ...: """) >>> ...: df3.createOrReplaceTempView('df3') >>> ...: df4 = spark.sql(""" >>> ...: SELECT df2.*, df3.* >>> ...: FROM df2 JOIN df3 ON df2.id = df3.id >>> ...: """) >>> ...: df4.explain() >>> == Physical Plan == >>> AdaptiveSparkPlan isFinalPlan=false >>> +- SortMergeJoin [id#4L], [id#0L], Inner >>> :- Sort [id#4L ASC NULLS FIRST], false, 0 >>> : +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, >>> [id=#110] >>> : +- Filter isnotnull(id#4L) >>> : +- Scan ExistingRDD[id#4L,col2#5L] >>> +- Project [id#0L, col1#1L, col2#35L] >>> +- SortMergeJoin [id#0L], [id#34L], Inner >>> :- Sort [id#0L ASC NULLS FIRST], false, 0 >>> : +- Exchange hashpartitioning(id#0L, 200), >>> ENSURE_REQUIREMENTS, [id=#102] >>> : +- Filter isnotnull(id#0L) >>> : +- Scan ExistingRDD[id#0L,col1#1L] >>> >>> >>> >>> * +- Sort [id#34L ASC NULLS FIRST], false, 0 +- >>> Exchange hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103] >>> +- Filter isnotnull(id#34L) +- Scan >>> ExistingRDD[id#34L,col2#35L]* >>> >>> >>> Doing this in spark 3.1.1 - the plan is: >>> >>> *(8) SortMergeJoin [id#4L], [id#0L], Inner >>> :- *(2) Sort [id#4L ASC NULLS FIRST], false, 0 >>> : +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, >>> [id=#56] >>> : +- *(1) Filter isnotnull(id#4L) >>> : +- *(1) Scan ExistingRDD[id#4L,col2#5L] >>> +- *(7) Project [id#0L, col1#1L, col2#20L] >>> +- *(7) SortMergeJoin [id#0L], [id#19L], Inner >>> :- *(4) Sort [id#0L ASC NULLS FIRST], false, 0 >>> : +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, >>> [id=#62] >>> : +- *(3) Filter isnotnull(id#0L) >>> : +- *(3) Scan ExistingRDD[id#0L,col1#1L] >>> >>> * +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0 +- >>> ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200), >>> ENSURE_REQUIREMENTS, [id=#56]* >>> >>> > > -- > ~~~~~~~~~~~~~~~ > no mistakes > ~~~~~~~~~~~~~~~~~~ >