This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9565926dfda9 feat(blob): default blob.inline.mode to DESCRIPTOR for
Lance (#18744)
9565926dfda9 is described below
commit 9565926dfda9ffdc37c1811ebe0d3e46fe8e173a
Author: voonhous <[email protected]>
AuthorDate: Wed May 20 15:46:33 2026 +0800
feat(blob): default blob.inline.mode to DESCRIPTOR for Lance (#18744)
Co-authored-by: Rahil Chertara <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/common/config/HoodieReaderConfig.java | 10 +-
.../spark/sql/hudi/blob/BatchedBlobReader.scala | 35 +-
.../spark/sql/hudi/blob/ScalarFunctions.scala | 4 +
.../hudi/functional/TestLanceDataSource.scala | 479 ++++++++++++++++++++-
rfc/rfc-100/rfc-100.md | 103 +++--
5 files changed, 561 insertions(+), 70 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index 942d1aeabb50..9cbab8f4468c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -107,13 +107,13 @@ public class HoodieReaderConfig extends HoodieConfig {
public static final String BLOB_INLINE_READ_MODE_DESCRIPTOR = "DESCRIPTOR";
public static final ConfigProperty<String> BLOB_INLINE_READ_MODE =
ConfigProperty
.key("hoodie.read.blob.inline.mode")
- .defaultValue(BLOB_INLINE_READ_MODE_CONTENT)
+ .defaultValue(BLOB_INLINE_READ_MODE_DESCRIPTOR)
.markAdvanced()
.sinceVersion("1.2.0")
.withValidValues(BLOB_INLINE_READ_MODE_CONTENT,
BLOB_INLINE_READ_MODE_DESCRIPTOR)
.withDocumentation("How Hudi interprets INLINE BLOB values on read. "
- + "CONTENT (default) returns the raw inline bytes. "
- + "DESCRIPTOR returns an OUT_OF_LINE-shaped reference pointing at
the backing "
- + "Lance file with the INLINE payload's position and size, so
callers can defer "
- + "the byte read via read_blob().");
+ + "DESCRIPTOR (default) returns an OUT_OF_LINE-shaped reference
pointing at the "
+ + "backing Lance file with the INLINE payload's position and size,
so callers can "
+ + "skip the byte content read. "
+ + "CONTENT returns the raw inline bytes directly in the data field
on every read.");
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala
index a1c299cf2611..419fb2c8a8d5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/BatchedBlobReader.scala
@@ -208,16 +208,31 @@ class BatchedBlobReader(
// Dispatch based on storage_type (field 0)
val storageType = accessor.getString(blobStruct, 0)
if (storageType == HoodieSchema.Blob.INLINE) {
- // Case 1: Inline — bytes are in field 1
- val bytes = accessor.getBytes(blobStruct, 1)
- batch += RowInfo[R](
- originalRow = row,
- filePath = "",
- offset = -1,
- length = -1,
- index = rowIndex,
- inlineBytes = Some(bytes)
- )
+ // INLINE + CONTENT: inline_data is populated; return bytes
directly (1-hop).
+ // INLINE + DESCRIPTOR: inline_data is null and the scan
synthesized a
+ // reference pointing into the backing file's storage layout. We
refuse to
+ // materialize bytes here — DESCRIPTOR is a metadata-only mode
for INLINE
+ // rows, and the synthesized reference is an internal pointer,
not
+ // user-facing storage info. Callers must switch to CONTENT mode
or stop
+ // using read_blob() on INLINE columns under DESCRIPTOR.
+ if (!accessor.isNullAt(blobStruct, 1)) {
+ val bytes = accessor.getBytes(blobStruct, 1)
+ batch += RowInfo[R](
+ originalRow = row,
+ filePath = "",
+ offset = -1,
+ length = -1,
+ index = rowIndex,
+ inlineBytes = Some(bytes)
+ )
+ } else {
+ throw new IllegalStateException(
+ s"read_blob() cannot materialize bytes for an INLINE blob
under " +
+ s"DESCRIPTOR mode. Under
hoodie.read.blob.inline.mode=DESCRIPTOR, " +
+ s"INLINE blobs are returned as metadata-only
(inline_data=NULL, " +
+ s"synthesized reference). To read bytes, set " +
+ s"hoodie.read.blob.inline.mode=CONTENT")
+ }
} else if (storageType == HoodieSchema.Blob.OUT_OF_LINE) {
// Case 2 or 3: Out-of-line — get reference struct (field 2)
require(!accessor.isNullAt(blobStruct, 2), s"Out-of-line blob at
row $rowIndex must set reference")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala
index bf94b4519c33..56e7b3bac096 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/blob/ScalarFunctions.scala
@@ -81,6 +81,10 @@ object ScalarFunctions {
|Returns:
| Binary data read from the file
|
+ |Caveat:
+ | Throws on INLINE rows under
hoodie.read.blob.inline.mode=DESCRIPTOR.
+ | Set CONTENT mode to materialize INLINE bytes.
+ |
|Performance:
| - Configure batching: hoodie.blob.batching.max.gap.bytes (default
4096)
| - Configure lookahead: hoodie.blob.batching.lookahead.size
(default 50)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
index 093b6ee30873..abce02568633 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -37,9 +37,10 @@ import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.types._
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
-import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals,
assertFalse, assertNotNull, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals,
assertFalse, assertNotNull, assertThrows, assertTrue}
import org.junit.jupiter.api.condition.DisabledIfSystemProperty
+import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource}
import org.lance.file.LanceFileReader
@@ -869,8 +870,11 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
// Writer-side: prove the bytes actually routed through Lance's dedicated
blob writer.
assertLanceBlobEncoding(tablePath)
- // Reader-side: in CONTENT mode the INLINE bytes come back directly in
`data`.
- val readRows = spark.read.format("hudi").load(tablePath)
+ // Reader-side: in CONTENT mode the INLINE bytes come back directly in
`data`. Set the mode
+ // explicitly — the default is DESCRIPTOR, which would surface a reference
instead.
+ val readRows = spark.read.format("hudi")
+ .option("hoodie.read.blob.inline.mode", "CONTENT")
+ .load(tablePath)
.select($"id", $"payload")
.orderBy($"id")
.collect()
@@ -894,9 +898,13 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
}
}
- // read_blob() resolution path: INLINE payloads resolve to the same bytes.
+ // read_blob() resolution path: INLINE payloads resolve to the same bytes.
CONTENT is set
+ // explicitly here — under the DESCRIPTOR default, read_blob() throws for
INLINE rows.
val viewName = s"${tableName}_view"
- spark.read.format("hudi").load(tablePath).createOrReplaceTempView(viewName)
+ spark.read.format("hudi")
+ .option("hoodie.read.blob.inline.mode", "CONTENT")
+ .load(tablePath)
+ .createOrReplaceTempView(viewName)
val materialized = spark.sql(
s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY
id").collect()
assertEquals(numRows, materialized.length)
@@ -917,8 +925,10 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
* DESCRIPTOR mode on INLINE rows: user writes `data` bytes; on read with
* `hoodie.read.blob.inline.mode=DESCRIPTOR` each row comes back with type
still set to
* {@code INLINE} (preserving the original storage mode) but with {@code
data=null} and a
- * populated {@code reference} pointing at the Lance file. {@code
read_blob()} then preads
- * the bytes back from the .lance file via the reference.
+ * populated synthesized {@code reference} pointing at the Lance file. The
synthesized
+ * reference is an internal pointer, not user-facing storage. {@code
read_blob()} is
+ * therefore unsupported on INLINE rows in this mode and must throw a clear
error so
+ * callers don't conflate the synthesized pointer with durable metadata.
*/
@ParameterizedTest
@EnumSource(value = classOf[HoodieTableType])
@@ -978,17 +988,458 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
s"synthetic reference to the Lance file should be flagged managed
(id=$i)")
}
- // read_blob() materializes bytes via BatchedBlobReader, which always
reads with CONTENT
- // mode (actual bytes) regardless of the user's inline read mode setting.
+ // read_blob() on INLINE rows under DESCRIPTOR mode is unsupported by
design: DESCRIPTOR
+ // is metadata-only and the synthesized reference is an internal pointer
into the .lance
+ // file's storage layout, not user-facing metadata. BatchedBlobReader must
throw with a
+ // message that names both INLINE and DESCRIPTOR so the failure is
actionable.
val viewName = s"${tableName}_view"
- spark.read.format("hudi").load(tablePath).createOrReplaceTempView(viewName)
- val materialized = spark.sql(
+ spark.read.format("hudi")
+ .option(modeKey, "DESCRIPTOR")
+ .load(tablePath)
+ .createOrReplaceTempView(viewName)
+ val ex = assertThrows(classOf[Throwable], new Executable {
+ override def execute(): Unit = {
+ spark.sql(s"SELECT id, read_blob(payload) AS bytes FROM $viewName
ORDER BY id").collect()
+ }
+ })
+ val msgChain = Iterator.iterate[Throwable](ex)(_.getCause).takeWhile(_ !=
null)
+ .flatMap(t => Option(t.getMessage)).mkString(" | ")
+ assertTrue(msgChain.contains("INLINE") && msgChain.contains("DESCRIPTOR"),
+ s"error must mention INLINE and DESCRIPTOR; got: $msgChain")
+ }
+
+ /**
+ * Mixed-storage table on Lance: one blob column holds both INLINE rows
(small payloads
+ * stored inline) and OUT_OF_LINE rows (external file references). Under
CONTENT mode,
+ * read_blob() must materialize the correct bytes for both shapes in a
single query —
+ * INLINE rows go through the 1-hop inline_data passthrough, OUT_OF_LINE
rows go through
+ * the external pread (with BatchedBlobReader merging consecutive ranges).
+ */
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testBlobMixedInlineAndOutOfLineContentMode(tableType: HoodieTableType):
Unit = {
+ val tableName =
s"test_lance_blob_mixed_content_${tableType.name().toLowerCase}"
+ val tablePath = s"$basePath/$tableName"
+
+ val payloadLen = 256
+ val numInline = 3
+ val numOutOfLine = 3
+ val externalFileSize = numOutOfLine * payloadLen
+ val externalDir = Files.createDirectories(
+ Paths.get(s"$basePath/_blob_ext_mixed_${tableType.name().toLowerCase}"))
+ val extPath = BlobTestHelpers.createTestFile(externalDir,
"mixed_file.bin", externalFileSize)
+
+ val inlinePayloads: Seq[Array[Byte]] = (0 until numInline).map { i =>
+ (0 until payloadLen).map(j => ((i + j) % 256).toByte).toArray
+ }
+
+ val sparkSess = spark
+ import sparkSess.implicits._
+ val inlineDf = inlinePayloads.zipWithIndex.map { case (b, i) => (i, b) }
+ .toDF("id", "bytes")
+ .select($"id", BlobTestHelpers.inlineBlobStructCol("payload", $"bytes"))
+
+ val outOfLineDf = (0 until numOutOfLine).map { k =>
+ (numInline + k, extPath, (k * payloadLen).toLong, payloadLen.toLong)
+ }.toDF("id", "path", "offset", "length")
+ .select($"id", BlobTestHelpers.blobStructCol("payload", $"path",
$"offset", $"length"))
+
+ val canonicalSchema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("payload", BlobType().asInstanceOf[StructType], nullable =
true,
+ BlobTestHelpers.blobMetadata)
+ ))
+ val raw = inlineDf.unionByName(outOfLineDf)
+ val df = spark.createDataFrame(raw.rdd, canonicalSchema)
+
+ writeDataframe(tableType, tableName, tablePath, df, saveMode =
SaveMode.Overwrite,
+ operation = Some("bulk_insert"),
+ extraOptions = Map(PRECOMBINE_FIELD.key() -> "id"))
+
+ assertLanceBlobEncoding(tablePath)
+
+ val viewName = s"${tableName}_mixed_view"
+ spark.read.format("hudi")
+ .option("hoodie.read.blob.inline.mode", "CONTENT")
+ .load(tablePath)
+ .createOrReplaceTempView(viewName)
+
+ val rows = spark.sql(
s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY
id").collect()
+ assertEquals(numInline + numOutOfLine, rows.length)
+
+ rows.foreach { row =>
+ val id = row.getInt(row.fieldIndex("id"))
+ val bytes = row.getAs[Array[Byte]]("bytes")
+ if (id < numInline) {
+ assertArrayEquals(inlinePayloads(id), bytes,
+ s"INLINE row: read_blob() bytes mismatch (id=$id)")
+ } else {
+ val k = id - numInline
+ assertEquals(payloadLen, bytes.length,
+ s"OUT_OF_LINE row: read_blob() length mismatch (id=$id)")
+ BlobTestHelpers.assertBytesContent(bytes, expectedOffset = k *
payloadLen)
+ }
+ }
+ }
+
+ /**
+ * Shared writer for multi-blob-column INLINE tests: writes a table with two
INLINE blob
+ * columns ({@code payload_a}, {@code payload_b}) and returns the table path
plus the
+ * payloads written for each column so individual tests can assert on read.
+ *
+ * Distinct byte patterns are used per column so a column-swap regression
would surface
+ * immediately as a byte mismatch rather than silently passing.
+ */
+ private def writeMultiBlobInlineTable(
+ tableType: HoodieTableType,
+ tableName: String,
+ numRows: Int = 4,
+ payloadLen: Int = 512): (String, Seq[Array[Byte]], Seq[Array[Byte]]) = {
+ val tablePath = s"$basePath/$tableName"
+ val payloadsA: Seq[Array[Byte]] = (0 until numRows).map { i =>
+ (0 until payloadLen).map(j => ((i + j) % 256).toByte).toArray
+ }
+ val payloadsB: Seq[Array[Byte]] = (0 until numRows).map { i =>
+ (0 until payloadLen).map(j => ((i + j + 128) % 256).toByte).toArray
+ }
+ val sparkSess = spark
+ import sparkSess.implicits._
+
+ val baseDf = (0 until numRows).map(i => (i, payloadsA(i), payloadsB(i)))
+ .toDF("id", "bytes_a", "bytes_b")
+ val rawDf = baseDf.select(
+ $"id",
+ BlobTestHelpers.inlineBlobStructCol("payload_a", $"bytes_a"),
+ BlobTestHelpers.inlineBlobStructCol("payload_b", $"bytes_b"))
+ val canonicalSchema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("payload_a", BlobType().asInstanceOf[StructType], nullable =
true,
+ BlobTestHelpers.blobMetadata),
+ StructField("payload_b", BlobType().asInstanceOf[StructType], nullable =
true,
+ BlobTestHelpers.blobMetadata)
+ ))
+ val df = spark.createDataFrame(rawDf.rdd, canonicalSchema)
+
+ writeDataframe(tableType, tableName, tablePath, df, saveMode =
SaveMode.Overwrite,
+ operation = Some("bulk_insert"),
+ extraOptions = Map(PRECOMBINE_FIELD.key() -> "id"))
+ assertLanceBlobEncoding(tablePath)
+ (tablePath, payloadsA, payloadsB)
+ }
+
+ /**
+ * Plain struct projection across two INLINE blob columns: {@code SELECT
payload_a, payload_b
+ * FROM table}. The single-column shape is already pinned by {@code
testBlobInlineRoundTrip}
+ * (CONTENT) and {@code testBlobInlineDescriptorMode} (DESCRIPTOR); this
test only asserts the
+ * per-column-independence properties that are unique to the multi-column
case:
+ *
+ * - CONTENT: each column's {@code data} carries its own written bytes
(distinct byte
+ * patterns per column rule out cross-column aliasing).
+ * - DESCRIPTOR (default): both columns independently get {@code
data=null} and a populated
+ * synthesized {@code reference} — i.e. the synthesis fires per-column,
not just on the
+ * first blob column.
+ */
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testBlobInlineMultipleColumnsPlainSelect(tableType: HoodieTableType):
Unit = {
+ val tableName =
s"test_lance_blob_multi_plain_${tableType.name().toLowerCase}"
+ val payloadLen = 512
+ val numRows = 4
+ val (tablePath, payloadsA, payloadsB) = writeMultiBlobInlineTable(
+ tableType, tableName, numRows, payloadLen)
+ val sparkSess = spark
+ import sparkSess.implicits._
+ val modeKey = "hoodie.read.blob.inline.mode"
+
+ // CONTENT: per-column bytes must not alias — payload_a carries payloadsA,
payload_b carries
+ // payloadsB. The byte patterns differ by construction (see
writeMultiBlobInlineTable).
+ val contentRows = spark.read.format("hudi")
+ .option(modeKey, "CONTENT")
+ .load(tablePath)
+ .select($"id", $"payload_a", $"payload_b")
+ .orderBy($"id")
+ .collect()
+ assertEquals(numRows, contentRows.length)
+ contentRows.zipWithIndex.foreach { case (row, i) =>
+ val a = row.getStruct(row.fieldIndex("payload_a"))
+ val b = row.getStruct(row.fieldIndex("payload_b"))
+ assertArrayEquals(payloadsA(i),
a.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD),
+ s"payload_a: bytes must match written payloadsA under CONTENT (id=$i)")
+ assertArrayEquals(payloadsB(i),
b.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD),
+ s"payload_b: bytes must match written payloadsB under CONTENT (id=$i)")
+ }
+
+ // DESCRIPTOR (default): descriptor synthesis must fire per-column.
data=null and a
+ // populated reference on BOTH columns is the property that distinguishes
this from the
+ // single-column DESCRIPTOR test.
+ val descRows = spark.read.format("hudi")
+ .load(tablePath)
+ .select($"id", $"payload_a", $"payload_b")
+ .orderBy($"id")
+ .collect()
+ assertEquals(numRows, descRows.length)
+ descRows.foreach { row =>
+ val id = row.getInt(row.fieldIndex("id"))
+ Seq("payload_a", "payload_b").foreach { col =>
+ val payload = row.getStruct(row.fieldIndex(col))
+
assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)),
+ s"$col: data should be null under DESCRIPTOR (id=$id)")
+
assertNotNull(payload.getStruct(payload.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)),
+ s"$col: reference should be populated under DESCRIPTOR (id=$id)")
+ }
+ }
+ }
+
+ /**
+ * Materializing both INLINE blob columns via {@code read_blob()} in a
single query under
+ * CONTENT mode: {@code SELECT read_blob(payload_a), read_blob(payload_b)
FROM table}. Each
+ * column must resolve via the 1-hop {@code inline_data} passthrough with
its own bytes —
+ * distinct byte patterns per column would surface a cross-column aliasing
regression.
+ *
+ * The DESCRIPTOR-mode failure path for {@code read_blob()} on INLINE rows
is pinned by
+ * {@code testBlobInlineDescriptorMode} (single column) and {@code
+ * testBlobInlineMultipleColumnsMixedSelect} (one read_blob + one struct
projection); it is
+ * not re-asserted here.
+ */
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testBlobInlineMultipleColumnsReadBlobAll(tableType: HoodieTableType):
Unit = {
+ val tableName =
s"test_lance_blob_multi_readblob_${tableType.name().toLowerCase}"
+ val numRows = 4
+ val (tablePath, payloadsA, payloadsB) = writeMultiBlobInlineTable(
+ tableType, tableName, numRows)
+ val modeKey = "hoodie.read.blob.inline.mode"
+
+ val contentView = s"${tableName}_content_view"
+ spark.read.format("hudi")
+ .option(modeKey, "CONTENT")
+ .load(tablePath)
+ .createOrReplaceTempView(contentView)
+ val materialized = spark.sql(
+ s"SELECT id, read_blob(payload_a) AS bytes_a, read_blob(payload_b) AS
bytes_b " +
+ s"FROM $contentView ORDER BY id").collect()
assertEquals(numRows, materialized.length)
materialized.zipWithIndex.foreach { case (row, i) =>
+ assertEquals(i, row.getInt(row.fieldIndex("id")))
+ assertArrayEquals(payloadsA(i), row.getAs[Array[Byte]]("bytes_a"),
+ s"read_blob(payload_a) should match under CONTENT (id=$i)")
+ assertArrayEquals(payloadsB(i), row.getAs[Array[Byte]]("bytes_b"),
+ s"read_blob(payload_b) should match under CONTENT (id=$i)")
+ }
+ }
+
+ /**
+ * Mixed projection across two INLINE blob columns: {@code SELECT
read_blob(payload_a),
+ * payload_b FROM table}. One column is materialized via {@code
read_blob()}, the other is
+ * left as a struct. This is the case explicitly raised in PR review — under
the DESCRIPTOR
+ * default, a mixed query asking for bytes on one column and a pointer on
another must fail
+ * loudly rather than silently returning one materialized and one
synthesized shape.
+ *
+ * - CONTENT: {@code payload_a} resolves to bytes (1-hop), {@code
payload_b} comes back as
+ * the same content-shape struct as {@code
testBlobInlineMultipleColumnsPlainSelect}
+ * (data=bytes, reference present-but-empty). Pinning both shapes in the
same row
+ * confirms the projection doesn't bleed across columns.
+ * - DESCRIPTOR (default): the {@code read_blob()} call on {@code
payload_a} still hits
+ * the INLINE+DESCRIPTOR branch even though {@code payload_b} is only
being projected as
+ * a struct. {@code payload_b}'s shape doesn't soften the failure.
+ */
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testBlobInlineMultipleColumnsMixedSelect(tableType: HoodieTableType):
Unit = {
+ val tableName =
s"test_lance_blob_multi_mixed_${tableType.name().toLowerCase}"
+ val numRows = 4
+ val (tablePath, payloadsA, payloadsB) = writeMultiBlobInlineTable(
+ tableType, tableName, numRows)
+ val modeKey = "hoodie.read.blob.inline.mode"
+
+ // CONTENT: read_blob() materializes payload_a; payload_b returned as
content-shape struct.
+ val contentView = s"${tableName}_content_view"
+ spark.read.format("hudi")
+ .option(modeKey, "CONTENT")
+ .load(tablePath)
+ .createOrReplaceTempView(contentView)
+ val rows = spark.sql(
+ s"SELECT id, read_blob(payload_a) AS bytes_a, payload_b " +
+ s"FROM $contentView ORDER BY id").collect()
+ assertEquals(numRows, rows.length)
+ rows.zipWithIndex.foreach { case (row, i) =>
+ assertEquals(i, row.getInt(row.fieldIndex("id")))
+ assertArrayEquals(payloadsA(i), row.getAs[Array[Byte]]("bytes_a"),
+ s"read_blob(payload_a) should return bytes under CONTENT (id=$i)")
+ val payloadB = row.getStruct(row.fieldIndex("payload_b"))
+ assertEquals(HoodieSchema.Blob.INLINE,
+ payloadB.getString(payloadB.fieldIndex(HoodieSchema.Blob.TYPE)),
+ s"payload_b: type should remain INLINE under CONTENT (id=$i)")
+ assertArrayEquals(payloadsB(i),
+ payloadB.getAs[Array[Byte]](HoodieSchema.Blob.INLINE_DATA_FIELD),
+ s"payload_b: data should match written bytes under CONTENT (id=$i)")
+ val refIdx = payloadB.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)
+ if (!payloadB.isNullAt(refIdx)) {
+ val ref = payloadB.getStruct(refIdx)
+
assertTrue(ref.isNullAt(ref.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE_PATH)),
+ s"payload_b: reference.external_path should be null under CONTENT
(id=$i)")
+ }
+ }
+
+ // DESCRIPTOR (default): read_blob(payload_a) trips even though payload_b
is just a struct.
+ val descView = s"${tableName}_desc_view"
+ spark.read.format("hudi")
+ .load(tablePath)
+ .createOrReplaceTempView(descView)
+ val ex = assertThrows(classOf[Throwable], new Executable {
+ override def execute(): Unit = {
+ spark.sql(
+ s"SELECT id, read_blob(payload_a) AS bytes_a, payload_b " +
+ s"FROM $descView ORDER BY id").collect()
+ }
+ })
+ val msgChain = Iterator.iterate[Throwable](ex)(_.getCause).takeWhile(_ !=
null)
+ .flatMap(t => Option(t.getMessage)).mkString(" | ")
+ assertTrue(msgChain.contains("INLINE") && msgChain.contains("DESCRIPTOR"),
+ s"read_blob(payload_a) under DESCRIPTOR must throw INLINE+DESCRIPTOR
error even when " +
+ s"mixed with a struct projection of another blob column; got:
$msgChain")
+ }
+
+ /**
+ * Compaction must preserve INLINE blob bytes under the DESCRIPTOR default.
MOR compaction reads
+ * the base file via {@link HoodieSparkLanceReader}, which hard-pins CONTENT
regardless of the
+ * user-facing {@code hoodie.read.blob.inline.mode}. If that pin were to
honor the default
+ * (DESCRIPTOR), compaction would read null {@code data} and rewrite a base
file without bytes,
+ * silently corrupting untouched rows. This test inserts INLINE blobs,
upserts a subset to force
+ * compaction, and asserts that touched rows carry the new bytes while
untouched rows retain the
+ * originals.
+ */
+ @Test
+ def testBlobInlineCompactionRoundTrip(): Unit = {
+ val tableType = HoodieTableType.MERGE_ON_READ
+ val tableName = "test_lance_blob_inline_compact_mor"
+ val tablePath = s"$basePath/$tableName"
+
+ val payloadLen = 1024
+ val numRows = 6
+ val initialPayloads: Seq[Array[Byte]] = (0 until numRows).map { i =>
+ (0 until payloadLen).map(j => ((i + j) % 256).toByte).toArray
+ }
+ val sparkSess = spark
+ import sparkSess.implicits._
+
+ val canonicalSchema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("payload", BlobType().asInstanceOf[StructType], nullable =
true,
+ BlobTestHelpers.blobMetadata)
+ ))
+ def asInlineDf(idToBytes: Seq[(Int, Array[Byte])]): DataFrame = {
+ val rawDf = idToBytes.toDF("id", "bytes")
+ .select($"id", BlobTestHelpers.inlineBlobStructCol("payload",
$"bytes"))
+ spark.createDataFrame(rawDf.rdd, canonicalSchema)
+ }
+
+ // First commit: bulk_insert ids 0..5 with the initial pattern. Lands in a
base file.
+ writeDataframe(tableType, tableName, tablePath,
+ asInlineDf(initialPayloads.zipWithIndex.map { case (b, i) => (i, b) }),
+ saveMode = SaveMode.Overwrite,
+ operation = Some("bulk_insert"),
+ extraOptions = Map(PRECOMBINE_FIELD.key() -> "id"))
+
+ assertLanceBlobEncoding(tablePath)
+
+ // Second commit: upsert ids 0..2 with all-0xEE payloads, triggering
inline compaction. The
+ // compactor reads the base file + log via the CONTENT-pinned reader and
rewrites a new base
+ // file. Ids 3..5 are untouched: their bytes must survive the compaction
read/rewrite even
+ // though the user-facing default is now DESCRIPTOR.
+ val updatedPayloadByte: Byte = 0xEE.toByte
+ val updatedIds = 0 until 3
+ val updatedPayloads = updatedIds.map(i => (i,
Array.fill[Byte](payloadLen)(updatedPayloadByte)))
+ writeDataframe(tableType, tableName, tablePath,
+ asInlineDf(updatedPayloads),
+ operation = Some("upsert"),
+ extraOptions = Map(PRECOMBINE_FIELD.key() -> "id",
+ "hoodie.compact.inline" -> "true",
+ "hoodie.compact.inline.max.delta.commits" -> "1"))
+
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .setBasePath(tablePath)
+ .build()
+ val completedInstants =
metaClient.reloadActiveTimeline().filterCompletedInstants()
+ .getInstants.asScala
+ val deltaCommits = completedInstants.filter(_.getAction == "deltacommit")
+ assertTrue(deltaCommits.nonEmpty,
+ "Upsert must have written a deltacommit on MOR — without log files the
compaction " +
+ "round-trip below would be a no-op and the test would silently pass
even if the " +
+ "CONTENT-pin in HoodieSparkLanceReader were broken.")
+ val compactionCommits = completedInstants.filter(_.getAction == "commit")
+ assertTrue(compactionCommits.nonEmpty, "Compaction commit should be
present after upsert")
+
+ // Walk file groups in the (non-partitioned) table and verify at least one
historical file
+ // slice carries log files. After compaction the latest slice is
post-compaction (no logs),
+ // but the pre-compaction slice is still in the FSV's history, so
`hasLogFiles` will flag
+ // it. This catches a regression where the upsert silently fell into a
CoW-like path.
+ val engineCtx = new HoodieLocalEngineContext(metaClient.getStorageConf)
+ val metadataCfg = HoodieMetadataConfig.newBuilder.build
+ val viewManager = FileSystemViewManager.createViewManager(
+ engineCtx, metadataCfg, FileSystemViewStorageConfig.newBuilder.build,
+ HoodieCommonConfig.newBuilder.build,
+ (mc: HoodieTableMetaClient) => metaClient.getTableFormat
+ .getMetadataFactory.create(engineCtx, mc.getStorage, metadataCfg,
tablePath))
+ val fsView = viewManager.getFileSystemView(metaClient)
+ try {
+ fsView.loadAllPartitions()
+ val anyHadLogs = fsView.getAllFileGroups("").iterator().asScala.exists {
fg =>
+ fg.getAllFileSlices.iterator().asScala.exists(_.hasLogFiles)
+ }
+ assertTrue(anyHadLogs,
+ s"MOR upsert must have produced log files in at least one file slice
at $tablePath; " +
+ s"none observed — upsert may have silently bypassed the deltacommit
path")
+ } finally {
+ fsView.close()
+ }
+
+ val expected: Map[Int, Array[Byte]] = (
+ updatedIds.map(i => i ->
Array.fill[Byte](payloadLen)(updatedPayloadByte)) ++
+ (updatedIds.length until numRows).map(i => i -> initialPayloads(i))
+ ).toMap
+
+ // Verify via the realistic user-facing path. After the flip, a plain read
yields the
+ // DESCRIPTOR shape: INLINE type, null `data`, populated reference. This
confirms the new
+ // default is in effect end-to-end.
+ val readRows = spark.read.format("hudi")
+ .load(tablePath)
+ .select($"id", $"payload")
+ .orderBy($"id")
+ .collect()
+ assertEquals(numRows, readRows.length)
+ readRows.foreach { row =>
+ val id = row.getInt(row.fieldIndex("id"))
+ val payload = row.getStruct(row.fieldIndex("payload"))
+ assertEquals(HoodieSchema.Blob.INLINE,
+ payload.getString(payload.fieldIndex(HoodieSchema.Blob.TYPE)),
+ s"Type must remain INLINE post-compaction (id=$id)")
+
assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)),
+ s"DESCRIPTOR default should null `data` on plain read (id=$id)")
+
assertNotNull(payload.getStruct(payload.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)),
+ s"DESCRIPTOR default should populate reference on plain read (id=$id)")
+ }
+
+ // read_blob() under CONTENT mode is what we use to verify the
post-compaction bytes
+ // because read_blob() on INLINE rows throws under the DESCRIPTOR default.
The bytes can
+ // only come back if HoodieSparkLanceReader's CONTENT pin held during the
compactor's
+ // base-file read — otherwise untouched ids 3..5 would have been rewritten
with null
+ // `data` and CONTENT-mode read would surface that.
+ val viewName = s"${tableName}_view"
+ spark.read.format("hudi")
+ .option("hoodie.read.blob.inline.mode", "CONTENT")
+ .load(tablePath)
+ .createOrReplaceTempView(viewName)
+ val materialized = spark.sql(
+ s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY
id").collect()
+ assertEquals(numRows, materialized.length)
+ materialized.foreach { row =>
+ val id = row.getInt(row.fieldIndex("id"))
val bytes = row.getAs[Array[Byte]]("bytes")
- assertArrayEquals(expectedPayloads(i), bytes,
- s"read_blob() bytes mismatch for id=$i")
+ assertArrayEquals(expected(id), bytes,
+ s"read_blob() must return correct bytes post-compaction (id=$id)")
}
}
diff --git a/rfc/rfc-100/rfc-100.md b/rfc/rfc-100/rfc-100.md
index 2c637ccb2db2..2fc8e33d781f 100644
--- a/rfc/rfc-100/rfc-100.md
+++ b/rfc/rfc-100/rfc-100.md
@@ -130,65 +130,86 @@ SELECT id, url, read_blob(image_blob) as image_bytes FROM
my_table;
#### Read Modes: `read_blob` vs. `SELECT *`
-`read_blob(<blob_column>)` is the canonical, universal API for materializing
raw blob bytes in a query. It always returns the underlying `bytes` regardless
of:
-- Storage strategy (`INLINE` vs `OUT_OF_LINE`)
-- Base file format (Parquet, Lance, …)
-- Any reader-side config such as `hoodie.read.blob.inline.mode`
+`read_blob(<blob_column>)` is the canonical API for materializing raw blob
bytes in a query. It returns the underlying `bytes` for:
+- Any `OUT_OF_LINE` row (via external pread), regardless of mode.
+- `INLINE` rows under `CONTENT` mode (1-hop passthrough of `data`).
+- `INLINE` rows on Parquet under either mode (Parquet's DESCRIPTOR is a no-op
today; rows arrive in CONTENT shape).
+
+`INLINE` rows on Lance under `DESCRIPTOR` mode are **not supported** by
`read_blob()` and the call throws a clear error. DESCRIPTOR is a metadata-only
mode for INLINE rows: bytes aren't materialized during the scan, and the
synthesized `reference` is an internal pointer into the `.lance` file's storage
layout, not user-facing metadata. To read bytes, set
`hoodie.read.blob.inline.mode=CONTENT`.
Selecting the blob column directly (e.g. `SELECT image_blob FROM t` or `SELECT
*`) returns the underlying `Blob` struct as-is. The contents of that struct
depend on the storage strategy, the file format, and the read mode, as
summarized below.
**Reader Configuration**
-- `hoodie.read.blob.inline.mode` — values `CONTENT` (default) | `DESCRIPTOR`.
+- `hoodie.read.blob.inline.mode` — values `DESCRIPTOR` (default) | `CONTENT`.
+ - `DESCRIPTOR` (default): metadata-only for `INLINE` rows. On Lance, the
engine returns a synthesized `reference` pointing at the backing file so
callers can inspect `(path, offset, length)` without materializing bytes.
`read_blob()` on these rows is **unsupported and throws** — switch to `CONTENT`
or read `col.data` if you want bytes.
- `CONTENT`: the engine eagerly materializes inline bytes into the struct's
`data` field.
- - `DESCRIPTOR`: the engine returns an `OUT_OF_LINE`-shaped descriptor in the
`reference` field where the underlying file format supports it (Lance today),
enabling lazy byte materialization via `read_blob`. For file formats without a
native descriptor for inline payloads (Parquet), both `data` and `reference`
are returned `NULL`, and the caller must use `read_blob` to retrieve bytes.
- - This config governs `INLINE` reads only. For `OUT_OF_LINE` storage, the
engine always returns a populated `reference` regardless of this setting.
+ - This config governs `INLINE` reads only. For `OUT_OF_LINE` storage, the
engine always returns a populated `reference` regardless of this setting, and
`read_blob()` always materializes bytes.
**Behavior matrix**
-| Access pattern | Storage | File format |
`hoodie.read.blob.inline.mode` | `data` field | `reference` field |
Raw bytes available? |
-|------------------|--------------|-------------|--------------------------------|--------------|------------------------------|---------------------------------------------------|
-| `SELECT read_blob(col) FROM table` | INLINE | Parquet | (any)
| n/a | n/a | Yes —
returns bytes |
-| `SELECT read_blob(col) FROM table` | INLINE | Lance | (any)
| n/a | n/a | Yes —
returns bytes |
-| `SELECT read_blob(col) FROM table` | OUT_OF_LINE | (any) | (any)
| n/a | n/a | Yes —
returns bytes |
-| `SELECT col FROM table` | INLINE | Parquet | `CONTENT`
(default) | bytes | NULL | Yes — via
`data` |
-| `SELECT col FROM table` | INLINE | Parquet | `DESCRIPTOR`
| **NULL** | **NULL** | No — must call
`read_blob` |
-| `SELECT col FROM table` | INLINE | Lance | `CONTENT`
(default) | bytes | NULL | Yes — via
`data` |
-| `SELECT col FROM table` | INLINE | Lance | `DESCRIPTOR`
| NULL | populated (Lance blob enc.) | No — descriptor
visible; use `read_blob` for bytes|
-| `SELECT col FROM table` | OUT_OF_LINE | (any) | (irrelevant)
| NULL | populated | No — must call
`read_blob` |
+| Access pattern | Storage | File format |
`hoodie.read.blob.inline.mode` | `data` field | `reference` field |
Raw bytes available? |
+|------------------|--------------|-------------|--------------------------------|--------------|-----------------------------|---------------------------------------------------|
+| `SELECT read_blob(col) FROM table` | INLINE | Parquet | (any)
| n/a | n/a | Yes —
returns bytes (1-hop) |
+| `SELECT read_blob(col) FROM table` | INLINE | Lance | `CONTENT`
| n/a | n/a | Yes —
returns bytes (1-hop) |
+| `SELECT read_blob(col) FROM table` | INLINE | Lance |
`DESCRIPTOR` (default) | n/a | n/a |
**No — throws.** Set `CONTENT` or use `col.data`. |
+| `SELECT read_blob(col) FROM table` | OUT_OF_LINE | (any) | (any)
| n/a | n/a | Yes —
returns bytes (external pread) |
+| `SELECT col FROM table` | INLINE | Parquet | `CONTENT`
| bytes | NULL | Yes — via `data`
|
+| `SELECT col FROM table` | INLINE | Parquet | `DESCRIPTOR`
(default) | bytes¹ | NULL | Yes — via
`data`¹ |
+| `SELECT col FROM table` | INLINE | Lance | `CONTENT`
| bytes | NULL | Yes — via `data`
|
+| `SELECT col FROM table` | INLINE | Lance | `DESCRIPTOR`
(default) | NULL | populated (synthesized) | No —
metadata-only; switch mode to read bytes |
+| `SELECT col FROM table` | OUT_OF_LINE | (any) | (irrelevant)
| NULL | populated | No — must call
`read_blob` |
+
**Why Parquet and Lance differ in `DESCRIPTOR` mode**
-Lance's native blob encoding stores blobs in a way that already exposes a
`(file, offset, length)` descriptor cheaply, so `DESCRIPTOR` mode surfaces it
directly in the `reference` field — effectively letting INLINE blobs be read
with the same deferred-materialization path used for OUT_OF_LINE references.
Parquet has no equivalent native descriptor for an inline byte array, so both
fields are `NULL` in `DESCRIPTOR` mode and the caller must use `read_blob` to
materialize bytes.
+Lance's native blob encoding stores blobs in a way that already exposes a
`(file, offset, length)` descriptor cheaply, so `DESCRIPTOR` mode surfaces it
directly in the `reference` field — effectively letting INLINE blobs be read
with the same deferred-materialization path used for OUT_OF_LINE references.
Parquet has no equivalent native descriptor for an inline byte array, which is
why the DESCRIPTOR path is currently a no-op there.
**Visual**
+What the user gets back, grouped by storage type (set at write time) and then
by query shape:
+
+```mermaid
+flowchart TD
+ ST{storage_type}
+
+ ST -->|OUT_OF_LINE| QO{Query}
+ QO -->|"SELECT col"| OOL["type = OUT_OF_LINE<br/>inline_data =
NULL<br/>reference = user-supplied"]
+ QO -->|"SELECT read_blob(col)"| RBO(["bytes — materialized<br/>via the
external reference"])
+
+ ST -->|INLINE| QI{Query}
+ QI -->|"SELECT col"| M{hoodie.read.blob.inline.mode}
+ M -->|CONTENT| CONT["type = INLINE<br/>inline_data = bytes<br/>reference =
NULL"]
+ M -->|DESCRIPTOR default| F{file format}
+ F -->|Lance| LD["type = INLINE<br/>inline_data = NULL<br/>reference =
synthetic managed<br/>path, offset, length, is_managed=true"]
+ F -->|"Parquet (today: mode no-op)"| PD["Parquet reader does
not<br/>implement DESCRIPTOR yet —<br/>returns CONTENT shape:<br/>inline_data =
bytes, reference = NULL"]
+
+ QI -->|"SELECT read_blob(col)"| RM{hoodie.read.blob.inline.mode}
+ RM -->|CONTENT| RBC(["bytes from inline_data on the row<br/>1 hop"])
+ RM -->|DESCRIPTOR default| RF{file format}
+ RF -->|Lance| RBL(["error: read_blob unsupported<br/>on INLINE rows under
DESCRIPTOR<br/>(metadata-only mode)"])
+ RF -->|"Parquet (today: mode no-op)"| RBP(["bytes from inline_data on the
row<br/>1 hop — same as CONTENT"])
```
- ┌──────────────────────────────────────────────────────────────────┐
- │ read_blob(col) ── universal, always materializes bytes ──│
- │ │ │
- │ ▼ │
- │ ┌─────────────┐ INLINE ───► read inline payload │
- │ │ Hudi reader │ ──┤ │
- │ └─────────────┘ OUT_OF_LINE ► follow reference → read bytes │
- └──────────────────────────────────────────────────────────────────┘
-
- ┌──────────────────────────────────────────────────────────────────┐
- │ SELECT col (returns Blob struct as-is) │
- │ │ │
- │ ▼ │
- │ storage = OUT_OF_LINE ─────────────► data=NULL, reference=set │
- │ │
- │ storage = INLINE, │
- │ inline.mode = CONTENT (default) ───► data=<bytes>, ref=NULL │
- │ │
- │ storage = INLINE, │
- │ inline.mode = DESCRIPTOR │
- │ ├─ Parquet ─────────────────────► data=NULL, ref=NULL │
- │ └─ Lance ─────────────────────► data=NULL, ref=set │
- └──────────────────────────────────────────────────────────────────┘
+
+`read_blob(col)` byte resolution — hop count depends on the row shape that
arrives. INLINE rows under DESCRIPTOR mode are rejected; the mode is
metadata-only and the synthesized reference is an internal pointer, not
user-facing storage.
+
+```mermaid
+flowchart LR
+ RB[/"SELECT read_blob(col)"/] --> Scan["Scan emits Blob struct"]
+ Scan --> Shape{"row shape"}
+ Shape -->|"inline_data populated<br/>(CONTENT mode, or Parquet)"|
Direct["BatchedBlobReader:<br/>read inline_data off the row<br/><b>1 hop
total</b>"]
+ Shape -->|"reference populated"| Type{"storage_type"}
+ Type -->|"OUT_OF_LINE"|
Indirect["BatchedBlobReader:<br/>openSeekable(external_path)<br/>seek(offset),
readFully(length)<br/><b>2 hops total</b>"]
+ Type -->|"INLINE<br/>(DESCRIPTOR + Lance)"| Err["error: read_blob
unsupported<br/>on INLINE rows under DESCRIPTOR"]
+ Direct --> Bytes(["bytes"])
+ Indirect --> Bytes
```
+Notes:
+- INLINE + DESCRIPTOR + `read_blob()` is unsupported by design. DESCRIPTOR is
metadata-only for INLINE rows: bytes aren't materialized during the scan, and
the synthesized `reference` is an internal pointer into the `.lance` file's
storage layout, not user-facing metadata. To read bytes, set
`hoodie.read.blob.inline.mode=CONTENT`.
+- OUT_OF_LINE is unaffected by `hoodie.read.blob.inline.mode` — the descriptor
is real user metadata and `read_blob()` always works via an external pread.
+- Plain `SELECT col` (no `read_blob`) is always 1 hop. DESCRIPTOR's win is
that hop 1 skips blob decoding when bytes aren't needed.
+
### 3. Writer
#### Phase 1: External Blob Support
The writer will be updated to support writing blob data as out-of-line
references.