This is an automated email from the ASF dual-hosted git repository.

yaooqinn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7bfd45ca98 [VL] Stop using Input.available() to probe trailing markers 
in CachedColumnarBatch (#12147)
7bfd45ca98 is described below

commit 7bfd45ca983eeedafc59fddb01e78ea2905cf816
Author: Kent Yao <[email protected]>
AuthorDate: Thu May 28 11:11:07 2026 +0800

    [VL] Stop using Input.available() to probe trailing markers in 
CachedColumnarBatch (#12147)
    
    ColumnarCachedBatchSerializer.read guarded the trailing hasStats /
    hasSchema booleans with `input.available() > 0` to tolerate the V1
    wire format that predates those markers. The intent was correct --
    the existing ColumnarCachedBatchKryoSuite#"V1 wire ..." test locks
    absent-trailing as silent null, and that contract must be preserved
    -- but `Input.available()` is the wrong probe.
    
    `Kryo Input.available()` returns
    `(limit - position) + underlyingStream.available()`, and the JDK
    `InputStream.available()` contract permits any implementation to
    return 0 even when more data follows -- BufferedInputStream over
    shuffle-spill / network chunk boundaries routinely does so. When the
    Kryo buffer is drained AND the underlying stream reports 0 at the
    trailing-boolean byte position, the probe falsely concludes EOF,
    skips hasStats, and the next `readClassAndObject` interprets the
    stats payload (which contains the schema JSON) as a class name --
    surfacing as `ClassNotFoundException: {"type":"struct",...}` with
    the stack topped by `DefaultClassResolver.readName`.
    
    Replace the probe with a try/readBoolean/catch on the narrow Kryo
    "Buffer underflow" surface. This catches the real EOF when the V1
    wire has no trailing booleans (preserves the silent-null contract)
    without ever consulting `available()`, so a V2 wire under
    chunked-fill always reads the trailing markers correctly.
    
    The catch is intentionally narrow (message-prefix match on
    "Buffer underflow") so that genuine corruption -- including
    ClassNotFoundException wrapped during readClassAndObject -- is
    never swallowed.
    
    The length-bound `require(... <= maxLen ...)` guard from commit
    491070bf34 (defending against NegativeArraySizeException /
    oversized allocation) is preserved -- that part is orthogonal to
    the V1 probe and remains useful.
    
    A new test ColumnarCachedBatchKryoBoundaryProbeBugSuite locks the
    chunked-fill probe contract: a 1-byte-per-read InputStream that
    returns `available() == 0` must still round-trip multi-batch V2
    wire correctly. The existing V1-wire silent-null test in
    ColumnarCachedBatchKryoSuite continues to pass unchanged.
---
 .../execution/ColumnarCachedBatchSerializer.scala  |  50 ++++++++--
 ...umnarCachedBatchKryoBoundaryProbeBugSuite.scala | 111 +++++++++++++++++++++
 2 files changed, 150 insertions(+), 11 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index 7bf7a17546..bb548b2f9c 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 
-import com.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => 
KryoSerializer}
 import com.esotericsoftware.kryo.DefaultSerializer
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.arrow.c.ArrowSchema
@@ -152,14 +152,30 @@ class CachedColumnarBatchKryoSerializer extends 
KryoSerializer[CachedColumnarBat
     )
     val bytes = new Array[Byte](payloadLen)
     input.readBytes(bytes)
-    // Backward-compat with the V1 wire format (no trailing hasStats / 
hasSchema booleans):
-    // legacy CachedColumnarBatch instances persisted on disk (DISK_ONLY / 
MEMORY_AND_DISK)
-    // surviving a rolling upgrade lack these fields. available() is 
best-effort -- treats
-    // unavailable suffix as "absent" instead of throwing KryoException.
-    val hasStats = input.available() > 0 && input.readBoolean()
-    // Even when hasStats=false we still consume the hasSchema tag to keep the 
stream aligned.
-    // NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics 
and the typed
-    // pattern match throws MatchError at runtime.
+    // Read the trailing hasStats marker. Catching a Buffer-underflow 
KryoException
+    // here preserves backward compatibility with the V1 wire format (no 
trailing
+    // hasStats / hasSchema booleans), which the existing
+    // ColumnarCachedBatchKryoSuite#"V1 wire ..." test locks as a contract:
+    // an absent trailing byte must read as null, not throw.
+    //
+    // Why a try/catch instead of `input.available() > 0 && readBoolean`:
+    // Kryo `Input.available()` returns `(limit - position) + 
underlyingStream.available()`,
+    // and the JDK `InputStream.available()` contract permits any 
implementation to
+    // return 0 even when more data follows -- BufferedInputStream over 
shuffle-spill
+    // / network chunk boundaries routinely does so. When the Kryo buffer is 
drained
+    // AND the underlying stream reports 0 at the trailing-boolean byte 
position, the
+    // probe falsely concludes EOF, skips hasStats, and the next 
readClassAndObject
+    // interprets the stats payload (which contains the schema JSON) as a 
class name --
+    // surfacing as `ClassNotFoundException: {"type":"struct",...}` with the 
stack
+    // topped by `DefaultClassResolver.readName`. A try/catch on the real EOF 
surface
+    // (Kryo "Buffer underflow") avoids the false-EOF probe while still 
tolerating
+    // V1 wire.
+    //
+    // NB: avoid `val (a: T, b: U) = ...` -- Scala 2.13 erases Tuple2 generics 
and the
+    // typed pattern match throws MatchError at runtime.
+    val hasStats =
+      try input.readBoolean()
+      catch { case e: KryoException if isBufferUnderflow(e) => false }
     val statsAndSchema: (InternalRow, StructType) = if (hasStats) {
       val statsLen = input.readInt()
       require(
@@ -177,9 +193,21 @@ class CachedColumnarBatchKryoSerializer extends 
KryoSerializer[CachedColumnarBat
     CachedColumnarBatch(numRows, sizeInBytes, bytes, statsAndSchema._1, 
statsAndSchema._2)
   }
 
+  // Kryo signals end-of-input by throwing KryoException with a message 
starting
+  // with "Buffer underflow". There is no dedicated subclass, so a 
message-prefix
+  // check is the narrowest filter we can apply without swallowing real 
corruption
+  // (e.g. ClassNotFoundException wrapped during readClassAndObject).
+  private def isBufferUnderflow(e: KryoException): Boolean = {
+    val msg = e.getMessage
+    msg != null && msg.startsWith("Buffer underflow")
+  }
+
   private def readOptionalSchema(input: Input, maxLen: Long): StructType = {
-    // Treat absent trailing bytes as "no schema": V1 wire format predates 
this field.
-    if (input.available() <= 0 || !input.readBoolean()) {
+    // Trailing schema marker. See readSchema above for the same 
V1-vs-chunked-fill rationale.
+    val hasSchema =
+      try input.readBoolean()
+      catch { case e: KryoException if isBufferUnderflow(e) => false }
+    if (!hasSchema) {
       null
     } else {
       val schemaLen = input.readInt()
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchKryoBoundaryProbeBugSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchKryoBoundaryProbeBugSuite.scala
new file mode 100644
index 0000000000..15bbcb3f84
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/ColumnarCachedBatchKryoBoundaryProbeBugSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import org.scalatest.funsuite.AnyFunSuite
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
+
+/**
+ * Deterministic repro for the L154/L180 Input.available() boolean-probe bug.
+ *
+ * Trigger conditions (all required):
+ *   (1) Multi-batch deserialize via kryo.readClassAndObject from one stream.
+ *   (2) Kryo Input wraps an InputStream (not byte[]).
+ *   (3) At a batch's trailing hasStats/hasSchema position, the underlying
+ *       InputStream returns available()=0 AND the Kryo Input buffer is drained
+ *       (limit==position). Both conditions must hit the SAME byte position.
+ *
+ * Real prod path observed in production:
+ *   BufferedInputStream over shuffle-spill / network ManagedBuffer chunk
+ *   boundary -> stream.available()=0 between chunks, Kryo Input.available()
+ *   = (limit-pos) + 0 -> reads 0 when buffer drained.
+ *
+ * Fixture: 1-byte-per-read stream + lying available()=0 -> every byte boundary
+ * satisfies (3); any trailing-boolean byte aligned with a Kryo refill triggers
+ * the false-EOF.
+ */
+class ColumnarCachedBatchKryoBoundaryProbeBugSuite extends AnyFunSuite {
+
+  final private class LyingOneByteStream(src: InputStream) extends InputStream 
{
+    override def read(): Int = src.read()
+    override def read(b: Array[Byte], off: Int, len: Int): Int = {
+      if (len == 0) 0
+      else {
+        val c = src.read()
+        if (c == -1) -1
+        else {
+          b(off) = c.toByte
+          1
+        }
+      }
+    }
+    override def available(): Int = 0
+  }
+
+  private def mkBatch(i: Int): CachedColumnarBatch = {
+    // PartitionStatistics per-column slots:
+    //   [lower(typed) upper(typed) count(Int) nullCount(Int) sizeBytes(Long)]
+    val stats: InternalRow =
+      new GenericInternalRow(Array[Any](i.toLong, (i * 10).toLong, i, 0, 8L))
+    val schema = StructType(Seq(StructField(s"col$i", LongType, nullable = 
true)))
+    val bytes = Array.fill[Byte](128)(i.toByte)
+    CachedColumnarBatch(
+      numRows = i,
+      sizeInBytes = bytes.length.toLong,
+      bytes = bytes,
+      stats = stats,
+      schema = schema)
+  }
+
+  test("multi-batch deserialize survives boundary-aligned trailing-boolean 
probe") {
+    val kryo = new Kryo()
+    val ser = new CachedColumnarBatchKryoSerializer()
+    kryo.register(classOf[CachedColumnarBatch], ser)
+
+    val baos = new ByteArrayOutputStream()
+    val out = new Output(baos)
+    val originals = (1 to 10).map(mkBatch)
+    originals.foreach(b => kryo.writeClassAndObject(out, b))
+    out.close()
+
+    val raw = baos.toByteArray
+    val in = new Input(new LyingOneByteStream(new ByteArrayInputStream(raw)), 
32)
+
+    val read = (1 to 10).map(_ => 
kryo.readClassAndObject(in).asInstanceOf[CachedColumnarBatch])
+    in.close()
+
+    originals.zip(read).zipWithIndex.foreach {
+      case ((o, r), i) =>
+        info(s"batch $i: orig.stats=${o.stats != null} schema=${o.schema}")
+        info(s"batch $i: read.stats=${r.stats != null} schema=${r.schema}")
+        assert(r.numRows == o.numRows, s"batch $i numRows mismatch")
+        assert(r.bytes.toSeq == o.bytes.toSeq, s"batch $i bytes mismatch")
+        assert(r.stats != null, s"batch $i stats lost (BUG)")
+        assert(r.schema == o.schema, s"batch $i schema mismatch (BUG)")
+    }
+  }
+
+  // V1 wire backward-compat is locked by ColumnarCachedBatchKryoSuite#"V1 
wire ..."
+  // -- not duplicated here. This suite only covers the chunked-fill probe 
path.
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to