weiting-chen commented on code in PR #12092:
URL: https://github.com/apache/gluten/pull/12092#discussion_r3246347958
##########
gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchSerializerJniWrapper.java:
##########
@@ -48,4 +54,27 @@ public long rtHandle() {
public native long deserializeDirect(long serializerHandle, long offset, int
len);
public native void close(long serializerHandle);
+
+ // Capability check for the serializeWithStats JNI extension. A new Gluten
jar paired with an
+ // older libgluten.so may lack this symbol; the cache write path consults
this helper and
+ // falls back to the legacy serialize() when false. Probing is lazy and
one-shot: reflection
+ // on the declared native method proves the JVM side is wired (the cpp
symbol is verified at
+ // first real invocation; callers must catch UnsatisfiedLinkError there too).
+ private static volatile Boolean supportsStatsExtCached = null;
+
+ public static boolean supportsStatsExt() {
Review Comment:
**`supportsStatsExt()` reflects on Java declaration, not native symbol**
**Problem:** `getDeclaredMethod("serializeWithStats", long.class)` checks
the Java method declaration, which will always exist in this class regardless
of whether the loaded `libgluten.so` contains the corresponding JNI symbol. If
a newer jar is paired with an older native library, enabling
`partitionStats.enabled=true` will throw `UnsatisfiedLinkError` on first
invocation instead of gracefully falling back.
**Evidence:**
```java
public static boolean supportsStatsExt() {
Boolean cached = supportsStatsExtCached;
if (cached != null) return cached;
boolean result;
try {
ColumnarBatchSerializerJniWrapper.class.getDeclaredMethod(
"serializeWithStats", long.class); // always succeeds
result = true;
} catch (NoSuchMethodException e) {
result = false; // never reached
}
// ...
}
```
**Suggested Fix:** Wrap the actual `serializeWithStats()` callsite in
`ColumnarCachedBatchSerializer.scala` with a try/catch for
`UnsatisfiedLinkError` (one-shot cached), or probe native linkage here by
invoking the method with an invalid handle and distinguishing `GlutenException`
(linked) from `UnsatisfiedLinkError` (not linked).
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -81,43 +133,443 @@ class CachedColumnarBatchKryoSerializer extends
KryoSerializer[CachedColumnarBat
length != Kryo.NULL,
"The object 'CachedColumnarBatch.bytes' is invalid or malformed to " +
s"deserialize using ${this.getClass.getName}")
- val bytes = new Array[Byte](length - 1) // -1 to restore
+ val bytes = new Array[Byte](length - 1)
input.readBytes(bytes)
- CachedColumnarBatch(numRows, sizeInBytes, bytes)
+ val hasStats = input.readBoolean()
Review Comment:
**Kryo `read()` not backward-compatible with pre-stats cached batches**
**Problem:** `input.readBoolean()` is called unconditionally after the
`bytes` payload. Previously serialized `CachedColumnarBatch` instances (e.g.,
from `DISK_ONLY` or `MEMORY_AND_DISK` storage levels surviving a rolling
upgrade) lack the trailing `hasStats`/`hasSchema` booleans, causing
`KryoException` on deserialization.
**Evidence:**
```scala
val bytes = new Array[Byte](length - 1)
input.readBytes(bytes)
val hasStats = input.readBoolean() // throws if stream ends here (old
format)
```
**Suggested Fix:** Guard with available-byte check to gracefully handle old
format:
```scala
val hasStats = if (input.available() > 0) input.readBoolean() else false
val stats: InternalRow = if (hasStats) { /* ... */ } else null
val hasSchema = if (input.available() > 0) input.readBoolean() else false
```
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with Logging {
override def next(): CachedBatch = {
val batch = veloxBatches.next()
- val unsafeBuffer = ColumnarBatchSerializerJniWrapper
- .create(
- Runtimes.contextInstance(
- BackendsApiManager.getBackendName,
- "ColumnarCachedBatchSerializer#serialize"))
-
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
batch))
- val bytes = unsafeBuffer.toByteArray
- CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+ val jni = ColumnarBatchSerializerJniWrapper.create(
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName,
+ "ColumnarCachedBatchSerializer#serialize"))
+ val handle =
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+ // Route through serializeWithStats when the JNI extension is
available AND the
Review Comment:
**Config is write-path only — disabling after cache build still prunes**
**Problem:** The config `partitionStats.enabled` only gates the write path
(stats computation during `serialize`). On the read path, `buildFilter`
inherits from `SimpleMetricsCachedBatchSerializer` and will apply pruning
whenever stats are present in the cached batch — regardless of the config
value. If a user enables the config, caches data, then disables it, pruning
continues until the cache is invalidated.
This is arguably correct behavior (stats already embedded should be used),
but it may surprise users who expect the config to be a full kill-switch.
**Investigation Needed:** Consider either:
1. **Document this** — add a note to the config description: "Controls stats
computation on the write path. Cached batches that already contain stats will
continue to use them for pruning."
2. **Gate read path too** — check the config in the `buildFilter` override
and return `Seq.empty` when disabled.
##########
cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc:
##########
@@ -95,4 +98,405 @@ std::shared_ptr<ColumnarBatch>
VeloxColumnarBatchSerializer::deserialize(uint8_t
return std::make_shared<VeloxColumnarBatch>(result);
}
+namespace {
+
+// Per-type FlatVector min/max scan + NaN guard. Returns false when the column
must be marked
+// unsupported (any NaN observed for floating-point types -- Spark equality
NaN != NaN means
+// min/max-based pruning would silently drop matching rows).
+//
+// Floating-point edge cases that DO NOT poison the column:
+// - +/-Infinity: ordered (-Inf < x < +Inf for finite x); participate in
min/max normally.
+// - +0 and -0: IEEE 754 declares them equal under <, ==; min/max bound is
correct either way.
+// - subnormal (denormal) values: ordered like normal floats; no special
handling needed.
+template <typename T>
+bool scanMinMax(const facebook::velox::FlatVector<T>* flat, T& tLo, T& tHi,
int64_t& nullCnt, bool& seen) {
+ const auto size = flat->size();
+ const uint64_t* nulls = flat->rawNulls();
+ const T* values = flat->rawValues();
+ for (vector_size_t i = 0; i < size; ++i) {
+ if (nulls != nullptr && bits::isBitNull(nulls, i)) {
+ ++nullCnt;
+ continue;
+ }
+ T v = values[i];
+ if constexpr (std::is_floating_point_v<T>) {
+ if (std::isnan(v)) {
+ return false;
+ }
+ }
+ if (!seen) {
+ tLo = v;
+ tHi = v;
+ seen = true;
+ } else {
+ if (v < tLo)
+ tLo = v;
+ if (v > tHi)
+ tHi = v;
+ }
+ }
+ return true;
+}
+
+} // namespace
+
+std::vector<ColumnStats>
VeloxColumnarBatchSerializer::computeStats(RowVectorPtr rowVector) {
+ std::vector<ColumnStats> result;
+ const auto numCols = rowVector->childrenSize();
+ result.resize(numCols);
+ for (column_index_t col = 0; col < numCols; ++col) {
+ auto& stats = result[col];
+ auto child = rowVector->childAt(col);
+ if (child == nullptr || !child->isFlatEncoding()) {
+ continue;
+ }
+ bool seen = false;
+ int64_t nullCnt = 0;
+ bool supported = false;
+ switch (child->typeKind()) {
+ case TypeKind::BIGINT: {
+ auto* flat = child->asFlatVector<int64_t>();
+ int64_t lo = 0, hi = 0;
+ supported = scanMinMax<int64_t>(flat, lo, hi, nullCnt, seen);
+ if (supported && seen) {
+ stats.hasLowerBound = true;
+ stats.hasUpperBound = true;
+ stats.lowerBound = variant(lo);
+ stats.upperBound = variant(hi);
+ }
+ break;
+ }
+ case TypeKind::INTEGER: {
+ auto* flat = child->asFlatVector<int32_t>();
+ int32_t lo = 0, hi = 0;
+ supported = scanMinMax<int32_t>(flat, lo, hi, nullCnt, seen);
+ if (supported && seen) {
+ stats.hasLowerBound = true;
+ stats.hasUpperBound = true;
+ stats.lowerBound = variant(lo);
+ stats.upperBound = variant(hi);
+ }
+ break;
+ }
+ case TypeKind::SMALLINT: {
+ auto* flat = child->asFlatVector<int16_t>();
+ int16_t lo = 0, hi = 0;
+ supported = scanMinMax<int16_t>(flat, lo, hi, nullCnt, seen);
+ if (supported && seen) {
+ stats.hasLowerBound = true;
+ stats.hasUpperBound = true;
+ stats.lowerBound = variant(lo);
+ stats.upperBound = variant(hi);
+ }
+ break;
+ }
+ case TypeKind::TINYINT: {
+ auto* flat = child->asFlatVector<int8_t>();
+ int8_t lo = 0, hi = 0;
+ supported = scanMinMax<int8_t>(flat, lo, hi, nullCnt, seen);
+ if (supported && seen) {
+ stats.hasLowerBound = true;
+ stats.hasUpperBound = true;
+ stats.lowerBound = variant(lo);
+ stats.upperBound = variant(hi);
+ }
+ break;
+ }
+ case TypeKind::REAL: {
+ auto* flat = child->asFlatVector<float>();
+ float lo = 0.f, hi = 0.f;
+ supported = scanMinMax<float>(flat, lo, hi, nullCnt, seen);
+ if (supported && seen) {
+ stats.hasLowerBound = true;
+ stats.hasUpperBound = true;
+ stats.lowerBound = variant(lo);
+ stats.upperBound = variant(hi);
+ }
+ break;
+ }
+ case TypeKind::DOUBLE: {
+ auto* flat = child->asFlatVector<double>();
+ double lo = 0.0, hi = 0.0;
+ supported = scanMinMax<double>(flat, lo, hi, nullCnt, seen);
+ if (supported && seen) {
+ stats.hasLowerBound = true;
+ stats.hasUpperBound = true;
+ stats.lowerBound = variant(lo);
+ stats.upperBound = variant(hi);
+ }
+ break;
+ }
+ case TypeKind::BOOLEAN: {
+ auto* flat = child->asFlatVector<bool>();
+ bool lo = false, hi = false;
+ supported = scanMinMax<bool>(flat, lo, hi, nullCnt, seen);
+ if (supported && seen) {
+ stats.hasLowerBound = true;
+ stats.hasUpperBound = true;
+ stats.lowerBound = variant(lo);
+ stats.upperBound = variant(hi);
+ }
+ break;
+ }
+ case TypeKind::HUGEINT: {
+ // long-Decimal (precision > 18); marshaled as 16B LE int128
downstream.
+ auto* flat = child->asFlatVector<int128_t>();
+ int128_t lo = 0, hi = 0;
+ supported = scanMinMax<int128_t>(flat, lo, hi, nullCnt, seen);
+ if (supported && seen) {
+ stats.hasLowerBound = true;
+ stats.hasUpperBound = true;
+ stats.lowerBound = variant(lo);
+ stats.upperBound = variant(hi);
+ }
+ break;
+ }
+ case TypeKind::TIMESTAMP: {
+ // Velox Timestamp has defaulted operator<=> (Timestamp.h) so
scanMinMax compiles via the
+ // existing template. Wire emit converts via toMicros() to int64;
Spark TimestampType /
+ // TimestampNTZType physical = Long microseconds.
+ auto* flat = child->asFlatVector<Timestamp>();
+ Timestamp lo, hi;
+ supported = scanMinMax<Timestamp>(flat, lo, hi, nullCnt, seen);
+ if (supported && seen) {
+ stats.hasLowerBound = true;
+ stats.hasUpperBound = true;
+ stats.lowerBound = variant(lo);
+ stats.upperBound = variant(hi);
+ }
+ break;
+ }
+ case TypeKind::VARCHAR: {
+ // StringView::operator<=> uses memcmp -> unsigned byte ordering,
matching Spark
+ // ByteArray.compareBinary. variant(std::string{sv}) heap-copies so
post-computeStats
+ // lifetime is decoupled from the RowVector buffer.
+ //
+ // Truncate to 256B at the source so the JVM never sees > 256B (single
source of truth).
+ // Lower bound: prefix is byte-wise <= original. Upper bound: prefix
+1 carry on the
+ // rightmost byte to ensure encoded >= original; carry overflow on an
all-0xFF prefix
+ // demotes supported=0. Mirrors the JVM-side encodeStringBounds.
+ constexpr size_t kStatsStringTruncateLen = 256;
+ auto* flat = child->asFlatVector<StringView>();
+ StringView lo, hi;
+ supported = scanMinMax<StringView>(flat, lo, hi, nullCnt, seen);
+ if (supported && seen) {
+ const size_t loLen = std::min(static_cast<size_t>(lo.size()),
kStatsStringTruncateLen);
+ std::string loBytes(lo.data(), loLen);
+ const size_t hiSrcLen = static_cast<size_t>(hi.size());
+ std::string hiBytes(hi.data(), std::min(hiSrcLen,
kStatsStringTruncateLen));
+ bool hiOk = true;
+ if (hiSrcLen > kStatsStringTruncateLen) {
+ bool carryDone = false;
+ for (int i = static_cast<int>(hiBytes.size()) - 1; i >= 0; --i) {
+ uint8_t b = static_cast<uint8_t>(hiBytes[i]) + 1;
+ if (b != 0) {
+ hiBytes[i] = static_cast<char>(b);
+ carryDone = true;
+ break;
+ }
+ hiBytes[i] = 0;
+ }
+ hiOk = carryDone;
+ }
+ if (hiOk) {
+ stats.hasLowerBound = true;
+ stats.hasUpperBound = true;
+ stats.lowerBound = variant(std::move(loBytes));
+ stats.upperBound = variant(std::move(hiBytes));
+ } else {
+ supported = false;
+ }
+ }
+ break;
+ }
+ default:
+ // Unsupported type -> hasLowerBound=hasUpperBound=false -> JVM
buildFilter pass-through.
+ break;
+ }
+ stats.nullCount = nullCnt;
+ }
+ return result;
+}
+
+std::vector<uint8_t> VeloxColumnarBatchSerializer::framedSerializeWithStats(
+ const std::shared_ptr<ColumnarBatch>& batch) {
+ // Compute stats over the inbound rowVector BEFORE delegating to the append
path (which may
+ // consume / mutate iterator state on subsequent calls).
+ auto rowVector = VeloxColumnarBatch::from(veloxPool_.get(),
batch)->getRowVector();
+ const uint32_t numRows = static_cast<uint32_t>(rowVector->size());
+ std::vector<ColumnStats> perCol = computeStats(rowVector);
+ const uint32_t numCols = static_cast<uint32_t>(perCol.size());
+
+ // Marshal statsBlob (LE primitives via lambdas).
+ std::vector<uint8_t> statsBlob;
+ auto pushU8 = [&](uint8_t v) { statsBlob.push_back(v); };
+ auto pushU32 = [&](uint32_t v) {
+ statsBlob.push_back(static_cast<uint8_t>(v & 0xFF));
+ statsBlob.push_back(static_cast<uint8_t>((v >> 8) & 0xFF));
+ statsBlob.push_back(static_cast<uint8_t>((v >> 16) & 0xFF));
+ statsBlob.push_back(static_cast<uint8_t>((v >> 24) & 0xFF));
+ };
+ auto pushU64 = [&](uint64_t v) {
+ for (int i = 0; i < 8; ++i) {
+ statsBlob.push_back(static_cast<uint8_t>((v >> (8 * i)) & 0xFF));
+ }
+ };
+ auto pushI64LE = [&](int64_t v) { pushU64(static_cast<uint64_t>(v)); };
+ auto pushU16LE = [&](uint16_t v) {
+ statsBlob.push_back(static_cast<uint8_t>(v & 0xFF));
+ statsBlob.push_back(static_cast<uint8_t>((v >> 8) & 0xFF));
+ };
+
+ pushU32(numCols);
+ for (const auto& s : perCol) {
+ auto kind = s.lowerBound.kind();
+ bool emitSupported = s.hasLowerBound && s.hasUpperBound &&
s.lowerBound.kind() == s.upperBound.kind() &&
+ (kind == facebook::velox::TypeKind::BIGINT || kind ==
facebook::velox::TypeKind::INTEGER ||
+ kind == facebook::velox::TypeKind::SMALLINT || kind ==
facebook::velox::TypeKind::TINYINT ||
+ kind == facebook::velox::TypeKind::HUGEINT || kind ==
facebook::velox::TypeKind::REAL ||
+ kind == facebook::velox::TypeKind::DOUBLE || kind ==
facebook::velox::TypeKind::BOOLEAN ||
+ kind == facebook::velox::TypeKind::TIMESTAMP || kind ==
facebook::velox::TypeKind::VARCHAR);
+ pushU8(emitSupported ? 1 : 0);
+ pushU32(static_cast<uint32_t>(s.nullCount));
+ // PartitionStatistics.count = numRows (vanilla gatherNullStats increments
count for null
+ // rows too; subtracting nullCount inverts the IsNotNull predicate).
+ pushU32(numRows);
+ pushU64(0); // sizeInBytes placeholder
+ if (emitSupported) {
+ switch (kind) {
+ case facebook::velox::TypeKind::BIGINT:
+ pushU32(8);
+ pushI64LE(s.lowerBound.value<int64_t>());
+ pushU32(8);
+ pushI64LE(s.upperBound.value<int64_t>());
+ break;
+ case facebook::velox::TypeKind::INTEGER:
+ pushU32(4);
+ pushU32(static_cast<uint32_t>(s.lowerBound.value<int32_t>()));
+ pushU32(4);
+ pushU32(static_cast<uint32_t>(s.upperBound.value<int32_t>()));
+ break;
+ case facebook::velox::TypeKind::SMALLINT:
+ pushU32(2);
+ pushU16LE(static_cast<uint16_t>(s.lowerBound.value<int16_t>()));
+ pushU32(2);
+ pushU16LE(static_cast<uint16_t>(s.upperBound.value<int16_t>()));
+ break;
+ case facebook::velox::TypeKind::TINYINT:
+ pushU32(1);
+ pushU8(static_cast<uint8_t>(s.lowerBound.value<int8_t>()));
+ pushU32(1);
+ pushU8(static_cast<uint8_t>(s.upperBound.value<int8_t>()));
+ break;
+ case facebook::velox::TypeKind::HUGEINT: {
+ // 16 LE bytes: int128 split into low/high uint64 halves, low first.
JVM reconstructs
+ // via BigInteger from signed two's-complement big-endian byte array
(reverse on read).
+ auto pushI128LE = [&](int128_t v) {
+ pushU64(static_cast<uint64_t>(v));
+ pushU64(static_cast<uint64_t>(v >> 64));
+ };
+ pushU32(16);
+ pushI128LE(s.lowerBound.value<int128_t>());
+ pushU32(16);
+ pushI128LE(s.upperBound.value<int128_t>());
+ break;
+ }
+ case facebook::velox::TypeKind::REAL: {
+ uint32_t loBits, hiBits;
+ float lo = s.lowerBound.value<float>();
+ float hi = s.upperBound.value<float>();
+ std::memcpy(&loBits, &lo, sizeof(uint32_t));
+ std::memcpy(&hiBits, &hi, sizeof(uint32_t));
+ pushU32(4);
+ pushU32(loBits);
+ pushU32(4);
+ pushU32(hiBits);
+ break;
+ }
+ case facebook::velox::TypeKind::DOUBLE: {
+ uint64_t loBits, hiBits;
+ double lo = s.lowerBound.value<double>();
+ double hi = s.upperBound.value<double>();
+ std::memcpy(&loBits, &lo, sizeof(uint64_t));
+ std::memcpy(&hiBits, &hi, sizeof(uint64_t));
+ pushU32(8);
+ pushU64(loBits);
+ pushU32(8);
+ pushU64(hiBits);
+ break;
+ }
+ case facebook::velox::TypeKind::BOOLEAN:
+ pushU32(1);
+ pushU8(s.lowerBound.value<bool>() ? 1 : 0);
+ pushU32(1);
+ pushU8(s.upperBound.value<bool>() ? 1 : 0);
+ break;
+ case facebook::velox::TypeKind::TIMESTAMP: {
+ // Spark Timestamp / TimestampNTZ physical = Long microseconds;
share the JVM LongType
+ // 8B wire arm. Velox Timestamp::toMicros() floors toward -infinity.
Floor on lo widens
+ // the prune interval downward (conservative) but floor on hi can
shrink it and
+ // false-negative drop rows whose true ts has nanos % 1000 != 0.
Fix: ceil hi by +1us
+ // when there is any sub-microsecond residue.
+ const auto& loTs = s.lowerBound.value<facebook::velox::Timestamp>();
+ const auto& hiTs = s.upperBound.value<facebook::velox::Timestamp>();
+ int64_t loMicros = loTs.toMicros();
+ int64_t hiMicros = hiTs.toMicros();
+ if (hiTs.getNanos() % 1000 != 0) {
+ hiMicros += 1;
+ }
+ pushU32(8);
+ pushI64LE(loMicros);
+ pushU32(8);
+ pushI64LE(hiMicros);
+ break;
+ }
+ case facebook::velox::TypeKind::VARCHAR: {
+ // Truncation already applied by computeStats (256B + carry); emit
raw bytes with
+ // u32 LE length prefix. variant.value<VARCHAR>() returns owned
std::string&.
+ const auto& loStr =
s.lowerBound.value<facebook::velox::TypeKind::VARCHAR>();
+ const auto& hiStr =
s.upperBound.value<facebook::velox::TypeKind::VARCHAR>();
+ pushU32(static_cast<uint32_t>(loStr.size()));
+ for (auto c : loStr) {
+ pushU8(static_cast<uint8_t>(c));
+ }
+ pushU32(static_cast<uint32_t>(hiStr.size()));
+ for (auto c : hiStr) {
+ pushU8(static_cast<uint8_t>(c));
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ }
+ const uint32_t statsLen = static_cast<uint32_t>(statsBlob.size());
+
+ // Produce bytesBlob via the existing serializer path.
+ append(batch);
+ const int64_t bytesLen = maxSerializedSize();
+ std::vector<uint8_t> bytesBlob(bytesLen);
+ serializeTo(bytesBlob.data(), bytesLen);
+
+ // Assemble: [magic(4) | statsLen(u32 LE) | statsBlob | bytesLen(u32 LE) |
bytesBlob].
+ std::vector<uint8_t> framed;
+ framed.reserve(4 + 4 + statsLen + 4 + bytesLen);
+ framed.push_back(0xFE);
+ framed.push_back(0xCA);
+ framed.push_back(0x53);
+ framed.push_back(0x02);
+ framed.push_back(static_cast<uint8_t>(statsLen & 0xFF));
+ framed.push_back(static_cast<uint8_t>((statsLen >> 8) & 0xFF));
+ framed.push_back(static_cast<uint8_t>((statsLen >> 16) & 0xFF));
+ framed.push_back(static_cast<uint8_t>((statsLen >> 24) & 0xFF));
+ framed.insert(framed.end(), statsBlob.begin(), statsBlob.end());
+ const uint32_t bytesLen32 = static_cast<uint32_t>(bytesLen);
Review Comment:
**`uint32_t` truncation on serialized batch length — missing bounds check**
**Problem:** `maxSerializedSize()` returns `int64_t` but is silently cast to
`uint32_t` for the framed wire format. If a single batch exceeds 4GB
(pathological but possible with very wide schemas or huge string columns), the
length wraps around silently, producing a corrupt frame that the JVM will
mis-parse.
**Evidence:**
```cpp
const int64_t bytesLen = maxSerializedSize();
std::vector<uint8_t> bytesBlob(bytesLen);
// ...
const uint32_t bytesLen32 = static_cast<uint32_t>(bytesLen); // silent
truncation
```
**Suggested Fix:**
```cpp
GLUTEN_CHECK(
bytesLen >= 0 && bytesLen <= std::numeric_limits<uint32_t>::max(),
"Serialized batch size (" + std::to_string(bytesLen) +
") exceeds u32 framing limit");
const uint32_t bytesLen32 = static_cast<uint32_t>(bytesLen);
```
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with Logging {
override def next(): CachedBatch = {
val batch = veloxBatches.next()
- val unsafeBuffer = ColumnarBatchSerializerJniWrapper
- .create(
- Runtimes.contextInstance(
- BackendsApiManager.getBackendName,
- "ColumnarCachedBatchSerializer#serialize"))
-
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
batch))
- val bytes = unsafeBuffer.toByteArray
- CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+ val jni = ColumnarBatchSerializerJniWrapper.create(
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName,
+ "ColumnarCachedBatchSerializer#serialize"))
+ val handle =
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+ // Route through serializeWithStats when the JNI extension is
available AND the
+ // partition-stats conf is enabled. Capability is cached after
first probe. When
+ // unavailable (older libgluten.so without the symbol, or conf
left off) we fall back
+ // to the original serialize() path and emit stats=null; the
buildFilter wrapper
+ // directs such batches through without pruning.
+ val partitionStatsEnabled =
+
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+ if (partitionStatsEnabled &&
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {
Review Comment:
**Config-gate negative test gap**
**Problem:** All E2E tests in `ColumnarCachedBatchE2ESuite` set
`partitionStats.enabled=true`. No integration test verifies that when the
config is `false` (the production default), stats are NOT computed and pruning
does NOT occur. A bug in the gating logic could silently activate stats for all
users.
**Evidence:** The gate check is here:
```scala
if (partitionStatsEnabled &&
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {
val framed = jni.serializeWithStats(handle)
```
But this path is never exercised with `partitionStatsEnabled=false` in E2E
tests.
**Suggested Fix:** Add a test to `ColumnarCachedBatchE2ESuite`:
```scala
test("config disabled: no stats in cached batch, no pruning") {
withSQLConf(
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "false"
) {
val df = spark.range(1000).selectExpr("id as c1").repartition(4).cache()
try {
df.count() // materialize
val filtered = df.filter("c1 = 500")
checkAnswer(filtered, Row(500))
// Verify full scan (no partition pruning)
} finally df.unpersist()
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]