This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b16523a1b7 [GLUTEN-9034][VL] Fix the VeloxResizeBatch not add for 
ReusedExchange (#11069)
b16523a1b7 is described below

commit b16523a1b7fcd3b1baf3c813998150ee01e36c0c
Author: Jin Chengcheng <[email protected]>
AuthorDate: Fri Nov 14 10:10:06 2025 +0000

    [GLUTEN-9034][VL] Fix the VeloxResizeBatch not add for ReusedExchange 
(#11069)
    
    Before that, in TPCDS Q95, there are 4 AQEShuffleRead, but only one add the 
VeloxResizeBatch node, Match the ReusedExchangeExec too.
---
 ...AppendBatchResizeForShuffleInputAndOutput.scala | 31 +++++++++++++++++++++-
 1 file changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
index 7d6309bf93..92519eecf7 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.execution.VeloxResizeBatchesExec
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, 
ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 
 /**
  * Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to 
make the batch sizes in
@@ -49,7 +50,16 @@ case class AppendBatchResizeForShuffleInputAndOutput() 
extends Rule[SparkPlan] {
           if resizeBatchesShuffleOutputEnabled &&
             shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
         VeloxResizeBatchesExec(a, range.min, range.max)
-      // Since it's transformed in a bottom to up order, so we may first 
encountered
+      case a @ AQEShuffleReadExec(
+            ShuffleQueryStageExec(
+              _,
+              ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec),
+              _),
+            _)
+          if resizeBatchesShuffleOutputEnabled &&
+            shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
+        VeloxResizeBatchesExec(a, range.min, range.max)
+      // Since it's transformed in a bottom to up order, so we may first 
encounter
       // ShuffeQueryStageExec, which is transformed to 
VeloxResizeBatchesExec(ShuffeQueryStageExec),
       // then we see AQEShuffleReadExec
       case a @ AQEShuffleReadExec(
@@ -61,10 +71,29 @@ case class AppendBatchResizeForShuffleInputAndOutput() 
extends Rule[SparkPlan] {
           if resizeBatchesShuffleOutputEnabled &&
             shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
         VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max)
+      case a @ AQEShuffleReadExec(
+            VeloxResizeBatchesExec(
+              s @ ShuffleQueryStageExec(
+                _,
+                ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec),
+                _),
+              _,
+              _),
+            _)
+          if resizeBatchesShuffleOutputEnabled &&
+            shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
+        VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max)
       case s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, 
_)
           if resizeBatchesShuffleOutputEnabled &&
             shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
         VeloxResizeBatchesExec(s, range.min, range.max)
+      case s @ ShuffleQueryStageExec(
+            _,
+            ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec),
+            _)
+          if resizeBatchesShuffleOutputEnabled &&
+            shuffle.shuffleWriterType.requiresResizingShuffleOutput =>
+        VeloxResizeBatchesExec(s, range.min, range.max)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to