This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6341b06f66b [SPARK-40086][SPARK-42049][SQL] Improve 
AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all 
aliases into account
6341b06f66b is described below

commit 6341b06f66bc8f919d086341d5b15157ada3b5e0
Author: Peter Toth <peter.t...@gmail.com>
AuthorDate: Tue Jan 31 23:07:57 2023 +0800

    [SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and 
AliasAwareQueryOutputOrdering to take all aliases into account
    
    ### What changes were proposed in this pull request?
    Currently `AliasAwareOutputPartitioning` and `AliasAwareQueryOutputOrdering`
    takes only the last alias by aliased expressions into account. We could 
avoid some extra shuffles and sorts with better alias handling.
    
    ### Why are the changes needed?
    Performance improvement and this also fix the issue in 
https://github.com/apache/spark/pull/39475.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, this PR fixes the issue in https://github.com/apache/spark/pull/39475.
    
    ### How was this patch tested?
    Added new UT.
    
    Closes #37525 from peter-toth/SPARK-40086-fix-aliasawareoutputexpression.
    
    Lead-authored-by: Peter Toth <peter.t...@gmail.com>
    Co-authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../catalyst/expressions/stringExpressions.scala   |  24 +++
 .../plans/AliasAwareOutputExpression.scala         | 120 ++++++++++++++
 .../spark/sql/catalyst/plans/QueryPlan.scala       |   7 +
 .../sql/catalyst/plans/logical/LogicalPlan.scala   |  13 +-
 .../plans/logical/basicLogicalOperators.scala      |   1 +
 .../org/apache/spark/sql/internal/SQLConf.scala    |  10 ++
 .../sql/execution/AliasAwareOutputExpression.scala |  75 +++------
 .../org/apache/spark/sql/execution/SparkPlan.scala |   3 -
 .../execution/aggregate/BaseAggregateExec.scala    |   4 +-
 .../execution/aggregate/SortAggregateExec.scala    |   4 +-
 .../sql/execution/basicPhysicalOperators.scala     |   4 +-
 .../execution/datasources/FileFormatWriter.scala   |   3 +-
 .../spark/sql/execution/datasources/V1Writes.scala |  29 +---
 .../org/apache/spark/sql/execution/limit.scala     |   2 +-
 .../scala/org/apache/spark/sql/ExplainSuite.scala  |   2 +-
 .../execution/CoalesceShufflePartitionsSuite.scala |   8 +-
 .../apache/spark/sql/execution/PlannerSuite.scala  |  67 +++++++-
 .../ProjectedOrderingAndPartitioningSuite.scala    | 180 +++++++++++++++++++++
 .../datasources/V1WriteCommandSuite.scala          |  27 ++--
 19 files changed, 464 insertions(+), 119 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 590582eee07..c1ca86b356e 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -3024,3 +3024,27 @@ case class SplitPart (
       partNum = newChildren.apply(2))
   }
 }
