Hi,
I am using pyspark for some projects. And one of the things we are doing is
trying to find the tables/columns being used by Spark using the execution
plan.
When we upgrade to spark 3.2 - the spark plan seems to be different from
previous versions - mainly when we are doing joins.
Below is a reproducible example (you could run the same in versions 2.3 to
3.1 to see the difference)
My original data frames have the columns: id#0 and id#4
But after doing the joins we are seeing new columns id#34 and id#19 which
are not created from the original dataframes I was working with.
In previous versions of spark, this used to use a ReusedExchange step
(shown below)
I was trying to understand if this is expected in spark 3.2 where the
execution plan seems to be creating a new data source which does not
originate from df1 and df2 which I provided.
NOTE: The same happens even if I read from parquet files
In spark 3.2:
In [1]: import pyspark
...: spark = pyspark.sql.SparkSession.builder.getOrCreate()
In [2]: df1 = spark.createDataFrame([[1, 10], [2, 20]], ['id', 'col1'])
...: df2 = spark.createDataFrame([[1, 11], [2, 22], [2, 222]], ['id',
'col2'])
...: df1.explain()
...: df2.explain()
== Physical Plan ==
*(1) Scan ExistingRDD[id#0L,col1#1L]
== Physical Plan ==
*(1) Scan ExistingRDD[id#4L,col2#5L]
In [3]: df3 = df1.join(df2, df1['id'] == df2['id']).drop(df2['id'])
...: df4 = df2.join(df3, df1['id'] == df2['id'])
...: df4.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#4L], [id#0L], Inner
:- Sort [id#4L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
[id=#53]
: +- Filter isnotnull(id#4L)
: +- Scan ExistingRDD[id#4L,col2#5L]
+- Project [id#0L, col1#1L, col2#20L]
+- SortMergeJoin [id#0L], [id#19L], Inner
:- Sort [id#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
[id=#45]
: +- Filter isnotnull(id#0L)
: +- Scan ExistingRDD[id#0L,col1#1L]
* +- Sort [id#19L ASC NULLS FIRST], false, 0 +- Exchange
hashpartitioning(id#19L, 200), ENSURE_REQUIREMENTS, [id=#46]
+- Filter isnotnull(id#19L) +- Scan
ExistingRDD[id#19L,col2#20L]*
In [4]: df1.createOrReplaceTempView('df1')
...: df2.createOrReplaceTempView('df2')
...: df3 = spark.sql("""
...: SELECT df1.id, df1.col1, df2.col2
...: FROM df1 JOIN df2 ON df1.id = df2.id
...: """)
...: df3.createOrReplaceTempView('df3')
...: df4 = spark.sql("""
...: SELECT df2.*, df3.*
...: FROM df2 JOIN df3 ON df2.id = df3.id
...: """)
...: df4.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#4L], [id#0L], Inner
:- Sort [id#4L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS,
[id=#110]
: +- Filter isnotnull(id#4L)
: +- Scan ExistingRDD[id#4L,col2#5L]
+- Project [id#0L, col1#1L, col2#35L]
+- SortMergeJoin [id#0L], [id#34L], Inner
:- Sort [id#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
[id=#102]
: +- Filter isnotnull(id#0L)
: +- Scan ExistingRDD[id#0L,col1#1L]
* +- Sort [id#34L ASC NULLS FIRST], false, 0 +- Exchange
hashpartitioning(id#34L, 200), ENSURE_REQUIREMENTS, [id=#103]
+- Filter isnotnull(id#34L) +- Scan
ExistingRDD[id#34L,col2#35L]*
Doing this in spark 3.1.1 - the plan is:
*(8) SortMergeJoin [id#4L], [id#0L], Inner
:- *(2) Sort [id#4L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#4L, 200), ENSURE_REQUIREMENTS, [id=#56]
: +- *(1) Filter isnotnull(id#4L)
: +- *(1) Scan ExistingRDD[id#4L,col2#5L]
+- *(7) Project [id#0L, col1#1L, col2#20L]
+- *(7) SortMergeJoin [id#0L], [id#19L], Inner
:- *(4) Sort [id#0L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS,
[id=#62]
: +- *(3) Filter isnotnull(id#0L)
: +- *(3) Scan ExistingRDD[id#0L,col1#1L]
* +- *(6) Sort [id#19L ASC NULLS FIRST], false, 0 +-
ReusedExchange [id#19L, col2#20L], Exchange hashpartitioning(id#4L, 200),
ENSURE_REQUIREMENTS, [id=#56]*