This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5a1664c899 [VL] Make VeloxResizeBatchesExec inherit from
ColumnarToColumnarExec to simplify the code (#10763)
5a1664c899 is described below
commit 5a1664c89972cae074d8a1829480022294a28d97
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Sep 30 16:33:25 2025 +0800
[VL] Make VeloxResizeBatchesExec inherit from ColumnarToColumnarExec to
simplify the code (#10763)
---
.../apache/gluten/vectorized/BatchIterator.java | 2 +-
.../ArrowColumnarToVeloxColumnarExec.scala | 9 +--
.../gluten/execution/VeloxResizeBatchesExec.scala | 87 ++++------------------
...AppendBatchResizeForShuffleInputAndOutput.scala | 2 +-
.../vectorized/ColumnarBatchSerializer.scala | 2 +-
.../vectorized/ColumnarBatchOutIterator.java | 3 +-
.../apache/gluten/iterator/ClosableIterator.java | 9 +--
.../gluten/execution/ColumnarToColumnarExec.scala | 16 +++-
.../execution/ColumnarToColumnarTransition.scala | 4 +-
.../extension/columnar/transition/package.scala | 4 +-
10 files changed, 44 insertions(+), 94 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
index 2f7471d837..d73330b9a6 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BatchIterator.java
@@ -23,7 +23,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
import java.util.concurrent.atomic.AtomicBoolean;
-public class BatchIterator extends ClosableIterator {
+public class BatchIterator extends ClosableIterator<ColumnarBatch> {
private final long handle;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
index 49d5659996..9209886750 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
@@ -26,12 +26,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
extends ColumnarToColumnarExec(ArrowNativeBatchType, VeloxBatchType) {
override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
- in.map {
- b =>
- val out = VeloxColumnarBatches.toVeloxBatch(b)
- out
- }
+ in.map(b => VeloxColumnarBatches.toVeloxBatch(b))
}
+
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
- ArrowColumnarToVeloxColumnarExec(child = newChild)
+ copy(child = newChild)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index a8e0863639..a1ec54ffbc 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -16,99 +16,40 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.columnar.transition.Convention
-import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.iterator.ClosableIterator
import org.apache.gluten.utils.VeloxBatchResizer
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.SortOrder
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.ColumnarBatch
-import java.util.concurrent.atomic.AtomicLong
-
import scala.collection.JavaConverters._
/**
* An operator to resize input batches by appending the later batches to the
one that comes earlier,
* or splitting one batch to smaller ones.
- *
- * FIXME: Code duplication with ColumnarToColumnarExec.
*/
case class VeloxResizeBatchesExec(
override val child: SparkPlan,
minOutputBatchSize: Int,
maxOutputBatchSize: Int)
- extends GlutenPlan
- with UnaryExecNode {
-
- override lazy val metrics: Map[String, SQLMetric] = Map(
- "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
- "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
input batches"),
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
- "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
output batches"),
- "selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append
/ split batches")
- )
-
- override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
-
- override def rowType0(): Convention.RowType = Convention.RowType.None
+ extends ColumnarToColumnarExec(VeloxBatchType, VeloxBatchType) {
- override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
-
- override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
- val numInputRows = longMetric("numInputRows")
- val numInputBatches = longMetric("numInputBatches")
- val numOutputRows = longMetric("numOutputRows")
- val numOutputBatches = longMetric("numOutputBatches")
- val selfTime = longMetric("selfTime")
-
- child.executeColumnar().mapPartitions {
- in =>
- // Append millis = Out millis - In millis.
- val appendMillis = new AtomicLong(0L)
- val appender = VeloxBatchResizer.create(
- minOutputBatchSize,
- maxOutputBatchSize,
- Iterators
- .wrap(in)
- .collectReadMillis(inMillis => appendMillis.getAndAdd(-inMillis))
- .create()
- .map {
- inBatch =>
- numInputRows += inBatch.numRows()
- numInputBatches += 1
- inBatch
- }
- .asJava
- )
-
- val out = Iterators
- .wrap(appender.asScala)
- .protectInvocationFlow()
- .collectReadMillis(outMillis => appendMillis.getAndAdd(outMillis))
- .recyclePayload(_.close())
- .recycleIterator {
- appender.close()
- selfTime += appendMillis.get()
- }
- .create()
- .map {
- outBatch =>
- numOutputRows += outBatch.numRows()
- numOutputBatches += 1
- outBatch
- }
+ override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
+ VeloxBatchResizer.create(minOutputBatchSize, maxOutputBatchSize,
in.asJava).asScala
+ }
- out
+ override protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = {
+ out.asJava match {
+ case c: ClosableIterator[ColumnarBatch] => c.close()
+ case _ =>
}
}
- override def output: Seq[Attribute] = child.output
+ override protected def needRecyclePayload: Boolean = true
+
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
index aea311c01d..ebe55f573c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala
@@ -25,7 +25,7 @@ import
org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec,
ShuffleQueryStageExec}
/**
- * Try to append [[VeloxResizeBatchesExec]] for shuffle input and ouput to
make the batch sizes in
+ * Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to
make the batch sizes in
* good shape.
*/
case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan]
{
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
index 17eea29238..3b5fce63f8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala
@@ -145,7 +145,7 @@ private class ColumnarBatchSerializerInstanceImpl(
with TaskResource {
private val streamReader = ShuffleStreamReader(streams)
- private val wrappedOut: ClosableIterator = new ColumnarBatchOutIterator(
+ private val wrappedOut: ClosableIterator[ColumnarBatch] = new
ColumnarBatchOutIterator(
runtime,
jniWrapper
.read(shuffleReaderHandle, streamReader))
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
index 4293b2abf8..f4d2c8e7d1 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
@@ -25,7 +25,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
import java.io.IOException;
-public class ColumnarBatchOutIterator extends ClosableIterator implements
RuntimeAware {
+public class ColumnarBatchOutIterator extends ClosableIterator<ColumnarBatch>
+ implements RuntimeAware {
private final Runtime runtime;
private final long iterHandle;
diff --git
a/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java
b/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java
index 6ed9d6c180..7947b09af9 100644
--- a/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java
+++ b/gluten-core/src/main/java/org/apache/gluten/iterator/ClosableIterator.java
@@ -18,14 +18,11 @@ package org.apache.gluten.iterator;
import org.apache.gluten.exception.GlutenException;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-
import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
-public abstract class ClosableIterator
- implements AutoCloseable, Serializable, Iterator<ColumnarBatch> {
+public abstract class ClosableIterator<T> implements AutoCloseable,
Serializable, Iterator<T> {
protected final AtomicBoolean closed = new AtomicBoolean(false);
public ClosableIterator() {}
@@ -43,7 +40,7 @@ public abstract class ClosableIterator
}
@Override
- public final ColumnarBatch next() {
+ public final T next() {
if (closed.get()) {
throw new GlutenException("Iterator has been closed.");
}
@@ -65,5 +62,5 @@ public abstract class ClosableIterator
protected abstract boolean hasNext0() throws Exception;
- protected abstract ColumnarBatch next0() throws Exception;
+ protected abstract T next0() throws Exception;
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
index ac1f8d6835..f19b898983 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -32,9 +32,16 @@ abstract class ColumnarToColumnarExec(from:
Convention.BatchType, to: Convention
extends ColumnarToColumnarTransition
with GlutenPlan {
+ override def isSameConvention: Boolean = from == to
+
def child: SparkPlan
+
protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch]
+ protected def closeIterator(out: Iterator[ColumnarBatch]): Unit = {}
+
+ protected def needRecyclePayload: Boolean = false
+
override lazy val metrics: Map[String, SQLMetric] =
Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
@@ -77,12 +84,18 @@ abstract class ColumnarToColumnarExec(from:
Convention.BatchType, to: Convention
inBatch
}
val out = mapIterator(wrappedIn)
- val wrappedOut = Iterators
+ val builder = Iterators
.wrap(out)
+ .protectInvocationFlow()
.collectReadMillis(outMillis => selfMillis.getAndAdd(outMillis))
.recycleIterator {
+ closeIterator(out)
selfTime += selfMillis.get()
}
+ if (needRecyclePayload) {
+ builder.recyclePayload(_.close())
+ }
+ builder
.create()
.map {
outBatch =>
@@ -90,7 +103,6 @@ abstract class ColumnarToColumnarExec(from:
Convention.BatchType, to: Convention
numOutputBatches += 1
outBatch
}
- wrappedOut
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
index 36c79c3f19..8c4757b45c 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
@@ -18,4 +18,6 @@ package org.apache.gluten.execution
import org.apache.spark.sql.execution.UnaryExecNode
-trait ColumnarToColumnarTransition extends UnaryExecNode
+trait ColumnarToColumnarTransition extends UnaryExecNode {
+ def isSameConvention: Boolean
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
index a10c41cbca..9f309b843f 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
@@ -65,11 +65,11 @@ package object transition {
}
}
- // Extractor for Gluten's C2C
+ // Extractor for Gluten's C2C with different convention
object ColumnarToColumnarLike {
def unapply(plan: SparkPlan): Option[SparkPlan] = {
plan match {
- case c2c: ColumnarToColumnarTransition =>
+ case c2c: ColumnarToColumnarTransition if !c2c.isSameConvention =>
Some(c2c.child)
case _ => None
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]