+
+/**
+ * A internal function that converts the empty string to null for partition 
values.
+ * This function should be only used in V1Writes.
+ */
+case class Empty2Null(child: Expression) extends UnaryExpression with 
String2StringExpression {
+  override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) 
null else v
+
+  override def nullable: Boolean = true
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+    nullSafeCodeGen(ctx, ev, c => {
+      s"""if ($c.numBytes() == 0) {
+         |  ${ev.isNull} = true;
+         |  ${ev.value} = null;
+         |} else {
+         |  ${ev.value} = $c;
+         |}""".stripMargin
+    })
+  }
+
+  override protected def withNewChildInternal(newChild: Expression): 
Empty2Null =
+    copy(child = newChild)
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
new file mode 100644
index 00000000000..4d9d69d14fe
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * A trait that provides functionality to handle aliases in the 
`outputExpressions`.
+ */
+trait AliasAwareOutputExpression extends SQLConfHelper {
+  protected val aliasCandidateLimit = 
conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT)
+  protected def outputExpressions: Seq[NamedExpression]
+  /**
+   * This method can be used to strip expression which does not affect the 
result, for example:
+   * strip the expression which is ordering agnostic for output ordering.
+   */
+  protected def strip(expr: Expression): Expression = expr
+
+  // Build an `Expression` -> `Attribute` alias map.
+  // There can be multiple alias defined for the same expressions but it 
doesn't make sense to store
+  // more than `aliasCandidateLimit` attributes for an expression. In those 
cases the old logic
+  // handled only the last alias so we need to make sure that we give 
precedence to that.
+  // If the `outputExpressions` contain simple attributes we need to add those 
too to the map.
+  private lazy val aliasMap = {
+    val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]()
+    outputExpressions.reverse.foreach {
+      case a @ Alias(child, _) =>
+        val buffer = aliases.getOrElseUpdate(strip(child).canonicalized, 
mutable.ArrayBuffer.empty)
+        if (buffer.size < aliasCandidateLimit) {
+          buffer += a.toAttribute
+        }
+      case _ =>
+    }
+    outputExpressions.foreach {
+      case a: Attribute if aliases.contains(a.canonicalized) =>
+        val buffer = aliases(a.canonicalized)
+        if (buffer.size < aliasCandidateLimit) {
+          buffer += a
+        }
+      case _ =>
+    }
+    aliases
+  }
+
+  protected def hasAlias: Boolean = aliasMap.nonEmpty
+
+  /**
+   * Return a stream of expressions in which the original expression is 
projected with `aliasMap`.
+   */
+  protected def projectExpression(expr: Expression): Stream[Expression] = {
+    val outputSet = AttributeSet(outputExpressions.map(_.toAttribute))
+    expr.multiTransformDown {
+      // Mapping with aliases
+      case e: Expression if aliasMap.contains(e.canonicalized) =>
+        aliasMap(e.canonicalized).toSeq ++ (if (e.containsChild.nonEmpty) 
Seq(e) else Seq.empty)
+
+      // Prune if we encounter an attribute that we can't map and it is not in 
output set.
+      // This prune will go up to the closest `multiTransformDown()` call and 
returns `Stream.empty`
+      // there.
+      case a: Attribute if !outputSet.contains(a) => Seq.empty
+    }
+  }
+}
+
+/**
+ * A trait that handles aliases in the `orderingExpressions` to produce 
`outputOrdering` that
+ * satisfies ordering requirements.
+ */
+trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]]
+  extends AliasAwareOutputExpression { self: QueryPlan[T] =>
+  protected def orderingExpressions: Seq[SortOrder]
+
+  override protected def strip(expr: Expression): Expression = expr match {
+    case e: Empty2Null => strip(e.child)
+    case _ => expr
+  }
+
+  override final def outputOrdering: Seq[SortOrder] = {
+    if (hasAlias) {
+      // Take the first `SortOrder`s only until they can be projected.
+      // E.g. we have child ordering `Seq(SortOrder(a), SortOrder(b))` then
+      // if only `a AS x` can be projected then we can return 
Seq(SortOrder(x))`
+      // but if only `b AS y` can be projected we can't return 
`Seq(SortOrder(y))`.
+      orderingExpressions.iterator.map { sortOrder =>
+        val orderingSet = mutable.Set.empty[Expression]
+        val sameOrderings = sortOrder.children.toStream
+          .flatMap(projectExpression)
+          .filter(e => orderingSet.add(e.canonicalized))
+          .take(aliasCandidateLimit)
+        if (sameOrderings.nonEmpty) {
+          Some(sortOrder.copy(child = sameOrderings.head,
+            sameOrderExpressions = sameOrderings.tail))
+        } else {
+          None
+        }
+      }.takeWhile(_.isDefined).flatten.toSeq
+    } else {
+      orderingExpressions
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 0942919b176..90d1bd805cb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -53,6 +53,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
   @transient
   lazy val outputSet: AttributeSet = AttributeSet(output)
 
+  /**
+   * Returns the output ordering that this plan generates, although the 
semantics differ in logical
+   * and physical plans. In the logical plan it means global ordering of the 
data while in physical
+   * it means ordering in each partition.
+   */
+  def outputOrdering: Seq[SortOrder] = Nil
+
   // Override `treePatternBits` to propagate bits for its expressions.
   override lazy val treePatternBits: BitSet = {
     val bits: BitSet = getDefaultTreePatternBits
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 0ffaa48b497..9a7726f6a03 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.{AliasAwareQueryOutputOrdering, 
QueryPlan}
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats
 import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
@@ -141,11 +141,6 @@ abstract class LogicalPlan
    */
   def refresh(): Unit = children.foreach(_.refresh())
 
-  /**
-   * Returns the output ordering that this plan generates.
-   */
-  def outputOrdering: Seq[SortOrder] = Nil
-
   /**
    * Returns true iff `other`'s output is semantically the same, i.e.:
    *  - it contains the same number of `Attribute`s;
@@ -205,8 +200,10 @@ trait UnaryNode extends LogicalPlan with 
UnaryLike[LogicalPlan] {
  */
 trait BinaryNode extends LogicalPlan with BinaryLike[LogicalPlan]
 
-abstract class OrderPreservingUnaryNode extends UnaryNode {
-  override final def outputOrdering: Seq[SortOrder] = child.outputOrdering
+trait OrderPreservingUnaryNode extends UnaryNode
+  with AliasAwareQueryOutputOrdering[LogicalPlan] {
+  override protected def outputExpressions: Seq[NamedExpression] = child.output
+  override protected def orderingExpressions: Seq[SortOrder] = 
child.outputOrdering
 }
 
 object LogicalPlanIntegrity {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 343fa3517c6..a8dfb8fbd84 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -69,6 +69,7 @@ object Subquery {
 case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
     extends OrderPreservingUnaryNode {
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+  override protected def outputExpressions: Seq[NamedExpression] = projectList
   override def maxRows: Option[Long] = child.maxRows
   override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4f2d5f6c106..f407dda009c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -443,6 +443,16 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val EXPRESSION_PROJECTION_CANDIDATE_LIMIT =
+    buildConf("spark.sql.optimizer.expressionProjectionCandidateLimit")
+      .doc("The maximum number of the candidate of output expressions whose 
alias are replaced." +
+        " It can preserve the output partitioning and ordering." +
+        " Negative value means disable this optimization.")
+      .internal()
+      .version("3.4.0")
+      .intConf
+      .createWithDefault(100)
+
   val COMPRESS_CACHED = 
buildConf("spark.sql.inMemoryColumnarStorage.compressed")
     .doc("When set to true Spark SQL will automatically select a compression 
codec for each " +
       "column based on statistics of the data.")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
index 92e86637eec..a09c719cf84 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala
@@ -16,52 +16,42 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
NamedExpression, SortOrder}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, PartitioningCollection, UnknownPartitioning}
+import scala.collection.mutable
 
-/**
- * A trait that provides functionality to handle aliases in the 
`outputExpressions`.
- */
-trait AliasAwareOutputExpression extends UnaryExecNode {
-  protected def outputExpressions: Seq[NamedExpression]
-
-  private lazy val aliasMap = outputExpressions.collect {
-    case a @ Alias(child, _) => child.canonicalized -> a.toAttribute
-  }.toMap
-
-  protected def hasAlias: Boolean = aliasMap.nonEmpty
-
-  protected def normalizeExpression(exp: Expression): Expression = {
-    exp.transformDown {
-      case e: Expression => aliasMap.getOrElse(e.canonicalized, e)
-    }
-  }
-}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, 
AliasAwareQueryOutputOrdering}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
PartitioningCollection, UnknownPartitioning}
 
 /**
  * A trait that handles aliases in the `outputExpressions` to produce 
`outputPartitioning` that
  * satisfies distribution requirements.
  */
-trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
+trait PartitioningPreservingUnaryExecNode extends UnaryExecNode
+  with AliasAwareOutputExpression {
   final override def outputPartitioning: Partitioning = {
-    val normalizedOutputPartitioning = if (hasAlias) {
-      child.outputPartitioning match {
+    if (hasAlias) {
+      flattenPartitioning(child.outputPartitioning).flatMap {
         case e: Expression =>
-          normalizeExpression(e).asInstanceOf[Partitioning]
-        case other => other
+          // We need unique partitionings but if the input partitioning is
+          // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> 
b` aliases then after
+          // the projection we have 4 partitionings:
+          // `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`,
+          // `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, 
but
+          // `HashPartitioning(Seq(a + b))` is the same as 
`HashPartitioning(Seq(b + a))`.
+          val partitioningSet = mutable.Set.empty[Expression]
+          projectExpression(e)
+            .filter(e => partitioningSet.add(e.canonicalized))
+            .take(aliasCandidateLimit)
+            .asInstanceOf[Stream[Partitioning]]
+        case o => Seq(o)
+      } match {
+        case Seq() => 
UnknownPartitioning(child.outputPartitioning.numPartitions)
+        case Seq(p) => p
+        case ps => PartitioningCollection(ps)
       }
     } else {
       child.outputPartitioning
     }
-
-    flattenPartitioning(normalizedOutputPartitioning).filter {
-      case hashPartitioning: HashPartitioning => 
hashPartitioning.references.subsetOf(outputSet)
-      case _ => true
-    } match {
-      case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions)
-      case Seq(singlePartitioning) => singlePartitioning
-      case seqWithMultiplePartitionings => 
PartitioningCollection(seqWithMultiplePartitionings)
-    }
   }
 
   private def flattenPartitioning(partitioning: Partitioning): 
