This is an automated email from the ASF dual-hosted git repository.
yaooqinn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a897401d86 [VL] Catch corrupt stats frames per-batch in
ColumnarCachedBatchSerializer (#12183)
a897401d86 is described below
commit a897401d866a263dba98856ce1b5d974a638b4b9
Author: Kent Yao <[email protected]>
AuthorDate: Fri May 29 16:29:17 2026 +0800
[VL] Catch corrupt stats frames per-batch in ColumnarCachedBatchSerializer
(#12183)
* [VL] Extract serializeOneBatchWithStats helper from
ColumnarCachedBatchSerializer
Refactor only — no behavior change. Move the per-batch serializeWithStats
fast
path (with the UnsatisfiedLinkError → markStatsExtUnavailable + legacy
fallback
catch arm) out of the anonymous Iterator lambda in
convertColumnarBatchToCachedBatch
into a companion-object package-private method:
ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
jni, handle, numRows, structSchema, fallbackToLegacy)
The legacy-only branch is extracted into a local def
legacySerializeInline() so
both the else branch and the fallback closure can call it without
duplication.
Sentinel suites (all green, no regression):
ColumnarCachedBatchE2ESuite 15/0
ColumnarCachedBatchStatsBlobSuite 4/0
ColumnarCachedBatchFramedBytesSuite 3/0
ColumnarCachedBatchBuildFilterSuite 1/0
ColumnarCachedBatchBuildFilterPruneSuite 3/0
ColumnarCachedBatchKryoSuite 6/0
ColumnarCachedBatchIntFamilyMarshalSuite 8/0
ColumnarCachedBatchKryoBoundaryProbeBugSuite 1/0
This extraction creates a unit-test hook point for an upcoming change that
widens the catch to handle corrupt/undecodable stats frames per-batch via
NonFatal without tripping the JVM-lifetime capability latch.
Generated-by: Claude claude-opus-4.7
* [VL] Catch corrupt stats frames per-batch in ColumnarCachedBatchSerializer
The V2 serialize-with-stats fast path used to catch only
UnsatisfiedLinkError.
Any IllegalArgumentException from parseFramedBytes (corrupt magic, truncated
frame, statsLen/bytesLen mismatch) or KryoException from deserializeStats
would
bubble out and fail the entire Spark task. Since V2 is an opt-in
optimization,
a malformed frame for one cached batch should not be user-visible — the
existing legacy serialize() fallback should absorb it just like an
UnsatisfiedLinkError
does.
This change layers the catch:
case e: UnsatisfiedLinkError =>
// capability gone for the JVM lifetime; trip the latch
markStatsExtUnavailable(e); fallbackToLegacy()
case NonFatal(e) =>
// per-batch corruption; do NOT trip the latch
warnCorruptStatsFrame(e); fallbackToLegacy()
The capability latch (statsExtAvailableFlag) is deliberately one-way for the
JVM lifetime — a native-symbol mismatch isn't recoverable. Corrupt-frame
events,
in contrast, are per-batch and shouldn't poison the latch: the next batch
retries the fast path.
To keep log output bounded when a native regression produces malformed
frames
batch after batch, warnCorruptStatsFrame caps warnings at 100 per JVM with a
final summary line.
Test coverage (new ColumnarCachedBatchSerializerHelperSuite, 4 cases):
- corrupt magic -> fallback CachedBatch with stats=null
- truncated framed bytes -> fallback CachedBatch with stats=null
- Kryo-corrupt statsBlob -> fallback CachedBatch with stats=null
- UnsatisfiedLinkError -> still trips capability latch (regression)
Also drops resetStatsExtAvailableForTesting() — it had no remaining callers
after the new helper suite started resetting the flag via reflection
directly.
Sentinel suites (all green, no regression):
ColumnarCachedBatchE2ESuite 15/0
ColumnarCachedBatchStatsBlobSuite 4/0
ColumnarCachedBatchFramedBytesSuite 3/0
ColumnarCachedBatchBuildFilterSuite 1/0
ColumnarCachedBatchBuildFilterPruneSuite 3/0
ColumnarCachedBatchKryoSuite 6/0
ColumnarCachedBatchIntFamilyMarshalSuite 8/0
ColumnarCachedBatchKryoBoundaryProbeBugSuite 1/0
ColumnarCachedBatchSerializerHelperSuite 4/0 (new)
Generated-by: Claude claude-opus-4.7
---
.../execution/ColumnarCachedBatchSerializer.scala | 109 ++++++++++-----
.../ColumnarCachedBatchSerializerHelperSuite.scala | 154 +++++++++++++++++++++
2 files changed, 227 insertions(+), 36 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index 18e2d56bd1..e53dd18c2e 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -56,6 +56,8 @@ import java.nio.{ByteBuffer, ByteOrder}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Arrays
+import scala.util.control.NonFatal
+
/**
* A Velox columnar cache batch carrying per-partition column statistics.
*
@@ -760,36 +762,7 @@ class ColumnarCachedBatchSerializer extends
SimpleMetricsCachedBatchSerializer
override def next(): CachedBatch = {
val batch = veloxBatches.next()
val handle = ColumnarBatches.getNativeHandle(backendName, batch)
- // Route through serializeWithStats when the partition-stats conf
is enabled and the
- // JNI extension is linked in libgluten.so. Capability is detected
lazily at the
- // call site: a new Gluten jar paired with an older native library
will throw
- // UnsatisfiedLinkError on the first invocation; we catch it once,
cache the
- // result, and fall back to the legacy serialize() path emitting
stats=null. The
- // buildFilter wrapper directs such batches through without
pruning.
- if (partitionStatsEnabled &&
ColumnarCachedBatchSerializer.statsExtAvailable) {
- try {
- val framed = jni.serializeWithStats(handle)
- val (stats, bytesBlob) =
- CachedColumnarBatchKryoSerializer.parseFramedBytes(framed,
structSchema)
- CachedColumnarBatch(
- batch.numRows(),
- bytesBlob.length,
- bytesBlob,
- stats,
- structSchema)
- } catch {
- case e: UnsatisfiedLinkError =>
- ColumnarCachedBatchSerializer.markStatsExtUnavailable(e)
- val unsafeBuffer = jni.serialize(handle)
- val bytes = unsafeBuffer.toByteArray
- CachedColumnarBatch(
- batch.numRows(),
- bytes.length,
- bytes,
- stats = null,
- schema = null)
- }
- } else {
+ def legacySerializeInline(): CachedBatch = {
val unsafeBuffer = jni.serialize(handle)
val bytes = unsafeBuffer.toByteArray
CachedColumnarBatch(
@@ -799,6 +772,22 @@ class ColumnarCachedBatchSerializer extends
SimpleMetricsCachedBatchSerializer
stats = null,
schema = null)
}
+ // Route through serializeWithStats when the partition-stats conf
is enabled and the
+ // JNI extension is linked in libgluten.so. Capability is detected
lazily at the
+ // call site: a new Gluten jar paired with an older native library
will throw
+ // UnsatisfiedLinkError on the first invocation; we catch it once,
cache the
+ // result, and fall back to the legacy serialize() path emitting
stats=null. The
+ // buildFilter wrapper directs such batches through without
pruning.
+ if (partitionStatsEnabled &&
ColumnarCachedBatchSerializer.statsExtAvailable) {
+ ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
+ jni,
+ handle,
+ batch.numRows(),
+ structSchema,
+ () => legacySerializeInline())
+ } else {
+ legacySerializeInline()
+ }
}
}
}
@@ -967,6 +956,60 @@ class ColumnarCachedBatchSerializer extends
SimpleMetricsCachedBatchSerializer
}
object ColumnarCachedBatchSerializer extends Logging {
+ // Encapsulates the per-batch serializeWithStats fast path so the catch arms
can
+ // be exercised in unit tests with a stubbed jni wrapper. Two-arm catch:
+ // - UnsatisfiedLinkError trips the JVM-lifetime capability latch via
+ // markStatsExtUnavailable (one-way; native symbol gone for the whole
JVM).
+ // - NonFatal absorbs per-batch corruption (corrupt magic, truncated frame,
+ // Kryo decode failure) without tripping the latch; the next batch
retries
+ // the fast path. A separate counter (warnCorruptStatsFrame) caps the
+ // warning floor so high-throughput workloads don't drown the executor
log.
+ private[execution] def serializeOneBatchWithStats(
+ jni: ColumnarBatchSerializerJniWrapper,
+ handle: Long,
+ numRows: Int,
+ structSchema: StructType,
+ fallbackToLegacy: () => CachedBatch): CachedBatch = {
+ try {
+ val framed = jni.serializeWithStats(handle)
+ val (stats, bytesBlob) =
+ CachedColumnarBatchKryoSerializer.parseFramedBytes(framed,
structSchema)
+ CachedColumnarBatch(numRows, bytesBlob.length, bytesBlob, stats,
structSchema)
+ } catch {
+ case e: UnsatisfiedLinkError =>
+ markStatsExtUnavailable(e)
+ fallbackToLegacy()
+ case NonFatal(e) =>
+ warnCorruptStatsFrame(e)
+ fallbackToLegacy()
+ }
+ }
+
+ // Per-JVM cap on corrupt-frame warnings to avoid log flooding when a native
+ // regression produces malformed frames batch after batch. Capability latch
is
+ // intentionally NOT tripped here: a corrupt frame is a per-batch event, not
a
+ // capability loss, and the next batch should still attempt the fast path.
+ private val corruptFrameWarnCount = new
java.util.concurrent.atomic.AtomicLong(0L)
+ private val CORRUPT_FRAME_WARN_CAP = 100L
+
+ def warnCorruptStatsFrame(cause: Throwable): Unit = {
+ val n = corruptFrameWarnCount.incrementAndGet()
+ if (n <= CORRUPT_FRAME_WARN_CAP) {
+ logWarning(
+ s"serializeWithStats produced a corrupt/undecodable frame for one
cached batch; " +
+ s"falling back to legacy serialize() for this batch (stats=null).
Capability " +
+ s"latch unchanged. [$n/$CORRUPT_FRAME_WARN_CAP]",
+ cause
+ )
+ if (n == CORRUPT_FRAME_WARN_CAP) {
+ logWarning(
+ s"Further corrupt-frame warnings suppressed for the JVM lifetime " +
+ s"(cap=$CORRUPT_FRAME_WARN_CAP reached). Capability latch remains
active; " +
+ s"investigate native serializeWithStats output.")
+ }
+ }
+ }
+
// Lazy capability flag for the serializeWithStats JNI symbol. A new Gluten
jar paired with an
// older libgluten.so will throw UnsatisfiedLinkError on the first
invocation; the call site
// catches it once via markStatsExtUnavailable() and we degrade to the
legacy serialize() path
@@ -986,10 +1029,4 @@ object ColumnarCachedBatchSerializer extends Logging {
)
}
}
-
- // Visible for testing: reset the capability flag so a unit test can
re-exercise the
- // probe-once semantics.
- private[execution] def resetStatsExtAvailableForTesting(): Unit = {
- statsExtAvailableFlag = true
- }
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializerHelperSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializerHelperSuite.scala
new file mode 100644
index 0000000000..badc4324b1
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializerHelperSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.vectorized.ColumnarBatchSerializerJniWrapper
+
+import org.apache.spark.sql.columnar.CachedBatch
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+import org.mockito.ArgumentMatchers.anyLong
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.funsuite.AnyFunSuite
+
+/**
+ * Unit tests for `ColumnarCachedBatchSerializer.serializeOneBatchWithStats`.
Exercises the
+ * fast-path / fallback catch arms with a Mockito-stubbed JNI wrapper, without
requiring a Velox
+ * runtime or native libraries.
+ */
+class ColumnarCachedBatchSerializerHelperSuite extends AnyFunSuite {
+
+ private val structSchema: StructType =
+ StructType(Seq(StructField("k", LongType, nullable = true)))
+
+ // A canned legacy CachedBatch that the fallback closure produces.
+ private val legacyCachedBytes: Array[Byte] = Array[Byte](7, 7, 7)
+
+ private def newFallbackProbe(): (() => CachedBatch, () => Boolean) = {
+ @volatile var called = false
+ val closure: () => CachedBatch = () => {
+ called = true
+ CachedColumnarBatch(1, legacyCachedBytes.length, legacyCachedBytes,
null, null)
+ }
+ (closure, () => called)
+ }
+
+ test("corrupt magic frame is absorbed into legacy fallback (stats=null)") {
+ // 12 bytes: 4 bogus magic + 4-byte statsLen=0 + 4-byte bytesLen=0
+ val corruptFramed: Array[Byte] = Array[Byte](0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0)
+ val jni = mock(classOf[ColumnarBatchSerializerJniWrapper])
+ when(jni.serializeWithStats(anyLong())).thenReturn(corruptFramed)
+ val (fallback, wasCalled) = newFallbackProbe()
+ val cb = ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
+ jni,
+ 0L,
+ 1,
+ structSchema,
+ fallback)
+ assert(wasCalled(), "fallback closure should be invoked on corrupt magic")
+ assert(
+ cb.asInstanceOf[CachedColumnarBatch].stats == null,
+ "fallback CachedBatch should carry stats=null")
+ }
+
+ test("truncated framed bytes absorbed into legacy fallback") {
+ val truncated: Array[Byte] = Array[Byte](
+ 0xfe.toByte,
+ 0xca.toByte,
+ 0x53.toByte,
+ 0x02.toByte,
+ 0,
+ 0,
+ 0,
+ 0
+ ) // magic + statsLen only; no statsBlob, no bytesLen
+ val jni = mock(classOf[ColumnarBatchSerializerJniWrapper])
+ when(jni.serializeWithStats(anyLong())).thenReturn(truncated)
+ val (fallback, wasCalled) = newFallbackProbe()
+ val cb = ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
+ jni,
+ 0L,
+ 1,
+ structSchema,
+ fallback)
+ assert(wasCalled(), "fallback closure should be invoked on truncated
frame")
+ assert(cb.asInstanceOf[CachedColumnarBatch].stats == null)
+ }
+
+ test("Kryo-corrupt statsBlob absorbed into legacy fallback") {
+ // Build a framed payload with valid magic, plausible statsLen=8, then
random bytes that
+ // are not a valid Kryo InternalRow serialization. Kryo throws
KryoException (RuntimeException
+ // subclass, so NonFatal) which the current ULE-only catch does not handle.
+ val statsBlob: Array[Byte] = Array.fill(8)(0xff.toByte) // garbage
+ val bytesBlob: Array[Byte] = Array[Byte](1, 2, 3)
+ val out = new java.io.ByteArrayOutputStream()
+ out.write(Array[Byte](0xfe.toByte, 0xca.toByte, 0x53.toByte, 0x02.toByte))
+ def writeU32LE(v: Int): Unit = {
+ out.write(v & 0xff)
+ out.write((v >>> 8) & 0xff)
+ out.write((v >>> 16) & 0xff)
+ out.write((v >>> 24) & 0xff)
+ }
+ writeU32LE(statsBlob.length)
+ out.write(statsBlob)
+ writeU32LE(bytesBlob.length)
+ out.write(bytesBlob)
+ val framed = out.toByteArray
+
+ val jni = mock(classOf[ColumnarBatchSerializerJniWrapper])
+ when(jni.serializeWithStats(anyLong())).thenReturn(framed)
+ val (fallback, wasCalled) = newFallbackProbe()
+ val cb = ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
+ jni,
+ 0L,
+ 1,
+ structSchema,
+ fallback)
+ assert(wasCalled(), "fallback closure should be invoked on Kryo-corrupt
statsBlob")
+ assert(cb.asInstanceOf[CachedColumnarBatch].stats == null)
+ }
+
+ test("UnsatisfiedLinkError still trips capability latch (regression)") {
+ // Reset latch via reflection so this test doesn't depend on prior test
ordering.
+ val flagField =
+
ColumnarCachedBatchSerializer.getClass.getDeclaredField("statsExtAvailableFlag")
+ flagField.setAccessible(true)
+ flagField.setBoolean(ColumnarCachedBatchSerializer, true)
+ assert(
+ ColumnarCachedBatchSerializer.statsExtAvailable,
+ "precondition: capability latch must start true")
+
+ val jni = mock(classOf[ColumnarBatchSerializerJniWrapper])
+ when(jni.serializeWithStats(anyLong()))
+ .thenThrow(new UnsatisfiedLinkError("serializeWithStats (test
injection)"))
+ val (fallback, wasCalled) = newFallbackProbe()
+ val cb = ColumnarCachedBatchSerializer.serializeOneBatchWithStats(
+ jni,
+ 0L,
+ 1,
+ structSchema,
+ fallback)
+ assert(wasCalled(), "fallback closure should be invoked on ULE")
+ assert(cb.asInstanceOf[CachedColumnarBatch].stats == null)
+ assert(
+ !ColumnarCachedBatchSerializer.statsExtAvailable,
+ "ULE should trip the JVM-lifetime capability latch")
+
+ // Restore for subsequent tests / suites in the same JVM.
+ flagField.setBoolean(ColumnarCachedBatchSerializer, true)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]