This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a1664c899 [VL] Make VeloxResizeBatchesExec inherit from 
ColumnarToColumnarExec to simplify the code (#10763)
5a1664c899 is described below

commit 5a1664c89972cae074d8a1829480022294a28d97
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Sep 30 16:33:25 2025 +0800

    [VL] Make VeloxResizeBatchesExec inherit from ColumnarToColumnarExec to 
simplify the code (#10763)
---
 .../apache/gluten/vectorized/BatchIterator.java    |  2 +-
 .../ArrowColumnarToVeloxColumnarExec.scala         |  9 +--
 .../gluten/execution/VeloxResizeBatchesExec.scala  | 87 ++++------------------
 ...AppendBatchResizeForShuffleInputAndOutput.scala |  2 +-
 .../vectorized/ColumnarBatchSerializer.scala       |  2 +-
 .../vectorized/ColumnarBatchOutIterator.java       |  3 +-
 .../apache/gluten/iterator/ClosableIterator.java   |  9 +--
 .../gluten/execution/ColumnarToColumnarExec.scala  | 16 +++-
 .../execution/ColumnarToColumnarTransition.scala   |  4 +-
 .../extension/columnar/transition/package.scala    |  4 +-
 10 files changed, 44 insertions(+), 94 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
index 2f7471d837..d73330b9a6 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
@@ -23,7 +23,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class BatchIterator extends ClosableIterator {
+public class BatchIterator extends ClosableIterator<ColumnarBatch> {
   private final long handle;
   private final AtomicBoolean cancelled = new AtomicBoolean(false);
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
index 49d5659996..9209886750 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
@@ -26,12 +26,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
   extends ColumnarToColumnarExec(ArrowNativeBatchType, VeloxBatchType) {
   override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
-    in.map {
-      b =>
-        val out = VeloxColumnarBatches.toVeloxBatch(b)
-        out
-    }
+    in.map(b => VeloxColumnarBatches.toVeloxBatch(b))
   }
+
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
-    ArrowColumnarToVeloxColumnarExec(child = newChild)
+    copy(child = newChild)
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index a8e0863639..a1ec54ffbc 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -16,99 +16,40 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.columnar.transition.Convention
-import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.iterator.ClosableIterator
 import org.apache.gluten.utils.VeloxBatchResizer
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
-import java.util.concurrent.atomic.AtomicLong
-
 import scala.collection.JavaConverters._
 
 /**
  * An operator to resize input batches by appending the later batches to the 
one that comes earlier,
  * or splitting one batch to smaller ones.
- *
- * FIXME: Code duplication with ColumnarToColumnarExec.
  */
 case class VeloxResizeBatchesExec(
     override val child: SparkPlan,
     minOutputBatchSize: Int,
     maxOutputBatchSize: Int)
-  extends GlutenPlan
-  with UnaryExecNode {
-
-  override lazy val metrics: Map[String, SQLMetric] = Map(
-    "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
-    "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
input batches"),
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
-    "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
output batches"),
-    "selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append 
/ split batches")
-  )
-
-  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
-
-  override def rowType0(): Convention.RowType = Convention.RowType.None
+  extends ColumnarToColumnarExec(VeloxBatchType, VeloxBatchType) {
 
-  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
-
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    val numInputRows = longMetric("numInputRows")
-    val numInputBatches = longMetric("numInputBatches")
-    val numOutputRows = longMetric("numOutputRows")
-    val numOutputBatches = longMetric("numOutputBatches")
-    val selfTime = longMetric("selfTime")
-
-    child.executeColumnar().mapPartitions {
-      in =>
-        // Append millis = Out millis - In millis.
-        val appendMillis = new AtomicLong(0L)
-        val appender = VeloxBatchResizer.create(
-          minOutputBatchSize,
-          maxOutputBatchSize,
-          Iterators
-            .wrap(in)
-            .collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis))
-            .create()
-            .map {
-              inBatch =>
-                numInputRows += inBatch.numRows()
-                numInputBatches += 1
-                inBatch
-            }
-            .asJava
-        )
-
-        val out = Iterators
-          .wrap(appender.asScala)
-          .protectInvocationFlow()
-          .collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis))
-          .recyclePayload(_.close())
-          .recycleIterator {
-            appender.close()
-            selfTime += appendMillis.get()
-          }
-          .create()
-          .map {
-            outBatch =>
-              numOutputRows += outBatch.numRows()
-              numOutputBatches += 1
-              outBatch
-          }
+  override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
+    VeloxBatchResizer.create(minOutputBatchSize, maxOutputBatchSize, 
in.asJava).asScala
+  }
 
-        out
+  override protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = {
+    out.asJava match {
+      case c: ClosableIterator[ColumnarBatch] => c.close()
+      case _ =>
     }
   }
 
-  override def output: Seq[Attribute] = child.output
+  override protected def needRecyclePayload: Boolean = true
+
   override def outputPartitioning: Partitioning = child.outputPartitioning
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
index aea311c01d..ebe55f573c 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, 
ShuffleQueryStageExec}
 
 /**
- * Try to append [[VeloxResizeBatchesExec]] for shuffle input and ouput to 
make the batch sizes in
+ * Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to 
make the batch sizes in
  * good shape.
  */
 case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] 
{
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index 17eea29238..3b5fce63f8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -145,7 +145,7 @@ private class ColumnarBatchSerializerInstanceImpl(
     with TaskResource {
     private val streamReader = ShuffleStreamReader(streams)
 
-    private val wrappedOut: ClosableIterator = new ColumnarBatchOutIterator(
+    private val wrappedOut: ClosableIterator[ColumnarBatch] = new 
ColumnarBatchOutIterator(
       runtime,
       jniWrapper
         .read(shuffleReaderHandle, streamReader))
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
index 4293b2abf8..f4d2c8e7d1 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
@@ -25,7 +25,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 import java.io.IOException;
 
-public class ColumnarBatchOutIterator extends ClosableIterator implements 
RuntimeAware {
+public class ColumnarBatchOutIterator extends ClosableIterator<ColumnarBatch>
+    implements RuntimeAware {
   private final Runtime runtime;
   private final long iterHandle;
 
diff --git 
a/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java 
b/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java
index 6ed9d6c180..7947b09af9 100644
--- a/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java
+++ b/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java
@@ -18,14 +18,11 @@ package org.apache.gluten.iterator;
 
 import org.apache.gluten.exception.GlutenException;
 
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public abstract class ClosableIterator
-    implements AutoCloseable, Serializable, Iterator<ColumnarBatch> {
+public abstract class ClosableIterator<T> implements AutoCloseable, 
Serializable, Iterator<T> {
   protected final AtomicBoolean closed = new AtomicBoolean(false);
 
   public ClosableIterator() {}
@@ -43,7 +40,7 @@ public abstract class ClosableIterator
   }
 
   @Override
-  public final ColumnarBatch next() {
+  public final T next() {
     if (closed.get()) {
       throw new GlutenException("Iterator has been closed.");
     }
@@ -65,5 +62,5 @@ public abstract class ClosableIterator
 
   protected abstract boolean hasNext0() throws Exception;
 
-  protected abstract ColumnarBatch next0() throws Exception;
+  protected abstract T next0() throws Exception;
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
index ac1f8d6835..f19b898983 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -32,9 +32,16 @@ abstract class ColumnarToColumnarExec(from: 
Convention.BatchType, to: Convention
   extends ColumnarToColumnarTransition
   with GlutenPlan {
 
+  override def isSameConvention: Boolean = from == to
+
   def child: SparkPlan
+
   protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch]
 
+  protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = {}
+
+  protected def needRecyclePayload: Boolean = false
+
   override lazy val metrics: Map[String, SQLMetric] =
     Map(
       "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
@@ -77,12 +84,18 @@ abstract class ColumnarToColumnarExec(from: 
Convention.BatchType, to: Convention
               inBatch
           }
         val out = mapIterator(wrappedIn)
-        val wrappedOut = Iterators
+        val builder = Iterators
           .wrap(out)
+          .protectInvocationFlow()
           .collectReadMillis(outMillis => selfMillis.getAndAdd(outMillis))
           .recycleIterator {
+            closeIterator(out)
             selfTime += selfMillis.get()
           }
+        if (needRecyclePayload) {
+          builder.recyclePayload(_.close())
+        }
+        builder
           .create()
           .map {
             outBatch =>
@@ -90,7 +103,6 @@ abstract class ColumnarToColumnarExec(from: 
Convention.BatchType, to: Convention
               numOutputBatches += 1
               outBatch
           }
-        wrappedOut
     }
 
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
index 36c79c3f19..8c4757b45c 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
@@ -18,4 +18,6 @@ package org.apache.gluten.execution
 
 import org.apache.spark.sql.execution.UnaryExecNode
 
-trait ColumnarToColumnarTransition extends UnaryExecNode
+trait ColumnarToColumnarTransition extends UnaryExecNode {
+  def isSameConvention: Boolean
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
index a10c41cbca..9f309b843f 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
@@ -65,11 +65,11 @@ package object transition {
     }
   }
 
-  // Extractor for Gluten's C2C
+  // Extractor for Gluten's C2C with different convention
   object ColumnarToColumnarLike {
     def unapply(plan: SparkPlan): Option[SparkPlan] = {
       plan match {
-        case c2c: ColumnarToColumnarTransition =>
+        case c2c: ColumnarToColumnarTransition if !c2c.isSameConvention =>
           Some(c2c.child)
         case _ => None
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to