Copilot commented on code in PR #12092:
URL: https://github.com/apache/gluten/pull/12092#discussion_r3246359782


##########
gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchSerializerJniWrapper.java:
##########
@@ -48,4 +54,27 @@ public long rtHandle() {
   public native long deserializeDirect(long serializerHandle, long offset, int 
len);
 
   public native void close(long serializerHandle);
+
+  // Capability check for the serializeWithStats JNI extension. A new Gluten 
jar paired with an
+  // older libgluten.so may lack this symbol; the cache write path consults 
this helper and
+  // falls back to the legacy serialize() when false. Probing is lazy and 
one-shot: reflection
+  // on the declared native method proves the JVM side is wired (the cpp 
symbol is verified at
+  // first real invocation; callers must catch UnsatisfiedLinkError there too).
+  private static volatile Boolean supportsStatsExtCached = null;
+
+  public static boolean supportsStatsExt() {
+    Boolean cached = supportsStatsExtCached;
+    if (cached != null) {
+      return cached;
+    }
+    boolean result;
+    try {
+      
ColumnarBatchSerializerJniWrapper.class.getDeclaredMethod("serializeWithStats", 
long.class);
+      result = true;
+    } catch (NoSuchMethodException e) {
+      result = false;
+    }
+    supportsStatsExtCached = result;
+    return result;

Review Comment:
   `supportsStatsExt()` as implemented will always return `true` in this jar 
because `serializeWithStats(long)` is declared in the same class; reflection 
does not validate that the native symbol is present in the loaded 
`libgluten.so`. With a newer jar + older native library, the first 
`serializeWithStats` call will still throw `UnsatisfiedLinkError`, so this 
capability check currently provides a false sense of safety. Consider either 
(a) removing this helper and relying on a try/catch fallback at the call site, 
or (b) implementing a real native capability probe (e.g., a small JNI method 
that returns whether the symbol/feature is available) and caching that result.
   



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
 
           override def next(): CachedBatch = {
             val batch = veloxBatches.next()
-            val unsafeBuffer = ColumnarBatchSerializerJniWrapper
-              .create(
-                Runtimes.contextInstance(
-                  BackendsApiManager.getBackendName,
-                  "ColumnarCachedBatchSerializer#serialize"))
-              
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, 
batch))
-            val bytes = unsafeBuffer.toByteArray
-            CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+            val jni = ColumnarBatchSerializerJniWrapper.create(
+              Runtimes.contextInstance(
+                BackendsApiManager.getBackendName,
+                "ColumnarCachedBatchSerializer#serialize"))
+            val handle =
+              
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+            // Route through serializeWithStats when the JNI extension is 
available AND the
+            // partition-stats conf is enabled. Capability is cached after 
first probe. When
+            // unavailable (older libgluten.so without the symbol, or conf 
left off) we fall back
+            // to the original serialize() path and emit stats=null; the 
buildFilter wrapper
+            // directs such batches through without pruning.
+            val partitionStatsEnabled =
+              
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+            if (partitionStatsEnabled && 
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {
+              val framed = jni.serializeWithStats(handle)
+              // Carry the per-batch StructType so the Kryo (spill / disk 
cache) read path can
+              // dispatch by dataType.
+              val structSchema = StructType(
+                schema.map(
+                  a =>
+                    StructField(a.name, a.dataType, a.nullable)))
+              val (stats, bytesBlob) =
+                CachedColumnarBatchKryoSerializer.parseFramedBytes(framed, 
structSchema)
+              CachedColumnarBatch(batch.numRows(), bytesBlob.length, 
bytesBlob, stats, structSchema)
+            } else {
+              val unsafeBuffer = jni.serialize(handle)
+              val bytes = unsafeBuffer.toByteArray
+              CachedColumnarBatch(
+                batch.numRows(),
+                bytes.length,
+                bytes,
+                stats = null,
+                schema = null)
+            }

Review Comment:
   The `serializeWithStats` path needs a safe fallback when the native symbol 
