rahil-c commented on code in PR #18744:
URL: https://github.com/apache/hudi/pull/18744#discussion_r3261315624


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala:
##########
@@ -960,17 +970,228 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
         s"synthetic reference to the Lance file should be flagged managed 
(id=$i)")
     }
 
-    // read_blob() materializes bytes via BatchedBlobReader, which always 
reads with CONTENT
-    // mode (actual bytes) regardless of the user's inline read mode setting.
+    // read_blob() on INLINE rows under DESCRIPTOR mode is unsupported by 
design: DESCRIPTOR
+    // is metadata-only and the synthesized reference is an internal pointer 
into the .lance
+    // file's storage layout, not user-facing metadata. BatchedBlobReader must 
throw with a
+    // message that names both INLINE and DESCRIPTOR so the failure is 
actionable.
     val viewName = s"${tableName}_view"
-    spark.read.format("hudi").load(tablePath).createOrReplaceTempView(viewName)
+    spark.read.format("hudi")
+      .option(modeKey, "DESCRIPTOR")
+      .load(tablePath)
+      .createOrReplaceTempView(viewName)
+    val ex = assertThrows(classOf[Throwable], new Executable {
+      override def execute(): Unit = {
+        spark.sql(s"SELECT id, read_blob(payload) AS bytes FROM $viewName 
ORDER BY id").collect()
+      }
+    })
+    val msgChain = Iterator.iterate[Throwable](ex)(_.getCause).takeWhile(_ != 
null)
+      .flatMap(t => Option(t.getMessage)).mkString(" | ")
+    assertTrue(msgChain.contains("INLINE") && msgChain.contains("DESCRIPTOR"),
+      s"error must mention INLINE and DESCRIPTOR; got: $msgChain")
+
+    // Same table, same rows, but read under CONTENT mode: read_blob() takes 
the 1-hop
+    // passthrough of inline_data and must return the originally-written 
bytes. This pins the
+    // write -> read roundtrip integrity for INLINE blobs (compaction/merge 
included for MOR)
+    // independently of the DESCRIPTOR-shape verification above.
+    val contentViewName = s"${tableName}_content_view"
+    spark.read.format("hudi")
+      .option(modeKey, "CONTENT")
+      .load(tablePath)
+      .createOrReplaceTempView(contentViewName)
     val materialized = spark.sql(
-      s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY 
id").collect()
+      s"SELECT id, read_blob(payload) AS bytes FROM $contentViewName ORDER BY 
id").collect()
     assertEquals(numRows, materialized.length)
     materialized.zipWithIndex.foreach { case (row, i) =>
+      assertEquals(i, row.getInt(row.fieldIndex("id")))
+      assertArrayEquals(expectedPayloads(i), row.getAs[Array[Byte]]("bytes"),
+        s"read_blob() bytes mismatch under CONTENT mode (id=$i)")
+    }
+  }
+
+  /**
+   * Mixed-storage table on Lance: one blob column holds both INLINE rows 
(small payloads
+   * stored inline) and OUT_OF_LINE rows (external file references). Under 
CONTENT mode,
+   * read_blob() must materialize the correct bytes for both shapes in a 
single query —
+   * INLINE rows go through the 1-hop inline_data passthrough, OUT_OF_LINE 
rows go through
+   * the external pread (with BatchedBlobReader merging consecutive ranges).
+   */
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testBlobMixedInlineAndOutOfLineContentMode(tableType: HoodieTableType): 
Unit = {
+    val tableName = 
s"test_lance_blob_mixed_content_${tableType.name().toLowerCase}"
+    val tablePath = s"$basePath/$tableName"
+
+    val payloadLen = 256
+    val numInline = 3
+    val numOutOfLine = 3
+    val externalFileSize = numOutOfLine * payloadLen
+    val externalDir = Files.createDirectories(
+      Paths.get(s"$basePath/_blob_ext_mixed_${tableType.name().toLowerCase}"))
+    val extPath = BlobTestHelpers.createTestFile(externalDir, 
"mixed_file.bin", externalFileSize)
+
+    val inlinePayloads: Seq[Array[Byte]] = (0 until numInline).map { i =>
+      (0 until payloadLen).map(j => ((i + j) % 256).toByte).toArray
+    }
+
+    val sparkSess = spark
+    import sparkSess.implicits._
+    val inlineDf = inlinePayloads.zipWithIndex.map { case (b, i) => (i, b) }
+      .toDF("id", "bytes")
+      .select($"id", BlobTestHelpers.inlineBlobStructCol("payload", $"bytes"))
+
+    val outOfLineDf = (0 until numOutOfLine).map { k =>
+      (numInline + k, extPath, (k * payloadLen).toLong, payloadLen.toLong)
+    }.toDF("id", "path", "offset", "length")
+      .select($"id", BlobTestHelpers.blobStructCol("payload", $"path", 
$"offset", $"length"))
+
+    val canonicalSchema = StructType(Seq(
+      StructField("id", IntegerType, nullable = false),
+      StructField("payload", BlobType().asInstanceOf[StructType], nullable = 
true,
+        BlobTestHelpers.blobMetadata)
+    ))
+    val raw = inlineDf.unionByName(outOfLineDf)
+    val df = spark.createDataFrame(raw.rdd, canonicalSchema)
+
+    writeDataframe(tableType, tableName, tablePath, df, saveMode = 
SaveMode.Overwrite,
+      operation = Some("bulk_insert"),
+      extraOptions = Map(PRECOMBINE_FIELD.key() -> "id"))
+
+    assertLanceBlobEncoding(tablePath)
+
+    val viewName = s"${tableName}_mixed_view"
+    spark.read.format("hudi")
+      .option("hoodie.read.blob.inline.mode", "CONTENT")
+      .load(tablePath)
+      .createOrReplaceTempView(viewName)
+
+    val rows = spark.sql(
+      s"SELECT id, read_blob(payload) AS bytes FROM $viewName ORDER BY 
id").collect()
+    assertEquals(numInline + numOutOfLine, rows.length)
+
+    rows.foreach { row =>
+      val id = row.getInt(row.fieldIndex("id"))
       val bytes = row.getAs[Array[Byte]]("bytes")
-      assertArrayEquals(expectedPayloads(i), bytes,
-        s"read_blob() bytes mismatch for id=$i")
+      if (id < numInline) {
+        assertArrayEquals(inlinePayloads(id), bytes,
+          s"INLINE row: read_blob() bytes mismatch (id=$id)")
+      } else {
+        val k = id - numInline
+        assertEquals(payloadLen, bytes.length,
+          s"OUT_OF_LINE row: read_blob() length mismatch (id=$id)")
+        BlobTestHelpers.assertBytesContent(bytes, expectedOffset = k * 
payloadLen)
+      }
+    }
+  }
+
+  /**
+   * Compaction must preserve INLINE blob bytes under the DESCRIPTOR default. 
MOR compaction reads

Review Comment:
   I think the first sentence is not accurate of this right? For compaction I 
believe the descriptor default will not matter and it always come back as 
CONTENT, like you mention further.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to