Antoine Wendlinger created SPARK-38000: ------------------------------------------
Summary: Sort node incorrectly removed from the optimized logical plan Key: SPARK-38000 URL: https://issues.apache.org/jira/browse/SPARK-38000 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.2.0 Environment: Tested on: Ubuntu 18.04.2 LTS OpenJDK 1.8.0_312 64-Bit Server VM (build 25.312-b07, mixed mode) Reporter: Antoine Wendlinger When using a fairly involved combination of joins, windows, cache and orderBy, the sorting phase disappears from the optimized logical plan and the resulting dataframe is not sorted. You can find a reproduction of the bug in [https://github.com/antoinewdg/spark-bug-report|http://example.com/]. Use {{sbt run}} to get the results. The bug is very niche, I chose to report it because it looks like a correctness issue, and may be a symptom of a larger one. The bug affects only 3.2.0, tests on 3.1.2 show the result correctly sorted. As far as I could test it, all steps in the reproduction are necessary for the bug to happen: * the join with an empty dataframe * the distinct call on the empty dataframe * the window function * the cache before the order by h2. Code {code:scala} val players = (10 to 20).map(x => Player(id = x.toString)).toDS val blacklist = sparkSession .emptyDataset[BlacklistEntry] .distinct() val result = players .join(blacklist, Seq("id"), "left_outer") .withColumn("rank", row_number().over(Window.partitionBy("id").orderBy("id"))) .orderBy("id") .cache() result.show() result.explain(true) {code} h2. Output {code:java} +---+----+ | id|rank| +---+----+ | 15| 1| | 11| 1| | 16| 1| | 18| 1| | 17| 1| | 19| 1| | 20| 1| | 10| 1| | 12| 1| | 13| 1| | 14| 1| +---+----+ == Parsed Logical Plan == 'Sort ['id ASC NULLS FIRST], true +- Project [id#1, rank#10] +- Project [id#1, rank#10, rank#10] +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST] +- Project [id#1] +- Project [id#1] +- Join LeftOuter, (id#1 = id#5) :- LocalRelation [id#1] +- Deduplicate [id#5] +- LocalRelation <empty>, [id#5] == Analyzed Logical Plan == id: string, rank: int Sort [id#1 ASC NULLS FIRST], true +- Project [id#1, rank#10] +- Project [id#1, rank#10, rank#10] +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST] +- Project [id#1] +- Project [id#1] +- Join LeftOuter, (id#1 = id#5) :- LocalRelation [id#1] +- Deduplicate [id#5] +- LocalRelation <empty>, [id#5] == Optimized Logical Plan == InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 replicas) +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST] +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#7] +- LocalTableScan [id#1] == Physical Plan == InMemoryTableScan [id#1, rank#10] +- InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 replicas) +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10], [id#1], [id#1 ASC NULLS FIRST] +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#7] +- LocalTableScan [id#1]{quote} {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org