Yuki Sato created SPARK-53634:
---------------------------------
Summary: Missing Sort Columns in Execution Plan Optimization with
union + withColumn + sortWithinPartition
Key: SPARK-53634
URL: https://issues.apache.org/jira/browse/SPARK-53634
Project: Spark
Issue Type: Bug
Components: Spark Core, SQL
Affects Versions: 4.0.0, 3.5.4
Reporter: Yuki Sato
I am reporting this as a bug, but as I cannot determine for sure, I would like
to confirm whether this behavior is intended or a bug.
*issue*
In certain logic, during the execution plan optimization process, some columns
specified in the Sort node created by SortWithinPartition are missing.
The following code snippet can easily reproduce this issue:
{code:java}
val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b"))
val csvOutputDF1 = sampleData1.toDF("value", "path1")
val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b"))
val csvOutputDF2 = sampleData2.toDF("value", "path1")
val csvUnionDF = csvOutputDF1.union(csvOutputDF2)
csvUnionDF
.withColumn("path2", lit("e"))
.withColumn("path3", lit("f"))
.sortWithinPartitions(
col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc
)
.explain(true) {code}
The execution plan of the above code is as follows:
{code:java}
== Parsed Logical Plan ==
'Sort ['path1 ASC NULLS FIRST, 'path2 ASC NULLS FIRST, 'path3 ASC NULLS FIRST,
'value ASC NULLS FIRST], false
+- Project [value#42, path1#43, path2#59, f AS path3#63]
+- Project [value#42, path1#43, e AS path2#59]
+- Union false, false
:- Project [_1#37 AS value#42, _2#38 AS path1#43]
: +- LocalRelation [_1#37, _2#38]
+- Project [_1#48 AS value#53, _2#49 AS path1#54]
+- LocalRelation [_1#48, _2#49]
== Analyzed Logical Plan ==
value: int, path1: string, path2: string, path3: string
Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, path3#63 ASC NULLS
FIRST, value#42 ASC NULLS FIRST], false
+- Project [value#42, path1#43, path2#59, f AS path3#63]
+- Project [value#42, path1#43, e AS path2#59]
+- Union false, false
:- Project [_1#37 AS value#42, _2#38 AS path1#43]
: +- LocalRelation [_1#37, _2#38]
+- Project [_1#48 AS value#53, _2#49 AS path1#54]
+- LocalRelation [_1#48, _2#49]
== Optimized Logical Plan ==
Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC NULLS
FIRST], false
+- Union false, false
:- LocalRelation [value#42, path1#43, path2#59, path3#63]
+- LocalRelation [value#53, path1#54, path2#68, path3#69]
== Physical Plan ==
*(1) Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC
NULLS FIRST], false, 0
+- Union
:- LocalTableScan [value#42, path1#43, path2#59, path3#63]
+- LocalTableScan [value#53, path1#54, path2#68, path3#69] {code}
It can be observed that the column "path3" in Sort has already disappeared at
the Optimized Logical Plan.
This becomes a problem when using partitionBy as shown below:
{code:java}
import org.apache.spark.sql.SaveMode
val publishDir = "output/csv_data"
val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b"))
val csvOutputDF1 = sampleData1.toDF("value", "path1")
val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b"))
val csvOutputDF2 = sampleData2.toDF("value", "path1")
val csvUnionDF = csvOutputDF1.union(csvOutputDF2)
csvUnionDF
.withColumn("path2", lit("e"))
.withColumn("path3", lit("f"))
.sortWithinPartitions(
col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc
)
.write
.mode(SaveMode.Overwrite)
.partitionBy(
"path1", "path2", "path3"
)
.csv(publishDir) {code}
When using V1Writes, isOrderingMatch compares the outputOrdering generated by
the Sort node from sortWithinPartition with the requiredOrdering from
partitionBy.
However, since "path3" is missing, the Sort node from partitionBy is selected,
and the sort specified by sortWithinPartition is ignored, resulting in the
user-specified sort order being broken{*}.{*}
Although partitionBy itself does not guarantee that the sort order will be
preserved, from the user’s perspective this behavior appears strange.
*root cause*
The root cause in this case lies in the three optimization rules
(PushProjectionThroughUnion / FoldablePropagation / EliminateSorts) and the
order in which they are applied.
Currently, the rules are applied once each time in the order
PushProjectionThroughUnion → FoldablePropagation → EliminateSorts.
As a result, the literal column from the first withColumn in csvUnionDF is not
subject to FoldablePropagation due to the prior application of
PushProjectionThroughUnion, while literal columns added from the second
withColumn are subject to FoldablePropagation, interpreted as literals, and
then eliminated by EliminateSorts.
*affect version*
I have confirmed this in version 3.5.4 and 4.0.0, but it is assumed that all
Spark versions are affected.
However, the behavior differs around version 3.4.0, where changes to V1Writes
were introduced.
- 3.4.0 and later
In the execution plan, the Sort generated by partitionBy is selected, and the
Sort from sortWithinPartition is ignored.
- Prior to 3.4.0
A Sort generated by partitionBy is internally added to the execution plan.
As a result, in the example code above, both the Sort from sortWithinPartition
and the Sort from partitionBy are executed redundantly.
I am relatively new to Spark community, so I cannot determine whether this is
intended behavior or a bug that should be fixed.
If it is to be fixed as a bug and necessary, I will consider working on it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]