hudi-agent commented on code in PR #18867:
URL: https://github.com/apache/hudi/pull/18867#discussion_r3314587661
##########
website/docs/sql_queries.md:
##########
@@ -325,13 +343,32 @@ see all changes in a given time window and not just the
latest values.
Please refer to [configurations](basic_configurations.md) section for the
important configuration options.
+:::note Column pruning for incremental queries
+Starting with Hudi 1.2.0, **column pruning** is applied to incremental
queries. Only the columns
+actually referenced in the query are read from the base files, reducing I/O
for wide tables. No
+configuration change is required β this optimization is applied automatically.
+:::
+
:::note Incremental Query Checkpointing between Hudi 0.x and 1.0.
In Hudi 1.0, we switch the incremental and CDC query to used completion time,
instead of instant time, to determine the
range of commits to incrementally pull from. The checkpoint stored for Hudi
incremental source and related sources is
also changed to use completion time. To support compatiblity, Hudi does a
checkpoint translation from requested instant
time to completion time depending on the source table version.
:::
+### Vector Similarity Search
+
+Hudi 1.2.0 introduces a `hudi_vector_search` table-valued function (TVF) for
approximate
+nearest-neighbor (ANN) search over `VECTOR` columns. This is an extension of
the
+`hudi_table_changes` TVF pattern.
+
+```sql
+-- Find the 10 nearest neighbors to a query vector in the 'embedding' column
+SELECT * FROM hudi_vector_search('db.embeddings_table', 'embedding',
ARRAY(0.1, 0.2, ...), 10);
+```
+
+See [Vector Search](vector_search.md) for the full API, supported metrics, and
setup instructions.
Review Comment:
π€ The illustrative `ARRAY(0.1, 0.2, ...)` literal isn't valid SQL β readers
who copy this verbatim will get a parse error. It might help to either (a) show
a concrete 3β4 element array (e.g., `ARRAY(0.1, 0.2, 0.3, 0.4)`) matching a
plausible `dim`, or (b) add a one-line note that the `...` is a placeholder for
the remaining elements up to the column's declared dimension.
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/writing_data.md:
##########
@@ -441,5 +441,33 @@ inputDF.write.format("hudi")
.save(basePath)
```
+### Rolling Extra Metadata
+
+Rolling extra metadata allows you to automatically carry forward selected
commit metadata keys to every subsequent commit and clean instant without
having to walk the full timeline. This is particularly useful for persisting
checkpoint information such as Kafka offsets or Flink checkpoints across
commits.
+
+| Config | Default | Description |
+|---|---|---|
+| `hoodie.write.rolling.metadata.keys` | `""` (disabled) | Comma-separated
list of extra metadata keys to carry forward to each new commit and clean
instant. Values are read from recent completed instants and written into the
new commit metadata, so they remain accessible without walking the timeline.
New values override old ones. Only applies to data table commits and clean
instants. |
+| `hoodie.write.rolling.metadata.timeline.lookback.commits` | `10` | Maximum
number of completed instants to walk back when searching for the configured
rolling metadata keys. Higher values improve resilience at a small performance
cost. |
+
+**Example:**
+
+```java
+inputDF.write.format("hudi")
+ .option("hoodie.write.rolling.metadata.keys",
"kafka.offset.partition.0,kafka.offset.partition.1")
+ .option("hoodie.write.rolling.metadata.timeline.lookback.commits", "10")
+ // ... other options
+ .save(basePath)
+```
+
+### Advanced Storage Options
+
+The following advanced storage configuration options are available in 1.2.0:
+
+| Config | Default | Description |
+|---|---|---|
+| `hoodie.parquet.write.config.injector.class` | (none) | Fully-qualified
class name of a custom `HoodieParquetConfigInjector` implementation. Use this
to inject custom Parquet writer properties (e.g., disable dictionary encoding,
set bloom filter sizes) without modifying the Hudi source. The implementing
class must implement `org.apache.hudi.io.HoodieParquetConfigInjector`. |
Review Comment:
π€ The `hoodie.parquet.write.config.injector.class` description references
the interface `org.apache.hudi.io.HoodieParquetConfigInjector`. It might help
to confirm the exact package path (the `org.apache.hudi.io` package contains a
large number of writer classes and a `HoodieParquetConfigInjector` interface
located there should be cross-checked) and to add a one-line note on the
lifecycle β is the injector instantiated once per writer, per task, or per
file? Users implementing this SPI will need that detail.
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/record_merger.md:
##########
@@ -207,6 +207,19 @@ Result data after merging using
`DefaultHoodieRecordPayload` (always honors orde
1 2 name_2 price_2
```
+### ReverseOrderHoodieRecordPayload (deprecated)
Review Comment:
π€ This adds a section for `ReverseOrderHoodieRecordPayload` and labels it
`(deprecated)` in the same change. If it's deprecated, can the section say what
the recommended replacement is (e.g., a specific merge mode or merger impl) so
new users don't adopt a deprecated payload? Otherwise it reads as "here is a
new option β also, don't use it".
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/variant_type.md:
##########
@@ -253,12 +253,34 @@ binary `value` field.
| Engine | VARIANT Support |
|:-------|:---------------|
-| **Spark 4.0+** | Native `VariantType` β full read/write/query |
+| **Spark 4.0** | Native `VariantType` β full read/write/query; MOR + VARIANT
fully supported; native `df.write` with `VariantType` on V1 datasource |
+| **Spark 4.1** | Native `VariantType` β full read/write/query; MOR + VARIANT
fixed in 1.2.0 |
| **Spark 3.x** | Reads as `STRUCT<value: BINARY, metadata: BINARY>` β
backward compatible |
-| **Flink** | Reads as `ROW<metadata BYTES, value BYTES>` β cross-engine
compatible |
+| **Flink 2.1+** | Reads and writes as `VARIANT` (Flink's native
`LogicalTypeRoot.VARIANT`) β cross-engine compatible |
+| **Flink < 2.1** | **Not supported** β throws
`UnsupportedOperationException`. Upgrade to Flink 2.1 to use VARIANT columns. |
-A VARIANT table written by Spark 4.0 can be read by Spark 3.x or Flink, and
vice versa. The
-binary encoding is engine-independent.
+A VARIANT table written by Spark 4.0/4.1 can be read by Spark 3.x or Flink
2.1+, and vice versa.
+The binary encoding is engine-independent.
+
+:::caution Flink version requirement
+VARIANT columns require **Flink 2.1 or later**. On Flink 2.0 and earlier, any
operation involving
+a VARIANT column throws:
+> `UnsupportedOperationException: VARIANT type is only supported in Flink
2.1+. Please upgrade your
+> Flink version to use Variant columns.`
+:::
+
+## Metastore Sync
+
+When syncing VARIANT column schemas to external catalogs, Hudi maps the binary
encoding to the
+target catalog's native struct type:
+
+| Catalog | VARIANT representation |
+|:--------|:----------------------|
+| Hive | `STRUCT<metadata:BINARY, value:BINARY>` |
+| BigQuery | `STRUCT` with `metadata` and `value` fields (`BYTES` type) |
+
+Query engines that support VARIANT (Spark 4.0+, Flink 2.1+) read the table
directly using the
Review Comment:
π€ The struct field order shown for VARIANT differs between rows: Spark 3.x
reads as `STRUCT<value: BINARY, metadata: BINARY>` (value first), while the new
Metastore Sync table below shows Hive as `STRUCT<metadata:BINARY,
value:BINARY>` (metadata first), and Flink originally had `ROW<metadata BYTES,
value BYTES>`. If both orderings are accurate to the actual implementations, a
brief sentence noting that field ordering may differ across engines/catalogs
but the binary payloads are interchangeable would help readers reason about
cross-engine reads.
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/syncing_metastore.md:
##########
@@ -297,3 +297,40 @@ While using hive beeline query, you need to enter settings:
```bash
set hive.input.format =
org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
```
+
+## Spark Catalog Metastore Client
+
+When running Hudi inside a Spark environment that already has Hive support
enabled (e.g., SparkSQL with `spark.sql.catalogImplementation=hive`), the
standard `IMetaStoreClient` initialization can conflict with Spark's own Hive
classloader. Setting
+
+```properties
+hoodie.datasource.hive_sync.use_spark_catalog=true
+```
+
+(default: `false`) makes Hudi use `SparkCatalogMetaStoreClient` β a
Spark-native `IMetaStoreClient` implementation β instead of creating its own.
This avoids classloader conflicts in Hive-on-Spark setups. Requires a
`SparkSession` with Hive support active.
+
+## HMS 4.x Support via JDBC Fallback
+
+HMS 4.x changed several Thrift API method signatures (e.g., `get_table` β
`get_table_req`), which makes the standard Thrift-based HMS client
incompatible. Hudi 1.2.0 adds automatic fallback: when the first Thrift call
fails with a `TApplicationException` indicating an API mismatch, all subsequent
metadata operations are transparently rerouted through the JDBC path.
Review Comment:
π€ The HMS 4.x fallback description says the fallback engages 'when the first
Thrift call fails with a `TApplicationException` indicating an API mismatch.'
It would help to clarify: (1) is the fallback per-table/per-session (cached
after first detection) or re-detected on every call? and (2) what happens if
`mode=jdbc` is set but the JDBC URL/credentials are wrong β does the user see
the HMS error or the JDBC error? @yihua could you verify the exact trigger
conditions and failure-surface behavior here so the docs match the
implementation?
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/sql_queries.md:
##########
@@ -325,13 +343,32 @@ see all changes in a given time window and not just the
latest values.
Please refer to [configurations](basic_configurations.md) section for the
important configuration options.
+:::note Column pruning for incremental queries
+Starting with Hudi 1.2.0, **column pruning** is applied to incremental
queries. Only the columns
+actually referenced in the query are read from the base files, reducing I/O
for wide tables. No
+configuration change is required β this optimization is applied automatically.
+:::
Review Comment:
π€ This section could clarify the scope of column pruning for incremental
queries β is it applied to both COW and MOR tables, and to both Spark and Flink
incremental readers? A wide-table user on MOR will want to know whether pruning
also applies to log file reads or only to base files. Right now the note only
mentions base files, which leaves MOR semantics ambiguous.
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/concurrency_control.md:
##########
@@ -359,6 +391,32 @@ hoodie.write.lock.client.num_retries
*Setting the right values for these depends on a case by case basis; some
defaults have been provided for general cases.*
+## Pre-Write Cleaner Policy
+
+When running multi-writer pipelines, failed writes can accumulate on storage
if a writer crashes before a clean cycle runs. Hudi 1.2.0 introduces
`hoodie.prewrite.cleaner.policy` to proactively handle this at write startup:
+
+| Config Key | Default | Description |
+|---|---|---|
+| `hoodie.prewrite.cleaner.policy` | `NONE` | Policy applied before starting a
new ingestion write commit. `NONE`: no pre-write action (default). `CLEAN`:
force a clean table service call (also rolls back failed writes).
`ROLLBACK_FAILED_WRITES`: only roll back failed writes without running a full
clean. |
+
+This is useful when a writer is perpetually crashing before completing a
`CLEAN`. See [Cleaning](cleaning.md) for the full list of cleaning
configurations.
Review Comment:
π€ There's a description mismatch between this page and `cleaning.md` for
`hoodie.prewrite.cleaner.policy=CLEAN`. Here it says `CLEAN` will "force a
clean table service call (also rolls back failed writes)", but on `cleaning.md`
(Pre-Write Cleaner Policy section) `CLEAN` is described as "run a clean pass
before each write" without mentioning rollback semantics. If `CLEAN` does
implicitly roll back failed writes, it would help to align both pages on that
detail (and call out the relationship to `ROLLBACK_FAILED_WRITES`).
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/lance_file_format.md:
##########
@@ -39,18 +68,21 @@ TBLPROPERTIES (
.option("hoodie.datasource.write.recordkey.field", "id")
.option("hoodie.record.merger.impls",
"org.apache.hudi.DefaultSparkRecordMerger")
- .option("hoodie.datasource.write.base.file.format", "lance")
+ .option("hoodie.table.base.file.format", "lance")
.mode("overwrite")
.save("/path/to/my_ai_table"))
```
### Required Dependencies
-Add the Lance Spark bundle to your Spark classpath:
+The Lance JAR is not bundled in Hudi. Add the appropriate Lance Spark bundle
to your Spark classpath:
-| Component | Maven Coordinates |
-|:----------|:-----------------|
-| Lance Spark Bundle (Spark 3.5) |
`org.lance:lance-spark-bundle-3.5_2.12:0.4.0` |
+| Spark Version | Maven Coordinates |
+|:--------------|:-----------------|
+| Spark 3.4 | `org.lance:lance-spark-3.4_2.12:0.4.0` |
+| Spark 3.5 | `org.lance:lance-spark-3.5_2.12:0.4.0` |
+| Spark 4.0 | `org.lance:lance-spark-4.0_2.13:0.4.0` |
Review Comment:
π€ The Maven coordinates here dropped `-bundle` from the artifact names (e.g.
`lance-spark-3.5_2.12` rather than `lance-spark-bundle-3.5_2.12`), but the
existing shell example just below still references
`lance-spark-bundle-3.5_2.12-0.4.0.jar` for the `LANCE_BUNDLE_JAR` env var.
Could you double-check which artifact id is the published one for 0.4.0 and
make the table + example consistent? As written, a user copying both will end
up with mismatched artifact names.
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/writing_data.md:
##########
@@ -441,5 +441,33 @@ inputDF.write.format("hudi")
.save(basePath)
```
+### Rolling Extra Metadata
+
+Rolling extra metadata allows you to automatically carry forward selected
commit metadata keys to every subsequent commit and clean instant without
having to walk the full timeline. This is particularly useful for persisting
checkpoint information such as Kafka offsets or Flink checkpoints across
commits.
+
+| Config | Default | Description |
+|---|---|---|
+| `hoodie.write.rolling.metadata.keys` | `""` (disabled) | Comma-separated
list of extra metadata keys to carry forward to each new commit and clean
instant. Values are read from recent completed instants and written into the
new commit metadata, so they remain accessible without walking the timeline.
New values override old ones. Only applies to data table commits and clean
instants. |
+| `hoodie.write.rolling.metadata.timeline.lookback.commits` | `10` | Maximum
number of completed instants to walk back when searching for the configured
rolling metadata keys. Higher values improve resilience at a small performance
cost. |
+
+**Example:**
+
+```java
+inputDF.write.format("hudi")
+ .option("hoodie.write.rolling.metadata.keys",
"kafka.offset.partition.0,kafka.offset.partition.1")
+ .option("hoodie.write.rolling.metadata.timeline.lookback.commits", "10")
+ // ... other options
+ .save(basePath)
+```
+
+### Advanced Storage Options
+
+The following advanced storage configuration options are available in 1.2.0:
+
+| Config | Default | Description |
+|---|---|---|
+| `hoodie.parquet.write.config.injector.class` | (none) | Fully-qualified
class name of a custom `HoodieParquetConfigInjector` implementation. Use this
to inject custom Parquet writer properties (e.g., disable dictionary encoding,
set bloom filter sizes) without modifying the Hudi source. The implementing
class must implement `org.apache.hudi.io.HoodieParquetConfigInjector`. |
+| `hoodie.hfile.writes.allow.duplicates` | `false` | Allow duplicate keys to
be written into HFile-format log blocks. This is an escape hatch for
bootstrapping a Record Level Index (RLI) when the source data table contains
pre-existing duplicates; enabling it avoids bootstrap job failures. Note that
with duplicates present, there is no deterministic guarantee of which record
will be retained in the index. Do not enable for normal write workloads. |
+
## Java Client
We can use plain java to write to hudi tables. To use Java client we can
refere
[here](https://github.com/apache/hudi/blob/master/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java)
Review Comment:
π€ [Line 477] For `hoodie.hfile.writes.allow.duplicates`, the description
warns 'Do not enable for normal write workloads' β it might help to add an
explicit note about how to disable it again after the RLI bootstrap completes
(i.e., is it safe to set back to `false` on the next commit?) and whether
enabling it leaves any persistent state in the table.
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/cli.md:
##########
@@ -340,6 +340,13 @@ $ hdfs dfs -ls /app/uber/trips/.hoodie/*.inflight
-rw-r--r-- 3 vinoth supergroup 321984 2016-10-05 23:18
/app/uber/trips/.hoodie/20161005225920.inflight
```
+To list all inflight and requested instants that have been running longer than
a specified number of minutes, use `commits show_infights` (note the literal
spelling of the command key):
Review Comment:
π€ `commits show_infights` (with the parenthetical "note the literal spelling
of the command key") β is this a real CLI command name (typo carried into the
source) or a documentation typo? If the underlying command key really is
misspelled as `show_infights`, it would be worth filing a follow-up to add
`show_inflights` as an alias so users don't have to type the typo. If the
canonical key actually is `show_inflights`, this section needs a fix.
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/ingestion_flink.md:
##########
@@ -112,15 +112,24 @@ the compaction options `compaction.delta_commits` and
`compaction.delta_seconds`
For `INSERT` mode write operations, new Parquet files are written directly,
and the [autoβfile sizing](file_sizing.md) is not enabled.
-### In-Memory Buffer Sort
+### Append Write Buffer
-For append-only workloads, Hudi supports in-memory buffer sorting to improve
Parquet compression ratio. When enabled, data is sorted within the write buffer
before being flushed to disk. This improves columnar file compression
efficiency by grouping similar values together.
+For append-only workloads, Hudi supports several write-buffer strategies that
improve Parquet compression ratio and write throughput. Data is sorted or
batched within the write buffer before being flushed to disk, grouping similar
values together for better columnar compression.
-| Option Name | Required | Default | Remarks
|
-|-----------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------|
-| `write.buffer.sort.enabled` | `false` | `false` | Whether to enable buffer
sort within append write function. Improves Parquet compression ratio by
sorting data before writing |
-| `write.buffer.sort.keys` | `false` | `N/A` | Sort keys concatenated by
comma (e.g., `col1,col2`). Required when `write.buffer.sort.enabled` is `true`
|
-| `write.buffer.size` | `false` | `1000` | Buffer size in number of
records. When buffer reaches this size, data is sorted and flushed to disk
|
+The buffer strategy is selected with `write.buffer.type`. In Hudi 1.2.0 this
replaces the deprecated `write.buffer.sort.enabled` flag.
+
+| Option Name | Required | Default | Remarks
|
+|------------------------------------------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.buffer.type` | `false` | `NONE` | Buffer
type for append write. Values: `NONE` (no buffering), `BOUNDED_IN_MEMORY`
(double buffer with async write), `DISRUPTOR` (ring-buffer with async write,
recommended for higher throughput), `CONTINUOUS_SORT` (TreeMap-based continuous
sort with incremental draining) |
+| `write.buffer.size` | `false` | `1000` | Record
count threshold at which the buffer is flushed. Applies to all non-`NONE`
buffer types
|
+| `write.buffer.sort.keys` | `false` | `N/A` |
Comma-separated sort key columns (e.g., `col1,col2`). Required for `DISRUPTOR`
and `CONTINUOUS_SORT` modes
|
+| `write.buffer.sort.continuous.drain.size`| `false` | `1` | Number of
records drained per flush cycle in `CONTINUOUS_SORT` mode. Default 1 provides
smooth incremental draining; increase for batching (e.g., 10β100)
|
+
+:::note
+`write.buffer.sort.enabled` is deprecated as of 1.2.0. Use
`write.buffer.type=DISRUPTOR` instead for equivalent behavior. The `DISRUPTOR`
and `CONTINUOUS_SORT` modes require `write.buffer.sort.keys` to be set.
+:::
+
+For Disruptor-specific tuning options, see
[flink_tuning.md](flink_tuning.md#disruptor-buffer-tuning).
Review Comment:
π€ The note says "`write.buffer.sort.enabled` is deprecated as of 1.2.0. Use
`write.buffer.type=DISRUPTOR` instead for equivalent behavior." But the table
above describes `DISRUPTOR` as a "ring-buffer with async write, recommended for
higher throughput" while the legacy `write.buffer.sort.enabled` was an
in-memory sort over a bounded buffer to improve Parquet compression β
`CONTINUOUS_SORT` (TreeMap-based) seems closer to the legacy behavior. Could
you clarify which new mode is the actual replacement for the old sort-enabled
path and what the migration recipe is (e.g., which keys to set for the same
on-disk layout)?
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/metadata_indexing.md:
##########
@@ -36,7 +36,7 @@ For more information on these indexes please refer [metadata
section](metadata/#
:::note
Please note in order to create secondary index:
1. The table must have a primary key and merge mode should be
[COMMIT_TIME_ORDERING](record_merger.md#commit_time_ordering).
-2. Record index must be enabled. This can be done by setting
`hoodie.metadata.record.index.enable=true` and then creating `record_index`.
Please note the example below.
+2. Record index must be enabled. This can be done by setting
`hoodie.metadata.global.record.level.index.enable=true` and then creating
`record_index`. Please note the example below.
Review Comment:
π€ Step 2 was changed from `hoodie.metadata.record.index.enable=true` to
`hoodie.metadata.global.record.level.index.enable=true` as the prerequisite for
`CREATE INDEX record_index`. This implies that the SQL `record_index`
corresponds to the **global** RLI. Could you confirm that's correct, and that
the partitioned RLI is not what's being created by `CREATE INDEX record_index`?
Right now a reader has to cross-reference the renames table later in the file
to know which RLI variant they are enabling.
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/metadata_indexing.md:
##########
@@ -300,6 +300,40 @@ spark-submit \
--spark-memory 2g
```
+## New in 1.2.0
+
+### Record-Level Index Config Key Renames
+
+Hudi 1.2.0 renamed several RLI configuration keys to better distinguish
between the _global_ RLI (unique record key
+across the entire table) and the _partitioned_ RLI (unique record key within
each partition). The old keys remain
Review Comment:
π€ This rename table needs a careful re-check. As I read it: the existing key
`hoodie.metadata.record.index.enable` (which in 1.0/1.1 enabled the partitioned
RLI) is now described as an alias for
`hoodie.metadata.global.record.level.index.enable` β i.e. its semantic meaning
has changed from "partitioned RLI" to "global RLI". If that's correct, this is
a quietly breaking change for any existing user config and deserves a stronger
upgrade callout. If the alias preserves the original semantic (partitioned),
then the "New canonical key" column is wrong. @yihua could you double-check the
alias mapping in `HoodieMetadataConfig` so we don't mis-document this?
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/blob_unstructured_data.md:
##########
@@ -290,15 +292,44 @@ Out-of-line BLOBs keep the Hudi table footprint extremely
small:
| Property | Default | Description |
|:---------|:--------|:------------|
-| `hoodie.read.blob.inline.mode` | `CONTENT` | Controls how INLINE BLOBs are
read. `CONTENT` materializes raw bytes in the `data` column. `DESCRIPTOR`
surfaces `(position, size)` coordinates rewritten as OUT_OF_LINE references. |
+| `hoodie.read.blob.inline.mode` | `DESCRIPTOR` | Controls how INLINE BLOBs
are read. `DESCRIPTOR` (default) returns an out-of-line-shaped reference
pointing at the in-file coordinates of the bytes β no bytes are materialized.
`CONTENT` materializes the raw inline bytes directly in the `data` field on
every read. |
Review Comment:
π€ The default for `hoodie.read.blob.inline.mode` flipped from `CONTENT` to
`DESCRIPTOR`. For users upgrading from a pre-release that exposed `CONTENT` as
default, this changes the result of `SELECT data FROM table` (now returns a
descriptor reference instead of raw bytes). Could you add an explicit upgrade
note (or call out in the release-notes section of this page) so users who built
queries against the old default are warned?
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/ingestion_flink.md:
##########
@@ -349,10 +358,216 @@ For Flink streaming reads, rate limiting helps avoid
backpressure when processin
The average read rate can be calculated as: **`read.splits.limit` /
`read.streaming.check-interval`** splits per second.
+Hudi 1.2.0 adds `read.commits.limit`, which complements `read.splits.limit` by
capping the number of commits (instants) consumed per check interval. This is
useful when tables have many small commits β limiting commits bounds the number
of splits regardless of their individual size.
+
+### Options
+
+| Option Name | Required | Default | Remarks
|
+|---------------------------------|----------|---------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.rate.limit` | `false` | `0` | Write
record rate limit per second to prevent traffic jitter and improve stability.
Default is 0 (no limit)
|
+| `read.splits.limit` | `false` | `Integer.MAX_VALUE` | Maximum
number of splits allowed to read in each instant check for streaming reads.
Average read rate = `read.splits.limit`/`read.streaming.check-interval`.
Default is no limit |
+| `read.commits.limit` | `false` | `(none)` | Maximum
number of commits (instants) allowed to read in each check interval.
Complements `read.splits.limit`. Average rate =
`read.commits.limit`/`read.streaming.check-interval`. Default is no limit |
+| `read.streaming.check-interval` | `false` | `60` | Check
interval in seconds for streaming reads. Default is 60 seconds (1 minute)
|
+
+## Flink Source V2 (RFC-95)
+
+Hudi 1.2.0 introduces a new Flink source implementation based on
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface),
available as an opt-in feature via the `read.source-v2.enabled` flag.
+
+### Why Source V2?
+
+The legacy Hudi Flink source was built on Flink's `SourceFunction` API. The
FLIP-27 rewrite brings:
+
+- **Resumable split assignment** β splits can be checkpointed independently,
enabling finer-grained recovery
+- **Checkpoint alignment** β the new API participates in Flink's coordinated
checkpoint protocol, improving end-to-end consistency
+- **Push-down support** β predicate push-down, partition pruning, and `LIMIT`
push-down are supported through the new source interface, reducing data scanned
at the source level
+
+### Enabling Source V2
+
+```sql
+CREATE TABLE t1 (
+ uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
+ name VARCHAR(10),
+ age INT,
+ ts TIMESTAMP(3),
+ `partition` VARCHAR(20)
+)
+PARTITIONED BY (`partition`)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = '${path}',
+ 'table.type' = 'MERGE_ON_READ',
+ 'read.source-v2.enabled' = 'true' -- enable the FLIP-27 source
+);
+```
+
+### Options
+
+| Option Name | Required | Default | Remarks
|
+|---------------------------|----------|---------|---------------------------------------------------------------------------------------------------------|
+| `read.source-v2.enabled` | `false` | `false` | Whether to use the FLIP-27
new source (Source V2) to consume data files. Default is the legacy source |
+
+### Savepoint Incompatibility
+
+:::warning
+Savepoints taken with the **legacy source** (`read.source-v2.enabled=false`)
are **not compatible** with the Source V2 source, and vice versa. When
switching from the legacy source to Source V2, start a fresh job without
restoring from a legacy savepoint. If you need to preserve read progress,
record the last committed instant time and use `read.start-commit` to resume
from that point.
+:::
+
+## Record-Level Index (RLI) Bucket Indexing for Flink
+
+As of Hudi 1.2.0, the Flink writer supports the Record-Level Index (RLI)
backed by the metadata table, in addition to the existing `FLINK_STATE` and
`BUCKET` index types. RLI is stored in the metadata table and avoids the
state-backend overhead of `FLINK_STATE`, while supporting full global or
partition-scoped uniqueness guarantees.
+
+Two RLI variants are available via `index.type`:
+
+- `RECORD_LEVEL_INDEX` β partitioned RLI; enforces uniqueness per (partition
path, record key) pair
+- `GLOBAL_RECORD_LEVEL_INDEX` β global RLI; enforces uniqueness across all
partitions
+
+### Bootstrap
+
+When enabling RLI on an existing table, the bootstrap process loads existing
record locations into RocksDB before the first write. Bootstrap is triggered by
setting `index.bootstrap.enabled=true`.
+
+```sql
+CREATE TABLE my_hudi_table (
+ id BIGINT,
+ name STRING,
+ ts BIGINT,
+ dt STRING,
+ PRIMARY KEY (id) NOT ENFORCED
+)
+PARTITIONED BY (dt)
+WITH (
+ 'connector' = 'hudi',
+ 'path' = 'hdfs:///warehouse/my_hudi_table',
+ 'table.type' = 'MERGE_ON_READ',
+ 'index.type' = 'RECORD_LEVEL_INDEX',
+ 'metadata.enabled' = 'true',
+ 'index.bootstrap.enabled' = 'true', -- enable bootstrap on first run
+ 'index.bootstrap.rocksdb.path' = '/tmp/hudi-rli-rocksdb'
+);
+```
+
+After the first successful checkpoint (which completes bootstrap), restart the
job with `index.bootstrap.enabled=false`.
+
+### In-Pipeline MDT Compaction
+
+For RLI workloads, the metadata table (MDT) accumulates log files that need
periodic compaction. The option `metadata.compaction.async.enabled` (default
`true`) runs MDT compaction inside the Flink pipeline after every
`metadata.compaction.delta_commits` (default `10`) delta commits.
+
+### Options
+
+| Option Name | Required | Default | Remarks
|
+|-------------------------------------|----------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `index.type` | `false` | `FLINK_STATE` | Set to
`RECORD_LEVEL_INDEX` or `GLOBAL_RECORD_LEVEL_INDEX` to use the
metadata-table-backed RLI |
+| `index.bootstrap.enabled` | `false` | `false` | Bootstrap the
index from the existing table on first run. Blocks checkpoints during bootstrap
|
+| `index.bootstrap.rocksdb.path` | `false` | system temp dir | Local
path for RocksDB storage during RLI bootstrap. Each task manager creates a
unique subdirectory under this path |
+| `index.rli.cache.size` | `false` | `256` | Maximum memory
in MB for the RLI cache per bucket-assign task. Dynamically adjusted based on
historical usage |
+| `index.rli.lookup.minibatch.size` | `false` | `1000` | Maximum records
buffered per mini-batch during RLI lookup. Mini-batching reduces individual
index lookups. Minimum effective value is 1000 |
+| `metadata.compaction.async.enabled` | `false` | `true` | Whether to run
MDT compaction asynchronously within the Flink pipeline. Recommended to keep
enabled for RLI workloads |
+| `metadata.compaction.delta_commits` | `false` | `10` | Number of MDT
delta commits that trigger in-pipeline compaction
|
+
+:::note
+`GLOBAL_RECORD_LEVEL_INDEX` requires `metadata.enabled=true` and
`index.global.enabled=true`. The Flink table factory validates these
constraints automatically.
+:::
+
+## Lookup Join
+
+Hudi 1.2.0 adds a RocksDB-backed cache option for Flink lookup joins against
Hudi dimension tables. This avoids JVM heap pressure when the dimension table
is large.
+
### Options
-| Option Name | Required | Default | Remarks
|
-|---------------------------------|----------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `write.rate.limit` | `false` | `0` | Write
record rate limit per second to prevent traffic jitter and improve stability.
Default is 0 (no limit)
|
-| `read.splits.limit` | `false` | `Integer.MAX_VALUE` | Maximum
number of splits allowed to read in each instant check for streaming reads.
Average read rate = `read.splits.limit`/`read.streaming.check-interval`.
Default is no limit |
-| `read.streaming.check-interval` | `false` | `60` | Check
interval in seconds for streaming reads. Default is 60 seconds (1 minute)
|
+| Option Name | Required | Default
| Remarks
|
+|--------------------------------|----------|---------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
+| `lookup.join.cache.type` | `false` | `heap`
| Storage backend for the lookup join cache. `heap` (default) stores rows in
JVM heap; `rocksdb` stores rows off-heap in an embedded RocksDB instance |
+| `lookup.join.rocksdb.path` | `false` |
`${java.io.tmpdir}/hudi-lookup-rocksdb` | Local directory for RocksDB data when
`lookup.join.cache.type=rocksdb`. Cleaned up when the lookup function closes
|
+| `lookup.async` | `false` | `false`
| Whether to enable async lookup join. Async join can improve throughput when
the lookup function has high latency |
+| `lookup.async-thread-number` | `false` | `16`
| Number of threads for async lookup join
|
+
+### Example
+
+```sql
+-- Streaming fact table
+CREATE TABLE orders (
+ order_id BIGINT,
+ customer_id BIGINT,
+ amount DOUBLE,
+ ts TIMESTAMP(3),
+ WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
+ PRIMARY KEY (order_id) NOT ENFORCED
+) WITH (
+ 'connector' = 'hudi',
+ 'path' = 'hdfs:///warehouse/orders',
+ 'table.type' = 'MERGE_ON_READ',
+ 'read.streaming.enabled' = 'true'
+);
+
+-- Hudi dimension table with RocksDB-backed lookup cache
+CREATE TABLE customers (
+ customer_id BIGINT,
+ name STRING,
+ city STRING,
+ PRIMARY KEY (customer_id) NOT ENFORCED
+) WITH (
+ 'connector' = 'hudi',
+ 'path' = 'hdfs:///warehouse/customers',
+ 'lookup.join.cache.type' = 'rocksdb',
+ 'lookup.join.rocksdb.path' = '/tmp/hudi-lookup-rocksdb'
+);
+
+-- Lookup join
+SELECT o.order_id, c.name, o.amount
+FROM orders AS o
+JOIN customers FOR SYSTEM_TIME AS OF o.ts AS c
+ ON o.customer_id = c.customer_id;
+```
+
+## Virtual Metadata Columns
+
+Flink 1.2.0 supports declaring Hudi metadata fields as `METADATA VIRTUAL`
columns in the DDL. This allows accessing system metadata (e.g., commit time,
record key) without storing them as regular data columns.
+
+```sql
+CREATE TABLE events (
+ event_id BIGINT,
+ payload STRING,
+ -- virtual metadata columns (read-only, not persisted as data)
+ _hoodie_commit_time STRING METADATA VIRTUAL,
+ _hoodie_record_key STRING METADATA VIRTUAL,
+ _hoodie_partition_path STRING METADATA VIRTUAL,
+ PRIMARY KEY (event_id) NOT ENFORCED
+)
Review Comment:
π€ Looks like a typo: this section starts with "Flink 1.2.0 supports
declaring Hudi metadata fields..." but everywhere else in this PR the new
feature is attributed to **Hudi** 1.2.0 (Flink itself does not have a 1.2.0
release in this version range). Should this read "Hudi 1.2.0 supports declaring
Hudi metadata fields as `METADATA VIRTUAL` columns in Flink DDL"?
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/indexes.md:
##########
@@ -214,10 +214,12 @@ for more details. All these, support the index types
mentioned [above](#addition
For Flink DataStream and Flink SQL, Bucket index and Flink state index are
supported.
Following are the basic configs that control the indexing behavior. Please
refer [the Flink
configurations](configurations.md#Flink-Options-advanced-configs) for advanced
configs.
-| Config Name | Default | Description
|
-|----------------------------|------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| index.type | FLINK_STATE (Optional) | Index type of Flink
write job, default is using state backed index. Possible values:<br />
<ul><li>FLINK_STATE</li><li>BUCKET</li></ul><br /> `Config Param: INDEX_TYPE`
|
-| hoodie.index.bucket.engine | SIMPLE (Optional) |
org.apache.hudi.index.HoodieIndex$BucketIndexEngineType: Determines the type of
bucketing or hashing to use when `hoodie.index.type` is set to `BUCKET`.
Possible Values: <br /> <ul><li>SIMPLE</li><li>CONSISTENT_HASHING</li></ul> |
+| Config Name | Default | Description
|
+|----------------------------|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| index.type | FLINK_STATE (Optional) | Index type of Flink
write job, default is using state backed index. Possible values:<br />
<ul><li>FLINK_STATE</li><li>BUCKET</li><li>RECORD_LEVEL_INDEX (since
1.2.0)</li><li>GLOBAL_RECORD_LEVEL_INDEX (since 1.2.0)</li></ul><br /> `Config
Param: INDEX_TYPE` |
Review Comment:
π€ The note here says `RECORD_LEVEL_INDEX` enforces uniqueness per (partition
path, record key) pair while `GLOBAL_RECORD_LEVEL_INDEX` enforces uniqueness
across all partitions. That maps to the partitioned/global RLI distinction in
metadata_indexing.md. However, on metadata_indexing.md the new config
`hoodie.metadata.global.record.level.index.enable` is described as the rename
of the old `hoodie.metadata.record.index.enable`. It would help readers if this
page linked to the specific config (and renames table) and called out that
`RECORD_LEVEL_INDEX` requires `hoodie.metadata.record.level.index.enable=true`
while `GLOBAL_RECORD_LEVEL_INDEX` requires
`hoodie.metadata.global.record.level.index.enable=true` (plus
`index.global.enabled=true`), to avoid mis-pairings.
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/ingestion_flink.md:
##########
@@ -185,7 +194,7 @@ Hudi Flink writer supports two types of writer indexes:
| CrossβPartition Changes | Cannot handle changes among partitions (unless
input is a CDC stream)
| No limit on
handling crossβpartition changes
|
:::note
-Bucket index supports only the `UPSERT` write operation and cannot be used
with the [append mode](#append-mode) in Flink.
+Bucket index supports `UPSERT` write operations on both COW and MOR tables. As
of Hudi 1.2.0, MOR + bucket index + upsert is fully supported. Bucket index
cannot be used with the [append mode](#append-mode) in Flink.
Review Comment:
π€ The updated note says "As of Hudi 1.2.0, MOR + bucket index + upsert is
fully supported." Earlier text on this page (and in older releases) restricted
bucket index to COW-only for upsert in some scenarios. Could you add a
one-liner about any caveats β for example, whether log compaction is required,
or whether `CONSISTENT_HASHING` is supported on MOR β so readers know what
"fully supported" includes?
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
##########
website/docs/metadata.md:
##########
@@ -129,6 +129,38 @@ can bring up the writers sequentially after stopping the
writers for enabling me
configurations to only a subset of writers or table services is unsafe and can
lead to loss of data. So, please ensure you enable
metadata table across all writers.
+## New in 1.2.0
+
+### Auto-Delete of Disabled MDT Partitions
+
+When an index is disabled in the write config, Hudi previously deleted the
corresponding metadata table partition
+automatically. Hudi 1.2.0 makes this behavior configurable.
+
+| Config Name | Default | Description |
+|---|---|---|
+| `hoodie.metadata.auto.delete.partitions` | `true` | When enabled (default),
metadata table partitions (indexes) that are disabled in the write config are
automatically deleted. Set to `false` to prevent accidental deletion in
multi-writer environments where not all writers may have the same config β
users must then drop indexes explicitly via Hudi CLI or `DROP INDEX`. |
+
+### MDT Cleaner Policy Derivation
+
+| Config Name | Default | Description |
+|---|---|---|
+| `hoodie.metadata.derive.from.datatable.clean.policy` | `true` | When
enabled, the metadata table's cleaner uses the same cleaning policy (retention
count, hours, etc.) as the data table. See also
[cleaning](cleaning.md#mdt-cleaner-inherits-data-table-policy). |
+
+### MDT Compaction Delegation
+
+The metadata table's compaction and log compaction can be delegated to an
external table service platform. See
+[compaction](compaction.md#delegating-mdt-compaction-to-an-external-platform)
for the full config reference.
+
+### Timeline Archival Controls
+
+Two new configs in `HoodieArchivalConfig` let you fine-tune how the timeline
manifest and archival interact with the
+most recent clean.
+
+| Config Name | Default | Description |
+|---|---|---|
+| `hoodie.timeline.manifest.retained.versions` | `3` | Number of timeline
manifest file versions to retain. Older manifest versions are pruned during
archival. |
Review Comment:
π€ The config name `hoodie.archive.block.on.latest.clean.ectr` uses a
four-letter abbreviation `ectr` (Earliest Commit To Retain). All other Hudi
configs spell out the concept (or use widely-known acronyms). Is this really
the canonical key in `HoodieArchivalConfig`? If so, it might be worth a short
note expanding `ectr` in-line; if not, the docs key should match the actual
ConfigProperty name.
<sub><i>- AI-generated; verify before applying. React π/π to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]