This is an automated email from the ASF dual-hosted git repository.

yaooqinn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 53dc27d7d2 [VL] Hoist per-partition constants out of 
ColumnarCachedBatchSerializer.serialize hot path (#12166)
53dc27d7d2 is described below

commit 53dc27d7d2fa19704c314c24ef3821d0a67e101a
Author: Kent Yao <[email protected]>
AuthorDate: Thu May 28 21:46:58 2026 +0800

    [VL] Hoist per-partition constants out of 
ColumnarCachedBatchSerializer.serialize hot path (#12166)
    
    In convertInternalRowToCachedBatch, three values that are constant for
    the lifetime of the per-partition Iterator[CachedBatch] were being
    re-evaluated on every next() call:
    
    1. BackendsApiManager.getBackendName (twice per batch)
    2. GlutenConfig.get.getConf(COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
       -- GlutenConfig.get allocates a fresh GlutenConfig(SQLConf.get) on
       every call (GlutenConfig.scala L584-L586)
    3. ColumnarBatchSerializerJniWrapper.create(Runtimes.contextInstance(...))
    
    Hoist all three out of next() into the mapPartitions body, alongside
    the structSchema value that the same block already hoists for the same
    many-small-batch GC-pressure reason. Only the per-batch handle remains
    inside next() since it depends on the batch.
    
    Wire format is byte-identical. Pure refactor with no new test file;
    behavior fully covered by ColumnarCachedBatchKryoSuite and
    ColumnarCachedBatchKryoBoundaryProbeBugSuite (7 tests, all green
    locally on -Pspark-4.1 -Pscala-2.13).
    
    refs: todos/features/gluten-ccbs-iterator-hoist/docs/0002-decision.md
    refs: 
todos/features/gluten-ccbs-iterator-hoist/docs/0003-implementation-plan.md
---
 .../execution/ColumnarCachedBatchSerializer.scala  | 24 ++++++++++++----------
 1 file changed, 13 insertions(+), 11 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index bb548b2f9c..18e2d56bd1 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -740,30 +740,32 @@ class ColumnarCachedBatchSerializer extends 
SimpleMetricsCachedBatchSerializer
              if heavy batch is encountered */
           batch => VeloxColumnarBatches.ensureVeloxBatch(batch)
         }
-        // Hoist the per-partition StructType out of the per-batch hot path: 
schema is constant
-        // for the lifetime of this iterator, so allocating one StructType per 
CachedBatch wastes
-        // GC for the many-small-batch case.
+        // Hoist per-partition-iterator constants out of the per-batch hot 
path:
+        // schema, backend name, partition-stats conf, and the JNI wrapper are 
all
+        // fixed for the lifetime of this iterator. Allocating them per 
CachedBatch
+        // wastes GC in the many-small-batch case; GlutenConfig.get in 
particular
+        // allocates a fresh GlutenConfig(SQLConf.get) on every call.
         val structSchema = StructType(
           schema.map(a => StructField(a.name, a.dataType, a.nullable)))
+        val backendName = BackendsApiManager.getBackendName
+        val partitionStatsEnabled =
+          
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+        val jni = ColumnarBatchSerializerJniWrapper.create(
+          Runtimes.contextInstance(
+            backendName,
+            "ColumnarCachedBatchSerializer#serialize"))
         new Iterator[CachedBatch] {
           override def hasNext: Boolean = veloxBatches.hasNext
 
           override def next(): CachedBatch = {
             val batch = veloxBatches.next()
-            val jni = ColumnarBatchSerializerJniWrapper.create(
-              Runtimes.contextInstance(
-                BackendsApiManager.getBackendName,
-                "ColumnarCachedBatchSerializer#serialize"))
-            val handle =
-              
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+            val handle = ColumnarBatches.getNativeHandle(backendName, batch)
             // Route through serializeWithStats when the partition-stats conf 
is enabled and the
             // JNI extension is linked in libgluten.so. Capability is detected 
lazily at the
             // call site: a new Gluten jar paired with an older native library 
will throw
             // UnsatisfiedLinkError on the first invocation; we catch it once, 
cache the
             // result, and fall back to the legacy serialize() path emitting 
stats=null. The
             // buildFilter wrapper directs such batches through without 
pruning.
-            val partitionStatsEnabled =
-              
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
             if (partitionStatsEnabled && 
ColumnarCachedBatchSerializer.statsExtAvailable) {
               try {
                 val framed = jni.serializeWithStats(handle)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to