This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new be488390d fix: Fall back to Spark for unsupported partition or sort 
expressions in window aggregates (#1253)
be488390d is described below

commit be488390dcd89b2a5bff6f24b9ae7724383cffde
Author: Andy Grove <[email protected]>
AuthorDate: Thu Jan 9 16:55:52 2025 -0700

    fix: Fall back to Spark for unsupported partition or sort expressions in 
window aggregates (#1253)
---
 .../org/apache/comet/serde/QueryPlanSerde.scala    | 11 ++++++--
 .../apache/comet/exec/CometAggregateSuite.scala    | 32 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 3 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 7ed3725be..7a69e6309 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -3148,17 +3148,22 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
       orderSpec: Seq[SortOrder],
       op: SparkPlan): Boolean = {
     if (partitionSpec.length != orderSpec.length) {
-      withInfo(op, "Partitioning and sorting specifications do not match")
       return false
     }
 
-    val partitionColumnNames = partitionSpec.collect { case a: 
AttributeReference =>
-      a.name
+    val partitionColumnNames = partitionSpec.collect {
+      case a: AttributeReference => a.name
+      case other =>
+        withInfo(op, s"Unsupported partition expression: 
${other.getClass.getSimpleName}")
+        return false
     }
 
     val orderColumnNames = orderSpec.collect { case s: SortOrder =>
       s.child match {
         case a: AttributeReference => a.name
+        case other =>
+          withInfo(op, s"Unsupported sort expression: 
${other.getClass.getSimpleName}")
+          return false
       }
     }
 
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 9a642f12f..8170230bc 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
 import org.apache.spark.sql.catalyst.optimizer.EliminateSorts
 import org.apache.spark.sql.comet.CometHashAggregateExec
+import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.functions.{count_distinct, sum}
 import org.apache.spark.sql.internal.SQLConf
@@ -89,6 +90,37 @@ class CometAggregateSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
     }
   }
 
+  // based on Spark's SQLWindowFunctionSuite test of the same name
+  test("window function: partition and order expressions") {
+    for (shuffleMode <- Seq("auto", "native", "jvm")) {
+      withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
+        val df =
+          Seq((1, "a", 5), (2, "a", 6), (3, "b", 7), (4, "b", 8), (5, "c", 9), 
(6, "c", 10)).toDF(
+            "month",
+            "area",
+            "product")
+        df.createOrReplaceTempView("windowData")
+        val df2 = sql("""
+            |select month, area, product, sum(product + 1) over (partition by 
1 order by 2)
+            |from windowData
+          """.stripMargin)
+        checkSparkAnswer(df2)
+        val cometShuffles = collect(df2.queryExecution.executedPlan) {
+          case _: CometShuffleExchangeExec => true
+        }
+        if (shuffleMode == "jvm") {
+          assert(cometShuffles.length == 1)
+        } else {
+          // we fall back to Spark for shuffle because we do not support
+          // native shuffle with a LocalTableScan input, and we do not fall
+          // back to Comet columnar shuffle due to
+          // https://github.com/apache/datafusion-comet/issues/1248
+          assert(cometShuffles.isEmpty)
+        }
+      }
+    }
+  }
+
   test("multiple column distinct count") {
     withSQLConf(
       CometConf.COMET_ENABLED.key -> "true",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to