Seq[Partitioning] = {
@@ -74,18 +64,5 @@ trait AliasAwareOutputPartitioning extends 
AliasAwareOutputExpression {
   }
 }
 
-/**
- * A trait that handles aliases in the `orderingExpressions` to produce 
`outputOrdering` that
- * satisfies ordering requirements.
- */
-trait AliasAwareOutputOrdering extends AliasAwareOutputExpression {
-  protected def orderingExpressions: Seq[SortOrder]
-
-  final override def outputOrdering: Seq[SortOrder] = {
-    if (hasAlias) {
-      orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder])
-    } else {
-      orderingExpressions
-    }
-  }
-}
+trait OrderPreservingUnaryExecNode
+  extends UnaryExecNode with AliasAwareQueryOutputOrdering[SparkPlan]
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 5ca36a8a216..bbd74a1fe74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -179,9 +179,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   def requiredChildDistribution: Seq[Distribution] =
     Seq.fill(children.size)(UnspecifiedDistribution)
 
-  /** Specifies how data is ordered in each partition. */
-  def outputOrdering: Seq[SortOrder] = Nil
-
   /** Specifies sort order for each partition requirements on the input data 
for this operator. */
   def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
index 756b5eb09d0..2427a39751f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
@@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, NamedExpression}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Final, PartialMerge}
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, UnspecifiedDistribution}
-import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, 
ExplainUtils, UnaryExecNode}
+import org.apache.spark.sql.execution.{ExplainUtils, 
PartitioningPreservingUnaryExecNode, UnaryExecNode}
 import org.apache.spark.sql.execution.streaming.StatefulOperatorPartitioning
 
 /**
  * Holds common logic for aggregate operators
  */
