Antoine Wendlinger created SPARK-38000:
------------------------------------------

             Summary: Sort node incorrectly removed from the optimized logical 
plan
                 Key: SPARK-38000
                 URL: https://issues.apache.org/jira/browse/SPARK-38000
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.2.0
         Environment: Tested on:

Ubuntu 18.04.2 LTS
OpenJDK 1.8.0_312 64-Bit Server VM (build 25.312-b07, mixed mode)

            Reporter: Antoine Wendlinger


When using a fairly involved combination of joins, windows, cache and orderBy, 
the sorting phase disappears from the optimized logical plan and the resulting 
dataframe is not sorted.

You can find a reproduction of the bug in 
[https://github.com/antoinewdg/spark-bug-report|http://example.com/].

Use {{sbt run}} to get the results.

The bug is very niche, I chose to report it because it looks like a correctness 
issue, and may be a symptom of a larger one.

The bug affects only 3.2.0, tests on 3.1.2 show the result correctly sorted.

As far as I could test it, all steps in the reproduction are necessary for the 
bug to happen:
 * the join with an empty dataframe
 * the distinct call on the empty dataframe
 * the window function
 * the cache before the order by

h2. Code
 
{code:scala}
  val players = (10 to 20).map(x => Player(id = x.toString)).toDS
  val blacklist = sparkSession
    .emptyDataset[BlacklistEntry]
    .distinct()
  val result = players
    .join(blacklist, Seq("id"), "left_outer")
    .withColumn("rank", 
row_number().over(Window.partitionBy("id").orderBy("id")))
    .orderBy("id")
    .cache()
  result.show()
  result.explain(true)
{code}
 
h2. Output
 
{code:java}
+---+----+
| id|rank|
+---+----+
| 15|   1|
| 11|   1|
| 16|   1|
| 18|   1|
| 17|   1|
| 19|   1|
| 20|   1|
| 10|   1|
| 12|   1|
| 13|   1|
| 14|   1|
+---+----+

== Parsed Logical Plan ==
'Sort ['id ASC NULLS FIRST], true
+- Project [id#1, rank#10]
   +- Project [id#1, rank#10, rank#10]
      +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
rank#10], [id#1], [id#1 ASC NULLS FIRST]
         +- Project [id#1]
            +- Project [id#1]
               +- Join LeftOuter, (id#1 = id#5)
                  :- LocalRelation [id#1]
                  +- Deduplicate [id#5]
                     +- LocalRelation <empty>, [id#5]

== Analyzed Logical Plan ==
id: string, rank: int
Sort [id#1 ASC NULLS FIRST], true
+- Project [id#1, rank#10]
   +- Project [id#1, rank#10, rank#10]
      +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
rank#10], [id#1], [id#1 ASC NULLS FIRST]
         +- Project [id#1]
            +- Project [id#1]
               +- Join LeftOuter, (id#1 = id#5)
                  :- LocalRelation [id#1]
                  +- Deduplicate [id#5]
                     +- LocalRelation <empty>, [id#5]

== Optimized Logical Plan ==
InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, deserialized, 1 
replicas)
   +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS FIRST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
rank#10], [id#1], [id#1 ASC NULLS FIRST]
      +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, [id=#7]
            +- LocalTableScan [id#1]

== Physical Plan ==
InMemoryTableScan [id#1, rank#10]
   +- InMemoryRelation [id#1, rank#10], StorageLevel(disk, memory, 
deserialized, 1 replicas)
         +- Window [row_number() windowspecdefinition(id#1, id#1 ASC NULLS 
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
rank#10], [id#1], [id#1 ASC NULLS FIRST]
            +- *(1) Sort [id#1 ASC NULLS FIRST, id#1 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(id#1, 200), ENSURE_REQUIREMENTS, 
[id=#7]
                  +- LocalTableScan [id#1]{quote}
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to