[ 
https://issues.apache.org/jira/browse/SPARK-31078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Kim updated SPARK-31078:
------------------------------
    Description: 
Currently, `outputOrdering` doesn't respect aliases. Thus, the following would 
produce an unnecessary sort node:

{code:java}
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
      val df = (0 until 20).toDF("i").as("df")
      df.repartition(8, df("i")).write.format("parquet")
        .bucketBy(8, "i").sortBy("i").saveAsTable("t")
      val t1 = spark.table("t")
      val t2 = t1.selectExpr("i as ii")
      t1.join(t2, t1("i") === t2("ii")).explain
    }
{code}

would produce an unnecessary sort node:
{code:java}
== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
:  +- *(1) Filter isnotnull(i#8)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t[i#8] Batched: true, DataFilters: 
[isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., 
PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, 
SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0
   +- *(2) Project [i#8 AS ii#10]
      +- *(2) Filter isnotnull(i#8)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t[i#8] Batched: true, DataFilters: 
[isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., 
PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, 
SelectedBucketsCount: 8 out of 8
{code}


  was:
Currently, `outputOrdering` doesn't respect aliases. Thus, the following would 
produce an unnecessary sort node:

{code:java}
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
      val df = (0 until 20).toDF("i").as("df")
      df.repartition(8, df("i")).write.format("parquet")
        .bucketBy(8, "i").sortBy("i").saveAsTable("t")
      val t1 = spark.table("t")
      val t2 = t1.selectExpr("i as ii")
      t1.join(t2, t1("i") === t2("ii")).explain
    }

== Physical Plan ==
*(3) SortMergeJoin [i#8], [ii#10], Inner
:- *(1) Project [i#8]
:  +- *(1) Filter isnotnull(i#8)
:     +- *(1) ColumnarToRow
:        +- FileScan parquet default.t[i#8] Batched: true, DataFilters: 
[isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., 
PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, 
SelectedBucketsCount: 8 out of 8
+- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0
   +- *(2) Project [i#8 AS ii#10]
      +- *(2) Filter isnotnull(i#8)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t[i#8] Batched: true, DataFilters: 
[isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., 
PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int>, 
SelectedBucketsCount: 8 out of 8

{code}



> outputOrdering should handle aliases correctly
> ----------------------------------------------
>
>                 Key: SPARK-31078
>                 URL: https://issues.apache.org/jira/browse/SPARK-31078
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Terry Kim
>            Priority: Major
>
> Currently, `outputOrdering` doesn't respect aliases. Thus, the following 
> would produce an unnecessary sort node:
> {code:java}
> withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
>       val df = (0 until 20).toDF("i").as("df")
>       df.repartition(8, df("i")).write.format("parquet")
>         .bucketBy(8, "i").sortBy("i").saveAsTable("t")
>       val t1 = spark.table("t")
>       val t2 = t1.selectExpr("i as ii")
>       t1.join(t2, t1("i") === t2("ii")).explain
>     }
> {code}
> would produce an unnecessary sort node:
> {code:java}
> == Physical Plan ==
> *(3) SortMergeJoin [i#8], [ii#10], Inner
> :- *(1) Project [i#8]
> :  +- *(1) Filter isnotnull(i#8)
> :     +- *(1) ColumnarToRow
> :        +- FileScan parquet default.t[i#8] Batched: true, DataFilters: 
> [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., 
> PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: 
> struct<i:int>, SelectedBucketsCount: 8 out of 8
> +- *(2) Sort [ii#10 ASC NULLS FIRST], false, 0
>    +- *(2) Project [i#8 AS ii#10]
>       +- *(2) Filter isnotnull(i#8)
>          +- *(2) ColumnarToRow
>             +- FileScan parquet default.t[i#8] Batched: true, DataFilters: 
> [isnotnull(i#8)], Format: Parquet, Location: InMemoryFileIndex[file:/..., 
> PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: 
> struct<i:int>, SelectedBucketsCount: 8 out of 8
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to