This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6341b06f66b [SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all aliases into account 6341b06f66b is described below commit 6341b06f66bc8f919d086341d5b15157ada3b5e0 Author: Peter Toth <peter.t...@gmail.com> AuthorDate: Tue Jan 31 23:07:57 2023 +0800 [SPARK-40086][SPARK-42049][SQL] Improve AliasAwareOutputPartitioning and AliasAwareQueryOutputOrdering to take all aliases into account ### What changes were proposed in this pull request? Currently `AliasAwareOutputPartitioning` and `AliasAwareQueryOutputOrdering` takes only the last alias by aliased expressions into account. We could avoid some extra shuffles and sorts with better alias handling. ### Why are the changes needed? Performance improvement and this also fix the issue in https://github.com/apache/spark/pull/39475. ### Does this PR introduce _any_ user-facing change? Yes, this PR fixes the issue in https://github.com/apache/spark/pull/39475. ### How was this patch tested? Added new UT. Closes #37525 from peter-toth/SPARK-40086-fix-aliasawareoutputexpression. Lead-authored-by: Peter Toth <peter.t...@gmail.com> Co-authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/expressions/stringExpressions.scala | 24 +++ .../plans/AliasAwareOutputExpression.scala | 120 ++++++++++++++ .../spark/sql/catalyst/plans/QueryPlan.scala | 7 + .../sql/catalyst/plans/logical/LogicalPlan.scala | 13 +- .../plans/logical/basicLogicalOperators.scala | 1 + .../org/apache/spark/sql/internal/SQLConf.scala | 10 ++ .../sql/execution/AliasAwareOutputExpression.scala | 75 +++------ .../org/apache/spark/sql/execution/SparkPlan.scala | 3 - .../execution/aggregate/BaseAggregateExec.scala | 4 +- .../execution/aggregate/SortAggregateExec.scala | 4 +- .../sql/execution/basicPhysicalOperators.scala | 4 +- .../execution/datasources/FileFormatWriter.scala | 3 +- .../spark/sql/execution/datasources/V1Writes.scala | 29 +--- .../org/apache/spark/sql/execution/limit.scala | 2 +- .../scala/org/apache/spark/sql/ExplainSuite.scala | 2 +- .../execution/CoalesceShufflePartitionsSuite.scala | 8 +- .../apache/spark/sql/execution/PlannerSuite.scala | 67 +++++++- .../ProjectedOrderingAndPartitioningSuite.scala | 180 +++++++++++++++++++++ .../datasources/V1WriteCommandSuite.scala | 27 ++-- 19 files changed, 464 insertions(+), 119 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 590582eee07..c1ca86b356e 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -3024,3 +3024,27 @@ case class SplitPart ( partNum = newChildren.apply(2)) } } + +/** + * A internal function that converts the empty string to null for partition values. + * This function should be only used in V1Writes. + */ +case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { + override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v + + override def nullable: Boolean = true + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + nullSafeCodeGen(ctx, ev, c => { + s"""if ($c.numBytes() == 0) { + | ${ev.isNull} = true; + | ${ev.value} = null; + |} else { + | ${ev.value} = $c; + |}""".stripMargin + }) + } + + override protected def withNewChildInternal(newChild: Expression): Empty2Null = + copy(child = newChild) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala new file mode 100644 index 00000000000..4d9d69d14fe --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Empty2Null, Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.internal.SQLConf + +/** + * A trait that provides functionality to handle aliases in the `outputExpressions`. + */ +trait AliasAwareOutputExpression extends SQLConfHelper { + protected val aliasCandidateLimit = conf.getConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT) + protected def outputExpressions: Seq[NamedExpression] + /** + * This method can be used to strip expression which does not affect the result, for example: + * strip the expression which is ordering agnostic for output ordering. + */ + protected def strip(expr: Expression): Expression = expr + + // Build an `Expression` -> `Attribute` alias map. + // There can be multiple alias defined for the same expressions but it doesn't make sense to store + // more than `aliasCandidateLimit` attributes for an expression. In those cases the old logic + // handled only the last alias so we need to make sure that we give precedence to that. + // If the `outputExpressions` contain simple attributes we need to add those too to the map. + private lazy val aliasMap = { + val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]() + outputExpressions.reverse.foreach { + case a @ Alias(child, _) => + val buffer = aliases.getOrElseUpdate(strip(child).canonicalized, mutable.ArrayBuffer.empty) + if (buffer.size < aliasCandidateLimit) { + buffer += a.toAttribute + } + case _ => + } + outputExpressions.foreach { + case a: Attribute if aliases.contains(a.canonicalized) => + val buffer = aliases(a.canonicalized) + if (buffer.size < aliasCandidateLimit) { + buffer += a + } + case _ => + } + aliases + } + + protected def hasAlias: Boolean = aliasMap.nonEmpty + + /** + * Return a stream of expressions in which the original expression is projected with `aliasMap`. + */ + protected def projectExpression(expr: Expression): Stream[Expression] = { + val outputSet = AttributeSet(outputExpressions.map(_.toAttribute)) + expr.multiTransformDown { + // Mapping with aliases + case e: Expression if aliasMap.contains(e.canonicalized) => + aliasMap(e.canonicalized).toSeq ++ (if (e.containsChild.nonEmpty) Seq(e) else Seq.empty) + + // Prune if we encounter an attribute that we can't map and it is not in output set. + // This prune will go up to the closest `multiTransformDown()` call and returns `Stream.empty` + // there. + case a: Attribute if !outputSet.contains(a) => Seq.empty + } + } +} + +/** + * A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that + * satisfies ordering requirements. + */ +trait AliasAwareQueryOutputOrdering[T <: QueryPlan[T]] + extends AliasAwareOutputExpression { self: QueryPlan[T] => + protected def orderingExpressions: Seq[SortOrder] + + override protected def strip(expr: Expression): Expression = expr match { + case e: Empty2Null => strip(e.child) + case _ => expr + } + + override final def outputOrdering: Seq[SortOrder] = { + if (hasAlias) { + // Take the first `SortOrder`s only until they can be projected. + // E.g. we have child ordering `Seq(SortOrder(a), SortOrder(b))` then + // if only `a AS x` can be projected then we can return Seq(SortOrder(x))` + // but if only `b AS y` can be projected we can't return `Seq(SortOrder(y))`. + orderingExpressions.iterator.map { sortOrder => + val orderingSet = mutable.Set.empty[Expression] + val sameOrderings = sortOrder.children.toStream + .flatMap(projectExpression) + .filter(e => orderingSet.add(e.canonicalized)) + .take(aliasCandidateLimit) + if (sameOrderings.nonEmpty) { + Some(sortOrder.copy(child = sameOrderings.head, + sameOrderExpressions = sameOrderings.tail)) + } else { + None + } + }.takeWhile(_.isDefined).flatten.toSeq + } else { + orderingExpressions + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 0942919b176..90d1bd805cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -53,6 +53,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] @transient lazy val outputSet: AttributeSet = AttributeSet(output) + /** + * Returns the output ordering that this plan generates, although the semantics differ in logical + * and physical plans. In the logical plan it means global ordering of the data while in physical + * it means ordering in each partition. + */ + def outputOrdering: Seq[SortOrder] = Nil + // Override `treePatternBits` to propagate bits for its expressions. override lazy val treePatternBits: BitSet = { val bits: BitSet = getDefaultTreePatternBits diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0ffaa48b497..9a7726f6a03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.{AliasAwareQueryOutputOrdering, QueryPlan} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -141,11 +141,6 @@ abstract class LogicalPlan */ def refresh(): Unit = children.foreach(_.refresh()) - /** - * Returns the output ordering that this plan generates. - */ - def outputOrdering: Seq[SortOrder] = Nil - /** * Returns true iff `other`'s output is semantically the same, i.e.: * - it contains the same number of `Attribute`s; @@ -205,8 +200,10 @@ trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] { */ trait BinaryNode extends LogicalPlan with BinaryLike[LogicalPlan] -abstract class OrderPreservingUnaryNode extends UnaryNode { - override final def outputOrdering: Seq[SortOrder] = child.outputOrdering +trait OrderPreservingUnaryNode extends UnaryNode + with AliasAwareQueryOutputOrdering[LogicalPlan] { + override protected def outputExpressions: Seq[NamedExpression] = child.output + override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering } object LogicalPlanIntegrity { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 343fa3517c6..a8dfb8fbd84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -69,6 +69,7 @@ object Subquery { case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) + override protected def outputExpressions: Seq[NamedExpression] = projectList override def maxRows: Option[Long] = child.maxRows override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4f2d5f6c106..f407dda009c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -443,6 +443,16 @@ object SQLConf { .booleanConf .createWithDefault(true) + val EXPRESSION_PROJECTION_CANDIDATE_LIMIT = + buildConf("spark.sql.optimizer.expressionProjectionCandidateLimit") + .doc("The maximum number of the candidate of output expressions whose alias are replaced." + + " It can preserve the output partitioning and ordering." + + " Negative value means disable this optimization.") + .internal() + .version("3.4.0") + .intConf + .createWithDefault(100) + val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed") .doc("When set to true Spark SQL will automatically select a compression codec for each " + "column based on statistics of the data.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index 92e86637eec..a09c719cf84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -16,52 +16,42 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} +import scala.collection.mutable -/** - * A trait that provides functionality to handle aliases in the `outputExpressions`. - */ -trait AliasAwareOutputExpression extends UnaryExecNode { - protected def outputExpressions: Seq[NamedExpression] - - private lazy val aliasMap = outputExpressions.collect { - case a @ Alias(child, _) => child.canonicalized -> a.toAttribute - }.toMap - - protected def hasAlias: Boolean = aliasMap.nonEmpty - - protected def normalizeExpression(exp: Expression): Expression = { - exp.transformDown { - case e: Expression => aliasMap.getOrElse(e.canonicalized, e) - } - } -} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.{AliasAwareOutputExpression, AliasAwareQueryOutputOrdering} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning} /** * A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that * satisfies distribution requirements. */ -trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression { +trait PartitioningPreservingUnaryExecNode extends UnaryExecNode + with AliasAwareOutputExpression { final override def outputPartitioning: Partitioning = { - val normalizedOutputPartitioning = if (hasAlias) { - child.outputPartitioning match { + if (hasAlias) { + flattenPartitioning(child.outputPartitioning).flatMap { case e: Expression => - normalizeExpression(e).asInstanceOf[Partitioning] - case other => other + // We need unique partitionings but if the input partitioning is + // `HashPartitioning(Seq(id + id))` and we have `id -> a` and `id -> b` aliases then after + // the projection we have 4 partitionings: + // `HashPartitioning(Seq(a + a))`, `HashPartitioning(Seq(a + b))`, + // `HashPartitioning(Seq(b + a))`, `HashPartitioning(Seq(b + b))`, but + // `HashPartitioning(Seq(a + b))` is the same as `HashPartitioning(Seq(b + a))`. + val partitioningSet = mutable.Set.empty[Expression] + projectExpression(e) + .filter(e => partitioningSet.add(e.canonicalized)) + .take(aliasCandidateLimit) + .asInstanceOf[Stream[Partitioning]] + case o => Seq(o) + } match { + case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) + case Seq(p) => p + case ps => PartitioningCollection(ps) } } else { child.outputPartitioning } - - flattenPartitioning(normalizedOutputPartitioning).filter { - case hashPartitioning: HashPartitioning => hashPartitioning.references.subsetOf(outputSet) - case _ => true - } match { - case Seq() => UnknownPartitioning(child.outputPartitioning.numPartitions) - case Seq(singlePartitioning) => singlePartitioning - case seqWithMultiplePartitionings => PartitioningCollection(seqWithMultiplePartitionings) - } } private def flattenPartitioning(partitioning: Partitioning): Seq[Partitioning] = { @@ -74,18 +64,5 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression { } } -/** - * A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that - * satisfies ordering requirements. - */ -trait AliasAwareOutputOrdering extends AliasAwareOutputExpression { - protected def orderingExpressions: Seq[SortOrder] - - final override def outputOrdering: Seq[SortOrder] = { - if (hasAlias) { - orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder]) - } else { - orderingExpressions - } - } -} +trait OrderPreservingUnaryExecNode + extends UnaryExecNode with AliasAwareQueryOutputOrdering[SparkPlan] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 5ca36a8a216..bbd74a1fe74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -179,9 +179,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ def requiredChildDistribution: Seq[Distribution] = Seq.fill(children.size)(UnspecifiedDistribution) - /** Specifies how data is ordered in each partition. */ - def outputOrdering: Seq[SortOrder] = Nil - /** Specifies sort order for each partition requirements on the input data for this operator. */ def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala index 756b5eb09d0..2427a39751f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, NamedExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Final, PartialMerge} import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution} -import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, ExplainUtils, UnaryExecNode} +import org.apache.spark.sql.execution.{ExplainUtils, PartitioningPreservingUnaryExecNode, UnaryExecNode} import org.apache.spark.sql.execution.streaming.StatefulOperatorPartitioning /** * Holds common logic for aggregate operators */ -trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning { +trait BaseAggregateExec extends UnaryExecNode with PartitioningPreservingUnaryExecNode { def requiredChildDistributionExpressions: Option[Seq[Expression]] def isStreaming: Boolean def numShufflePartitions: Option[Int] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 3cf63a5318d..6042ff7b2ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{AliasAwareOutputOrdering, SparkPlan} +import org.apache.spark.sql.execution.{OrderPreservingUnaryExecNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf @@ -41,7 +41,7 @@ case class SortAggregateExec( resultExpressions: Seq[NamedExpression], child: SparkPlan) extends AggregateCodegenSupport - with AliasAwareOutputOrdering { + with OrderPreservingUnaryExecNode { override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 76a8d3d942f..a7b1207765a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -42,8 +42,8 @@ import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryExecNode with CodegenSupport - with AliasAwareOutputPartitioning - with AliasAwareOutputOrdering { + with PartitioningPreservingUnaryExecNode + with OrderPreservingUnaryExecNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 5c4d662c145..2491c9d7754 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -155,10 +155,9 @@ object FileFormatWriter extends Logging { } // the sort order doesn't matter - // Use the output ordering from the original plan before adding the empty2null projection. val actualOrdering = writeFilesOpt.map(_.child) .getOrElse(materializeAdaptiveSparkPlan(plan)) - .outputOrdering.map(_.child) + .outputOrdering val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, actualOrdering) SQLExecution.checkSQLExecutionId(sparkSession) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala index d52af645218..b17d72b0f72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder, String2StringExpression, UnaryExpression} -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeMap, AttributeSet, BitwiseAnd, Empty2Null, Expression, HiveHash, Literal, NamedExpression, Pmod, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule @@ -29,7 +28,6 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StringType -import org.apache.spark.unsafe.types.UTF8String trait V1WriteCommand extends DataWritingCommand { /** @@ -121,26 +119,6 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper { } object V1WritesUtils { - - /** A function that converts the empty string to null for partition values. */ - case class Empty2Null(child: Expression) extends UnaryExpression with String2StringExpression { - override def convert(v: UTF8String): UTF8String = if (v.numBytes() == 0) null else v - override def nullable: Boolean = true - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - nullSafeCodeGen(ctx, ev, c => { - s"""if ($c.numBytes() == 0) { - | ${ev.isNull} = true; - | ${ev.value} = null; - |} else { - | ${ev.value} = $c; - |}""".stripMargin - }) - } - - override protected def withNewChildInternal(newChild: Expression): Empty2Null = - copy(child = newChild) - } - def getWriterBucketSpec( bucketSpec: Option[BucketSpec], dataColumns: Seq[Attribute], @@ -230,12 +208,13 @@ object V1WritesUtils { def isOrderingMatched( requiredOrdering: Seq[Expression], - outputOrdering: Seq[Expression]): Boolean = { + outputOrdering: Seq[SortOrder]): Boolean = { if (requiredOrdering.length > outputOrdering.length) { false } else { requiredOrdering.zip(outputOrdering).forall { - case (requiredOrder, outputOrder) => requiredOrder.semanticEquals(outputOrder) + case (requiredOrder, outputOrder) => + outputOrder.satisfies(outputOrder.copy(child = requiredOrder)) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 88e212c53a0..877f6508d96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -276,7 +276,7 @@ case class TakeOrderedAndProjectExec( sortOrder: Seq[SortOrder], projectList: Seq[NamedExpression], child: SparkPlan, - offset: Int = 0) extends AliasAwareOutputOrdering { + offset: Int = 0) extends OrderPreservingUnaryExecNode { override def output: Seq[Attribute] = { projectList.map(_.toAttribute) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 9a75cc5ff8f..a6b295578d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -677,7 +677,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit df.createTempView("df") val sqlText = "EXPLAIN CODEGEN SELECT key, MAX(value) FROM df GROUP BY key" - val expectedCodegenText = "Found 2 WholeStageCodegen subtrees." + val expectedCodegenText = "Found 1 WholeStageCodegen subtrees." val expectedNoCodegenText = "Found 0 WholeStageCodegen subtrees." withNormalizedExplain(sqlText) { normalizedOutput => assert(normalizedOutput.contains(expectedNoCodegenText)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index 81777f67f37..24a98dd83f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -339,12 +339,12 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite { // ShuffleQueryStage 0 // ShuffleQueryStage 2 // ReusedQueryStage 0 - val grouped = df.groupBy("key").agg(max("value").as("value")) + val grouped = df.groupBy((col("key") + 1).as("key")).agg(max("value").as("value")) val resultDf2 = grouped.groupBy(col("key") + 1).max("value") .union(grouped.groupBy(col("key") + 2).max("value")) - QueryTest.checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Row(2, 1) :: Row(3, 1) :: - Row(3, 2) :: Row(4, 2) :: Row(4, 3) :: Row(5, 3) :: Row(5, 4) :: Row(6, 4) :: Row(6, 5) :: - Row(7, 5) :: Nil) + QueryTest.checkAnswer(resultDf2, Row(2, 0) :: Row(3, 0) :: Row(3, 1) :: Row(4, 1) :: + Row(4, 2) :: Row(5, 2) :: Row(5, 3) :: Row(6, 3) :: Row(6, 4) :: Row(7, 4) :: Row(7, 5) :: + Row(8, 5) :: Nil) val finalPlan2 = resultDf2.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 248c68abd1e..e9cb77ec95c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -1072,7 +1072,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(projects.exists(_.outputPartitioning match { case PartitioningCollection(Seq(HashPartitioning(Seq(k1: AttributeReference), _), HashPartitioning(Seq(k2: AttributeReference), _))) => - k1.name == "t1id" && k2.name == "t2id" + Set(k1.name, k2.name) == Set("t1id", "t2id") case _ => false })) } @@ -1101,9 +1101,8 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { val projects = collect(planned) { case p: ProjectExec => p } assert(projects.exists(_.outputOrdering match { - case Seq(SortOrder(_, Ascending, NullsFirst, sameOrderExprs)) => - sameOrderExprs.size == 1 && sameOrderExprs.head.isInstanceOf[AttributeReference] && - sameOrderExprs.head.asInstanceOf[AttributeReference].name == "t2id" + case Seq(s @ SortOrder(_, Ascending, NullsFirst, _)) => + s.children.map(_.asInstanceOf[AttributeReference].name).toSet == Set("t2id", "t3id") case _ => false })) } @@ -1249,7 +1248,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(planned.outputPartitioning match { case PartitioningCollection(Seq(HashPartitioning(Seq(k1: AttributeReference), _), HashPartitioning(Seq(k2: AttributeReference), _))) => - k1.name == "t1id" && k2.name == "t2id" + Set(k1.name, k2.name) == Set("t1id", "t2id") }) val planned2 = sql( @@ -1314,6 +1313,64 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { assert(topKs.size == 1) assert(sorts.isEmpty) } + + test("SPARK-40086: an attribute and its aliased version in aggregate expressions should not " + + "introduce extra shuffle") { + withTempView("df") { + spark.range(5).select(col("id"), col("id").as("value")).createTempView("df") + val df = sql("SELECT id, max(value) FROM df GROUP BY id") + + val planned = df.queryExecution.executedPlan + + assert(collect(planned) { case h: HashAggregateExec => h }.nonEmpty) + + val exchanges = collect(planned) { case s: ShuffleExchangeExec => s } + assert(exchanges.size == 0) + } + } + + test("SPARK-40086: multiple aliases to the same attribute in aggregate expressions should not " + + "introduce extra shuffle") { + withTempView("df") { + spark.range(5).select(col("id").as("key"), col("id").as("value")).createTempView("df") + val df = sql("SELECT key, max(value) FROM df GROUP BY key") + + val planned = df.queryExecution.executedPlan + + assert(collect(planned) { case h: HashAggregateExec => h }.nonEmpty) + + val exchanges = collect(planned) { case s: ShuffleExchangeExec => s } + assert(exchanges.size == 0) + } + } + + test("SPARK-40086: multiple aliases to the same attribute with complex required distribution " + + "should not introduce extra shuffle") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val df = spark.range(5) + val df1 = df.repartition($"id" + $"id") + .select($"id".as("key1"), $"id".as("value1"), ($"id" + $"id").as("idPlusId1")) + val df2 = df.repartition($"id" + $"id") + .select($"id".as("key2"), $"id".as("value2"), ($"id" + $"id").as("idPlusId2")) + val df3 = df1.join(df2, $"key1" + $"value1" === $"idPlusId2") + + val planned = df3.queryExecution.executedPlan + + val numShuffles = collect(planned) { + case e: ShuffleExchangeExec => e + } + // before SPARK-40086: numShuffles is 4 + assert(numShuffles.size == 2) + val numOutputPartitioning = collectFirst(planned) { + case e: SortMergeJoinExec => e.outputPartitioning match { + case PartitioningCollection(Seq(PartitioningCollection(l), PartitioningCollection(r))) => + l ++ r + case _ => Seq.empty + } + }.get + assert(numOutputPartitioning.size == 8) + } + } } // Used for unit-testing EnsureRequirements diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala new file mode 100644 index 00000000000..e4ecdb9c445 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, PartitioningCollection, UnknownPartitioning} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +class ProjectedOrderingAndPartitioningSuite + extends SharedSparkSession with AdaptiveSparkPlanHelper { + import testImplicits._ + + test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - multi-alias") { + Seq(0, 1, 2, 5).foreach { limit => + withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { + val df = spark.range(2).orderBy($"id").selectExpr("id as x", "id as y", "id as z") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + limit match { + case 5 => + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sameOrderExpressions.size == 2) + assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name) + .toSet.subsetOf(Set("x", "y", "z"))) + case 2 => + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sameOrderExpressions.size == 1) + assert(outputOrdering.head.sameOrderExpressions.map(_.asInstanceOf[Attribute].name) + .toSet.subsetOf(Set("x", "y", "z"))) + case 1 => + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sameOrderExpressions.size == 0) + case 0 => + assert(outputOrdering.size == 0) + } + } + } + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - multi-alias") { + Seq(0, 1, 2, 5).foreach { limit => + withSQLConf(SQLConf.EXPRESSION_PROJECTION_CANDIDATE_LIMIT.key -> limit.toString) { + val df = spark.range(2).repartition($"id").selectExpr("id as x", "id as y", "id as z") + val outputPartitioning = stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning + limit match { + case 5 => + val p = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + assert(p.size == 3) + assert(p.flatMap(_.asInstanceOf[HashPartitioning].expressions + .map(_.asInstanceOf[Attribute].name)).toSet == Set("x", "y", "z")) + case 2 => + val p = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + assert(p.size == 2) + p.flatMap(_.asInstanceOf[HashPartitioning].expressions + .map(_.asInstanceOf[Attribute].name)).toSet.subsetOf(Set("x", "y", "z")) + case 1 => + val p = outputPartitioning.asInstanceOf[HashPartitioning] + assert(p.expressions.size == 1) + assert(p.expressions.map(_.asInstanceOf[Attribute].name) + .toSet.subsetOf(Set("x", "y", "z"))) + case 0 => + assert(outputPartitioning.isInstanceOf[UnknownPartitioning]) + } + } + } + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - ordering - multi-references") { + val df = spark.range(2).selectExpr("id as a", "id as b") + .orderBy($"a" + $"b").selectExpr("a as x", "b as y") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sql == "(x + y) ASC NULLS FIRST") + assert(outputOrdering.head.sameOrderExpressions.size == 0) + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - partitioning - multi-references") { + val df = spark.range(2).selectExpr("id as a", "id as b") + .repartition($"a" + $"b").selectExpr("a as x", "b as y") + val outputPartitioning = stripAQEPlan(df.queryExecution.executedPlan).outputPartitioning + // (a + b), (a + y), (x + b) are pruned since their references are not the subset of output + outputPartitioning match { + case p: HashPartitioning => assert(p.sql == "hashpartitioning((x + y))") + case _ => fail(s"Unexpected $outputPartitioning") + } + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to complex " + + "expressions") { + val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id + id as a", "id + id as b") + val outputPartitioning = stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning + val partitionings = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + assert(partitionings.map { + case p: HashPartitioning => p.sql + case _ => fail(s"Unexpected $outputPartitioning") + } == Seq("hashpartitioning(b)", "hashpartitioning(a)")) + + val df = spark.range(2).orderBy($"id" + $"id").selectExpr("id + id as a", "id + id as b") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sql == "b ASC NULLS FIRST") + assert(outputOrdering.head.sameOrderExpressions.map(_.sql) == Seq("a")) + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to children of " + + "complex expressions") { + val df2 = spark.range(2).repartition($"id" + $"id").selectExpr("id as a", "id as b") + val outputPartitioning = stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning + val partitionings = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + // (a + b) is the same as (b + a) so expect only one + assert(partitionings.map { + case p: HashPartitioning => p.sql + case _ => fail(s"Unexpected $outputPartitioning") + } == Seq("hashpartitioning((b + b))", "hashpartitioning((a + b))", "hashpartitioning((a + a))")) + + val df = spark.range(2).orderBy($"id" + $"id").selectExpr("id as a", "id as b") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sql == "(b + b) ASC NULLS FIRST") + // (a + b) is the same as (b + a) so expect only one + assert(outputOrdering.head.sameOrderExpressions.map(_.sql) == Seq("(a + b)", "(a + a)")) + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - multi-references to complex " + + "expressions and to their children") { + val df2 = spark.range(2).repartition($"id" + $"id") + .selectExpr("id + id as aa", "id + id as bb", "id as a", "id as b") + val outputPartitioning = stripAQEPlan(df2.queryExecution.executedPlan).outputPartitioning + val partitionings = outputPartitioning.asInstanceOf[PartitioningCollection].partitionings + // (a + b) is the same as (b + a) so expect only one + assert(partitionings.map { + case p: HashPartitioning => p.sql + case _ => fail(s"Unexpected $outputPartitioning") + } == Seq("hashpartitioning(bb)", "hashpartitioning(aa)", "hashpartitioning((b + b))", + "hashpartitioning((a + b))", "hashpartitioning((a + a))")) + + val df = spark.range(2).orderBy($"id" + $"id") + .selectExpr("id + id as aa", "id + id as bb", "id as a", "id as b") + val outputOrdering = df.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering.size == 1) + assert(outputOrdering.head.sql == "bb ASC NULLS FIRST") + // (a + b) is the same as (b + a) so expect only one + assert(outputOrdering.head.sameOrderExpressions.map(_.sql) == + Seq("aa", "(b + b)", "(a + b)", "(a + a)")) + } + + test("SPARK-42049: Improve AliasAwareOutputExpression - ordering partly projected") { + val df = spark.range(2).orderBy($"id" + 1, $"id" + 2) + + val df1 = df.selectExpr("id + 1 AS a", "id + 2 AS b") + val outputOrdering1 = df1.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering1.size == 2) + assert(outputOrdering1.map(_.sql) == Seq("a ASC NULLS FIRST", "b ASC NULLS FIRST")) + + val df2 = df.selectExpr("id + 1 AS a") + val outputOrdering2 = df2.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering2.size == 1) + assert(outputOrdering2.head.sql == "a ASC NULLS FIRST") + + val df3 = df.selectExpr("id + 2 AS b") + val outputOrdering3 = df3.queryExecution.optimizedPlan.outputOrdering + assert(outputOrdering3.size == 0) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala index 40574a8e73a..20a90cd94b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/V1WriteCommandSuite.scala @@ -90,9 +90,11 @@ trait V1WriteCommandSuiteBase extends SQLTestUtils { sparkContext.listenerBus.waitUntilEmpty() assert(optimizedPlan != null) - // Check whether a logical sort node is at the top of the logical plan of the write query. - assert(optimizedPlan.isInstanceOf[Sort] == hasLogicalSort, - s"Expect hasLogicalSort: $hasLogicalSort, Actual: ${optimizedPlan.isInstanceOf[Sort]}") + // Check whether exists a logical sort node of the write query. + // If user specified sort matches required ordering, the sort node may not at the top of query. + assert(optimizedPlan.exists(_.isInstanceOf[Sort]) == hasLogicalSort, + s"Expect hasLogicalSort: $hasLogicalSort," + + s"Actual: ${optimizedPlan.exists(_.isInstanceOf[Sort])}") // Check empty2null conversion. val empty2nullExpr = optimizedPlan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions)) @@ -223,8 +225,8 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write case s: SortExec => s }.exists { case SortExec(Seq( - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) ), false, _, _) => true case _ => false }, plan) @@ -268,16 +270,11 @@ class V1WriteCommandSuite extends QueryTest with SharedSparkSession with V1Write // assert the outer most sort in the executed plan assert(plan.collectFirst { case s: SortExec => s - }.map(s => (enabled, s)).exists { - case (false, SortExec(Seq( - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), - SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) - ), false, _, _)) => true - - // SPARK-40885: this bug removes the in-partition sort, which manifests here - case (true, SortExec(Seq( - SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _) - ), false, _, _)) => true + }.exists { + case SortExec(Seq( + SortOrder(AttributeReference("value", StringType, _, _), Ascending, NullsFirst, _), + SortOrder(AttributeReference("key", IntegerType, _, _), Ascending, NullsFirst, _) + ), false, _, _) => true case _ => false }, plan) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org