[
https://issues.apache.org/jira/browse/SPARK-53634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yuki Sato updated SPARK-53634:
------------------------------
Priority: Minor (was: Major)
> Missing Sort Columns in Execution Plan Optimization with union + withColumn +
> sortWithinPartition
> -------------------------------------------------------------------------------------------------
>
> Key: SPARK-53634
> URL: https://issues.apache.org/jira/browse/SPARK-53634
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, SQL
> Affects Versions: 3.5.4, 4.0.0
> Reporter: Yuki Sato
> Priority: Minor
>
> I am reporting this as a bug, but as I cannot determine for sure, I would
> like to confirm whether this behavior is intended or a bug.
> *issue*
> In certain logic, during the execution plan optimization process, some
> columns specified in the Sort node created by SortWithinPartition are missing.
> The following code snippet can easily reproduce this issue:
> {code:java}
> val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b"))
> val csvOutputDF1 = sampleData1.toDF("value", "path1")
> val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b"))
> val csvOutputDF2 = sampleData2.toDF("value", "path1")
> val csvUnionDF = csvOutputDF1.union(csvOutputDF2)
> csvUnionDF
> .withColumn("path2", lit("e"))
> .withColumn("path3", lit("f"))
> .sortWithinPartitions(
> col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc
> )
> .explain(true) {code}
>
> The execution plan of the above code is as follows:
> {code:java}
> == Parsed Logical Plan ==
> 'Sort ['path1 ASC NULLS FIRST, 'path2 ASC NULLS FIRST, 'path3 ASC NULLS
> FIRST, 'value ASC NULLS FIRST], false
> +- Project [value#42, path1#43, path2#59, f AS path3#63]
> +- Project [value#42, path1#43, e AS path2#59]
> +- Union false, false
> :- Project [_1#37 AS value#42, _2#38 AS path1#43]
> : +- LocalRelation [_1#37, _2#38]
> +- Project [_1#48 AS value#53, _2#49 AS path1#54]
> +- LocalRelation [_1#48, _2#49]
> == Analyzed Logical Plan ==
> value: int, path1: string, path2: string, path3: string
> Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, path3#63 ASC NULLS
> FIRST, value#42 ASC NULLS FIRST], false
> +- Project [value#42, path1#43, path2#59, f AS path3#63]
> +- Project [value#42, path1#43, e AS path2#59]
> +- Union false, false
> :- Project [_1#37 AS value#42, _2#38 AS path1#43]
> : +- LocalRelation [_1#37, _2#38]
> +- Project [_1#48 AS value#53, _2#49 AS path1#54]
> +- LocalRelation [_1#48, _2#49]
> == Optimized Logical Plan ==
> Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC NULLS
> FIRST], false
> +- Union false, false
> :- LocalRelation [value#42, path1#43, path2#59, path3#63]
> +- LocalRelation [value#53, path1#54, path2#68, path3#69]
> == Physical Plan ==
> *(1) Sort [path1#43 ASC NULLS FIRST, path2#59 ASC NULLS FIRST, value#42 ASC
> NULLS FIRST], false, 0
> +- Union
> :- LocalTableScan [value#42, path1#43, path2#59, path3#63]
> +- LocalTableScan [value#53, path1#54, path2#68, path3#69] {code}
> It can be observed that the column "path3" in Sort has already disappeared at
> the Optimized Logical Plan.
> This becomes a problem when using partitionBy as shown below:
> {code:java}
> import org.apache.spark.sql.SaveMode
> val publishDir = "output/csv_data"
> val sampleData1 = Seq((1, "a"), (3, "a"), (2, "b"))
> val csvOutputDF1 = sampleData1.toDF("value", "path1")
> val sampleData2 = Seq((2, "b"), (3, "c"), (5,"b"))
> val csvOutputDF2 = sampleData2.toDF("value", "path1")
> val csvUnionDF = csvOutputDF1.union(csvOutputDF2)
> csvUnionDF
> .withColumn("path2", lit("e"))
> .withColumn("path3", lit("f"))
> .sortWithinPartitions(
> col("path1").asc, col("path2").asc, col("path3").asc, col("value").asc
> )
> .write
> .mode(SaveMode.Overwrite)
> .partitionBy(
> "path1", "path2", "path3"
> )
> .csv(publishDir) {code}
> When using V1Writes, isOrderingMatch compares the outputOrdering generated by
> the Sort node from sortWithinPartition with the requiredOrdering from
> partitionBy.
> However, since "path3" is missing, the Sort node from partitionBy is
> selected, and the sort specified by sortWithinPartition is ignored, resulting
> in the user-specified sort order being broken{*}.{*}
> Although partitionBy itself does not guarantee that the sort order will be
> preserved, from the user’s perspective this behavior appears strange.
> *root cause*
> The root cause in this case lies in the three optimization rules
> (PushProjectionThroughUnion / FoldablePropagation / EliminateSorts) and the
> order in which they are applied.
> Currently, the rules are applied once each time in the order
> PushProjectionThroughUnion → FoldablePropagation → EliminateSorts.
> As a result, the literal column from the first withColumn in csvUnionDF is
> not subject to FoldablePropagation due to the prior application of
> PushProjectionThroughUnion, while literal columns added from the second
> withColumn are subject to FoldablePropagation, interpreted as literals, and
> then eliminated by EliminateSorts.
> *affect version*
> I have confirmed this in version 3.5.4 and 4.0.0, but it is assumed that all
> Spark versions are affected.
> However, the behavior differs around version 3.4.0, where changes to V1Writes
> were introduced.
> - 3.4.0 and later (default: spark.sql.optimizer.plannedWrite.enbaled=true)
> In the execution plan, the Sort generated by partitionBy is selected, and
> the Sort from sortWithinPartition is ignored.
> - Prior to 3.4.0 (same as spark.sql.optimizer.plannedWrite.enbaled=false in
> 3.4.0 and later)
> A Sort generated by partitionBy is internally added to the execution plan.
> As a result, in the example code above, both the Sort from
> sortWithinPartition and the Sort from partitionBy are executed redundantly.
> I am relatively new to Spark community, so I cannot determine whether this is
> intended behavior or a bug that should be fixed.
> If it is to be fixed as a bug and necessary, I will consider working on it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]