-trait BaseAggregateExec extends UnaryExecNode with 
AliasAwareOutputPartitioning {
+trait BaseAggregateExec extends UnaryExecNode with 
PartitioningPreservingUnaryExecNode {
   def requiredChildDistributionExpressions: Option[Seq[Expression]]
   def isStreaming: Boolean
   def numShufflePartitions: Option[Int]
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index 3cf63a5318d..6042ff7b2ca 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.execution.{AliasAwareOutputOrdering, SparkPlan}
+import org.apache.spark.sql.execution.{OrderPreservingUnaryExecNode, SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 
@@ -41,7 +41,7 @@ case class SortAggregateExec(
     resultExpressions: Seq[NamedExpression],
     child: SparkPlan)
   extends AggregateCodegenSupport
-  with AliasAwareOutputOrdering {
+  with OrderPreservingUnaryExecNode {
 
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 76a8d3d942f..a7b1207765a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -42,8 +42,8 @@ import org.apache.spark.util.random.{BernoulliCellSampler, 
PoissonSampler}
 case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
   extends UnaryExecNode
     with CodegenSupport
-    with AliasAwareOutputPartitioning
-    with AliasAwareOutputOrdering {
+    with PartitioningPreservingUnaryExecNode
+    with OrderPreservingUnaryExecNode {
 
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 5c4d662c145..2491c9d7754 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -155,10 +155,9 @@ object FileFormatWriter extends Logging {
     }
 
     // the sort order doesn't matter
-    // Use the output ordering from the original plan before adding the 
empty2null projection.
     val actualOrdering = writeFilesOpt.map(_.child)
       .getOrElse(materializeAdaptiveSparkPlan(plan))
-      .outputOrdering.map(_.child)
+      .outputOrdering
     val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, 
actualOrdering)
 
     SQLExecution.checkSQLExecutionId(sparkSession)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
index d52af645218..b17d72b0f72 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala
@@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.datasources
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, 
AttributeMap, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, 
NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression}
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, 
AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, 
Literal, NamedExpression, Pmod, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -29,7 +28,6 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StringType
-import org.apache.spark.unsafe.types.UTF8String
 
 trait V1WriteCommand extends DataWritingCommand {
   /**
@@ -121,26 +119,6 @@ object V1Writes extends Rule[LogicalPlan] with 
SQLConfHelper {
 }
 
 object V1WritesUtils {
-
-  /** A function that converts the empty string to null for partition values. 
*/
-  case class Empty2Null(child: Expression) extends UnaryExpression with 
String2StringExpression {
-    override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) 
null else v
-    override def nullable: Boolean = true
-    override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-      nullSafeCodeGen(ctx, ev, c => {
-        s"""if ($c.numBytes() == 0) {
-           |  ${ev.isNull} = true;
-           |  ${ev.value} = null;
-           |} else {
-           |  ${ev.value} = $c;
-           |}""".stripMargin
-      })
-    }
-
-    override protected def withNewChildInternal(newChild: Expression): 
Empty2Null =
-      copy(child = newChild)
-  }
-
   def getWriterBucketSpec(
       bucketSpec: Option[BucketSpec],
       dataColumns: Seq[Attribute],
@@ -230,12 +208,13 @@ object V1WritesUtils {
 
   def isOrderingMatched(
       requiredOrdering: Seq[Expression],
-      outputOrdering: Seq[Expression]): Boolean = {
+      outputOrdering: Seq[SortOrder]): Boolean = {
     if (requiredOrdering.length > outputOrdering.length) {
       false
     } else {
       requiredOrdering.zip(outputOrdering).forall {
-        case (requiredOrder, outputOrder) => 
requiredOrder.semanticEquals(outputOrder)
+        case (requiredOrder, outputOrder) =>
+          outputOrder.satisfies(outputOrder.copy(child = requiredOrder))
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 88e212c53a0..877f6508d96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -276,7 +276,7 @@ case class TakeOrderedAndProjectExec(
     sortOrder: Seq[SortOrder],
     projectList: Seq[NamedExpression],
     child: SparkPlan,
-    offset: Int = 0) extends AliasAwareOutputOrdering {
+    offset: Int = 0) extends OrderPreservingUnaryExecNode {
 
   override def output: Seq[Attribute] = {
     projectList.map(_.toAttribute)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index 9a75cc5ff8f..a6b295578d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -677,7 +677,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with 
EnableAdaptiveExecutionSuit
         df.createTempView("df")
 
         val sqlText = "EXPLAIN CODEGEN SELECT key, MAX(value) FROM df GROUP BY 
key"
-        val expectedCodegenText = "Found 2 WholeStageCodegen subtrees."
+        val expectedCodegenText = "Found 1 WholeStageCodegen subtrees."
         val expectedNoCodegenText = "Found 0 WholeStageCodegen subtrees."
         withNormalizedExplain(sqlText) { normalizedOutput =>
           assert(normalizedOutput.contains(expectedNoCodegenText))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index 81777f67f37..24a98dd83f3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -339,12 +339,12 @@ class CoalesceShufflePartitionsSuite extends 
SparkFunSuite {
       //     ShuffleQueryStage 0
       //   ShuffleQueryStage 2
       //     ReusedQueryStage 0
-      val grouped = df.groupBy("key").agg(max("value").as("value"))
+      val grouped = df.groupBy((col("key") + 
1).as("key")).agg(max("value").as("value"))
       val resultDf2 = grouped.groupBy(col("key") + 1).max("value")
         .union(grouped.groupBy(col("key") + 2).max("value"))
-      QueryTest.checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Row(2, 1) :: 
Row(3, 1) ::
-        Row(3, 2) :: Row(4, 2) :: Row(4, 3) :: Row(5, 3) :: Row(5, 4) :: 
Row(6, 4) :: Row(6, 5) ::
-        Row(7, 5) :: Nil)
+      QueryTest.checkAnswer(resultDf2, Row(2, 0) :: Row(3, 0) :: Row(3, 1) :: 
Row(4, 1) ::
+        Row(4, 2) :: Row(5, 2) :: Row(5, 3) :: Row(6, 3) :: Row(6, 4) :: 
Row(7, 4) :: Row(7, 5) ::
+        Row(8, 5) :: Nil)
 
       val finalPlan2 = resultDf2.queryExecution.executedPlan
         .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 248c68abd1e..e9cb77ec95c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -1072,7 +1072,7 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
         assert(projects.exists(_.outputPartitioning match {
           case PartitioningCollection(Seq(HashPartitioning(Seq(k1: 
AttributeReference), _),
           HashPartitioning(Seq(k2: AttributeReference), _))) =>
-            k1.name == "t1id" && k2.name == "t2id"
+            Set(k1.name, k2.name) == Set("t1id", "t2id")
           case _ => false
         }))
       }
@@ -1101,9 +1101,8 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
 
         val projects = collect(planned) { case p: ProjectExec => p }
         assert(projects.exists(_.outputOrdering match {
-          case Seq(SortOrder(_, Ascending, NullsFirst, sameOrderExprs)) =>
-            sameOrderExprs.size == 1 && 
sameOrderExprs.head.isInstanceOf[AttributeReference] &&
-              sameOrderExprs.head.asInstanceOf[AttributeReference].name == 
"t2id"
+          case Seq(s @ SortOrder(_, Ascending, NullsFirst, _)) =>
+            s.children.map(_.asInstanceOf[AttributeReference].name).toSet == 
Set("t2id", "t3id")
           case _ => false
         }))
       }
@@ -1249,7 +1248,7 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
         assert(planned.outputPartitioning match {
           case PartitioningCollection(Seq(HashPartitioning(Seq(k1: 
AttributeReference), _),
           HashPartitioning(Seq(k2: AttributeReference), _))) =>
-            k1.name == "t1id" && k2.name == "t2id"
+            Set(k1.name, k2.name) == Set("t1id", "t2id")
         })
 
         val planned2 = sql(
@@ -1314,6 +1313,64 @@ class PlannerSuite extends SharedSparkSession with 
AdaptiveSparkPlanHelper {
     assert(topKs.size == 1)
     assert(sorts.isEmpty)
   }
+
+  test("SPARK-40086: an attribute and its aliased version in aggregate 
expressions should not " +
+    "introduce extra shuffle") {
+    withTempView("df") {
+      spark.range(5).select(col("id"), 
col("id").as("value")).createTempView("df")
+      val df = sql("SELECT id, max(value) FROM df GROUP BY id")
+
+      val planned = df.queryExecution.executedPlan
+
+      assert(collect(planned) { case h: HashAggregateExec => h }.nonEmpty)
+
+      val exchanges = collect(planned) { case s: ShuffleExchangeExec => s }
+      assert(exchanges.size == 0)
+    }
+  }
+
+  test("SPARK-40086: multiple aliases to the same attribute in aggregate 
expressions should not " +
+    "introduce extra shuffle") {
+    withTempView("df") {
+      spark.range(5).select(col("id").as("key"), 
col("id").as("value")).createTempView("df")
+      val df = sql("SELECT key, max(value) FROM df GROUP BY key")
+
+      val planned = df.queryExecution.executedPlan
+
+      assert(collect(planned) { case h: HashAggregateExec => h }.nonEmpty)
+
+      val exchanges = collect(planned) { case s: ShuffleExchangeExec => s }
+      assert(exchanges.size == 0)
+    }
+  }
+
+  test("SPARK-40086: multiple aliases to the same attribute with complex 
required distribution " +
+    "should not introduce extra shuffle") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      val df = spark.range(5)
+      val df1 = df.repartition($"id" + $"id")
+        .select($"id".as("key1"), $"id".as("value1"), ($"id" + 
$"id").as("idPlusId1"))
+      val df2 = df.repartition($"id" + $"id")
+        .select($"id".as("key2"), $"id".as("value2"), ($"id" + 
$"id").as("idPlusId2"))
+      val df3 = df1.join(df2, $"key1" + $"value1" === $"idPlusId2")
+
+      val planned = df3.queryExecution.executedPlan
+
+      val numShuffles = collect(planned) {
+        case e: ShuffleExchangeExec => e
+      }
+      // before SPARK-40086: numShuffles is 4
+      assert(numShuffles.size == 2)
+      val numOutputPartitioning = collectFirst(planned) {
+        case e: SortMergeJoinExec => e.outputPartitioning match {
+          case PartitioningCollection(Seq(PartitioningCollection(l), 
PartitioningCollection(r))) =>
+            l ++ r
+          case _ => Seq.empty
+        }
+      }.get
+      assert(numOutputPartitioning.size == 8)
+    }
+  }
 }
 
 // Used for unit-testing EnsureRequirements
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
new file mode 100644
index 00000000000..e4ecdb9c445
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
PartitioningCollection, UnknownPartitioning}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class ProjectedOrderingAndPartitioningSuite
+  extends SharedSparkSession with AdaptiveSparkPlanHelper {
+  import testImplicits._
+
+  test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - 
multi-alias") {
+    Seq(0, 1, 2, 5).foreach { limit =>
+      withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> 
limit.toString) {
+        val df = spark.range(2).orderBy($"id").selectExpr("id as x", "id as 
y", "id as z")
+        val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering
+        limit match {
+          case 5 =>
+            assert(outputOrdering.size == 1)
+            assert(outputOrdering.head.sameOrderExpressions.size == 2)
+            
assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name)
+              .toSet.subsetOf(Set("x", "y", "z")))
+          case 2 =>
+            assert(outputOrdering.size == 1)
+            assert(outputOrdering.head.sameOrderExpressions.size == 1)
+            
assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name)
+              .toSet.subsetOf(Set("x", "y", "z")))
+          case 1 =>
+            assert(outputOrdering.size == 1)
+            assert(outputOrdering.head.sameOrderExpressions.size == 0)
+          case 0 =>
+            assert(outputOrdering.size == 0)
+        }
+      }
+    }
+  }
+
+  test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - 
multi-alias") {
+    Seq(0, 1, 2, 5).foreach { limit =>
+      withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> 
limit.toString) {
+        val df = spark.range(2).repartition($"id").selectExpr("id as x", "id 
as y", "id as z")
+        val outputPartitioning = 
stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning
+        limit match {
+          case 5 =>
+            val p = 
outputPartitioning.asInstanceOf[PartitioningCollection].partitionings
+            assert(p.size == 3)
+            assert(p.flatMap(_.asInstanceOf[HashPartitioning].expressions
+              .map(_.asInstanceOf[Attribute].name)).toSet == Set("x", "y", 
"z"))
+          case 2 =>
+            val p = 
outputPartitioning.asInstanceOf[PartitioningCollection].partitionings
+            assert(p.size == 2)
+            p.flatMap(_.asInstanceOf[HashPartitioning].expressions
+              .map(_.asInstanceOf[Attribute].name)).toSet.subsetOf(Set("x", 
"y", "z"))
+          case 1 =>
+            val p = outputPartitioning.asInstanceOf[HashPartitioning]
+            assert(p.expressions.size == 1)
+            assert(p.expressions.map(_.asInstanceOf[Attribute].name)
+              .toSet.subsetOf(Set("x", "y", "z")))
+          case 0 =>
+            assert(outputPartitioning.isInstanceOf[UnknownPartitioning])
+        }
+      }
+    }
+  }
+
+  test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - 
multi-references") {
+    val df = spark.range(2).selectExpr("id as a", "id as b")
+      .orderBy($"a" + $"b").selectExpr("a as x", "b as y")
+    val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering
+    assert(outputOrdering.size == 1)
+    assert(outputOrdering.head.sql == "(x + y) ASC NULLS FIRST")
+    assert(outputOrdering.head.sameOrderExpressions.size == 0)
+  }
+
+  test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - 
multi-references") {
+    val df = spark.range(2).selectExpr("id as a", "id as b")
+      .repartition($"a" + $"b").selectExpr("a as x", "b as y")
+    val outputPartitioning = 
stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning
+    // (a + b), (a + y), (x + b) are pruned since their references are not the 
subset of output
+    outputPartitioning match {
+      case p: HashPartitioning => assert(p.sql == "hashpartitioning((x + y))")
+      case _ => fail(s"Unexpected $outputPartitioning")
+    }
+  }
+
+  test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to 
complex " +
+    "expressions") {
+    val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id + id as 
a", "id + id as b")
+    val outputPartitioning = 
stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning
+    val partitionings = 
outputPartitioning.asInstanceOf[PartitioningCollection].partitionings
+    assert(partitionings.map {
+      case p: HashPartitioning => p.sql
+      case _ => fail(s"Unexpected $outputPartitioning")
+    } == Seq("hashpartitioning(b)", "hashpartitioning(a)"))
+
+    val df = spark.range(2).orderBy($"id" + $"id").selectExpr("id + id as a", 
"id + id as b")
+    val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering
+    assert(outputOrdering.size == 1)
+    assert(outputOrdering.head.sql == "b ASC NULLS FIRST")
+    assert(outputOrdering.head.sameOrderExpressions.map(_.sql) == Seq("a"))
+  }
+
+  test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to 
children of " +
+    "complex expressions") {
+    val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id as a", 
"id as b")
+    val outputPartitioning = 
stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning
+    val partitionings = 
outputPartitioning.asInstanceOf[PartitioningCollection].partitionings
+    // (a + b) is the same as (b + a) so expect only one
+    assert(partitionings.map {
+      case p: HashPartitioning => p.sql
+      case _ => fail(s"Unexpected $outputPartitioning")
+    } == Seq("hashpartitioning((b + b))", "hashpartitioning((a + b))", 
"hashpartitioning((a + a))"))
+
+    val df = spark.range(2).orderBy($"id" + $"id").selectExpr("id as a", "id 
as b")
+    val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering
+    assert(outputOrdering.size == 1)
+    assert(outputOrdering.head.sql == "(b + b) ASC NULLS FIRST")
+    // (a + b) is the same as (b + a) so expect only one
+    assert(outputOrdering.head.sameOrderExpressions.map(_.sql) == Seq("(a + 
b)", "(a + a)"))
+  }
+
+  test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to 
complex " +
+    "expressions and to their children") {
+    val df2 = spark.range(2).repartition($"id" + $"id")
+      .selectExpr("id + id as aa", "id + id as bb", "id as a", "id as b")
+    val outputPartitioning = 
stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning
+    val partitionings = 
outputPartitioning.asInstanceOf[PartitioningCollection].partitionings
+    // (a + b) is the same as (b + a) so expect only one
+    assert(partitionings.map {
+      case p: HashPartitioning => p.sql
+      case _ => fail(s"Unexpected $outputPartitioning")
+    } == Seq("hashpartitioning(bb)", "hashpartitioning(aa)", 
"hashpartitioning((b + b))",
+      "hashpartitioning((a + b))", "hashpartitioning((a + a))"))
+
+    val df = spark.range(2).orderBy($"id" + $"id")
+      .selectExpr("id + id as aa", "id + id as bb", "id as a", "id as b")
+    val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering
+    assert(outputOrdering.size == 1)
+    assert(outputOrdering.head.sql == "bb ASC NULLS FIRST")
+    // (a + b) is the same as (b + a) so expect only one
+    assert(outputOrdering.head.sameOrderExpressions.map(_.sql) ==
+      Seq("aa", "(b + b)", "(a + b)", "(a + a)"))
+  }
+
+  test("SPARK-42049: Improve AliasAwareOutputExpression - ordering partly 
projected") {
+    val df = spark.range(2).orderBy($"id" + 1, $"id" + 2)
+
+    val df1 = df.selectExpr("id + 1 AS a", "id + 2 AS b")
+    val outputOrdering1 = df1.queryExecution.optimizedPlan.outputOrdering
+    assert(outputOrdering1.size == 2)
+    assert(outputOrdering1.map(_.sql) == Seq("a ASC NULLS FIRST", "b ASC NULLS 
FIRST"))
+
+    val df2 = df.selectExpr("id + 1 AS a")
+    val outputOrdering2 = df2.queryExecution.optimizedPlan.outputOrdering
+    assert(outputOrdering2.size == 1)
+    assert(outputOrdering2.head.sql == "a ASC NULLS FIRST")
+
+    val df3 = df.selectExpr("id + 2 AS b")
+    val outputOrdering3 = df3.queryExecution.optimizedPlan.outputOrdering
+    assert(outputOrdering3.size == 0)
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
index 40574a8e73a..20a90cd94b6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala
@@ -90,9 +90,11 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils {
     sparkContext.listenerBus.waitUntilEmpty()
 
     assert(optimizedPlan != null)
-    // Check whether a logical sort node is at the top of the logical plan of 
the write query.
-    assert(optimizedPlan.isInstanceOf[Sort] == hasLogicalSort,
-      s"Expect hasLogicalSort: $hasLogicalSort, Actual: 
${optimizedPlan.isInstanceOf[Sort]}")
+    // Check whether exists a logical sort node of the write query.
+    // If user specified sort matches required ordering, the sort node may not 
at the top of query.
+    assert(optimizedPlan.exists(_.isInstanceOf[Sort]) == hasLogicalSort,
+      s"Expect hasLogicalSort: $hasLogicalSort," +
+        s"Actual: ${optimizedPlan.exists(_.isInstanceOf[Sort])}")
 
     // Check empty2null conversion.
     val empty2nullExpr = optimizedPlan.exists(p => 
V1WritesUtils.hasEmptyToNull(p.expressions))
@@ -223,8 +225,8 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
             case s: SortExec => s
           }.exists {
             case SortExec(Seq(
-            SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, 
NullsFirst, _),
-            SortOrder(AttributeReference("value", StringType, _, _), 
Ascending, NullsFirst, _)
+              SortOrder(AttributeReference("key", IntegerType, _, _), 
Ascending, NullsFirst, _),
+              SortOrder(AttributeReference("value", StringType, _, _), 
Ascending, NullsFirst, _)
             ), false, _, _) => true
             case _ => false
           }, plan)
@@ -268,16 +270,11 @@ class V1WriteCommandSuite extends QueryTest with 
SharedSparkSession with V1Write
         // assert the outer most sort in the executed plan
         assert(plan.collectFirst {
           case s: SortExec => s
-        }.map(s => (enabled, s)).exists {
-          case (false, SortExec(Seq(
-          SortOrder(AttributeReference("value", StringType, _, _), Ascending, 
NullsFirst, _),
-          SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, 
NullsFirst, _)
-          ), false, _, _)) => true
-
-          // SPARK-40885: this bug removes the in-partition sort, which 
manifests here
-          case (true, SortExec(Seq(
-          SortOrder(AttributeReference("value", StringType, _, _), Ascending, 
NullsFirst, _)
-          ), false, _, _)) => true
+        }.exists {
+          case SortExec(Seq(
+            SortOrder(AttributeReference("value", StringType, _, _), 
Ascending, NullsFirst, _),
+            SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, 
NullsFirst, _)
+          ), false, _, _) => true
           case _ => false
         }, plan)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to