This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ed4d448daa [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder 
Expression
7ed4d448daa is described below

commit 7ed4d448daab372b2c5cc846f1f66f70f7fd574c
Author: dengziming <dengzim...@bytedance.com>
AuthorDate: Wed Dec 21 08:56:03 2022 +0800

    [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder Expression
    
    ### What changes were proposed in this pull request?
    in #39090 we moved `SortOrder` proto from relations to expressions, in this 
PR we add logic to handle it.
    
    ### Why are the changes needed?
    [](https://github.com/dengziming/spark/pull/new/SortOrder)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    A unit test and existing tests.
    
    Closes #39138 from dengziming/SortOrder.
    
    Authored-by: dengziming <dengzim...@bytedance.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../sql/connect/planner/SparkConnectPlanner.scala  |  5 ++-
 .../connect/planner/SparkConnectPlannerSuite.scala | 46 +++++++++++++++++++++-
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 3bdf3654c68..9fe9acd354d 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -480,6 +480,7 @@ class SparkConnectPlanner(session: SparkSession) {
       case proto.Expression.ExprTypeCase.CAST => transformCast(exp.getCast)
       case proto.Expression.ExprTypeCase.UNRESOLVED_REGEX =>
         transformUnresolvedRegex(exp.getUnresolvedRegex)
+      case proto.Expression.ExprTypeCase.SORT_ORDER => 
transformSortOrder(exp.getSortOrder)
       case _ =>
         throw InvalidPlanInput(
           s"Expression with ID: ${exp.getExprTypeCase.getNumber} is not 
supported")
@@ -699,10 +700,10 @@ class SparkConnectPlanner(session: SparkSession) {
     logical.Sort(
       child = transformRelation(sort.getInput),
       global = sort.getIsGlobal,
-      order = 
sort.getOrderList.asScala.toSeq.map(transformSortOrderExpression))
+      order = sort.getOrderList.asScala.toSeq.map(transformSortOrder))
   }
 
-  private def transformSortOrderExpression(order: proto.Expression.SortOrder) 
= {
+  private def transformSortOrder(order: proto.Expression.SortOrder) = {
     expressions.SortOrder(
       child = transformExpression(order.getChild),
       direction = order.getDirection match {
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index 2e0aa018467..93cb97b4421 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -24,7 +24,7 @@ import com.google.protobuf.ByteString
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.Expression.{Alias, ExpressionString, 
UnresolvedStar}
-import org.apache.spark.sql.{AnalysisException, Dataset}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical
@@ -661,4 +661,48 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
         .build())
     intercept[AnalysisException](Dataset.ofRows(spark, logical))
   }
+
+  test("transform SortOrder") {
+    val input = proto.Relation
+      .newBuilder()
+      .setSql(
+        proto.SQL
+          .newBuilder()
+          .setQuery("SELECT id FROM VALUES 
(5),(1),(2),(6),(4),(3),(7),(9),(8),(null) AS tab(id)")
+          .build())
+
+    val relation = proto.Relation
+      .newBuilder()
+      .setSort(
+        proto.Sort
+          .newBuilder()
+          .setInput(input)
+          .setIsGlobal(false)
+          .addOrder(
+            proto.Expression.SortOrder
+              .newBuilder()
+              .setDirectionValue(
+                
proto.Expression.SortOrder.SortDirection.SORT_DIRECTION_ASCENDING_VALUE)
+              
.setNullOrdering(proto.Expression.SortOrder.NullOrdering.SORT_NULLS_FIRST)
+              .setChild(proto.Expression
+                .newBuilder()
+                .setExpressionString(
+                  
proto.Expression.ExpressionString.newBuilder().setExpression("id")))))
+      .build()
+    val df = Dataset.ofRows(spark, transform(relation))
+    df.foreachPartition { p: Iterator[Row] =>
+      var previousValue: Int = -1
+      p.foreach { r =>
+        val v = r.getAs[Int](0)
+        // null will be converted to 0
+        if (v == 0) {
+          assert(previousValue == -1, "null should be first")
+        }
+        if (previousValue != -1) {
+          assert(v > previousValue, "Partition is not ordered.")
+        }
+        previousValue = v
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to