This is an automated email from the ASF dual-hosted git repository.

yaooqinn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a897401d86 [VL] Catch corrupt stats frames per-batch in 
ColumnarCachedBatchSerializer (#12183)
a897401d86 is described below

commit a897401d866a263dba98856ce1b5d974a638b4b9
Author: Kent Yao <[email protected]>
AuthorDate: Fri May 29 16:29:17 2026 +0800

    [VL] Catch corrupt stats frames per-batch in ColumnarCachedBatchSerializer 
(#12183)
    
    * [VL] Extract serializeOneBatchWithStats helper from 
ColumnarCachedBatchSerializer
    
    Refactor only — no behavior change. Move the per-batch serializeWithStats 
fast
    path (with the UnsatisfiedLinkError → markStatsExtUnavailable + legacy 
fallback
    catch arm) out of the anonymous Iterator lambda in 
convertColumnarBatchToCachedBatch
    into a companion-object package-private method:
    
      ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
          jni, handle, numRows, structSchema, fallbackToLegacy)
    
    The legacy-only branch is extracted into a local def 
legacySerializeInline() so
    both the else branch and the fallback closure can call it without 
duplication.
    
    Sentinel suites (all green, no regression):
      ColumnarCachedBatchE2ESuite                  15/0
      ColumnarCachedBatchStatsBlobSuite             4/0
      ColumnarCachedBatchFramedBytesSuite           3/0
      ColumnarCachedBatchBuildFilterSuite           1/0
      ColumnarCachedBatchBuildFilterPruneSuite      3/0
      ColumnarCachedBatchKryoSuite                  6/0
      ColumnarCachedBatchIntFamilyMarshalSuite      8/0
      ColumnarCachedBatchKryoBoundaryProbeBugSuite  1/0
    
    This extraction creates a unit-test hook point for an upcoming change that
    widens the catch to handle corrupt/undecodable stats frames per-batch via
    NonFatal without tripping the JVM-lifetime capability latch.
    
    Generated-by: Claude claude-opus-4.7
    
    * [VL] Catch corrupt stats frames per-batch in ColumnarCachedBatchSerializer
    
    The V2 serialize-with-stats fast path used to catch only 
UnsatisfiedLinkError.
    Any IllegalArgumentException from parseFramedBytes (corrupt magic, truncated
    frame, statsLen/bytesLen mismatch) or KryoException from deserializeStats 
would
    bubble out and fail the entire Spark task. Since V2 is an opt-in 
optimization,
    a malformed frame for one cached batch should not be user-visible — the
    existing legacy serialize() fallback should absorb it just like an 
UnsatisfiedLinkError
    does.
    
    This change layers the catch:
    
      case e: UnsatisfiedLinkError =>
        // capability gone for the JVM lifetime; trip the latch
        markStatsExtUnavailable(e); fallbackToLegacy()
      case NonFatal(e) =>
        // per-batch corruption; do NOT trip the latch
        warnCorruptStatsFrame(e); fallbackToLegacy()
    
    The capability latch (statsExtAvailableFlag) is deliberately one-way for the
    JVM lifetime — a native-symbol mismatch isn't recoverable. Corrupt-frame 
events,
    in contrast, are per-batch and shouldn't poison the latch: the next batch
    retries the fast path.
    
    To keep log output bounded when a native regression produces malformed 
frames
    batch after batch, warnCorruptStatsFrame caps warnings at 100 per JVM with a
    final summary line.
    
    Test coverage (new ColumnarCachedBatchSerializerHelperSuite, 4 cases):
      - corrupt magic           -> fallback CachedBatch with stats=null
      - truncated framed bytes  -> fallback CachedBatch with stats=null
      - Kryo-corrupt statsBlob  -> fallback CachedBatch with stats=null
      - UnsatisfiedLinkError    -> still trips capability latch (regression)
    
    Also drops resetStatsExtAvailableForTesting() — it had no remaining callers
    after the new helper suite started resetting the flag via reflection 
directly.
    
    Sentinel suites (all green, no regression):
      ColumnarCachedBatchE2ESuite                  15/0
      ColumnarCachedBatchStatsBlobSuite             4/0
      ColumnarCachedBatchFramedBytesSuite           3/0
      ColumnarCachedBatchBuildFilterSuite           1/0
      ColumnarCachedBatchBuildFilterPruneSuite      3/0
      ColumnarCachedBatchKryoSuite                  6/0
      ColumnarCachedBatchIntFamilyMarshalSuite      8/0
      ColumnarCachedBatchKryoBoundaryProbeBugSuite  1/0
      ColumnarCachedBatchSerializerHelperSuite      4/0 (new)
    
    Generated-by: Claude claude-opus-4.7
---
 .../execution/ColumnarCachedBatchSerializer.scala  | 109 ++++++++++-----
 .../ColumnarCachedBatchSerializerHelperSuite.scala | 154 +++++++++++++++++++++
 2 files changed, 227 insertions(+), 36 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index 18e2d56bd1..e53dd18c2e 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -56,6 +56,8 @@ import java.nio.{ByteBuffer, ByteOrder}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Arrays
 
+import scala.util.control.NonFatal
+
 /**
  * A Velox columnar cache batch carrying per-partition column statistics.
  *
@@ -760,36 +762,7 @@ class ColumnarCachedBatchSerializer extends 
SimpleMetricsCachedBatchSerializer
           override def next(): CachedBatch = {
             val batch = veloxBatches.next()
             val handle = ColumnarBatches.getNativeHandle(backendName, batch)
-            // Route through serializeWithStats when the partition-stats conf 
is enabled and the
-            // JNI extension is linked in libgluten.so. Capability is detected 
lazily at the
-            // call site: a new Gluten jar paired with an older native library 
will throw
-            // UnsatisfiedLinkError on the first invocation; we catch it once, 
cache the
-            // result, and fall back to the legacy serialize() path emitting 
stats=null. The
-            // buildFilter wrapper directs such batches through without 
pruning.
-            if (partitionStatsEnabled && 
ColumnarCachedBatchSerializer.statsExtAvailable) {
-              try {
-                val framed = jni.serializeWithStats(handle)
-                val (stats, bytesBlob) =
-                  CachedColumnarBatchKryoSerializer.parseFramedBytes(framed, 
structSchema)
-                CachedColumnarBatch(
-                  batch.numRows(),
-                  bytesBlob.length,
-                  bytesBlob,
-                  stats,
-                  structSchema)
-              } catch {
-                case e: UnsatisfiedLinkError =>
-                  ColumnarCachedBatchSerializer.markStatsExtUnavailable(e)
-                  val unsafeBuffer = jni.serialize(handle)
-                  val bytes = unsafeBuffer.toByteArray
-                  CachedColumnarBatch(
-                    batch.numRows(),
-                    bytes.length,
-                    bytes,
-                    stats = null,
-                    schema = null)
-              }
-            } else {
+            def legacySerializeInline(): CachedBatch = {
               val unsafeBuffer = jni.serialize(handle)
               val bytes = unsafeBuffer.toByteArray
               CachedColumnarBatch(
@@ -799,6 +772,22 @@ class ColumnarCachedBatchSerializer extends 
SimpleMetricsCachedBatchSerializer
                 stats = null,
                 schema = null)
             }
+            // Route through serializeWithStats when the partition-stats conf 
is enabled and the
+            // JNI extension is linked in libgluten.so. Capability is detected 
lazily at the
+            // call site: a new Gluten jar paired with an older native library 
will throw
+            // UnsatisfiedLinkError on the first invocation; we catch it once, 
cache the
+            // result, and fall back to the legacy serialize() path emitting 
stats=null. The
+            // buildFilter wrapper directs such batches through without 
pruning.
+            if (partitionStatsEnabled && 
ColumnarCachedBatchSerializer.statsExtAvailable) {
+              ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
+                jni,
+                handle,
+                batch.numRows(),
+                structSchema,
+                () => legacySerializeInline())
+            } else {
+              legacySerializeInline()
+            }
           }
         }
     }
@@ -967,6 +956,60 @@ class ColumnarCachedBatchSerializer extends 
SimpleMetricsCachedBatchSerializer
 }
 
 object ColumnarCachedBatchSerializer extends Logging {
+  // Encapsulates the per-batch serializeWithStats fast path so the catch arms 
can
+  // be exercised in unit tests with a stubbed jni wrapper. Two-arm catch:
+  //   - UnsatisfiedLinkError trips the JVM-lifetime capability latch via
+  //     markStatsExtUnavailable (one-way; native symbol gone for the whole 
JVM).
+  //   - NonFatal absorbs per-batch corruption (corrupt magic, truncated frame,
+  //     Kryo decode failure) without tripping the latch; the next batch 
retries
+  //     the fast path. A separate counter (warnCorruptStatsFrame) caps the
+  //     warning floor so high-throughput workloads don't drown the executor 
log.
+  private[execution] def serializeOneBatchWithStats(
+      jni: ColumnarBatchSerializerJniWrapper,
+      handle: Long,
+      numRows: Int,
+      structSchema: StructType,
+      fallbackToLegacy: () => CachedBatch): CachedBatch = {
+    try {
+      val framed = jni.serializeWithStats(handle)
+      val (stats, bytesBlob) =
+        CachedColumnarBatchKryoSerializer.parseFramedBytes(framed, 
structSchema)
+      CachedColumnarBatch(numRows, bytesBlob.length, bytesBlob, stats, 
structSchema)
+    } catch {
+      case e: UnsatisfiedLinkError =>
+        markStatsExtUnavailable(e)
+        fallbackToLegacy()
+      case NonFatal(e) =>
+        warnCorruptStatsFrame(e)
+        fallbackToLegacy()
+    }
+  }
+
+  // Per-JVM cap on corrupt-frame warnings to avoid log flooding when a native
+  // regression produces malformed frames batch after batch. Capability latch 
is
+  // intentionally NOT tripped here: a corrupt frame is a per-batch event, not 
a
+  // capability loss, and the next batch should still attempt the fast path.
+  private val corruptFrameWarnCount = new 
java.util.concurrent.atomic.AtomicLong(0L)
+  private val CORRUPT_FRAME_WARN_CAP = 100L
+
+  def warnCorruptStatsFrame(cause: Throwable): Unit = {
+    val n = corruptFrameWarnCount.incrementAndGet()
+    if (n <= CORRUPT_FRAME_WARN_CAP) {
+      logWarning(
+        s"serializeWithStats produced a corrupt/undecodable frame for one 
cached batch; " +
+          s"falling back to legacy serialize() for this batch (stats=null). 
Capability " +
+          s"latch unchanged. [$n/$CORRUPT_FRAME_WARN_CAP]",
+        cause
+      )
+      if (n == CORRUPT_FRAME_WARN_CAP) {
+        logWarning(
+          s"Further corrupt-frame warnings suppressed for the JVM lifetime " +
+            s"(cap=$CORRUPT_FRAME_WARN_CAP reached). Capability latch remains 
active; " +
+            s"investigate native serializeWithStats output.")
+      }
+    }
+  }
+
   // Lazy capability flag for the serializeWithStats JNI symbol. A new Gluten 
jar paired with an
   // older libgluten.so will throw UnsatisfiedLinkError on the first 
invocation; the call site
   // catches it once via markStatsExtUnavailable() and we degrade to the 
legacy serialize() path
@@ -986,10 +1029,4 @@ object ColumnarCachedBatchSerializer extends Logging {
       )
     }
   }
-
-  // Visible for testing: reset the capability flag so a unit test can 
re-exercise the
-  // probe-once semantics.
-  private[execution] def resetStatsExtAvailableForTesting(): Unit = {
-    statsExtAvailableFlag = true
-  }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializerHelperSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializerHelperSuite.scala
new file mode 100644
index 0000000000..badc4324b1
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializerHelperSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper
+
+import org.apache.spark.sql.columnar.CachedBatch
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+import org.mockito.ArgumentMatchers.anyLong
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.funsuite.AnyFunSuite
+
+/**
+ * Unit tests for `ColumnarCachedBatchSerializer.serializeOneBatchWithStats`. 
Exercises the
+ * fast-path / fallback catch arms with a Mockito-stubbed JNI wrapper, without 
requiring a Velox
+ * runtime or native libraries.
+ */
+class ColumnarCachedBatchSerializerHelperSuite extends AnyFunSuite {
+
+  private val structSchema: StructType =
+    StructType(Seq(StructField("k", LongType, nullable = true)))
+
+  // A canned legacy CachedBatch that the fallback closure produces.
+  private val legacyCachedBytes: Array[Byte] = Array[Byte](7, 7, 7)
+
+  private def newFallbackProbe(): (() => CachedBatch, () => Boolean) = {
+    @volatile var called = false
+    val closure: () => CachedBatch = () => {
+      called = true
+      CachedColumnarBatch(1, legacyCachedBytes.length, legacyCachedBytes, 
null, null)
+    }
+    (closure, () => called)
+  }
+
+  test("corrupt magic frame is absorbed into legacy fallback (stats=null)") {
+    // 12 bytes: 4 bogus magic + 4-byte statsLen=0 + 4-byte bytesLen=0
+    val corruptFramed: Array[Byte] = Array[Byte](0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0)
+    val jni = mock(classOf[ColumnarBatchSerializerJniWrapper])
+    when(jni.serializeWithStats(anyLong())).thenReturn(corruptFramed)
+    val (fallback, wasCalled) = newFallbackProbe()
+    val cb = ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
+      jni,
+      0L,
+      1,
+      structSchema,
+      fallback)
+    assert(wasCalled(), "fallback closure should be invoked on corrupt magic")
+    assert(
+      cb.asInstanceOf[CachedColumnarBatch].stats == null,
+      "fallback CachedBatch should carry stats=null")
+  }
+
+  test("truncated framed bytes absorbed into legacy fallback") {
+    val truncated: Array[Byte] = Array[Byte](
+      0xfe.toByte,
+      0xca.toByte,
+      0x53.toByte,
+      0x02.toByte,
+      0,
+      0,
+      0,
+      0
+    ) // magic + statsLen only; no statsBlob, no bytesLen
+    val jni = mock(classOf[ColumnarBatchSerializerJniWrapper])
+    when(jni.serializeWithStats(anyLong())).thenReturn(truncated)
+    val (fallback, wasCalled) = newFallbackProbe()
+    val cb = ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
+      jni,
+      0L,
+      1,
+      structSchema,
+      fallback)
+    assert(wasCalled(), "fallback closure should be invoked on truncated 
frame")
+    assert(cb.asInstanceOf[CachedColumnarBatch].stats == null)
+  }
+
+  test("Kryo-corrupt statsBlob absorbed into legacy fallback") {
+    // Build a framed payload with valid magic, plausible statsLen=8, then 
random bytes that
+    // are not a valid Kryo InternalRow serialization. Kryo throws 
KryoException (RuntimeException
+    // subclass, so NonFatal) which the current ULE-only catch does not handle.
+    val statsBlob: Array[Byte] = Array.fill(8)(0xff.toByte) // garbage
+    val bytesBlob: Array[Byte] = Array[Byte](1, 2, 3)
+    val out = new java.io.ByteArrayOutputStream()
+    out.write(Array[Byte](0xfe.toByte, 0xca.toByte, 0x53.toByte, 0x02.toByte))
+    def writeU32LE(v: Int): Unit = {
+      out.write(v & 0xff)
+      out.write((v >>> 8) & 0xff)
+      out.write((v >>> 16) & 0xff)
+      out.write((v >>> 24) & 0xff)
+    }
+    writeU32LE(statsBlob.length)
+    out.write(statsBlob)
+    writeU32LE(bytesBlob.length)
+    out.write(bytesBlob)
+    val framed = out.toByteArray
+
+    val jni = mock(classOf[ColumnarBatchSerializerJniWrapper])
+    when(jni.serializeWithStats(anyLong())).thenReturn(framed)
+    val (fallback, wasCalled) = newFallbackProbe()
+    val cb = ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
+      jni,
+      0L,
+      1,
+      structSchema,
+      fallback)
+    assert(wasCalled(), "fallback closure should be invoked on Kryo-corrupt 
statsBlob")
+    assert(cb.asInstanceOf[CachedColumnarBatch].stats == null)
+  }
+
+  test("UnsatisfiedLinkError still trips capability latch (regression)") {
+    // Reset latch via reflection so this test doesn't depend on prior test 
ordering.
+    val flagField =
+      
ColumnarCachedBatchSerializer.getClass.getDeclaredField("statsExtAvailableFlag")
+    flagField.setAccessible(true)
+    flagField.setBoolean(ColumnarCachedBatchSerializer, true)
+    assert(
+      ColumnarCachedBatchSerializer.statsExtAvailable,
+      "precondition: capability latch must start true")
+
+    val jni = mock(classOf[ColumnarBatchSerializerJniWrapper])
+    when(jni.serializeWithStats(anyLong()))
+      .thenThrow(new UnsatisfiedLinkError("serializeWithStats (test 
injection)"))
+    val (fallback, wasCalled) = newFallbackProbe()
+    val cb = ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
+      jni,
+      0L,
+      1,
+      structSchema,
+      fallback)
+    assert(wasCalled(), "fallback closure should be invoked on ULE")
+    assert(cb.asInstanceOf[CachedColumnarBatch].stats == null)
+    assert(
+      !ColumnarCachedBatchSerializer.statsExtAvailable,
+      "ULE should trip the JVM-lifetime capability latch")
+
+    // Restore for subsequent tests / suites in the same JVM.
+    flagField.setBoolean(ColumnarCachedBatchSerializer, true)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to