yaooqinn commented on code in PR #12092:
URL: https://github.com/apache/gluten/pull/12092#discussion_r3247815459


##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
 
           override def next(): CachedBatch = {
             val batch = veloxBatches.next()
-            val unsafeBuffer = ColumnarBatchSerializerJniWrapper
-              .create(
-                Runtimes.contextInstance(
-                  BackendsApiManager.getBackendName,
-                  "ColumnarCachedBatchSerializer#serialize"))
-              
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, 
batch))
-            val bytes = unsafeBuffer.toByteArray
-            CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+            val jni = ColumnarBatchSerializerJniWrapper.create(
+              Runtimes.contextInstance(
+                BackendsApiManager.getBackendName,
+                "ColumnarCachedBatchSerializer#serialize"))
+            val handle =
+              
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+            // Route through serializeWithStats when the JNI extension is 
available AND the
+            // partition-stats conf is enabled. Capability is cached after 
first probe. When
+            // unavailable (older libgluten.so without the symbol, or conf 
left off) we fall back
+            // to the original serialize() path and emit stats=null; the 
buildFilter wrapper
+            // directs such batches through without pruning.
+            val partitionStatsEnabled =
+              
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+            if (partitionStatsEnabled && 
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {

Review Comment:
   Addressed in 3f9c923644. Added a new E2E test `partitionStats.enabled=false: 
legacy serialize() path correctness preserved` to `ColumnarCachedBatchE2ESuite` 
that wraps `cacheRange()` in a `withSQLConf` block setting the production 
default `false`, materializes the cache, and verifies the equality filter 
result. This exercises the disabled branch of the gate end-to-end so any 
regression that silently activates stats or breaks the stats=null read path 
surfaces here. Suite passes 12/12 locally (11 pre-existing + 1 new). Thanks for 
catching the gap.



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
 
           override def next(): CachedBatch = {
             val batch = veloxBatches.next()
-            val unsafeBuffer = ColumnarBatchSerializerJniWrapper
-              .create(
-                Runtimes.contextInstance(
-                  BackendsApiManager.getBackendName,
-                  "ColumnarCachedBatchSerializer#serialize"))
-              
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, 
batch))
-            val bytes = unsafeBuffer.toByteArray
-            CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+            val jni = ColumnarBatchSerializerJniWrapper.create(
+              Runtimes.contextInstance(
+                BackendsApiManager.getBackendName,
+                "ColumnarCachedBatchSerializer#serialize"))
+            val handle =
+              
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+            // Route through serializeWithStats when the JNI extension is 
available AND the

Review Comment:
   Thanks -- this is intentional and won't be addressed in this PR. The config 
is scoped to the write path by design (see design D-A4 trade-off in the PR 
description): stats already embedded in cached batches should be honored, 
otherwise toggling the config mid-session would leave behind un-prunable cached 
data with no way to drop the stats blob. The existing  is the user-facing way 
to drop cached stats. Behavior is correct (using available stats is never 
wrong), and no current user could rely on the read-path being gated since the 
feature is new. Adding a read-path kill switch is tracked as a future 
possibility (covered by the same on/off SQLConf if demand emerges).



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
 
           override def next(): CachedBatch = {
             val batch = veloxBatches.next()
-            val unsafeBuffer = ColumnarBatchSerializerJniWrapper
-              .create(
-                Runtimes.contextInstance(
-                  BackendsApiManager.getBackendName,
-                  "ColumnarCachedBatchSerializer#serialize"))
-              
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, 
batch))
-            val bytes = unsafeBuffer.toByteArray
-            CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+            val jni = ColumnarBatchSerializerJniWrapper.create(
+              Runtimes.contextInstance(
+                BackendsApiManager.getBackendName,
+                "ColumnarCachedBatchSerializer#serialize"))
+            val handle =
+              
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+            // Route through serializeWithStats when the JNI extension is 
available AND the
+            // partition-stats conf is enabled. Capability is cached after 
first probe. When
+            // unavailable (older libgluten.so without the symbol, or conf 
left off) we fall back
+            // to the original serialize() path and emit stats=null; the 
buildFilter wrapper
+            // directs such batches through without pruning.
+            val partitionStatsEnabled =
+              
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+            if (partitionStatsEnabled && 
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {
+              val framed = jni.serializeWithStats(handle)
+              // Carry the per-batch StructType so the Kryo (spill / disk 
cache) read path can
+              // dispatch by dataType.
+              val structSchema = StructType(
+                schema.map(
+                  a =>
+                    StructField(a.name, a.dataType, a.nullable)))
+              val (stats, bytesBlob) =
+                CachedColumnarBatchKryoSerializer.parseFramedBytes(framed, 
structSchema)
+              CachedColumnarBatch(batch.numRows(), bytesBlob.length, 
bytesBlob, stats, structSchema)

Review Comment:
   Addressed in 3f9c923644. Hoisted the 
`StructType(schema.map(StructField(...)))` allocation out of the per-batch 
`Iterator.next()` body to a single iterator-scoped val. Schema is constant for 
the lifetime of a partition iterator so re-building per batch was wasted GC, 
especially for the many-small-batch case as you noted. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to