hudi-agent commented on code in PR #18683:
URL: https://github.com/apache/hudi/pull/18683#discussion_r3244377074
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala:
##########
@@ -237,11 +237,16 @@ abstract class HoodieBaseHadoopFsRelationFactory(val
sqlContext: SQLContext,
override def buildFileFormat(): FileFormat = {
val tableConfig = metaClient.getTableConfig
Review Comment:
🤖 nit: could you rename `blobDescriptorMode` to `isBlobDescriptorMode`? It's
a boolean and the constructor parameter it's passed into
(`isBlobDescriptorMode`) already uses the `is` prefix — the local val not
following suit is a small but jarring inconsistency.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -457,6 +491,46 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
}
}
+ /**
+ * Detects BLOB columns and strips the {@code data} sub-field when
DESCRIPTOR mode is active.
+ * Only applies to Parquet format; other formats handle DESCRIPTOR mode
natively.
+ *
+ * @param forceContentCols Top-level column names that must keep their
{@code data} sub-field
+ * (i.e. the columns the current query reads via
{@code read_blob()}).
+ * These are excluded from the strip set so the
bytes are materialized.
+ */
+ private def withBlobDescriptorRewrite(schema: StructType,
+ forceContentCols: Set[String]):
(StructType, Set[Int]) = {
+ if (hoodieFileFormat != HoodieFileFormat.PARQUET) {
+ (schema, Set.empty[Int])
+ } else {
+ import scala.collection.JavaConverters._
+ val detected =
VectorConversionUtils.detectBlobColumnsFromMetadata(schema).asScala.map(_.intValue()).toSet
+ val toStrip = if (forceContentCols.isEmpty) detected
+ else detected.filterNot(idx =>
forceContentCols.contains(schema.fields(idx).name))
+ if (toStrip.isEmpty) {
+ (schema, Set.empty[Int])
+ } else {
+ val javaBlobCols: java.util.Set[Integer] =
toStrip.map(Integer.valueOf).asJava
+ (VectorConversionUtils.stripBlobDataField(schema, javaBlobCols),
toStrip)
+ }
+ }
+ }
+
+ /**
+ * Wraps an iterator to re-insert null {@code data} fields into blob structs
+ * after Parquet DESCRIPTOR mode read (expanding 2-field → 3-field structs).
+ */
+ private def wrapWithBlobNullPadding(iter: Iterator[InternalRow],
Review Comment:
🤖 nit: the body of this `wrapWithBlobNullPadding` (create projection →
create mapper → wrap iterator) is almost identical to
`SparkFileFormatInternalRowReaderContext.wrapWithBlobNullPadding` at line 421
of that file — only the iterator type differs. Have you considered
consolidating the shared setup into `VectorConversionUtils` or a small shared
utility so the two call sites can't silently diverge if
`buildBlobNullPadRowMapper`'s signature changes?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/blob/TestReadBlobSQL.scala:
##########
@@ -495,4 +499,296 @@ class TestReadBlobSQL extends HoodieClientTestBase {
assertBytesContent(row.getAs[Array[Byte]]("data"), expectedOffset = idx
* 100)
}
}
+
+ // ------------------------------------------------------------------
+ // Parquet DESCRIPTOR-mode interaction tests
+ //
+ // These exercise the per-query rewrite added by ReadBlobRule that
+ // injects BLOB_INLINE_READ_FORCE_CONTENT_COLUMNS into the
+ // LogicalRelation's options when a query uses read_blob(). The
+ // contract: read_blob(col) always returns bytes; plain SELECT keeps
+ // DESCRIPTOR's I/O savings (data sub-field is null) for the columns
+ // that aren't referenced by read_blob().
+ // ------------------------------------------------------------------
+
+ /**
+ * Helpers for the DESCRIPTOR-mode tests. Builds a Hudi table containing
+ * one or two INLINE blob columns and returns the table path.
+ */
+ private def writeInlineBlobTable(name: String,
+ tableType: HoodieTableType,
+ payloads: Seq[Array[Byte]]): String = {
+ val tablePath = s"$tempDir/$name"
+ val rawDf = sparkSession.createDataFrame(
+ payloads.zipWithIndex.map { case (bytes, i) => (i + 1, bytes) })
+ .toDF("id", "bytes")
+ .withColumn("payload", inlineBlobStructCol("payload", col("bytes")))
+ .select("id", "payload")
+ val canonicalSchema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("payload", BlobType().asInstanceOf[StructType], nullable =
true, blobMetadata)
+ ))
+ val df = sparkSession.createDataFrame(rawDf.rdd, canonicalSchema)
+ df.write.format("hudi")
+ .option("hoodie.table.name", name)
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "id")
+ .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType.name())
+ .option(DataSourceWriteOptions.OPERATION.key(), "bulk_insert")
+ .mode("overwrite")
+ .save(tablePath)
+ tablePath
+ }
+
+ /**
+ * Core contract: read_blob() always materializes bytes, even under
+ * DESCRIPTOR mode, on both COW and MOR base files. Without this fix,
+ * read_blob() would see a null `data` sub-field (column-pruned by
+ * Parquet) and silently return null.
+ */
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testReadBlobUnderDescriptorMaterializesBytes(tableType:
HoodieTableType): Unit = {
+ val payloads = Seq(
+ Array.fill[Byte](128)(0x1.toByte),
+ Array.fill[Byte](128)(0x2.toByte),
+ Array.fill[Byte](128)(0x3.toByte))
+ val tablePath = writeInlineBlobTable(
+ s"read_blob_desc_${tableType.name().toLowerCase}", tableType, payloads)
+
+ sparkSession.read.format("hudi")
+ .option("hoodie.read.blob.inline.mode", "DESCRIPTOR")
+ .load(tablePath)
+ .createOrReplaceTempView("rb_desc_view")
+
+ val rows = sparkSession.sql(
+ "SELECT id, read_blob(payload) AS bytes FROM rb_desc_view ORDER BY id"
+ ).collect()
+ assertEquals(3, rows.length)
+ rows.zip(payloads).foreach { case (row, expected) =>
+ val bytes = row.getAs[Array[Byte]]("bytes")
+ assertNotNull(bytes, s"read_blob() must materialize bytes under
DESCRIPTOR (id=${row.getInt(0)})")
+ assertArrayEquals(expected, bytes, s"bytes mismatch for
id=${row.getInt(0)}")
+ }
+ }
+
+ /**
+ * DESCRIPTOR savings preserved when read_blob() is NOT in the query:
+ * commit 1's column projection still strips `data`, and the rule writes
+ * no force-content option.
+ */
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testDescriptorWithoutReadBlobStillSkipsData(tableType: HoodieTableType):
Unit = {
+ val payloads = Seq(
+ Array.fill[Byte](128)(0x1.toByte),
+ Array.fill[Byte](128)(0x2.toByte))
+ val tablePath = writeInlineBlobTable(
+ s"desc_no_rb_${tableType.name().toLowerCase}", tableType, payloads)
+
+ val rows = sparkSession.read.format("hudi")
+ .option("hoodie.read.blob.inline.mode", "DESCRIPTOR")
+ .load(tablePath)
+ .select(col("id"), col("payload"))
+ .orderBy(col("id"))
+ .collect()
+
+ assertEquals(2, rows.length)
+ rows.foreach { row =>
+ val payload = row.getStruct(row.fieldIndex("payload"))
+ assertEquals(HoodieSchema.Blob.INLINE,
+ payload.getString(payload.fieldIndex(HoodieSchema.Blob.TYPE)))
+
assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)),
+ s"DESCRIPTOR should null-pad data when read_blob() is absent
(id=${row.getInt(0)})")
+
assertTrue(payload.isNullAt(payload.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)),
+ "Parquet has no native byte-range descriptor; reference is null")
+ }
+ }
+
+ /**
+ * The per-column granularity claim. Query references read_blob(payload_a)
+ * only — payload_a must materialize bytes, payload_b must remain
+ * stripped (DESCRIPTOR savings preserved for the column the user
+ * didn't ask about).
+ *
+ * Uses multiple rows with distinct per-row byte patterns so that any
+ * row-iteration bug (e.g., reusing a row buffer without copying, or
+ * mis-indexing the blob struct ordinal across rows) would surface as
+ * mismatched bytes per row rather than slipping through a single-row
+ * happy path.
+ */
+ @Test
+ def testDescriptorPerColumnGranularity(): Unit = {
+ val tablePath = s"$tempDir/desc_per_column"
+ // Distinct fill byte AND distinct length per row, AND per-row distinction
+ // between payload_a and payload_b — a cross-row or cross-column leak
fails an assertion.
+ val rows = Seq(
+ (1, Array.fill[Byte](80)(0xA1.toByte),
Array.fill[Byte](160)(0xB1.toByte)),
+ (2, Array.fill[Byte](64)(0xA2.toByte),
Array.fill[Byte](192)(0xB2.toByte)),
+ (3, Array.fill[Byte](96)(0xA3.toByte),
Array.fill[Byte](128)(0xB3.toByte))
+ )
+ val rawDf = sparkSession.createDataFrame(rows)
+ .toDF("id", "bytes_a", "bytes_b")
+ .withColumn("payload_a", inlineBlobStructCol("payload_a",
col("bytes_a")))
+ .withColumn("payload_b", inlineBlobStructCol("payload_b",
col("bytes_b")))
+ .select("id", "payload_a", "payload_b")
+ val canonicalSchema = StructType(Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("payload_a", BlobType().asInstanceOf[StructType], nullable =
true, blobMetadata),
+ StructField("payload_b", BlobType().asInstanceOf[StructType], nullable =
true, blobMetadata)
+ ))
+ sparkSession.createDataFrame(rawDf.rdd,
canonicalSchema).write.format("hudi")
+ .option("hoodie.table.name", "desc_per_column")
+ .option("hoodie.datasource.write.recordkey.field", "id")
+ .option("hoodie.datasource.write.operation", "bulk_insert")
+ .mode("overwrite")
+ .save(tablePath)
+
+ sparkSession.read.format("hudi")
+ .option("hoodie.read.blob.inline.mode", "DESCRIPTOR")
+ .load(tablePath)
+ .createOrReplaceTempView("desc_per_column_view")
+
+ val outRows = sparkSession.sql(
+ "SELECT id, read_blob(payload_a) AS bytes_a, payload_b " +
+ "FROM desc_per_column_view ORDER BY id"
+ ).collect()
+ assertEquals(rows.length, outRows.length)
+ outRows.zip(rows).foreach { case (row, (expectedId, expectedA, expectedB))
=>
+ assertEquals(expectedId, row.getInt(0))
+ val bytesA = row.getAs[Array[Byte]]("bytes_a")
+ assertArrayEquals(expectedA, bytesA,
+ s"read_blob(payload_a) bytes mismatch at id=$expectedId — expected
length ${expectedA.length}, " +
+ s"got length ${if (bytesA == null) -1 else bytesA.length}")
+ val payloadB = row.getStruct(row.fieldIndex("payload_b"))
+
assertTrue(payloadB.isNullAt(payloadB.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)),
+ s"DESCRIPTOR savings must be preserved for payload_b at
id=$expectedId")
+ // Sanity: payload_b's type marker survived even though `data` was
stripped.
+ assertEquals(HoodieSchema.Blob.INLINE,
+ payloadB.getString(payloadB.fieldIndex(HoodieSchema.Blob.TYPE)),
+ s"payload_b type marker must survive stripping at id=$expectedId")
+ // Sanity: we did NOT smuggle bytes into payload_b under any name.
+ val _ = expectedB // explicitly unused: the contract is that
payload_b.data must be null
+ }
+ }
+
+ /**
+ * read_blob() in WHERE clause must also trigger the per-query rewrite.
+ * ReadBlobRule's Filter case wraps the condition in BatchedBlobRead;
+ * the second pass collects the blobAttr the same way.
+ */
+ @Test
+ def testReadBlobInWhereClauseUnderDescriptor(): Unit = {
+ val payloads = Seq(
+ Array.fill[Byte](100)(0xA.toByte),
+ Array.fill[Byte](200)(0xB.toByte),
+ Array.fill[Byte](100)(0xC.toByte))
+ val tablePath = writeInlineBlobTable(
+ "desc_where_clause", HoodieTableType.COPY_ON_WRITE, payloads)
+
+ sparkSession.read.format("hudi")
+ .option("hoodie.read.blob.inline.mode", "DESCRIPTOR")
+ .load(tablePath)
+ .createOrReplaceTempView("desc_where_view")
+
+ val rows = sparkSession.sql(
+ "SELECT id, read_blob(payload) AS bytes FROM desc_where_view " +
+ "WHERE length(read_blob(payload)) = 200"
+ ).collect()
+ assertEquals(1, rows.length)
+ assertEquals(2, rows(0).getInt(0))
+ assertArrayEquals(payloads(1), rows(0).getAs[Array[Byte]]("bytes"))
+ }
+
+ /**
+ * JOIN of two Hudi Parquet tables, both in DESCRIPTOR mode, both with
+ * a blob column. The query uses read_blob() on only the left side's
+ * blob.
+ *
+ * This exercises ReadBlobRule's per-relation option routing: the
+ * BLOB_INLINE_READ_FORCE_CONTENT_COLUMNS option must land on the
+ * left table's LogicalRelation only, and the right table's payload
+ * must come back with DESCRIPTOR's null `data`. A bug where the
+ * rule writes the option to every Hudi LogicalRelation, or to
Review Comment:
🤖 Heads-up: `@Ignore` is from JUnit 4 (`org.junit.Ignore`), but the
surrounding test class uses JUnit 5 (`@Test` from `org.junit.jupiter.api`). The
Jupiter engine doesn't honor JUnit 4's `@Ignore`, so this test will actually
execute rather than be skipped — the rest of the file uses `@Disabled` from
`org.junit.jupiter.api.Disabled` for that purpose. Given the `TODO to
re-enable` comment, was the intent to skip this? If so, switching to
`@Disabled("reason")` would do it.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]