This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 39455e3d49 [GLUTEN-11088][VL] Fix GlutenStreamingQuerySuite (#11223)
39455e3d49 is described below

commit 39455e3d490473aa6d3da346cea3bda290f3588d
Author: Rong Ma <[email protected]>
AuthorDate: Thu Dec 4 00:46:07 2025 +0800

    [GLUTEN-11088][VL] Fix GlutenStreamingQuerySuite (#11223)
---
 .../gluten/utils/velox/VeloxTestSettings.scala     |  4 +--
 .../sql/execution/GlutenStreamingQuerySuite.scala  | 29 +++++++++++++++++++++-
 2 files changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 18024f0d0e..75c70fbd6f 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -956,10 +956,8 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("detect escaped path and report the migration guide")
     .exclude("ignore the escaped path check when the flag is off")
     .excludeByPrefix("SPARK-51187")
-    // TODO: fix in Spark-4.0
+    // Rewrite for the query plan check
     .excludeByPrefix("SPARK-49905")
-    .excludeByPrefix("SPARK-41199")
-    .excludeByPrefix("SPARK-41198")
   enableSuite[GlutenQueryExecutionSuite]
     // Rewritten to set root logger level to INFO so that logs can be parsed
     .exclude("Logging plan changes for execution")
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
index d09576908f..bda9c97eb5 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenStreamingQuerySuite.scala
@@ -17,6 +17,33 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.GlutenSQLTestsTrait
+import org.apache.spark.sql.execution.exchange.REQUIRED_BY_STATEFUL_OPERATOR
+import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.streaming._
 
-class GlutenStreamingQuerySuite extends StreamingQuerySuite with 
GlutenSQLTestsTrait {}
+class GlutenStreamingQuerySuite extends StreamingQuerySuite with 
GlutenSQLTestsTrait {
+
+  import testImplicits._
+
+  testGluten("SPARK-49905") {
+    val inputData = MemoryStream[Int]
+
+    // Use the streaming aggregation as an example - all stateful operators 
are using the same
+    // distribution, named `StatefulOpClusteredDistribution`.
+    val df = inputData.toDF().groupBy("value").count()
+
+    testStream(df, OutputMode.Update())(
+      AddData(inputData, 1, 2, 3, 1, 2, 3),
+      CheckAnswer((1, 2), (2, 2), (3, 2)),
+      Execute {
+        qe =>
+          val shuffleOpt = qe.lastExecution.executedPlan.collect {
+            case s: ColumnarShuffleExchangeExec => s
+          }
+
+          assert(shuffleOpt.nonEmpty, "No shuffle exchange found in the query 
plan")
+          assert(shuffleOpt.head.shuffleOrigin === 
REQUIRED_BY_STATEFUL_OPERATOR)
+      }
+    )
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to