weiting-chen commented on code in PR #12092:
URL: https://github.com/apache/gluten/pull/12092#discussion_r3246347958


##########
gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchSerializerJniWrapper.java:
##########
@@ -48,4 +54,27 @@ public long rtHandle() {
   public native long deserializeDirect(long serializerHandle, long offset, int 
len);
 
   public native void close(long serializerHandle);
+
+  // Capability check for the serializeWithStats JNI extension. A new Gluten 
jar paired with an
+  // older libgluten.so may lack this symbol; the cache write path consults 
this helper and
+  // falls back to the legacy serialize() when false. Probing is lazy and 
one-shot: reflection
+  // on the declared native method proves the JVM side is wired (the cpp 
symbol is verified at
+  // first real invocation; callers must catch UnsatisfiedLinkError there too).
+  private static volatile Boolean supportsStatsExtCached = null;
+
+  public static boolean supportsStatsExt() {

Review Comment:
   **`supportsStatsExt()` reflects on Java declaration, not native symbol**
   
   **Problem:** `getDeclaredMethod("serializeWithStats", long.class)` checks 
the Java method declaration, which will always exist in this class regardless 
of whether the loaded `libgluten.so` contains the corresponding JNI symbol. If 
a newer jar is paired with an older native library, enabling 
`partitionStats.enabled=true` will throw `UnsatisfiedLinkError` on first 
invocation instead of gracefully falling back.
   
   **Evidence:**
   ```java
   public static boolean supportsStatsExt() {
       Boolean cached = supportsStatsExtCached;
       if (cached != null) return cached;
       boolean result;
       try {
         ColumnarBatchSerializerJniWrapper.class.getDeclaredMethod(
             "serializeWithStats", long.class); // always succeeds
         result = true;
       } catch (NoSuchMethodException e) {
         result = false; // never reached
       }
       // ...
   }
   ```
   
   **Suggested Fix:** Wrap the actual `serializeWithStats()` callsite in 
`ColumnarCachedBatchSerializer.scala` with a try/catch for 
`UnsatisfiedLinkError` (one-shot cached), or probe native linkage here by 
invoking the method with an invalid handle and distinguishing `GlutenException` 
(linked) from `UnsatisfiedLinkError` (not linked).



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -81,43 +133,443 @@ class CachedColumnarBatchKryoSerializer extends 
KryoSerializer[CachedColumnarBat
       length != Kryo.NULL,
       "The object 'CachedColumnarBatch.bytes' is invalid or malformed to " +
         s"deserialize using ${this.getClass.getName}")
-    val bytes = new Array[Byte](length - 1) // -1 to restore
+    val bytes = new Array[Byte](length - 1)
     input.readBytes(bytes)
-    CachedColumnarBatch(numRows, sizeInBytes, bytes)
+    val hasStats = input.readBoolean()

Review Comment:
   **Kryo `read()` not backward-compatible with pre-stats cached batches**
   
   **Problem:** `input.readBoolean()` is called unconditionally after the 
`bytes` payload. Previously serialized `CachedColumnarBatch` instances (e.g., 
from `DISK_ONLY` or `MEMORY_AND_DISK` storage levels surviving a rolling 
upgrade) lack the trailing `hasStats`/`hasSchema` booleans, causing 
`KryoException` on deserialization.
   
   **Evidence:**
   ```scala
   val bytes = new Array[Byte](length - 1)
   input.readBytes(bytes)
   val hasStats = input.readBoolean()  // throws if stream ends here (old 
format)
   ```
   
   **Suggested Fix:** Guard with available-byte check to gracefully handle old 
format:
   ```scala
   val hasStats = if (input.available() > 0) input.readBoolean() else false
   val stats: InternalRow = if (hasStats) { /* ... */ } else null
   val hasSchema = if (input.available() > 0) input.readBoolean() else false
   ```



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
 
           override def next(): CachedBatch = {
             val batch = veloxBatches.next()
-            val unsafeBuffer = ColumnarBatchSerializerJniWrapper
-              .create(
-                Runtimes.contextInstance(
-                  BackendsApiManager.getBackendName,
-                  "ColumnarCachedBatchSerializer#serialize"))
-              
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, 
batch))
-            val bytes = unsafeBuffer.toByteArray
-            CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+            val jni = ColumnarBatchSerializerJniWrapper.create(
+              Runtimes.contextInstance(
+                BackendsApiManager.getBackendName,
+                "ColumnarCachedBatchSerializer#serialize"))
+            val handle =
+              
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+            // Route through serializeWithStats when the JNI extension is 
available AND the

Review Comment:
   **Config is write-path only — disabling after cache build still prunes**
   
   **Problem:** The config `partitionStats.enabled` only gates the write path 
(stats computation during `serialize`). On the read path, `buildFilter` 
inherits from `SimpleMetricsCachedBatchSerializer` and will apply pruning 
whenever stats are present in the cached batch — regardless of the config 
value. If a user enables the config, caches data, then disables it, pruning 
continues until the cache is invalidated.
   
   This is arguably correct behavior (stats already embedded should be used), 
but it may surprise users who expect the config to be a full kill-switch.
   
   **Investigation Needed:** Consider either:
   1. **Document this** — add a note to the config description: "Controls stats 
computation on the write path. Cached batches that already contain stats will 
continue to use them for pruning."
   2. **Gate read path too** — check the config in the `buildFilter` override 
and return `Seq.empty` when disabled.



##########
cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc:
##########
@@ -95,4 +98,405 @@ std::shared_ptr<ColumnarBatch> 
VeloxColumnarBatchSerializer::deserialize(uint8_t
   return std::make_shared<VeloxColumnarBatch>(result);
 }
 
+namespace {
+
+// Per-type FlatVector min/max scan + NaN guard. Returns false when the column 
must be marked
+// unsupported (any NaN observed for floating-point types -- Spark equality 
NaN != NaN means
+// min/max-based pruning would silently drop matching rows).
+//
+// Floating-point edge cases that DO NOT poison the column:
+// - +/-Infinity: ordered (-Inf < x < +Inf for finite x); participate in 
min/max normally.
+// - +0 and -0: IEEE 754 declares them equal under <, ==; min/max bound is 
correct either way.
+// - subnormal (denormal) values: ordered like normal floats; no special 
handling needed.
+template <typename T>
+bool scanMinMax(const facebook::velox::FlatVector<T>* flat, T& tLo, T& tHi, 
int64_t& nullCnt, bool& seen) {
+  const auto size = flat->size();
+  const uint64_t* nulls = flat->rawNulls();
+  const T* values = flat->rawValues();
+  for (vector_size_t i = 0; i < size; ++i) {
+    if (nulls != nullptr && bits::isBitNull(nulls, i)) {
+      ++nullCnt;
+      continue;
+    }
+    T v = values[i];
+    if constexpr (std::is_floating_point_v<T>) {
+      if (std::isnan(v)) {
+        return false;
+      }
+    }
+    if (!seen) {
+      tLo = v;
+      tHi = v;
+      seen = true;
+    } else {
+      if (v < tLo)
+        tLo = v;
+      if (v > tHi)
+        tHi = v;
+    }
+  }
+  return true;
+}
+
+} // namespace
+
+std::vector<ColumnStats> 
VeloxColumnarBatchSerializer::computeStats(RowVectorPtr rowVector) {
+  std::vector<ColumnStats> result;
+  const auto numCols = rowVector->childrenSize();
+  result.resize(numCols);
+  for (column_index_t col = 0; col < numCols; ++col) {
+    auto& stats = result[col];
+    auto child = rowVector->childAt(col);
+    if (child == nullptr || !child->isFlatEncoding()) {
+      continue;
+    }
+    bool seen = false;
+    int64_t nullCnt = 0;
+    bool supported = false;
+    switch (child->typeKind()) {
+      case TypeKind::BIGINT: {
+        auto* flat = child->asFlatVector<int64_t>();
+        int64_t lo = 0, hi = 0;
+        supported = scanMinMax<int64_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::INTEGER: {
+        auto* flat = child->asFlatVector<int32_t>();
+        int32_t lo = 0, hi = 0;
+        supported = scanMinMax<int32_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::SMALLINT: {
+        auto* flat = child->asFlatVector<int16_t>();
+        int16_t lo = 0, hi = 0;
+        supported = scanMinMax<int16_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::TINYINT: {
+        auto* flat = child->asFlatVector<int8_t>();
+        int8_t lo = 0, hi = 0;
+        supported = scanMinMax<int8_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::REAL: {
+        auto* flat = child->asFlatVector<float>();
+        float lo = 0.f, hi = 0.f;
+        supported = scanMinMax<float>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::DOUBLE: {
+        auto* flat = child->asFlatVector<double>();
+        double lo = 0.0, hi = 0.0;
+        supported = scanMinMax<double>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::BOOLEAN: {
+        auto* flat = child->asFlatVector<bool>();
+        bool lo = false, hi = false;
+        supported = scanMinMax<bool>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::HUGEINT: {
+        // long-Decimal (precision > 18); marshaled as 16B LE int128 
downstream.
+        auto* flat = child->asFlatVector<int128_t>();
+        int128_t lo = 0, hi = 0;
+        supported = scanMinMax<int128_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::TIMESTAMP: {
+        // Velox Timestamp has defaulted operator<=> (Timestamp.h) so 
scanMinMax compiles via the
+        // existing template. Wire emit converts via toMicros() to int64; 
Spark TimestampType /
+        // TimestampNTZType physical = Long microseconds.
+        auto* flat = child->asFlatVector<Timestamp>();
+        Timestamp lo, hi;
+        supported = scanMinMax<Timestamp>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::VARCHAR: {
+        // StringView::operator<=> uses memcmp -> unsigned byte ordering, 
matching Spark
+        // ByteArray.compareBinary. variant(std::string{sv}) heap-copies so 
post-computeStats
+        // lifetime is decoupled from the RowVector buffer.
+        //
+        // Truncate to 256B at the source so the JVM never sees > 256B (single 
source of truth).
+        // Lower bound: prefix is byte-wise <= original. Upper bound: prefix 
+1 carry on the
+        // rightmost byte to ensure encoded >= original; carry overflow on an 
all-0xFF prefix
+        // demotes supported=0. Mirrors the JVM-side encodeStringBounds.
+        constexpr size_t kStatsStringTruncateLen = 256;
+        auto* flat = child->asFlatVector<StringView>();
+        StringView lo, hi;
+        supported = scanMinMax<StringView>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          const size_t loLen = std::min(static_cast<size_t>(lo.size()), 
kStatsStringTruncateLen);
+          std::string loBytes(lo.data(), loLen);
+          const size_t hiSrcLen = static_cast<size_t>(hi.size());
+          std::string hiBytes(hi.data(), std::min(hiSrcLen, 
kStatsStringTruncateLen));
+          bool hiOk = true;
+          if (hiSrcLen > kStatsStringTruncateLen) {
+            bool carryDone = false;
+            for (int i = static_cast<int>(hiBytes.size()) - 1; i >= 0; --i) {
+              uint8_t b = static_cast<uint8_t>(hiBytes[i]) + 1;
+              if (b != 0) {
+                hiBytes[i] = static_cast<char>(b);
+                carryDone = true;
+                break;
+              }
+              hiBytes[i] = 0;
+            }
+            hiOk = carryDone;
+          }
+          if (hiOk) {
+            stats.hasLowerBound = true;
+            stats.hasUpperBound = true;
+            stats.lowerBound = variant(std::move(loBytes));
+            stats.upperBound = variant(std::move(hiBytes));
+          } else {
+            supported = false;
+          }
+        }
+        break;
+      }
+      default:
+        // Unsupported type -> hasLowerBound=hasUpperBound=false -> JVM 
buildFilter pass-through.
+        break;
+    }
+    stats.nullCount = nullCnt;
+  }
+  return result;
+}
+
+std::vector<uint8_t> VeloxColumnarBatchSerializer::framedSerializeWithStats(
+    const std::shared_ptr<ColumnarBatch>& batch) {
+  // Compute stats over the inbound rowVector BEFORE delegating to the append 
path (which may
+  // consume / mutate iterator state on subsequent calls).
+  auto rowVector = VeloxColumnarBatch::from(veloxPool_.get(), 
batch)->getRowVector();
+  const uint32_t numRows = static_cast<uint32_t>(rowVector->size());
+  std::vector<ColumnStats> perCol = computeStats(rowVector);
+  const uint32_t numCols = static_cast<uint32_t>(perCol.size());
+
+  // Marshal statsBlob (LE primitives via lambdas).
+  std::vector<uint8_t> statsBlob;
+  auto pushU8 = [&](uint8_t v) { statsBlob.push_back(v); };
+  auto pushU32 = [&](uint32_t v) {
+    statsBlob.push_back(static_cast<uint8_t>(v & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 8) & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 16) & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 24) & 0xFF));
+  };
+  auto pushU64 = [&](uint64_t v) {
+    for (int i = 0; i < 8; ++i) {
+      statsBlob.push_back(static_cast<uint8_t>((v >> (8 * i)) & 0xFF));
+    }
+  };
+  auto pushI64LE = [&](int64_t v) { pushU64(static_cast<uint64_t>(v)); };
+  auto pushU16LE = [&](uint16_t v) {
+    statsBlob.push_back(static_cast<uint8_t>(v & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 8) & 0xFF));
+  };
+
+  pushU32(numCols);
+  for (const auto& s : perCol) {
+    auto kind = s.lowerBound.kind();
+    bool emitSupported = s.hasLowerBound && s.hasUpperBound && 
s.lowerBound.kind() == s.upperBound.kind() &&
+        (kind == facebook::velox::TypeKind::BIGINT || kind == 
facebook::velox::TypeKind::INTEGER ||
+         kind == facebook::velox::TypeKind::SMALLINT || kind == 
facebook::velox::TypeKind::TINYINT ||
+         kind == facebook::velox::TypeKind::HUGEINT || kind == 
facebook::velox::TypeKind::REAL ||
+         kind == facebook::velox::TypeKind::DOUBLE || kind == 
facebook::velox::TypeKind::BOOLEAN ||
+         kind == facebook::velox::TypeKind::TIMESTAMP || kind == 
facebook::velox::TypeKind::VARCHAR);
+    pushU8(emitSupported ? 1 : 0);
+    pushU32(static_cast<uint32_t>(s.nullCount));
+    // PartitionStatistics.count = numRows (vanilla gatherNullStats increments 
count for null
+    // rows too; subtracting nullCount inverts the IsNotNull predicate).
+    pushU32(numRows);
+    pushU64(0); // sizeInBytes placeholder
+    if (emitSupported) {
+      switch (kind) {
+        case facebook::velox::TypeKind::BIGINT:
+          pushU32(8);
+          pushI64LE(s.lowerBound.value<int64_t>());
+          pushU32(8);
+          pushI64LE(s.upperBound.value<int64_t>());
+          break;
+        case facebook::velox::TypeKind::INTEGER:
+          pushU32(4);
+          pushU32(static_cast<uint32_t>(s.lowerBound.value<int32_t>()));
+          pushU32(4);
+          pushU32(static_cast<uint32_t>(s.upperBound.value<int32_t>()));
+          break;
+        case facebook::velox::TypeKind::SMALLINT:
+          pushU32(2);
+          pushU16LE(static_cast<uint16_t>(s.lowerBound.value<int16_t>()));
+          pushU32(2);
+          pushU16LE(static_cast<uint16_t>(s.upperBound.value<int16_t>()));
+          break;
+        case facebook::velox::TypeKind::TINYINT:
+          pushU32(1);
+          pushU8(static_cast<uint8_t>(s.lowerBound.value<int8_t>()));
+          pushU32(1);
+          pushU8(static_cast<uint8_t>(s.upperBound.value<int8_t>()));
+          break;
+        case facebook::velox::TypeKind::HUGEINT: {
+          // 16 LE bytes: int128 split into low/high uint64 halves, low first. 
JVM reconstructs
+          // via BigInteger from signed two's-complement big-endian byte array 
(reverse on read).
+          auto pushI128LE = [&](int128_t v) {
+            pushU64(static_cast<uint64_t>(v));
+            pushU64(static_cast<uint64_t>(v >> 64));
+          };
+          pushU32(16);
+          pushI128LE(s.lowerBound.value<int128_t>());
+          pushU32(16);
+          pushI128LE(s.upperBound.value<int128_t>());
+          break;
+        }
+        case facebook::velox::TypeKind::REAL: {
+          uint32_t loBits, hiBits;
+          float lo = s.lowerBound.value<float>();
+          float hi = s.upperBound.value<float>();
+          std::memcpy(&loBits, &lo, sizeof(uint32_t));
+          std::memcpy(&hiBits, &hi, sizeof(uint32_t));
+          pushU32(4);
+          pushU32(loBits);
+          pushU32(4);
+          pushU32(hiBits);
+          break;
+        }
+        case facebook::velox::TypeKind::DOUBLE: {
+          uint64_t loBits, hiBits;
+          double lo = s.lowerBound.value<double>();
+          double hi = s.upperBound.value<double>();
+          std::memcpy(&loBits, &lo, sizeof(uint64_t));
+          std::memcpy(&hiBits, &hi, sizeof(uint64_t));
+          pushU32(8);
+          pushU64(loBits);
+          pushU32(8);
+          pushU64(hiBits);
+          break;
+        }
+        case facebook::velox::TypeKind::BOOLEAN:
+          pushU32(1);
+          pushU8(s.lowerBound.value<bool>() ? 1 : 0);
+          pushU32(1);
+          pushU8(s.upperBound.value<bool>() ? 1 : 0);
+          break;
+        case facebook::velox::TypeKind::TIMESTAMP: {
+          // Spark Timestamp / TimestampNTZ physical = Long microseconds; 
share the JVM LongType
+          // 8B wire arm. Velox Timestamp::toMicros() floors toward -infinity. 
Floor on lo widens
+          // the prune interval downward (conservative) but floor on hi can 
shrink it and
+          // false-negative drop rows whose true ts has nanos % 1000 != 0. 
Fix: ceil hi by +1us
+          // when there is any sub-microsecond residue.
+          const auto& loTs = s.lowerBound.value<facebook::velox::Timestamp>();
+          const auto& hiTs = s.upperBound.value<facebook::velox::Timestamp>();
+          int64_t loMicros = loTs.toMicros();
+          int64_t hiMicros = hiTs.toMicros();
+          if (hiTs.getNanos() % 1000 != 0) {
+            hiMicros += 1;
+          }
+          pushU32(8);
+          pushI64LE(loMicros);
+          pushU32(8);
+          pushI64LE(hiMicros);
+          break;
+        }
+        case facebook::velox::TypeKind::VARCHAR: {
+          // Truncation already applied by computeStats (256B + carry); emit 
raw bytes with
+          // u32 LE length prefix. variant.value<VARCHAR>() returns owned 
std::string&.
+          const auto& loStr = 
s.lowerBound.value<facebook::velox::TypeKind::VARCHAR>();
+          const auto& hiStr = 
s.upperBound.value<facebook::velox::TypeKind::VARCHAR>();
+          pushU32(static_cast<uint32_t>(loStr.size()));
+          for (auto c : loStr) {
+            pushU8(static_cast<uint8_t>(c));
+          }
+          pushU32(static_cast<uint32_t>(hiStr.size()));
+          for (auto c : hiStr) {
+            pushU8(static_cast<uint8_t>(c));
+          }
+          break;
+        }
+        default:
+          break;
+      }
+    }
+  }
+  const uint32_t statsLen = static_cast<uint32_t>(statsBlob.size());
+
+  // Produce bytesBlob via the existing serializer path.
+  append(batch);
+  const int64_t bytesLen = maxSerializedSize();
+  std::vector<uint8_t> bytesBlob(bytesLen);
+  serializeTo(bytesBlob.data(), bytesLen);
+
+  // Assemble: [magic(4) | statsLen(u32 LE) | statsBlob | bytesLen(u32 LE) | 
bytesBlob].
+  std::vector<uint8_t> framed;
+  framed.reserve(4 + 4 + statsLen + 4 + bytesLen);
+  framed.push_back(0xFE);
+  framed.push_back(0xCA);
+  framed.push_back(0x53);
+  framed.push_back(0x02);
+  framed.push_back(static_cast<uint8_t>(statsLen & 0xFF));
+  framed.push_back(static_cast<uint8_t>((statsLen >> 8) & 0xFF));
+  framed.push_back(static_cast<uint8_t>((statsLen >> 16) & 0xFF));
+  framed.push_back(static_cast<uint8_t>((statsLen >> 24) & 0xFF));
+  framed.insert(framed.end(), statsBlob.begin(), statsBlob.end());
+  const uint32_t bytesLen32 = static_cast<uint32_t>(bytesLen);

Review Comment:
   **`uint32_t` truncation on serialized batch length — missing bounds check**
   
   **Problem:** `maxSerializedSize()` returns `int64_t` but is silently cast to 
`uint32_t` for the framed wire format. If a single batch exceeds 4GB 
(pathological but possible with very wide schemas or huge string columns), the 
length wraps around silently, producing a corrupt frame that the JVM will 
mis-parse.
   
   **Evidence:**
   ```cpp
   const int64_t bytesLen = maxSerializedSize();
   std::vector<uint8_t> bytesBlob(bytesLen);
   // ...
   const uint32_t bytesLen32 = static_cast<uint32_t>(bytesLen);  // silent 
truncation
   ```
   
   **Suggested Fix:**
   ```cpp
   GLUTEN_CHECK(
       bytesLen >= 0 && bytesLen <= std::numeric_limits<uint32_t>::max(),
       "Serialized batch size (" + std::to_string(bytesLen) +
       ") exceeds u32 framing limit");
   const uint32_t bytesLen32 = static_cast<uint32_t>(bytesLen);
   ```



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
 
           override def next(): CachedBatch = {
             val batch = veloxBatches.next()
-            val unsafeBuffer = ColumnarBatchSerializerJniWrapper
-              .create(
-                Runtimes.contextInstance(
-                  BackendsApiManager.getBackendName,
-                  "ColumnarCachedBatchSerializer#serialize"))
-              
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, 
batch))
-            val bytes = unsafeBuffer.toByteArray
-            CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+            val jni = ColumnarBatchSerializerJniWrapper.create(
+              Runtimes.contextInstance(
+                BackendsApiManager.getBackendName,
+                "ColumnarCachedBatchSerializer#serialize"))
+            val handle =
+              
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+            // Route through serializeWithStats when the JNI extension is 
available AND the
+            // partition-stats conf is enabled. Capability is cached after 
first probe. When
+            // unavailable (older libgluten.so without the symbol, or conf 
left off) we fall back
+            // to the original serialize() path and emit stats=null; the 
buildFilter wrapper
+            // directs such batches through without pruning.
+            val partitionStatsEnabled =
+              
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+            if (partitionStatsEnabled && 
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {

Review Comment:
   **Config-gate negative test gap**
   
   **Problem:** All E2E tests in `ColumnarCachedBatchE2ESuite` set 
`partitionStats.enabled=true`. No integration test verifies that when the 
config is `false` (the production default), stats are NOT computed and pruning 
does NOT occur. A bug in the gating logic could silently activate stats for all 
users.
   
   **Evidence:** The gate check is here:
   ```scala
   if (partitionStatsEnabled && 
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {
     val framed = jni.serializeWithStats(handle)
   ```
   But this path is never exercised with `partitionStatsEnabled=false` in E2E 
tests.
   
   **Suggested Fix:** Add a test to `ColumnarCachedBatchE2ESuite`:
   ```scala
   test("config disabled: no stats in cached batch, no pruning") {
     withSQLConf(
       GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "false"
     ) {
       val df = spark.range(1000).selectExpr("id as c1").repartition(4).cache()
       try {
         df.count() // materialize
         val filtered = df.filter("c1 = 500")
         checkAnswer(filtered, Row(500))
         // Verify full scan (no partition pruning)
       } finally df.unpersist()
     }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to