This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 99d65db03b [VL] Support Velox's `preferred_output_batch_bytes` config
(#10661)
99d65db03b is described below
commit 99d65db03b74383c3871ba7616e2393ea21da543
Author: kevinwilfong <[email protected]>
AuthorDate: Mon Sep 15 09:42:18 2025 -0700
[VL] Support Velox's `preferred_output_batch_bytes` config (#10661)
---
.../org/apache/gluten/config/VeloxConfig.scala | 8 +++++
.../gluten/datasource/ArrowCSVFileFormat.scala | 4 ++-
.../gluten/execution/RowToVeloxColumnarExec.scala | 42 ++++++++++++++++------
.../execution/ColumnarCachedBatchSerializer.scala | 9 +++--
.../gluten/columnarbatch/ColumnarBatchTest.java | 3 +-
.../gluten/execution/MiscOperatorSuite.scala | 23 ++++++++++++
cpp/velox/compute/WholeStageResultIterator.cc | 2 ++
cpp/velox/config/VeloxConfig.h | 3 ++
.../org/apache/gluten/config/GlutenConfig.scala | 3 +-
9 files changed, 81 insertions(+), 16 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index b821d2d35a..461e6f3543 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -80,6 +80,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def enableEnhancedFeatures(): Boolean =
ConfigJniWrapper.isEnhancedFeaturesEnabled &&
getConf(ENABLE_ENHANCED_FEATURES)
+
+ def veloxPreferredBatchBytes: Long =
getConf(COLUMNAR_VELOX_PREFERRED_BATCH_BYTES)
}
object VeloxConfig {
@@ -654,4 +656,10 @@ object VeloxConfig {
.doc("Enable some features including iceberg native write and other
features.")
.booleanConf
.createWithDefault(true)
+
+ val COLUMNAR_VELOX_PREFERRED_BATCH_BYTES =
+ buildConf("spark.gluten.sql.columnar.backend.velox.preferredBatchBytes")
+ .internal()
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("10MB")
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
index 52f7756381..10b7e7801f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.datasource
import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.config.VeloxConfig
import org.apache.gluten.exception.SchemaMismatchException
import org.apache.gluten.execution.RowToVeloxColumnarExec
import org.apache.gluten.iterator.Iterators
@@ -312,7 +313,8 @@ object ArrowCSVFileFormat {
val veloxBatch = RowToVeloxColumnarExec.toColumnarBatchIterator(
it,
schema,
- batchSize
+ batchSize,
+ VeloxConfig.get.veloxPreferredBatchBytes
)
veloxBatch
.map(v => ColumnarBatches.load(ArrowBufferAllocators.contextInstance(),
v))
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index adb4aeca2a..f11de5894a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
@@ -49,6 +49,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends
RowToColumnarExecBas
val numOutputBatches = longMetric("numOutputBatches")
val convertTime = longMetric("convertTime")
val numRows = GlutenConfig.get.maxBatchSize
+ val numBytes = VeloxConfig.get.veloxPreferredBatchBytes
// This avoids calling `schema` in the RDD closure, so that we don't need
to include the entire
// plan (this) in the closure.
val localSchema = schema
@@ -60,7 +61,8 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends
RowToColumnarExecBas
numInputRows,
numOutputBatches,
convertTime,
- numRows)
+ numRows,
+ numBytes)
}
}
@@ -69,6 +71,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends
RowToColumnarExecBas
val numOutputBatches = longMetric("numOutputBatches")
val convertTime = longMetric("convertTime")
val numRows = GlutenConfig.get.maxBatchSize
+ val numBytes = VeloxConfig.get.veloxPreferredBatchBytes
val mode = BroadcastUtils.getBroadcastMode(outputPartitioning)
val relation = child.executeBroadcast()
BroadcastUtils.sparkToVeloxUnsafe(
@@ -83,7 +86,9 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends
RowToColumnarExecBas
numInputRows,
numOutputBatches,
convertTime,
- numRows))
+ numRows,
+ numBytes)
+ )
}
// For spark 3.2.
@@ -96,7 +101,8 @@ object RowToVeloxColumnarExec {
def toColumnarBatchIterator(
it: Iterator[InternalRow],
schema: StructType,
- columnBatchSize: Int): Iterator[ColumnarBatch] = {
+ columnBatchSize: Int,
+ columnBatchBytes: Long): Iterator[ColumnarBatch] = {
val numInputRows = new SQLMetric("numInputRows")
val numOutputBatches = new SQLMetric("numOutputBatches")
val convertTime = new SQLMetric("convertTime")
@@ -106,7 +112,8 @@ object RowToVeloxColumnarExec {
numInputRows,
numOutputBatches,
convertTime,
- columnBatchSize)
+ columnBatchSize,
+ columnBatchBytes)
}
def toColumnarBatchIterator(
@@ -115,7 +122,8 @@ object RowToVeloxColumnarExec {
numInputRows: SQLMetric,
numOutputBatches: SQLMetric,
convertTime: SQLMetric,
- columnBatchSize: Int): Iterator[ColumnarBatch] = {
+ columnBatchSize: Int,
+ columnBatchBytes: Long): Iterator[ColumnarBatch] = {
if (it.isEmpty) {
return Iterator.empty
}
@@ -165,7 +173,7 @@ object RowToVeloxColumnarExec {
val rowLength = new ListBuffer[Long]()
var rowCount = 0
var offset = 0L
- while (rowCount < columnBatchSize && !finished) {
+ while (rowCount < columnBatchSize && offset < columnBatchBytes &&
!finished) {
if (!it.hasNext) {
finished = true
} else {
@@ -180,14 +188,26 @@ object RowToVeloxColumnarExec {
// maybe we should optimize to list ArrayBuf to native to avoid
buf close and allocate
// 31760L origins from
BaseVariableWidthVector.lastValueAllocationSizeInBytes
// experimental value
- val estimatedBufSize = Math.max(
- Math.min(sizeInBytes.toDouble * columnBatchSize * 1.2, 31760L
* columnBatchSize),
- sizeInBytes.toDouble * 10)
+ val estimatedBufSize = Math.min(
+ Math.max(
+ Math.min(sizeInBytes.toDouble * columnBatchSize * 1.2,
31760L * columnBatchSize),
+ sizeInBytes.toDouble * 10),
+ // Limit the size of the buffer to columnBatchBytes or the
size of the first row,
+ // whichever is greater so we always have enough space for the
first row.
+ Math.max(columnBatchBytes, sizeInBytes)
+ )
arrowBuf = arrowAllocator.buffer(estimatedBufSize.toLong)
}
if ((offset + sizeInBytes) > arrowBuf.capacity()) {
- val tmpBuf = arrowAllocator.buffer((offset + sizeInBytes) * 2)
+ val bufSize = if (offset + sizeInBytes > columnBatchBytes) {
+ // If adding the current row causes the batch size to exceed
columnBatchBytes add
+ // just enough space to add the current row.
+ offset + sizeInBytes
+ } else {
+ Math.min((offset + sizeInBytes * 2), columnBatchBytes)
+ }
+ val tmpBuf = arrowAllocator.buffer(bufSize)
tmpBuf.setBytes(0, arrowBuf, 0, offset)
arrowBuf.close()
arrowBuf = tmpBuf
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index b5533695a0..f8d6bd886b 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.execution.{RowToVeloxColumnarExec,
VeloxColumnarToRowExec}
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
@@ -125,7 +125,12 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with Logging {
val numRows = conf.columnBatchSize
val rddColumnarBatch = input.mapPartitions {
- it => RowToVeloxColumnarExec.toColumnarBatchIterator(it, localSchema,
numRows)
+ it =>
+ RowToVeloxColumnarExec.toColumnarBatchIterator(
+ it,
+ localSchema,
+ numRows,
+ VeloxConfig.get.veloxPreferredBatchBytes)
}
convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel,
conf)
}
diff --git
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
index f02caf8f2d..fba8458a4b 100644
---
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
+++
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
@@ -229,7 +229,8 @@ public class ColumnarBatchTest extends VeloxBackendTestBase
{
RowToVeloxColumnarExec.toColumnarBatchIterator(
JavaConverters.<InternalRow>asScalaIterator(batch.rowIterator()),
structType,
- numRows)
+ numRows,
+ Integer.MAX_VALUE)
.next();
Assert.assertEquals("[true,15]\n[false,14]",
ColumnarBatches.toString(veloxBatch, 0, 2));
Assert.assertEquals(
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index e14dd1fcb5..58a81af4a9 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -2158,4 +2158,27 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
}
})
}
+
+ test("RowToVeloxColumnar preferredBatchBytes") {
+ Seq("1", "80", "100000000").foreach(
+ preferredBatchBytes => {
+ withSQLConf(
+ VeloxConfig.COLUMNAR_VELOX_PREFERRED_BATCH_BYTES.key ->
preferredBatchBytes
+ ) {
+ val df = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9,
10).toDF("Col").select($"Col".plus(1))
+ assert(df.collect().length == 10)
+ val ops = collect(df.queryExecution.executedPlan) { case p:
RowToVeloxColumnarExec => p }
+ assert(ops.size == 1)
+ val op = ops.head
+ val metrics = op.metrics
+ // Each row consumes 16 bytes as an UnsafeRow.
+ val expectedNumBatches = preferredBatchBytes match {
+ case "1" => 10
+ case "80" => 2
+ case _ => 1
+ }
+ assert(metrics("numOutputBatches").value == expectedNumBatches)
+ }
+ })
+ }
}
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 9b07e4f826..7e3af2a3fe 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -486,6 +486,8 @@ std::unordered_map<std::string, std::string>
WholeStageResultIterator::getQueryC
std::to_string(veloxCfg_->get<uint32_t>(kSparkBatchSize, 4096));
configs[velox::core::QueryConfig::kMaxOutputBatchRows] =
std::to_string(veloxCfg_->get<uint32_t>(kSparkBatchSize, 4096));
+ configs[velox::core::QueryConfig::kPreferredOutputBatchBytes] =
+ std::to_string(veloxCfg_->get<uint64_t>(kVeloxPreferredBatchBytes, 10L
<< 20));
try {
configs[velox::core::QueryConfig::kSparkAnsiEnabled] =
veloxCfg_->get<std::string>(kAnsiEnabled, "false");
configs[velox::core::QueryConfig::kSessionTimezone] =
veloxCfg_->get<std::string>(kSessionTimezone, "");
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index ca85cc65ec..e37c99987e 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -177,4 +177,7 @@ const std::string kCudfMemoryResourceDefault =
const std::string kCudfMemoryPercent =
"spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent";
const int32_t kCudfMemoryPercentDefault = 50;
+/// Preferred size of batches in bytes to be returned by operators.
+const std::string kVeloxPreferredBatchBytes =
"spark.gluten.sql.columnar.backend.velox.preferredBatchBytes";
+
} // namespace gluten
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index c2339dda43..3b8e7c6568 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -486,7 +486,8 @@ object GlutenConfig {
"spark.gluten.sql.columnar.backend.velox.enableSystemExceptionStacktrace",
"spark.gluten.sql.columnar.backend.velox.memoryUseHugePages",
"spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct",
-
"spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks"
+
"spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks",
+ "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes"
)
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]