This is an automated email from the ASF dual-hosted git repository.
yaooqinn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 53dc27d7d2 [VL] Hoist per-partition constants out of
ColumnarCachedBatchSerializer.serialize hot path (#12166)
53dc27d7d2 is described below
commit 53dc27d7d2fa19704c314c24ef3821d0a67e101a
Author: Kent Yao <[email protected]>
AuthorDate: Thu May 28 21:46:58 2026 +0800
[VL] Hoist per-partition constants out of
ColumnarCachedBatchSerializer.serialize hot path (#12166)
In convertInternalRowToCachedBatch, three values that are constant for
the lifetime of the per-partition Iterator[CachedBatch] were being
re-evaluated on every next() call:
1. BackendsApiManager.getBackendName (twice per batch)
2. GlutenConfig.get.getConf(COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
-- GlutenConfig.get allocates a fresh GlutenConfig(SQLConf.get) on
every call (GlutenConfig.scala L584-L586)
3. ColumnarBatchSerializerJniWrapper.create(Runtimes.contextInstance(...))
Hoist all three out of next() into the mapPartitions body, alongside
the structSchema value that the same block already hoists for the same
many-small-batch GC-pressure reason. Only the per-batch handle remains
inside next() since it depends on the batch.
Wire format is byte-identical. Pure refactor with no new test file;
behavior fully covered by ColumnarCachedBatchKryoSuite and
ColumnarCachedBatchKryoBoundaryProbeBugSuite (7 tests, all green
locally on -Pspark-4.1 -Pscala-2.13).
refs: todos/features/gluten-ccbs-iterator-hoist/docs/0002-decision.md
refs:
todos/features/gluten-ccbs-iterator-hoist/docs/0003-implementation-plan.md
---
.../execution/ColumnarCachedBatchSerializer.scala | 24 ++++++++++++----------
1 file changed, 13 insertions(+), 11 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index bb548b2f9c..18e2d56bd1 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -740,30 +740,32 @@ class ColumnarCachedBatchSerializer extends
SimpleMetricsCachedBatchSerializer
if heavy batch is encountered */
batch => VeloxColumnarBatches.ensureVeloxBatch(batch)
}
- // Hoist the per-partition StructType out of the per-batch hot path:
schema is constant
- // for the lifetime of this iterator, so allocating one StructType per
CachedBatch wastes
- // GC for the many-small-batch case.
+ // Hoist per-partition-iterator constants out of the per-batch hot
path:
+ // schema, backend name, partition-stats conf, and the JNI wrapper are
all
+ // fixed for the lifetime of this iterator. Allocating them per
CachedBatch
+ // wastes GC in the many-small-batch case; GlutenConfig.get in
particular
+ // allocates a fresh GlutenConfig(SQLConf.get) on every call.
val structSchema = StructType(
schema.map(a => StructField(a.name, a.dataType, a.nullable)))
+ val backendName = BackendsApiManager.getBackendName
+ val partitionStatsEnabled =
+
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+ val jni = ColumnarBatchSerializerJniWrapper.create(
+ Runtimes.contextInstance(
+ backendName,
+ "ColumnarCachedBatchSerializer#serialize"))
new Iterator[CachedBatch] {
override def hasNext: Boolean = veloxBatches.hasNext
override def next(): CachedBatch = {
val batch = veloxBatches.next()
- val jni = ColumnarBatchSerializerJniWrapper.create(
- Runtimes.contextInstance(
- BackendsApiManager.getBackendName,
- "ColumnarCachedBatchSerializer#serialize"))
- val handle =
-
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+ val handle = ColumnarBatches.getNativeHandle(backendName, batch)
// Route through serializeWithStats when the partition-stats conf
is enabled and the
// JNI extension is linked in libgluten.so. Capability is detected
lazily at the
// call site: a new Gluten jar paired with an older native library
will throw
// UnsatisfiedLinkError on the first invocation; we catch it once,
cache the
// result, and fall back to the legacy serialize() path emitting
stats=null. The
// buildFilter wrapper directs such batches through without
pruning.
- val partitionStatsEnabled =
-
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
if (partitionStatsEnabled &&
ColumnarCachedBatchSerializer.statsExtAvailable) {
try {
val framed = jni.serializeWithStats(handle)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]