This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 34da0c6a6f [VL] Add convenient C2R API (#10796)
34da0c6a6f is described below
commit 34da0c6a6f32a806e7f8be9f5085e917943933e7
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Sep 25 13:33:46 2025 +0200
[VL] Add convenient C2R API (#10796)
---
.../gluten/execution/VeloxColumnarToRowExec.scala | 126 +++++++++++----------
.../execution/ColumnarCachedBatchSerializer.scala | 4 +-
2 files changed, 70 insertions(+), 60 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
index 31cda32dad..097c7d4897 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
@@ -26,7 +26,7 @@ import org.apache.gluten.vectorized.{NativeColumnarToRowInfo,
NativeColumnarToRo
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.{BroadcastUtils, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
@@ -74,7 +74,7 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends
ColumnarToRowExecBas
child.executeColumnar().mapPartitions {
it =>
VeloxColumnarToRowExec
- .toRowIterator(it, output, numOutputRows, numInputBatches,
convertTime)
+ .toRowIterator(it, numOutputRows, numInputBatches, convertTime)
}
}
@@ -89,7 +89,7 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends
ColumnarToRowExecBas
sparkContext,
mode,
relation,
- VeloxColumnarToRowExec.toRowIterator(_, output, numOutputRows,
numInputBatches, convertTime))
+ VeloxColumnarToRowExec.toRowIterator(_, numOutputRows, numInputBatches,
convertTime))
}
protected def withNewChildInternal(newChild: SparkPlan):
VeloxColumnarToRowExec =
@@ -98,23 +98,20 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends
ColumnarToRowExecBas
object VeloxColumnarToRowExec {
- def toRowIterator(
- batches: Iterator[ColumnarBatch],
- output: Seq[Attribute]): Iterator[InternalRow] = {
+ def toRowIterator(batches: Iterator[ColumnarBatch]): Iterator[InternalRow] =
{
val numOutputRows = new SQLMetric("numOutputRows")
val numInputBatches = new SQLMetric("numInputBatches")
val convertTime = new SQLMetric("convertTime")
toRowIterator(
batches,
- output,
numOutputRows,
numInputBatches,
convertTime
)
}
+
def toRowIterator(
batches: Iterator[ColumnarBatch],
- output: Seq[Attribute],
numOutputRows: SQLMetric,
numInputBatches: SQLMetric,
convertTime: SQLMetric): Iterator[InternalRow] = {
@@ -122,13 +119,9 @@ object VeloxColumnarToRowExec {
return Iterator.empty
}
- val runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName,
"ColumnarToRow")
- // TODO: Pass the jni jniWrapper and arrowSchema and serializeSchema
method by broadcast.
- val jniWrapper = NativeColumnarToRowJniWrapper.create(runtime)
- val c2rId = jniWrapper.nativeColumnarToRowInit()
+ val converter = new Converter(convertTime)
val res: Iterator[Iterator[InternalRow]] = new
Iterator[Iterator[InternalRow]] {
-
override def hasNext: Boolean = {
batches.hasNext
}
@@ -137,61 +130,80 @@ object VeloxColumnarToRowExec {
val batch = batches.next()
numInputBatches += 1
numOutputRows += batch.numRows()
+ converter.toRowIterator(batch)
+ }
+ }
+ Iterators
+ .wrap(res.flatten)
+ .protectInvocationFlow() // Spark may call `hasNext()` again after a
false output which
+ // is not allowed by Gluten iterators. E.g.
GroupedIterator#fetchNextGroupIterator
+ .recycleIterator {
+ converter.close()
+ }
+ .create()
+ }
- if (batch.numRows == 0) {
- batch.close()
- return Iterator.empty
- }
+ /**
+ * A convenient C2R API to allow caller converts batches on demand without
having to pass in an
+ * Iterator[ColumnarBatch].
+ */
+ class Converter(convertTime: SQLMetric) {
+ private val runtime =
+ Runtimes.contextInstance(BackendsApiManager.getBackendName,
"VeloxColumnarToRow")
+ // TODO: Pass the jni jniWrapper and arrowSchema and serializeSchema
method by broadcast.
+ private val jniWrapper = NativeColumnarToRowJniWrapper.create(runtime)
+ private val c2rId = jniWrapper.nativeColumnarToRowInit()
- if (output.isEmpty) {
- val rows = ColumnarBatches.emptyRowIterator(batch.numRows()).asScala
- batch.close()
- return rows
- }
+ def toRowIterator(batch: ColumnarBatch): Iterator[InternalRow] = {
+ if (batch.numRows() == 0) {
+ return Iterator.empty
+ }
+
+ if (batch.numCols() == 0) {
+ val rows = ColumnarBatches.emptyRowIterator(batch.numRows()).asScala
+ return rows
+ }
- VeloxColumnarBatches.checkVeloxBatch(batch)
+ VeloxColumnarBatches.checkVeloxBatch(batch)
- val cols = batch.numCols()
- val rows = batch.numRows()
- val batchHandle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
- var info: NativeColumnarToRowInfo = null
+ new Iterator[InternalRow] {
+ private val cols = batch.numCols()
+ private val rows = batch.numRows()
+ private val batchHandle =
+ ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
batch)
- new Iterator[InternalRow] {
- var rowId = 0
- var baseLength = 0
- val row = new UnsafeRow(cols)
+ // Mutable members.
+ private var rowId = 0
+ private var baseLength = 0
+ private val row = new UnsafeRow(cols)
+ private var info: NativeColumnarToRowInfo = _
- override def hasNext: Boolean = {
- rowId < rows
- }
+ override def hasNext: Boolean = {
+ rowId < rows
+ }
- override def next: UnsafeRow = {
- if (rowId == 0 || rowId == baseLength + info.lengths.length) {
- baseLength = if (info == null) {
- baseLength
- } else {
- baseLength + info.lengths.length
- }
- val before = System.currentTimeMillis()
- info = jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle,
rowId)
- convertTime += (System.currentTimeMillis() - before)
+ override def next(): InternalRow = {
+ if (rowId == 0 || rowId == baseLength + info.lengths.length) {
+ baseLength = if (info == null) {
+ baseLength
+ } else {
+ baseLength + info.lengths.length
}
- val (offset, length) =
- (info.offsets(rowId - baseLength), info.lengths(rowId -
baseLength))
- row.pointTo(null, info.memoryAddress + offset, length)
- rowId += 1
- row
+ val before = System.currentTimeMillis()
+ info = jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle,
rowId)
+ convertTime += (System.currentTimeMillis() - before)
}
+ val (offset, length) =
+ (info.offsets(rowId - baseLength), info.lengths(rowId -
baseLength))
+ row.pointTo(null, info.memoryAddress + offset, length)
+ rowId += 1
+ row
}
}
}
- Iterators
- .wrap(res.flatten)
- .protectInvocationFlow() // Spark may call `hasNext()` again after a
false output which
- // is not allowed by Gluten iterators. E.g.
GroupedIterator#fetchNextGroupIterator
- .recycleIterator {
- jniWrapper.nativeClose(c2rId)
- }
- .create()
+
+ def close(): Unit = {
+ jniWrapper.nativeClose(c2rId)
+ }
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index f8d6bd886b..80e5039e76 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -152,9 +152,7 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with Logging {
val rddColumnarBatch =
convertCachedBatchToColumnarBatch(input, cacheAttributes,
selectedAttributes, conf)
- rddColumnarBatch.mapPartitions {
- it => VeloxColumnarToRowExec.toRowIterator(it, selectedAttributes)
- }
+ rddColumnarBatch.mapPartitions(it =>
VeloxColumnarToRowExec.toRowIterator(it))
}
override def convertColumnarBatchToCachedBatch(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]