yaooqinn commented on code in PR #12092:
URL: https://github.com/apache/gluten/pull/12092#discussion_r3247613389
##########
gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchSerializerJniWrapper.java:
##########
@@ -48,4 +54,27 @@ public long rtHandle() {
public native long deserializeDirect(long serializerHandle, long offset, int
len);
public native void close(long serializerHandle);
+
+ // Capability check for the serializeWithStats JNI extension. A new Gluten
jar paired with an
+ // older libgluten.so may lack this symbol; the cache write path consults
this helper and
+ // falls back to the legacy serialize() when false. Probing is lazy and
one-shot: reflection
+ // on the declared native method proves the JVM side is wired (the cpp
symbol is verified at
+ // first real invocation; callers must catch UnsatisfiedLinkError there too).
+ private static volatile Boolean supportsStatsExtCached = null;
+
+ public static boolean supportsStatsExt() {
+ Boolean cached = supportsStatsExtCached;
+ if (cached != null) {
+ return cached;
+ }
+ boolean result;
+ try {
+
ColumnarBatchSerializerJniWrapper.class.getDeclaredMethod("serializeWithStats",
long.class);
+ result = true;
+ } catch (NoSuchMethodException e) {
+ result = false;
+ }
+ supportsStatsExtCached = result;
+ return result;
Review Comment:
Addressed in `9145670865` (callsite-fallback). Thank you for the advice!
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -213,14 +665,40 @@ class ColumnarCachedBatchSerializer extends
CachedBatchSerializer with Logging {
override def next(): CachedBatch = {
val batch = veloxBatches.next()
- val unsafeBuffer = ColumnarBatchSerializerJniWrapper
- .create(
- Runtimes.contextInstance(
- BackendsApiManager.getBackendName,
- "ColumnarCachedBatchSerializer#serialize"))
-
.serialize(ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName,
batch))
- val bytes = unsafeBuffer.toByteArray
- CachedColumnarBatch(batch.numRows(), bytes.length, bytes)
+ val jni = ColumnarBatchSerializerJniWrapper.create(
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName,
+ "ColumnarCachedBatchSerializer#serialize"))
+ val handle =
+
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
+ // Route through serializeWithStats when the JNI extension is
available AND the
+ // partition-stats conf is enabled. Capability is cached after
first probe. When
+ // unavailable (older libgluten.so without the symbol, or conf
left off) we fall back
+ // to the original serialize() path and emit stats=null; the
buildFilter wrapper
+ // directs such batches through without pruning.
+ val partitionStatsEnabled =
+
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
+ if (partitionStatsEnabled &&
ColumnarBatchSerializerJniWrapper.supportsStatsExt()) {
+ val framed = jni.serializeWithStats(handle)
+ // Carry the per-batch StructType so the Kryo (spill / disk
cache) read path can
+ // dispatch by dataType.
+ val structSchema = StructType(
+ schema.map(
+ a =>
+ StructField(a.name, a.dataType, a.nullable)))
+ val (stats, bytesBlob) =
+ CachedColumnarBatchKryoSerializer.parseFramedBytes(framed,
structSchema)
+ CachedColumnarBatch(batch.numRows(), bytesBlob.length,
bytesBlob, stats, structSchema)
+ } else {
+ val unsafeBuffer = jni.serialize(handle)
+ val bytes = unsafeBuffer.toByteArray
+ CachedColumnarBatch(
+ batch.numRows(),
+ bytes.length,
+ bytes,
+ stats = null,
+ schema = null)
+ }
Review Comment:
Addressed in `9145670865` (callsite-fallback). Thank you for the advice!
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala:
##########
@@ -81,43 +133,443 @@ class CachedColumnarBatchKryoSerializer extends
KryoSerializer[CachedColumnarBat
length != Kryo.NULL,
"The object 'CachedColumnarBatch.bytes' is invalid or malformed to " +
s"deserialize using ${this.getClass.getName}")
- val bytes = new Array[Byte](length - 1) // -1 to restore
+ val bytes = new Array[Byte](length - 1)
input.readBytes(bytes)
- CachedColumnarBatch(numRows, sizeInBytes, bytes)
+ val hasStats = input.readBoolean()
+ // Even when hasStats=false we still consume the hasSchema tag to keep the
stream aligned.
+ // NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics
and the typed
+ // pattern match throws MatchError at runtime.
+ val statsAndSchema: (InternalRow, StructType) = if (hasStats) {
+ val statsLen = input.readInt()
+ val statsBytes = new Array[Byte](statsLen)
+ input.readBytes(statsBytes)
+ val sch = readOptionalSchema(input)
+ (CachedColumnarBatchKryoSerializer.deserializeStats(statsBytes, sch),
sch)
+ } else {
+ (null, readOptionalSchema(input))
+ }
+ CachedColumnarBatch(numRows, sizeInBytes, bytes, statsAndSchema._1,
statsAndSchema._2)
+ }
+
+ private def readOptionalSchema(input: Input): StructType = {
+ if (!input.readBoolean()) {
+ null
+ } else {
+ val schemaLen = input.readInt()
+ val schemaBytes = new Array[Byte](schemaLen)
+ input.readBytes(schemaBytes)
+ DataType.fromJson(new String(schemaBytes,
UTF_8)).asInstanceOf[StructType]
+ }
Review Comment:
Addressed in `491070bf34` (kryo-bounds-and-v1-back-compat). Thank you for
the advice!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]