isn't available or the backend returns the default empty framing. Today this 
branch calls `jni.serializeWithStats(handle)` and immediately 
`parseFramedBytes(...)` without catching `UnsatisfiedLinkError` (new jar + old 
native lib) or handling `framed.isEmpty` (the default 
`framedSerializeWithStats` returns an empty vector to indicate “not 
supported”). Either condition will currently fail cache materialization instead 
of falling back to `serialize()` and `stats=null` as described in the comments.



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
 
           override def next(): CachedBatch = {
             val batch = veloxBatches.next()
-            val unsafeBuffer = ColumnarBatchSerializerJniWrapper
-              .create(
-                Runtimes.contextInstance(
-                  BackendsApiManager.getBackendName,
-                  "ColumnarCachedBatchSerializer#serialize"))
-              
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, 
batch))
-            val bytes = unsafeBuffer.toByteArray
-            CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+            val jni = ColumnarBatchSerializerJniWrapper.create(
+              Runtimes.contextInstance(
+                BackendsApiManager.getBackendName,
+                "ColumnarCachedBatchSerializer#serialize"))
+            val handle =
+              
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+            // Route through serializeWithStats when the JNI extension is 
available AND the
+            // partition-stats conf is enabled. Capability is cached after 
first probe. When
+            // unavailable (older libgluten.so without the symbol, or conf 
left off) we fall back
+            // to the original serialize() path and emit stats=null; the 
buildFilter wrapper
+            // directs such batches through without pruning.
+            val partitionStatsEnabled =
+              
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+            if (partitionStatsEnabled && 
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {
+              val framed = jni.serializeWithStats(handle)
+              // Carry the per-batch StructType so the Kryo (spill / disk 
cache) read path can
+              // dispatch by dataType.
+              val structSchema = StructType(
+                schema.map(
+                  a =>
+                    StructField(a.name, a.dataType, a.nullable)))
+              val (stats, bytesBlob) =
+                CachedColumnarBatchKryoSerializer.parseFramedBytes(framed, 
structSchema)
+              CachedColumnarBatch(batch.numRows(), bytesBlob.length, 
bytesBlob, stats, structSchema)

Review Comment:
   `StructType` for stats dispatch is rebuilt for every cached batch 
(`StructType(schema.map(StructField(...)))`). Since `schema` is constant per 
partition/iterator, this can be hoisted outside `next()` (or cached in a `lazy 
val`) to avoid repeated allocations during cache materialization, especially 
for many small batches.



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -81,43 +133,443 @@ class CachedColumnarBatchKryoSerializer extends 
KryoSerializer[CachedColumnarBat
       length != Kryo.NULL,
       "The object 'CachedColumnarBatch.bytes' is invalid or malformed to " +
         s"deserialize using ${this.getClass.getName}")
-    val bytes = new Array[Byte](length - 1) // -1 to restore
+    val bytes = new Array[Byte](length - 1)
     input.readBytes(bytes)
-    CachedColumnarBatch(numRows, sizeInBytes, bytes)
+    val hasStats = input.readBoolean()
+    // Even when hasStats=false we still consume the hasSchema tag to keep the 
stream aligned.
+    // NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics 
and the typed
+    // pattern match throws MatchError at runtime.
+    val statsAndSchema: (InternalRow, StructType) = if (hasStats) {
+      val statsLen = input.readInt()
+      val statsBytes = new Array[Byte](statsLen)
+      input.readBytes(statsBytes)
+      val sch = readOptionalSchema(input)
+      (CachedColumnarBatchKryoSerializer.deserializeStats(statsBytes, sch), 
sch)
+    } else {
+      (null, readOptionalSchema(input))
+    }
+    CachedColumnarBatch(numRows, sizeInBytes, bytes, statsAndSchema._1, 
statsAndSchema._2)
+  }
+
+  private def readOptionalSchema(input: Input): StructType = {
+    if (!input.readBoolean()) {
+      null
+    } else {
+      val schemaLen = input.readInt()
+      val schemaBytes = new Array[Byte](schemaLen)
+      input.readBytes(schemaBytes)
+      DataType.fromJson(new String(schemaBytes, 
UTF_8)).asInstanceOf[StructType]
+    }

Review Comment:
   The Kryo read path trusts `length`, `statsLen`, and `schemaLen` from the 
stream and allocates arrays directly from them. A corrupt / truncated cache 
block can currently trigger `NegativeArraySizeException` or OOM (e.g., negative 
`statsLen`, or a very large `schemaLen`). Add basic bounds checks (>=0 and 
reasonable max) before allocating, and fail with a clear exception so 
corruption doesn't crash the executor.



##########
cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc:
##########
@@ -95,4 +98,405 @@ std::shared_ptr<ColumnarBatch> 
VeloxColumnarBatchSerializer::deserialize(uint8_t
   return std::make_shared<VeloxColumnarBatch>(result);
 }
 
+namespace {
+
+// Per-type FlatVector min/max scan + NaN guard. Returns false when the column 
must be marked
+// unsupported (any NaN observed for floating-point types -- Spark equality 
NaN != NaN means
+// min/max-based pruning would silently drop matching rows).
+//
+// Floating-point edge cases that DO NOT poison the column:
+// - +/-Infinity: ordered (-Inf < x < +Inf for finite x); participate in 
min/max normally.
+// - +0 and -0: IEEE 754 declares them equal under <, ==; min/max bound is 
correct either way.
+// - subnormal (denormal) values: ordered like normal floats; no special 
handling needed.
+template <typename T>
+bool scanMinMax(const facebook::velox::FlatVector<T>* flat, T& tLo, T& tHi, 
int64_t& nullCnt, bool& seen) {
+  const auto size = flat->size();
+  const uint64_t* nulls = flat->rawNulls();
+  const T* values = flat->rawValues();
+  for (vector_size_t i = 0; i < size; ++i) {
+    if (nulls != nullptr && bits::isBitNull(nulls, i)) {
+      ++nullCnt;
+      continue;
+    }
+    T v = values[i];
+    if constexpr (std::is_floating_point_v<T>) {
+      if (std::isnan(v)) {
+        return false;
+      }
+    }
+    if (!seen) {
+      tLo = v;
+      tHi = v;
+      seen = true;
+    } else {
+      if (v < tLo)
+        tLo = v;
+      if (v > tHi)
+        tHi = v;
+    }
+  }
+  return true;
+}
+
+} // namespace
+
+std::vector<ColumnStats> 
VeloxColumnarBatchSerializer::computeStats(RowVectorPtr rowVector) {
+  std::vector<ColumnStats> result;
+  const auto numCols = rowVector->childrenSize();
+  result.resize(numCols);
+  for (column_index_t col = 0; col < numCols; ++col) {
+    auto& stats = result[col];
+    auto child = rowVector->childAt(col);
+    if (child == nullptr || !child->isFlatEncoding()) {
+      continue;
+    }
+    bool seen = false;
+    int64_t nullCnt = 0;
+    bool supported = false;
+    switch (child->typeKind()) {
+      case TypeKind::BIGINT: {
+        auto* flat = child->asFlatVector<int64_t>();
+        int64_t lo = 0, hi = 0;
+        supported = scanMinMax<int64_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::INTEGER: {
+        auto* flat = child->asFlatVector<int32_t>();
+        int32_t lo = 0, hi = 0;
+        supported = scanMinMax<int32_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::SMALLINT: {
+        auto* flat = child->asFlatVector<int16_t>();
+        int16_t lo = 0, hi = 0;
+        supported = scanMinMax<int16_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::TINYINT: {
+        auto* flat = child->asFlatVector<int8_t>();
+        int8_t lo = 0, hi = 0;
+        supported = scanMinMax<int8_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::REAL: {
+        auto* flat = child->asFlatVector<float>();
+        float lo = 0.f, hi = 0.f;
+        supported = scanMinMax<float>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::DOUBLE: {
+        auto* flat = child->asFlatVector<double>();
+        double lo = 0.0, hi = 0.0;
+        supported = scanMinMax<double>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::BOOLEAN: {
+        auto* flat = child->asFlatVector<bool>();
+        bool lo = false, hi = false;
+        supported = scanMinMax<bool>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::HUGEINT: {
+        // long-Decimal (precision > 18); marshaled as 16B LE int128 
downstream.
+        auto* flat = child->asFlatVector<int128_t>();
+        int128_t lo = 0, hi = 0;
+        supported = scanMinMax<int128_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::TIMESTAMP: {
+        // Velox Timestamp has defaulted operator<=> (Timestamp.h) so 
scanMinMax compiles via the
+        // existing template. Wire emit converts via toMicros() to int64; 
Spark TimestampType /
+        // TimestampNTZType physical = Long microseconds.
+        auto* flat = child->asFlatVector<Timestamp>();
+        Timestamp lo, hi;
+        supported = scanMinMax<Timestamp>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::VARCHAR: {
+        // StringView::operator<=> uses memcmp -> unsigned byte ordering, 
matching Spark
+        // ByteArray.compareBinary. variant(std::string{sv}) heap-copies so 
post-computeStats
+        // lifetime is decoupled from the RowVector buffer.
+        //
+        // Truncate to 256B at the source so the JVM never sees > 256B (single 
source of truth).
+        // Lower bound: prefix is byte-wise <= original. Upper bound: prefix 
+1 carry on the
+        // rightmost byte to ensure encoded >= original; carry overflow on an 
all-0xFF prefix
+        // demotes supported=0. Mirrors the JVM-side encodeStringBounds.
+        constexpr size_t kStatsStringTruncateLen = 256;
+        auto* flat = child->asFlatVector<StringView>();
+        StringView lo, hi;
+        supported = scanMinMax<StringView>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          const size_t loLen = std::min(static_cast<size_t>(lo.size()), 
kStatsStringTruncateLen);
+          std::string loBytes(lo.data(), loLen);
+          const size_t hiSrcLen = static_cast<size_t>(hi.size());
+          std::string hiBytes(hi.data(), std::min(hiSrcLen, 
kStatsStringTruncateLen));
+          bool hiOk = true;
+          if (hiSrcLen > kStatsStringTruncateLen) {
+            bool carryDone = false;
+            for (int i = static_cast<int>(hiBytes.size()) - 1; i >= 0; --i) {
+              uint8_t b = static_cast<uint8_t>(hiBytes[i]) + 1;
+              if (b != 0) {
+                hiBytes[i] = static_cast<char>(b);
+                carryDone = true;
+                break;
+              }
+              hiBytes[i] = 0;
+            }
+            hiOk = carryDone;
+          }
+          if (hiOk) {
+            stats.hasLowerBound = true;
+            stats.hasUpperBound = true;
+            stats.lowerBound = variant(std::move(loBytes));
+            stats.upperBound = variant(std::move(hiBytes));
+          } else {
+            supported = false;
+          }
+        }
+        break;
+      }
+      default:
+        // Unsupported type -> hasLowerBound=hasUpperBound=false -> JVM 
buildFilter pass-through.
+        break;
+    }
+    stats.nullCount = nullCnt;
+  }
+  return result;
+}
+
+std::vector<uint8_t> VeloxColumnarBatchSerializer::framedSerializeWithStats(
+    const std::shared_ptr<ColumnarBatch>& batch) {
+  // Compute stats over the inbound rowVector BEFORE delegating to the append 
path (which may
+  // consume / mutate iterator state on subsequent calls).
+  auto rowVector = VeloxColumnarBatch::from(veloxPool_.get(), 
batch)->getRowVector();
+  const uint32_t numRows = static_cast<uint32_t>(rowVector->size());
+  std::vector<ColumnStats> perCol = computeStats(rowVector);
+  const uint32_t numCols = static_cast<uint32_t>(perCol.size());
+
+  // Marshal statsBlob (LE primitives via lambdas).
+  std::vector<uint8_t> statsBlob;
+  auto pushU8 = [&](uint8_t v) { statsBlob.push_back(v); };
+  auto pushU32 = [&](uint32_t v) {
+    statsBlob.push_back(static_cast<uint8_t>(v & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 8) & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 16) & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 24) & 0xFF));
+  };
+  auto pushU64 = [&](uint64_t v) {
+    for (int i = 0; i < 8; ++i) {
+      statsBlob.push_back(static_cast<uint8_t>((v >> (8 * i)) & 0xFF));
+    }
+  };
+  auto pushI64LE = [&](int64_t v) { pushU64(static_cast<uint64_t>(v)); };
+  auto pushU16LE = [&](uint16_t v) {
+    statsBlob.push_back(static_cast<uint8_t>(v & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 8) & 0xFF));
+  };
+
+  pushU32(numCols);
+  for (const auto& s : perCol) {
+    auto kind = s.lowerBound.kind();
+    bool emitSupported = s.hasLowerBound && s.hasUpperBound && 
s.lowerBound.kind() == s.upperBound.kind() &&
+        (kind == facebook::velox::TypeKind::BIGINT || kind == 
facebook::velox::TypeKind::INTEGER ||
+         kind == facebook::velox::TypeKind::SMALLINT || kind == 
facebook::velox::TypeKind::TINYINT ||
+         kind == facebook::velox::TypeKind::HUGEINT || kind == 
facebook::velox::TypeKind::REAL ||
+         kind == facebook::velox::TypeKind::DOUBLE || kind == 
facebook::velox::TypeKind::BOOLEAN ||
+         kind == facebook::velox::TypeKind::TIMESTAMP || kind == 
facebook::velox::TypeKind::VARCHAR);
+    pushU8(emitSupported ? 1 : 0);
+    pushU32(static_cast<uint32_t>(s.nullCount));
+    // PartitionStatistics.count = numRows (vanilla gatherNullStats increments 
count for null
+    // rows too; subtracting nullCount inverts the IsNotNull predicate).
+    pushU32(numRows);
+    pushU64(0); // sizeInBytes placeholder
+    if (emitSupported) {
+      switch (kind) {
+        case facebook::velox::TypeKind::BIGINT:
+          pushU32(8);
+          pushI64LE(s.lowerBound.value<int64_t>());
+          pushU32(8);
+          pushI64LE(s.upperBound.value<int64_t>());
+          break;
+        case facebook::velox::TypeKind::INTEGER:
+          pushU32(4);
+          pushU32(static_cast<uint32_t>(s.lowerBound.value<int32_t>()));
+          pushU32(4);
+          pushU32(static_cast<uint32_t>(s.upperBound.value<int32_t>()));
+          break;
+        case facebook::velox::TypeKind::SMALLINT:
+          pushU32(2);
+          pushU16LE(static_cast<uint16_t>(s.lowerBound.value<int16_t>()));
+          pushU32(2);
+          pushU16LE(static_cast<uint16_t>(s.upperBound.value<int16_t>()));
+          break;
+        case facebook::velox::TypeKind::TINYINT:
+          pushU32(1);
+          pushU8(static_cast<uint8_t>(s.lowerBound.value<int8_t>()));
+          pushU32(1);
+          pushU8(static_cast<uint8_t>(s.upperBound.value<int8_t>()));
+          break;
+        case facebook::velox::TypeKind::HUGEINT: {
+          // 16 LE bytes: int128 split into low/high uint64 halves, low first. 
JVM reconstructs
+          // via BigInteger from signed two's-complement big-endian byte array 
(reverse on read).
+          auto pushI128LE = [&](int128_t v) {
+            pushU64(static_cast<uint64_t>(v));
+            pushU64(static_cast<uint64_t>(v >> 64));
+          };
+          pushU32(16);
+          pushI128LE(s.lowerBound.value<int128_t>());
+          pushU32(16);
+          pushI128LE(s.upperBound.value<int128_t>());
+          break;
+        }
+        case facebook::velox::TypeKind::REAL: {
+          uint32_t loBits, hiBits;
+          float lo = s.lowerBound.value<float>();
+          float hi = s.upperBound.value<float>();
+          std::memcpy(&loBits, &lo, sizeof(uint32_t));
+          std::memcpy(&hiBits, &hi, sizeof(uint32_t));
+          pushU32(4);
+          pushU32(loBits);
+          pushU32(4);
+          pushU32(hiBits);
+          break;
+        }
+        case facebook::velox::TypeKind::DOUBLE: {
+          uint64_t loBits, hiBits;
+          double lo = s.lowerBound.value<double>();
+          double hi = s.upperBound.value<double>();
+          std::memcpy(&loBits, &lo, sizeof(uint64_t));
+          std::memcpy(&hiBits, &hi, sizeof(uint64_t));
+          pushU32(8);
+          pushU64(loBits);
+          pushU32(8);
+          pushU64(hiBits);
+          break;
+        }
+        case facebook::velox::TypeKind::BOOLEAN:
+          pushU32(1);
+          pushU8(s.lowerBound.value<bool>() ? 1 : 0);
+          pushU32(1);
+          pushU8(s.upperBound.value<bool>() ? 1 : 0);
+          break;
+        case facebook::velox::TypeKind::TIMESTAMP: {
+          // Spark Timestamp / TimestampNTZ physical = Long microseconds; 
share the JVM LongType
+          // 8B wire arm. Velox Timestamp::toMicros() floors toward -infinity. 
Floor on lo widens
+          // the prune interval downward (conservative) but floor on hi can 
shrink it and
+          // false-negative drop rows whose true ts has nanos % 1000 != 0. 
Fix: ceil hi by +1us
+          // when there is any sub-microsecond residue.
+          const auto& loTs = s.lowerBound.value<facebook::velox::Timestamp>();
+          const auto& hiTs = s.upperBound.value<facebook::velox::Timestamp>();
+          int64_t loMicros = loTs.toMicros();
+          int64_t hiMicros = hiTs.toMicros();
+          if (hiTs.getNanos() % 1000 != 0) {
+            hiMicros += 1;
+          }
+          pushU32(8);
+          pushI64LE(loMicros);
+          pushU32(8);
+          pushI64LE(hiMicros);
+          break;
+        }
+        case facebook::velox::TypeKind::VARCHAR: {
+          // Truncation already applied by computeStats (256B + carry); emit 
raw bytes with
+          // u32 LE length prefix. variant.value<VARCHAR>() returns owned 
std::string&.
+          const auto& loStr = 
s.lowerBound.value<facebook::velox::TypeKind::VARCHAR>();
+          const auto& hiStr = 
s.upperBound.value<facebook::velox::TypeKind::VARCHAR>();
+          pushU32(static_cast<uint32_t>(loStr.size()));
+          for (auto c : loStr) {
+            pushU8(static_cast<uint8_t>(c));
+          }
+          pushU32(static_cast<uint32_t>(hiStr.size()));
+          for (auto c : hiStr) {
+            pushU8(static_cast<uint8_t>(c));
+          }
+          break;
+        }
+        default:
+          break;
+      }
+    }
+  }
+  const uint32_t statsLen = static_cast<uint32_t>(statsBlob.size());
+
+  // Produce bytesBlob via the existing serializer path.
+  append(batch);
+  const int64_t bytesLen = maxSerializedSize();
+  std::vector<uint8_t> bytesBlob(bytesLen);
+  serializeTo(bytesBlob.data(), bytesLen);
+
+  // Assemble: [magic(4) | statsLen(u32 LE) | statsBlob | bytesLen(u32 LE) | 
bytesBlob].
+  std::vector<uint8_t> framed;
+  framed.reserve(4 + 4 + statsLen + 4 + bytesLen);
+  framed.push_back(0xFE);
+  framed.push_back(0xCA);
+  framed.push_back(0x53);
+  framed.push_back(0x02);
+  framed.push_back(static_cast<uint8_t>(statsLen & 0xFF));
+  framed.push_back(static_cast<uint8_t>((statsLen >> 8) & 0xFF));
+  framed.push_back(static_cast<uint8_t>((statsLen >> 16) & 0xFF));
+  framed.push_back(static_cast<uint8_t>((statsLen >> 24) & 0xFF));
+  framed.insert(framed.end(), statsBlob.begin(), statsBlob.end());
+  const uint32_t bytesLen32 = static_cast<uint32_t>(bytesLen);
+  framed.push_back(static_cast<uint8_t>(bytesLen32 & 0xFF));
+  framed.push_back(static_cast<uint8_t>((bytesLen32 >> 8) & 0xFF));
+  framed.push_back(static_cast<uint8_t>((bytesLen32 >> 16) & 0xFF));
+  framed.push_back(static_cast<uint8_t>((bytesLen32 >> 24) & 0xFF));
+  framed.insert(framed.end(), bytesBlob.begin(), bytesBlob.end());

Review Comment:
   `bytesLen` is an `int64_t` but is written into the framing as a 32-bit 
length (`bytesLen32 = static_cast<uint32_t>(bytesLen)`) with no range check. If 
`maxSerializedSize()` ever exceeds `UINT32_MAX` (or even Java’s `byte[]` 
limit), the framing will truncate and the JVM parser will misinterpret the 
payload. Add an explicit guard (and fail fast) ensuring `bytesLen` fits the 
chosen wire type (and ideally within `jsize`/`Int.MaxValue`) before 
allocating/casting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to