kbuci opened a new pull request, #18958:
URL: https://github.com/apache/hudi/pull/18958

   ### Describe the issue this Pull Request addresses
   
   Flink can already declare BLOB columns in Hudi table schemas (the 
`HoodieSchema.Blob` ROW shape is recognized by `HoodieSchemaConverter`), but 
writing out-of-line (OOL) BLOB references through the Flink writer did not work 
end-to-end:
   
   1. A BLOB column declared via Flink SQL `CREATE TABLE` was silently demoted 
to a generic record in the committed schema. Flink does not preserve `NOT NULL` 
on nested `ROW` fields in the physical `RowType` exposed by 
`ResolvedSchema#toPhysicalRowDataType()`, so `isBlobStructure` (which required 
exact nested nullability) failed to recognize the column and `convertToSchema` 
produced a plain RECORD instead of the BLOB logical type.
   2. Once the schema was a proper BLOB, MOR upserts failed at write time with 
`AvroTypeException: value OUT_OF_LINE (a Utf8) is not a blob_storage_type`. The 
BLOB `type` discriminator is an Avro ENUM, but the Flink RowData -> Avro 
converter emitted a plain `Utf8` for all string fields.
   
   This change makes Flink write OOL BLOB references for both COW and MOR 
tables while preserving the `HoodieSchema` BLOB type on disk (not a generic 
record).
   
   ### Summary and Changelog
   
   **User-facing outcome:** Flink SQL / DataStream upserts can write OOL BLOB 
references (`external_path`, `offset`, `length`, `managed`) to COW base files 
and MOR log files, and the committed table schema keeps the BLOB logical type.
   
   #### Changes
   
   - **`HoodieSchemaConverter#isBlobStructure`**
     - Match a BLOB column by the expected field names and types only. We no 
longer require nested `NOT NULL` flags to match, because Flink often strips 
those flags from `ROW` columns even when the DDL says `NOT NULL`. Before this 
change, a valid BLOB column could be misclassified as a normal RECORD; after 
match, `HoodieSchema.createBlob()` still applies the canonical BLOB schema.
     - Flink docs on nested ROW nullability and 
`table.legacy-nested-row-nullability`: 
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/config/ and 
https://nightlies.apache.org/flink/flink-docs-stable/release-notes/flink-2.2/ 
Physical row type from `ResolvedSchema#toPhysicalRowDataType()`: 
https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/table/catalog/ResolvedSchema.html#toPhysicalRowDataType()
   
   - **`RowDataToAvroConverters` (CHAR/VARCHAR converter)**
     - Emit a `GenericData.EnumSymbol` when the target `HoodieSchema` field is 
an `ENUM` (the BLOB `type` discriminator) instead of a plain `Utf8`, so MOR 
Avro log-block writes of BLOB rows succeed. Plain strings are unchanged.
   
   #### Tests
   
   - **`ITTestBlobWrite`** (new, Flink-only)
     - Parameterized over COW / MOR: insert OOL BLOB references, upsert the 
same keys (exercising the MOR Avro log write path), read back and verify the 
references, and assert the committed table schema keeps the BLOB logical type 
via `HoodieSchema#isBlobField`.
     - `testFlinkSqlDdlPhysicalRowTypeStillMapsToHoodieBlob`: `CREATE TABLE` 
with a `blackhole` sink and nested DDL `NOT NULL` on the BLOB ROW; assert 
Flink's physical `RowType` relaxes at least one nested constraint, then assert 
`HoodieSchemaConverter.convertToSchema` (same inputs as 
`HoodieTableFactory#inferAvroSchema`) still maps `blob_col` to a Hoodie BLOB.
   
   - **`TestRowDataToAvroConverters`**
     - `testRowDataToAvroBlobTypeFieldWritesEnumSymbol`: asserts the BLOB 
`type` field is written as an Avro `EnumSymbol`, not a `Utf8`.
   
   - **`TestHoodieSchemaConverter`**
     - Positive case for an all-nullable-nested BLOB ROW (synthetic analogue of 
the Flink DDL physical shape). Existing negative cases (wrong field names / 
base types) still hold.
   
   ### Impact
   
   - **Write path:** Flink COW and MOR writes can now encode OOL BLOB 
references instead of either degrading the schema to a generic record or 
failing at MOR log-write time.
   - **Read path:** unchanged.
   - **On-disk layout:** BLOB column retains the `HoodieSchema.Blob` shape 
(enum `type`, nullable `data`, nullable `reference`) consistent with Spark.
   - **No public API change.** Inline BLOB writes, the `read_blob` batched 
reader, managed-blob cleaning, and dynamic inline/OOL placement remain out of 
scope.
   
   ### Risk Level
   
   **Low/Medium**
   
   - The `isBlobStructure` relaxation only drops nullability assertions; 
field-name and base-type checks are unchanged, so non-BLOB records are 
unaffected.
   - The ENUM branch in the string converter is gated on 
`HoodieSchemaType.ENUM` and falls through to the existing `Utf8` behavior for 
all other string fields.
   - The new DDL-focused IT asserts that Flink relaxes at least one nested DDL 
`NOT NULL` inside the BLOB ROW; if a future Flink version preserves all of 
them, that assertion may need revisiting.
   
   ### Test plan
   
   - [ ] `-Pflink2.1` CI: `TestHoodieSchemaConverter`
   - [ ] `-Pflink2.1` CI: `TestRowDataToAvroConverters`
   - [ ] `-Pflink2.1` CI: `ITTestBlobWrite` (COW + MOR + Flink DDL physical 
schema)
   
   ### Contributor's checklist
   
   - [x] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [x] Enough context is provided in the sections above
   - [x] Adequate tests were added if applicable
   
   ### Describe the issue this Pull Request addresses
   
   <!-- Either describe the issue inline here with motivation behind the 
changes 
        (or) link to an issue by including `Closes #<issue-number>` for 
context. 
        If this PR includes changes to the storage format, public APIs,
        or has breaking changes, use `!` (e.g., feat!: ...) -->
   
   ### Summary and Changelog
   
   <!-- Short, plain-English summary of what users gain or what changed in 
behavior.
        Followed by a detailed log of all the changes. Highlight if any code 
was copied. -->
   
   ### Impact
   
   <!-- Describe any public API or user-facing feature change or any 
performance impact. -->
   
   ### Risk Level
   
   <!-- Accepted values: none, low, medium or high. Other than `none`, explain 
the risk.
        If medium or high, explain what verification was done to mitigate the 
risks. -->
   
   ### Documentation Update
   
   <!-- Describe any necessary documentation update if there is any new 
feature, config, or user-facing change. If not, put "none".
   
   - The config description must be updated if new configs are added or the 
default value of the configs are changed.
   - Any new feature or user-facing change requires updating the Hudi website. 
Please follow the 
     [instruction](https://hudi.apache.org/contribute/developer-setup#website) 
to make changes to the website. -->
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Enough context is provided in the sections above
   - [ ] Adequate tests were added if applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to