rahil-c commented on code in PR #18744:
URL: https://github.com/apache/hudi/pull/18744#discussion_r3267857675
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala:
##########
@@ -960,17 +970,228 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
s"synthetic reference to the Lance file should be flagged managed
(id=$i)")
}
- // read_blob() materializes bytes via BatchedBlobReader, which always
reads with CONTENT
- // mode (actual bytes) regardless of the user's inline read mode setting.
+ // read_blob() on INLINE rows under DESCRIPTOR mode is unsupported by
design: DESCRIPTOR
+ // is metadata-only and the synthesized reference is an internal pointer
into the .lance
+ // file's storage layout, not user-facing metadata. BatchedBlobReader must
throw with a
+ // message that names both INLINE and DESCRIPTOR so the failure is
actionable.
val viewName = s"${tableName}_view"
- spark.read.format("hudi").load(tablePath).createOrReplaceTempView(viewName)
+ spark.read.format("hudi")
+ .option(modeKey, "DESCRIPTOR")
+ .load(tablePath)
+ .createOrReplaceTempView(viewName)
+ val ex = assertThrows(classOf[Throwable], new Executable {
+ override def execute(): Unit = {
+ spark.sql(s"SELECT id, read_blob(payload) AS bytes FROM $viewName
ORDER BY id").collect()
+ }
+ })
+ val msgChain = Iterator.iterate[Throwable](ex)(_.getCause).takeWhile(_ !=
null)
+ .flatMap(t => Option(t.getMessage)).mkString(" | ")
+ assertTrue(msgChain.contains("INLINE") && msgChain.contains("DESCRIPTOR"),
+ s"error must mention INLINE and DESCRIPTOR; got: $msgChain")
+
+ // Same table, same rows, but read under CONTENT mode: read_blob() takes
the 1-hop
+ // passthrough of inline_data and must return the originally-written
bytes. This pins the
+ // write -> read roundtrip integrity for INLINE blobs (compaction/merge
included for MOR)
+ // independently of the DESCRIPTOR-shape verification above.
+ val contentViewName = s"${tableName}_content_view"
+ spark.read.format("hudi")
+ .option(modeKey, "CONTENT")
+ .load(tablePath)
+ .createOrReplaceTempView(contentViewName)
val materialized = spark.sql(
- s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY
id").collect()
+ s"SELECT id, read_blob(payload) AS bytes FROM $contentViewName ORDER BY
id").collect()
assertEquals(numRows, materialized.length)
materialized.zipWithIndex.foreach { case (row, i) =>
+ assertEquals(i, row.getInt(row.fieldIndex("id")))
+ assertArrayEquals(expectedPayloads(i), row.getAs[Array[Byte]]("bytes"),
+ s"read_blob() bytes mismatch under CONTENT mode (id=$i)")
+ }
+ }
+
+ /**
+ * Mixed-storage table on Lance: one blob column holds both INLINE rows
(small payloads
+ * stored inline) and OUT_OF_LINE rows (external file references). Under
CONTENT mode,
+ * read_blob() must materialize the correct bytes for both shapes in a
single query —
+ * INLINE rows go through the 1-hop inline_data passthrough, OUT_OF_LINE
rows go through
+ * the external pread (with BatchedBlobReader merging consecutive ranges).
+ */
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testBlobMixedInlineAndOutOfLineContentMode(tableType: HoodieTableType):
Unit = {
+ val tableName =
s"test_lance_blob_mixed_content_${tableType.name().toLowerCase}"
+ val tablePath = s"$basePath/$tableName"
+
+ val payloadLen = 256
+ val numInline = 3
+ val numOutOfLine = 3
+ val externalFileSize = numOutOfLine * payloadLen
+ val externalDir = Files.createDirectories(
+ Paths.get(s"$basePath/_blob_ext_mixed_${tableType.name().toLowerCase}"))
+ val extPath = BlobTestHelpers.createTestFile(externalDir,
"mixed_file.bin", externalFileSize)
+
+ val inlinePayloads: Seq[Array[Byte]] = (0 until numInline).map { i =>
+ (0 until payloadLen).map(j => ((i + j) % 256).toByte).toArray
+ }
+
+ val sparkSess = spark
+ import sparkSess.implicits._
+ val inlineDf = inlinePayloads.zipWithIndex.map { case (b, i) => (i, b) }
+ .toDF("id", "bytes")
+ .select($"id", BlobTestHelpers.inlineBlobStructCol("payload", $"bytes"))
+
+ val outOfLineDf = (0 until numOutOfLine).map { k =>
+ (numInline + k, extPath, (k * payloadLen).toLong, payloadLen.toLong)
+ }.toDF("id", "path", "offset", "length")
+ .select($"id", BlobTestHelpers.blobStructCol("payload", $"path",
$"offset", $"length"))
+
+ val canonicalSchema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("payload", BlobType().asInstanceOf[StructType], nullable =
true,
+ BlobTestHelpers.blobMetadata)
+ ))
+ val raw = inlineDf.unionByName(outOfLineDf)
+ val df = spark.createDataFrame(raw.rdd, canonicalSchema)
+
+ writeDataframe(tableType, tableName, tablePath, df, saveMode =
SaveMode.Overwrite,
+ operation = Some("bulk_insert"),
+ extraOptions = Map(PRECOMBINE_FIELD.key() -> "id"))
+
+ assertLanceBlobEncoding(tablePath)
+
+ val viewName = s"${tableName}_mixed_view"
+ spark.read.format("hudi")
+ .option("hoodie.read.blob.inline.mode", "CONTENT")
+ .load(tablePath)
+ .createOrReplaceTempView(viewName)
+
+ val rows = spark.sql(
+ s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY
id").collect()
+ assertEquals(numInline + numOutOfLine, rows.length)
+
+ rows.foreach { row =>
+ val id = row.getInt(row.fieldIndex("id"))
val bytes = row.getAs[Array[Byte]]("bytes")
- assertArrayEquals(expectedPayloads(i), bytes,
- s"read_blob() bytes mismatch for id=$i")
+ if (id < numInline) {
+ assertArrayEquals(inlinePayloads(id), bytes,
+ s"INLINE row: read_blob() bytes mismatch (id=$id)")
+ } else {
+ val k = id - numInline
+ assertEquals(payloadLen, bytes.length,
+ s"OUT_OF_LINE row: read_blob() length mismatch (id=$id)")
+ BlobTestHelpers.assertBytesContent(bytes, expectedOffset = k *
payloadLen)
+ }
+ }
+ }
+
+ /**
+ * Compaction must preserve INLINE blob bytes under the DESCRIPTOR default.
MOR compaction reads
+ * the base file via {@link HoodieSparkLanceReader}, which hard-pins CONTENT
regardless of the
+ * user-facing {@code hoodie.read.blob.inline.mode}. If that pin were to
honor the default
+ * (DESCRIPTOR), compaction would read null {@code data} and rewrite a base
file without bytes,
+ * silently corrupting untouched rows. This test inserts INLINE blobs,
upserts a subset to force
+ * compaction, and asserts that touched rows carry the new bytes while
untouched rows retain the
+ * originals.
+ */
+ @Test
+ def testBlobInlineCompactionRoundTrip(): Unit = {
Review Comment:
Ensure that log files are created
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]