This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new be488390d fix: Fall back to Spark for unsupported partition or sort
expressions in window aggregates (#1253)
be488390d is described below
commit be488390dcd89b2a5bff6f24b9ae7724383cffde
Author: Andy Grove <[email protected]>
AuthorDate: Thu Jan 9 16:55:52 2025 -0700
fix: Fall back to Spark for unsupported partition or sort expressions in
window aggregates (#1253)
---
.../org/apache/comet/serde/QueryPlanSerde.scala | 11 ++++++--
.../apache/comet/exec/CometAggregateSuite.scala | 32 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 3 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 7ed3725be..7a69e6309 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -3148,17 +3148,22 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
orderSpec: Seq[SortOrder],
op: SparkPlan): Boolean = {
if (partitionSpec.length != orderSpec.length) {
- withInfo(op, "Partitioning and sorting specifications do not match")
return false
}
- val partitionColumnNames = partitionSpec.collect { case a:
AttributeReference =>
- a.name
+ val partitionColumnNames = partitionSpec.collect {
+ case a: AttributeReference => a.name
+ case other =>
+ withInfo(op, s"Unsupported partition expression:
${other.getClass.getSimpleName}")
+ return false
}
val orderColumnNames = orderSpec.collect { case s: SortOrder =>
s.child match {
case a: AttributeReference => a.name
+ case other =>
+ withInfo(op, s"Unsupported sort expression:
${other.getClass.getSimpleName}")
+ return false
}
}
diff --git
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 9a642f12f..8170230bc 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
import org.apache.spark.sql.catalyst.optimizer.EliminateSorts
import org.apache.spark.sql.comet.CometHashAggregateExec
+import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.{count_distinct, sum}
import org.apache.spark.sql.internal.SQLConf
@@ -89,6 +90,37 @@ class CometAggregateSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
+ // based on Spark's SQLWindowFunctionSuite test of the same name
+ test("window function: partition and order expressions") {
+ for (shuffleMode <- Seq("auto", "native", "jvm")) {
+ withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> shuffleMode) {
+ val df =
+ Seq((1, "a", 5), (2, "a", 6), (3, "b", 7), (4, "b", 8), (5, "c", 9),
(6, "c", 10)).toDF(
+ "month",
+ "area",
+ "product")
+ df.createOrReplaceTempView("windowData")
+ val df2 = sql("""
+ |select month, area, product, sum(product + 1) over (partition by
1 order by 2)
+ |from windowData
+ """.stripMargin)
+ checkSparkAnswer(df2)
+ val cometShuffles = collect(df2.queryExecution.executedPlan) {
+ case _: CometShuffleExchangeExec => true
+ }
+ if (shuffleMode == "jvm") {
+ assert(cometShuffles.length == 1)
+ } else {
+ // we fall back to Spark for shuffle because we do not support
+ // native shuffle with a LocalTableScan input, and we do not fall
+ // back to Comet columnar shuffle due to
+ // https://github.com/apache/datafusion-comet/issues/1248
+ assert(cometShuffles.isEmpty)
+ }
+ }
+ }
+ }
+
test("multiple column distinct count") {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]