kbuci opened a new pull request, #18958:
URL: https://github.com/apache/hudi/pull/18958
### Describe the issue this Pull Request addresses
Flink can already declare BLOB columns in Hudi table schemas (the
`HoodieSchema.Blob` ROW shape is recognized by `HoodieSchemaConverter`), but
writing out-of-line (OOL) BLOB references through the Flink writer did not work
end-to-end:
1. A BLOB column declared via Flink SQL `CREATE TABLE` was silently demoted
to a generic record in the committed schema. Flink does not preserve `NOT NULL`
on nested `ROW` fields in the physical `RowType` exposed by
`ResolvedSchema#toPhysicalRowDataType()`, so `isBlobStructure` (which required
exact nested nullability) failed to recognize the column and `convertToSchema`
produced a plain RECORD instead of the BLOB logical type.
2. Once the schema was a proper BLOB, MOR upserts failed at write time with
`AvroTypeException: value OUT_OF_LINE (a Utf8) is not a blob_storage_type`. The
BLOB `type` discriminator is an Avro ENUM, but the Flink RowData -> Avro
converter emitted a plain `Utf8` for all string fields.
This change makes Flink write OOL BLOB references for both COW and MOR
tables while preserving the `HoodieSchema` BLOB type on disk (not a generic
record).
### Summary and Changelog
**User-facing outcome:** Flink SQL / DataStream upserts can write OOL BLOB
references (`external_path`, `offset`, `length`, `managed`) to COW base files
and MOR log files, and the committed table schema keeps the BLOB logical type.
#### Changes
- **`HoodieSchemaConverter#isBlobStructure`**
- Match a BLOB column by the expected field names and types only. We no
longer require nested `NOT NULL` flags to match, because Flink often strips
those flags from `ROW` columns even when the DDL says `NOT NULL`. Before this
change, a valid BLOB column could be misclassified as a normal RECORD; after
match, `HoodieSchema.createBlob()` still applies the canonical BLOB schema.
- Flink docs on nested ROW nullability and
`table.legacy-nested-row-nullability`:
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/config/ and
https://nightlies.apache.org/flink/flink-docs-stable/release-notes/flink-2.2/
Physical row type from `ResolvedSchema#toPhysicalRowDataType()`:
https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/table/catalog/ResolvedSchema.html#toPhysicalRowDataType()
- **`RowDataToAvroConverters` (CHAR/VARCHAR converter)**
- Emit a `GenericData.EnumSymbol` when the target `HoodieSchema` field is
an `ENUM` (the BLOB `type` discriminator) instead of a plain `Utf8`, so MOR
Avro log-block writes of BLOB rows succeed. Plain strings are unchanged.
#### Tests
- **`ITTestBlobWrite`** (new, Flink-only)
- Parameterized over COW / MOR: insert OOL BLOB references, upsert the
same keys (exercising the MOR Avro log write path), read back and verify the
references, and assert the committed table schema keeps the BLOB logical type
via `HoodieSchema#isBlobField`.
- `testFlinkSqlDdlPhysicalRowTypeStillMapsToHoodieBlob`: `CREATE TABLE`
with a `blackhole` sink and nested DDL `NOT NULL` on the BLOB ROW; assert
Flink's physical `RowType` relaxes at least one nested constraint, then assert
`HoodieSchemaConverter.convertToSchema` (same inputs as
`HoodieTableFactory#inferAvroSchema`) still maps `blob_col` to a Hoodie BLOB.
- **`TestRowDataToAvroConverters`**
- `testRowDataToAvroBlobTypeFieldWritesEnumSymbol`: asserts the BLOB
`type` field is written as an Avro `EnumSymbol`, not a `Utf8`.
- **`TestHoodieSchemaConverter`**
- Positive case for an all-nullable-nested BLOB ROW (synthetic analogue of
the Flink DDL physical shape). Existing negative cases (wrong field names /
base types) still hold.
### Impact
- **Write path:** Flink COW and MOR writes can now encode OOL BLOB
references instead of either degrading the schema to a generic record or
failing at MOR log-write time.
- **Read path:** unchanged.
- **On-disk layout:** BLOB column retains the `HoodieSchema.Blob` shape
(enum `type`, nullable `data`, nullable `reference`) consistent with Spark.
- **No public API change.** Inline BLOB writes, the `read_blob` batched
reader, managed-blob cleaning, and dynamic inline/OOL placement remain out of
scope.
### Risk Level
**Low/Medium**
- The `isBlobStructure` relaxation only drops nullability assertions;
field-name and base-type checks are unchanged, so non-BLOB records are
unaffected.
- The ENUM branch in the string converter is gated on
`HoodieSchemaType.ENUM` and falls through to the existing `Utf8` behavior for
all other string fields.
- The new DDL-focused IT asserts that Flink relaxes at least one nested DDL
`NOT NULL` inside the BLOB ROW; if a future Flink version preserves all of
them, that assertion may need revisiting.
### Test plan
- [ ] `-Pflink2.1` CI: `TestHoodieSchemaConverter`
- [ ] `-Pflink2.1` CI: `TestRowDataToAvroConverters`
- [ ] `-Pflink2.1` CI: `ITTestBlobWrite` (COW + MOR + Flink DDL physical
schema)
### Contributor's checklist
- [x] Read through [contributor's
guide](https://hudi.apache.org/contribute/how-to-contribute)
- [x] Enough context is provided in the sections above
- [x] Adequate tests were added if applicable
### Describe the issue this Pull Request addresses
<!-- Either describe the issue inline here with motivation behind the
changes
(or) link to an issue by including `Closes #<issue-number>` for
context.
If this PR includes changes to the storage format, public APIs,
or has breaking changes, use `!` (e.g., feat!: ...) -->
### Summary and Changelog
<!-- Short, plain-English summary of what users gain or what changed in
behavior.
Followed by a detailed log of all the changes. Highlight if any code
was copied. -->
### Impact
<!-- Describe any public API or user-facing feature change or any
performance impact. -->
### Risk Level
<!-- Accepted values: none, low, medium or high. Other than `none`, explain
the risk.
If medium or high, explain what verification was done to mitigate the
risks. -->
### Documentation Update
<!-- Describe any necessary documentation update if there is any new
feature, config, or user-facing change. If not, put "none".
- The config description must be updated if new configs are added or the
default value of the configs are changed.
- Any new feature or user-facing change requires updating the Hudi website.
Please follow the
[instruction](https://hudi.apache.org/contribute/developer-setup#website)
to make changes to the website. -->
### Contributor's checklist
- [ ] Read through [contributor's
guide](https://hudi.apache.org/contribute/how-to-contribute)
- [ ] Enough context is provided in the sections above
- [ ] Adequate tests were added if applicable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]