yaooqinn commented on code in PR #12092:
URL: https://github.com/apache/gluten/pull/12092#discussion_r3247612418


##########
gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchSerializerJniWrapper.java:
##########
@@ -48,4 +54,27 @@ public long rtHandle() {
   public native long deserializeDirect(long serializerHandle, long offset, int 
len);
 
   public native void close(long serializerHandle);
+
+  // Capability check for the serializeWithStats JNI extension. A new Gluten 
jar paired with an
+  // older libgluten.so may lack this symbol; the cache write path consults 
this helper and
+  // falls back to the legacy serialize() when false. Probing is lazy and 
one-shot: reflection
+  // on the declared native method proves the JVM side is wired (the cpp 
symbol is verified at
+  // first real invocation; callers must catch UnsatisfiedLinkError there too).
+  private static volatile Boolean supportsStatsExtCached = null;
+
+  public static boolean supportsStatsExt() {

Review Comment:
   Addressed in `9145670865` (callsite-fallback). Thank you for the advice!



##########
cpp/velox/operators/serializer/VeloxColumnarBatchSerializer.cc:
##########
@@ -95,4 +98,405 @@ std::shared_ptr<ColumnarBatch> 
VeloxColumnarBatchSerializer::deserialize(uint8_t
   return std::make_shared<VeloxColumnarBatch>(result);
 }
 
+namespace {
+
+// Per-type FlatVector min/max scan + NaN guard. Returns false when the column 
must be marked
+// unsupported (any NaN observed for floating-point types -- Spark equality 
NaN != NaN means
+// min/max-based pruning would silently drop matching rows).
+//
+// Floating-point edge cases that DO NOT poison the column:
+// - +/-Infinity: ordered (-Inf < x < +Inf for finite x); participate in 
min/max normally.
+// - +0 and -0: IEEE 754 declares them equal under <, ==; min/max bound is 
correct either way.
+// - subnormal (denormal) values: ordered like normal floats; no special 
handling needed.
+template <typename T>
+bool scanMinMax(const facebook::velox::FlatVector<T>* flat, T& tLo, T& tHi, 
int64_t& nullCnt, bool& seen) {
+  const auto size = flat->size();
+  const uint64_t* nulls = flat->rawNulls();
+  const T* values = flat->rawValues();
+  for (vector_size_t i = 0; i < size; ++i) {
+    if (nulls != nullptr && bits::isBitNull(nulls, i)) {
+      ++nullCnt;
+      continue;
+    }
+    T v = values[i];
+    if constexpr (std::is_floating_point_v<T>) {
+      if (std::isnan(v)) {
+        return false;
+      }
+    }
+    if (!seen) {
+      tLo = v;
+      tHi = v;
+      seen = true;
+    } else {
+      if (v < tLo)
+        tLo = v;
+      if (v > tHi)
+        tHi = v;
+    }
+  }
+  return true;
+}
+
+} // namespace
+
+std::vector<ColumnStats> 
VeloxColumnarBatchSerializer::computeStats(RowVectorPtr rowVector) {
+  std::vector<ColumnStats> result;
+  const auto numCols = rowVector->childrenSize();
+  result.resize(numCols);
+  for (column_index_t col = 0; col < numCols; ++col) {
+    auto& stats = result[col];
+    auto child = rowVector->childAt(col);
+    if (child == nullptr || !child->isFlatEncoding()) {
+      continue;
+    }
+    bool seen = false;
+    int64_t nullCnt = 0;
+    bool supported = false;
+    switch (child->typeKind()) {
+      case TypeKind::BIGINT: {
+        auto* flat = child->asFlatVector<int64_t>();
+        int64_t lo = 0, hi = 0;
+        supported = scanMinMax<int64_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::INTEGER: {
+        auto* flat = child->asFlatVector<int32_t>();
+        int32_t lo = 0, hi = 0;
+        supported = scanMinMax<int32_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::SMALLINT: {
+        auto* flat = child->asFlatVector<int16_t>();
+        int16_t lo = 0, hi = 0;
+        supported = scanMinMax<int16_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::TINYINT: {
+        auto* flat = child->asFlatVector<int8_t>();
+        int8_t lo = 0, hi = 0;
+        supported = scanMinMax<int8_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::REAL: {
+        auto* flat = child->asFlatVector<float>();
+        float lo = 0.f, hi = 0.f;
+        supported = scanMinMax<float>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::DOUBLE: {
+        auto* flat = child->asFlatVector<double>();
+        double lo = 0.0, hi = 0.0;
+        supported = scanMinMax<double>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::BOOLEAN: {
+        auto* flat = child->asFlatVector<bool>();
+        bool lo = false, hi = false;
+        supported = scanMinMax<bool>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::HUGEINT: {
+        // long-Decimal (precision > 18); marshaled as 16B LE int128 
downstream.
+        auto* flat = child->asFlatVector<int128_t>();
+        int128_t lo = 0, hi = 0;
+        supported = scanMinMax<int128_t>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::TIMESTAMP: {
+        // Velox Timestamp has defaulted operator<=> (Timestamp.h) so 
scanMinMax compiles via the
+        // existing template. Wire emit converts via toMicros() to int64; 
Spark TimestampType /
+        // TimestampNTZType physical = Long microseconds.
+        auto* flat = child->asFlatVector<Timestamp>();
+        Timestamp lo, hi;
+        supported = scanMinMax<Timestamp>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          stats.hasLowerBound = true;
+          stats.hasUpperBound = true;
+          stats.lowerBound = variant(lo);
+          stats.upperBound = variant(hi);
+        }
+        break;
+      }
+      case TypeKind::VARCHAR: {
+        // StringView::operator<=> uses memcmp -> unsigned byte ordering, 
matching Spark
+        // ByteArray.compareBinary. variant(std::string{sv}) heap-copies so 
post-computeStats
+        // lifetime is decoupled from the RowVector buffer.
+        //
+        // Truncate to 256B at the source so the JVM never sees > 256B (single 
source of truth).
+        // Lower bound: prefix is byte-wise <= original. Upper bound: prefix 
+1 carry on the
+        // rightmost byte to ensure encoded >= original; carry overflow on an 
all-0xFF prefix
+        // demotes supported=0. Mirrors the JVM-side encodeStringBounds.
+        constexpr size_t kStatsStringTruncateLen = 256;
+        auto* flat = child->asFlatVector<StringView>();
+        StringView lo, hi;
+        supported = scanMinMax<StringView>(flat, lo, hi, nullCnt, seen);
+        if (supported && seen) {
+          const size_t loLen = std::min(static_cast<size_t>(lo.size()), 
kStatsStringTruncateLen);
+          std::string loBytes(lo.data(), loLen);
+          const size_t hiSrcLen = static_cast<size_t>(hi.size());
+          std::string hiBytes(hi.data(), std::min(hiSrcLen, 
kStatsStringTruncateLen));
+          bool hiOk = true;
+          if (hiSrcLen > kStatsStringTruncateLen) {
+            bool carryDone = false;
+            for (int i = static_cast<int>(hiBytes.size()) - 1; i >= 0; --i) {
+              uint8_t b = static_cast<uint8_t>(hiBytes[i]) + 1;
+              if (b != 0) {
+                hiBytes[i] = static_cast<char>(b);
+                carryDone = true;
+                break;
+              }
+              hiBytes[i] = 0;
+            }
+            hiOk = carryDone;
+          }
+          if (hiOk) {
+            stats.hasLowerBound = true;
+            stats.hasUpperBound = true;
+            stats.lowerBound = variant(std::move(loBytes));
+            stats.upperBound = variant(std::move(hiBytes));
+          } else {
+            supported = false;
+          }
+        }
+        break;
+      }
+      default:
+        // Unsupported type -> hasLowerBound=hasUpperBound=false -> JVM 
buildFilter pass-through.
+        break;
+    }
+    stats.nullCount = nullCnt;
+  }
+  return result;
+}
+
+std::vector<uint8_t> VeloxColumnarBatchSerializer::framedSerializeWithStats(
+    const std::shared_ptr<ColumnarBatch>& batch) {
+  // Compute stats over the inbound rowVector BEFORE delegating to the append 
path (which may
+  // consume / mutate iterator state on subsequent calls).
+  auto rowVector = VeloxColumnarBatch::from(veloxPool_.get(), 
batch)->getRowVector();
+  const uint32_t numRows = static_cast<uint32_t>(rowVector->size());
+  std::vector<ColumnStats> perCol = computeStats(rowVector);
+  const uint32_t numCols = static_cast<uint32_t>(perCol.size());
+
+  // Marshal statsBlob (LE primitives via lambdas).
+  std::vector<uint8_t> statsBlob;
+  auto pushU8 = [&](uint8_t v) { statsBlob.push_back(v); };
+  auto pushU32 = [&](uint32_t v) {
+    statsBlob.push_back(static_cast<uint8_t>(v & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 8) & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 16) & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 24) & 0xFF));
+  };
+  auto pushU64 = [&](uint64_t v) {
+    for (int i = 0; i < 8; ++i) {
+      statsBlob.push_back(static_cast<uint8_t>((v >> (8 * i)) & 0xFF));
+    }
+  };
+  auto pushI64LE = [&](int64_t v) { pushU64(static_cast<uint64_t>(v)); };
+  auto pushU16LE = [&](uint16_t v) {
+    statsBlob.push_back(static_cast<uint8_t>(v & 0xFF));
+    statsBlob.push_back(static_cast<uint8_t>((v >> 8) & 0xFF));
+  };
+
+  pushU32(numCols);
+  for (const auto& s : perCol) {
+    auto kind = s.lowerBound.kind();
+    bool emitSupported = s.hasLowerBound && s.hasUpperBound && 
s.lowerBound.kind() == s.upperBound.kind() &&
+        (kind == facebook::velox::TypeKind::BIGINT || kind == 
facebook::velox::TypeKind::INTEGER ||
+         kind == facebook::velox::TypeKind::SMALLINT || kind == 
facebook::velox::TypeKind::TINYINT ||
+         kind == facebook::velox::TypeKind::HUGEINT || kind == 
facebook::velox::TypeKind::REAL ||
+         kind == facebook::velox::TypeKind::DOUBLE || kind == 
facebook::velox::TypeKind::BOOLEAN ||
+         kind == facebook::velox::TypeKind::TIMESTAMP || kind == 
facebook::velox::TypeKind::VARCHAR);
+    pushU8(emitSupported ? 1 : 0);
+    pushU32(static_cast<uint32_t>(s.nullCount));
+    // PartitionStatistics.count = numRows (vanilla gatherNullStats increments 
count for null
+    // rows too; subtracting nullCount inverts the IsNotNull predicate).
+    pushU32(numRows);
+    pushU64(0); // sizeInBytes placeholder
+    if (emitSupported) {
+      switch (kind) {
+        case facebook::velox::TypeKind::BIGINT:
+          pushU32(8);
+          pushI64LE(s.lowerBound.value<int64_t>());
+          pushU32(8);
+          pushI64LE(s.upperBound.value<int64_t>());
+          break;
+        case facebook::velox::TypeKind::INTEGER:
+          pushU32(4);
+          pushU32(static_cast<uint32_t>(s.lowerBound.value<int32_t>()));
+          pushU32(4);
+          pushU32(static_cast<uint32_t>(s.upperBound.value<int32_t>()));
+          break;
+        case facebook::velox::TypeKind::SMALLINT:
+          pushU32(2);
+          pushU16LE(static_cast<uint16_t>(s.lowerBound.value<int16_t>()));
+          pushU32(2);
+          pushU16LE(static_cast<uint16_t>(s.upperBound.value<int16_t>()));
+          break;
+        case facebook::velox::TypeKind::TINYINT:
+          pushU32(1);
+          pushU8(static_cast<uint8_t>(s.lowerBound.value<int8_t>()));
+          pushU32(1);
+          pushU8(static_cast<uint8_t>(s.upperBound.value<int8_t>()));
+          break;
+        case facebook::velox::TypeKind::HUGEINT: {
+          // 16 LE bytes: int128 split into low/high uint64 halves, low first. 
JVM reconstructs
+          // via BigInteger from signed two's-complement big-endian byte array 
(reverse on read).
+          auto pushI128LE = [&](int128_t v) {
+            pushU64(static_cast<uint64_t>(v));
+            pushU64(static_cast<uint64_t>(v >> 64));
+          };
+          pushU32(16);
+          pushI128LE(s.lowerBound.value<int128_t>());
+          pushU32(16);
+          pushI128LE(s.upperBound.value<int128_t>());
+          break;
+        }
+        case facebook::velox::TypeKind::REAL: {
+          uint32_t loBits, hiBits;
+          float lo = s.lowerBound.value<float>();
+          float hi = s.upperBound.value<float>();
+          std::memcpy(&loBits, &lo, sizeof(uint32_t));
+          std::memcpy(&hiBits, &hi, sizeof(uint32_t));
+          pushU32(4);
+          pushU32(loBits);
+          pushU32(4);
+          pushU32(hiBits);
+          break;
+        }
+        case facebook::velox::TypeKind::DOUBLE: {
+          uint64_t loBits, hiBits;
+          double lo = s.lowerBound.value<double>();
+          double hi = s.upperBound.value<double>();
+          std::memcpy(&loBits, &lo, sizeof(uint64_t));
+          std::memcpy(&hiBits, &hi, sizeof(uint64_t));
+          pushU32(8);
+          pushU64(loBits);
+          pushU32(8);
+          pushU64(hiBits);
+          break;
+        }
+        case facebook::velox::TypeKind::BOOLEAN:
+          pushU32(1);
+          pushU8(s.lowerBound.value<bool>() ? 1 : 0);
+          pushU32(1);
+          pushU8(s.upperBound.value<bool>() ? 1 : 0);
+          break;
+        case facebook::velox::TypeKind::TIMESTAMP: {
+          // Spark Timestamp / TimestampNTZ physical = Long microseconds; 
share the JVM LongType
+          // 8B wire arm. Velox Timestamp::toMicros() floors toward -infinity. 
Floor on lo widens
+          // the prune interval downward (conservative) but floor on hi can 
shrink it and
+          // false-negative drop rows whose true ts has nanos % 1000 != 0. 
Fix: ceil hi by +1us
+          // when there is any sub-microsecond residue.
+          const auto& loTs = s.lowerBound.value<facebook::velox::Timestamp>();
+          const auto& hiTs = s.upperBound.value<facebook::velox::Timestamp>();
+          int64_t loMicros = loTs.toMicros();
+          int64_t hiMicros = hiTs.toMicros();
+          if (hiTs.getNanos() % 1000 != 0) {
+            hiMicros += 1;
+          }
+          pushU32(8);
+          pushI64LE(loMicros);
+          pushU32(8);
+          pushI64LE(hiMicros);
+          break;
+        }
+        case facebook::velox::TypeKind::VARCHAR: {
+          // Truncation already applied by computeStats (256B + carry); emit 
raw bytes with
+          // u32 LE length prefix. variant.value<VARCHAR>() returns owned 
std::string&.
+          const auto& loStr = 
s.lowerBound.value<facebook::velox::TypeKind::VARCHAR>();
+          const auto& hiStr = 
s.upperBound.value<facebook::velox::TypeKind::VARCHAR>();
+          pushU32(static_cast<uint32_t>(loStr.size()));
+          for (auto c : loStr) {
+            pushU8(static_cast<uint8_t>(c));
+          }
+          pushU32(static_cast<uint32_t>(hiStr.size()));
+          for (auto c : hiStr) {
+            pushU8(static_cast<uint8_t>(c));
+          }
+          break;
+        }
+        default:
+          break;
+      }
+    }
+  }
+  const uint32_t statsLen = static_cast<uint32_t>(statsBlob.size());
+
+  // Produce bytesBlob via the existing serializer path.
+  append(batch);
+  const int64_t bytesLen = maxSerializedSize();
+  std::vector<uint8_t> bytesBlob(bytesLen);
+  serializeTo(bytesBlob.data(), bytesLen);
+
+  // Assemble: [magic(4) | statsLen(u32 LE) | statsBlob | bytesLen(u32 LE) | 
bytesBlob].
+  std::vector<uint8_t> framed;
+  framed.reserve(4 + 4 + statsLen + 4 + bytesLen);
+  framed.push_back(0xFE);
+  framed.push_back(0xCA);
+  framed.push_back(0x53);
+  framed.push_back(0x02);
+  framed.push_back(static_cast<uint8_t>(statsLen & 0xFF));
+  framed.push_back(static_cast<uint8_t>((statsLen >> 8) & 0xFF));
+  framed.push_back(static_cast<uint8_t>((statsLen >> 16) & 0xFF));
+  framed.push_back(static_cast<uint8_t>((statsLen >> 24) & 0xFF));
+  framed.insert(framed.end(), statsBlob.begin(), statsBlob.end());
+  const uint32_t bytesLen32 = static_cast<uint32_t>(bytesLen);

Review Comment:
   Addressed in `94d7207af1` (u32-framing-check). Thank you for the advice!



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -81,43 +133,443 @@ class CachedColumnarBatchKryoSerializer extends 
KryoSerializer[CachedColumnarBat
       length != Kryo.NULL,
       "The object 'CachedColumnarBatch.bytes' is invalid or malformed to " +
         s"deserialize using ${this.getClass.getName}")
-    val bytes = new Array[Byte](length - 1) // -1 to restore
+    val bytes = new Array[Byte](length - 1)
     input.readBytes(bytes)
-    CachedColumnarBatch(numRows, sizeInBytes, bytes)
+    val hasStats = input.readBoolean()

Review Comment:
   Addressed in `491070bf34` (kryo-bounds-and-v1-back-compat). Thank you for 
the advice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to