This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7ed4d448daa [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder Expression 7ed4d448daa is described below commit 7ed4d448daab372b2c5cc846f1f66f70f7fd574c Author: dengziming <dengzim...@bytedance.com> AuthorDate: Wed Dec 21 08:56:03 2022 +0800 [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder Expression ### What changes were proposed in this pull request? in #39090 we moved `SortOrder` proto from relations to expressions, in this PR we add logic to handle it. ### Why are the changes needed? [](https://github.com/dengziming/spark/pull/new/SortOrder) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A unit test and existing tests. Closes #39138 from dengziming/SortOrder. Authored-by: dengziming <dengzim...@bytedance.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../sql/connect/planner/SparkConnectPlanner.scala | 5 ++- .../connect/planner/SparkConnectPlannerSuite.scala | 46 +++++++++++++++++++++- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 3bdf3654c68..9fe9acd354d 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -480,6 +480,7 @@ class SparkConnectPlanner(session: SparkSession) { case proto.Expression.ExprTypeCase.CAST => transformCast(exp.getCast) case proto.Expression.ExprTypeCase.UNRESOLVED_REGEX => transformUnresolvedRegex(exp.getUnresolvedRegex) + case proto.Expression.ExprTypeCase.SORT_ORDER => transformSortOrder(exp.getSortOrder) case _ => throw InvalidPlanInput( s"Expression with ID: ${exp.getExprTypeCase.getNumber} is not supported") @@ -699,10 +700,10 @@ class SparkConnectPlanner(session: SparkSession) { logical.Sort( child = transformRelation(sort.getInput), global = sort.getIsGlobal, - order = sort.getOrderList.asScala.toSeq.map(transformSortOrderExpression)) + order = sort.getOrderList.asScala.toSeq.map(transformSortOrder)) } - private def transformSortOrderExpression(order: proto.Expression.SortOrder) = { + private def transformSortOrder(order: proto.Expression.SortOrder) = { expressions.SortOrder( child = transformExpression(order.getChild), direction = order.getDirection match { diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index 2e0aa018467..93cb97b4421 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -24,7 +24,7 @@ import com.google.protobuf.ByteString import org.apache.spark.SparkFunSuite import org.apache.spark.connect.proto import org.apache.spark.connect.proto.Expression.{Alias, ExpressionString, UnresolvedStar} -import org.apache.spark.sql.{AnalysisException, Dataset} +import org.apache.spark.sql.{AnalysisException, Dataset, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical @@ -661,4 +661,48 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { .build()) intercept[AnalysisException](Dataset.ofRows(spark, logical)) } + + test("transform SortOrder") { + val input = proto.Relation + .newBuilder() + .setSql( + proto.SQL + .newBuilder() + .setQuery("SELECT id FROM VALUES (5),(1),(2),(6),(4),(3),(7),(9),(8),(null) AS tab(id)") + .build()) + + val relation = proto.Relation + .newBuilder() + .setSort( + proto.Sort + .newBuilder() + .setInput(input) + .setIsGlobal(false) + .addOrder( + proto.Expression.SortOrder + .newBuilder() + .setDirectionValue( + proto.Expression.SortOrder.SortDirection.SORT_DIRECTION_ASCENDING_VALUE) + .setNullOrdering(proto.Expression.SortOrder.NullOrdering.SORT_NULLS_FIRST) + .setChild(proto.Expression + .newBuilder() + .setExpressionString( + proto.Expression.ExpressionString.newBuilder().setExpression("id"))))) + .build() + val df = Dataset.ofRows(spark, transform(relation)) + df.foreachPartition { p: Iterator[Row] => + var previousValue: Int = -1 + p.foreach { r => + val v = r.getAs[Int](0) + // null will be converted to 0 + if (v == 0) { + assert(previousValue == -1, "null should be first") + } + if (previousValue != -1) { + assert(v > previousValue, "Partition is not ordered.") + } + previousValue = v + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org