hudi-agent commented on code in PR #18867:
URL: https://github.com/apache/hudi/pull/18867#discussion_r3314587661


##########
website/docs/sql_queries.md:
##########
@@ -325,13 +343,32 @@ see all changes in a given time window and not just the 
latest values.
 
 Please refer to [configurations](basic_configurations.md) section for the 
important configuration options.
 
+:::note Column pruning for incremental queries
+Starting with Hudi 1.2.0, **column pruning** is applied to incremental 
queries. Only the columns
+actually referenced in the query are read from the base files, reducing I/O 
for wide tables. No
+configuration change is required β€” this optimization is applied automatically.
+:::
+
 :::note Incremental Query Checkpointing between Hudi 0.x and 1.0.
 In Hudi 1.0, we switch the incremental and CDC query to used completion time, 
instead of instant time, to determine the
 range of commits to incrementally pull from. The checkpoint stored for Hudi 
incremental source and related sources is
 also changed to use completion time. To support compatiblity, Hudi does a 
checkpoint translation from requested instant
 time to completion time depending on the source table version.
 :::
 
+### Vector Similarity Search
+
+Hudi 1.2.0 introduces a `hudi_vector_search` table-valued function (TVF) for 
approximate
+nearest-neighbor (ANN) search over `VECTOR` columns. This is an extension of 
the
+`hudi_table_changes` TVF pattern.
+
+```sql
+-- Find the 10 nearest neighbors to a query vector in the 'embedding' column
+SELECT * FROM hudi_vector_search('db.embeddings_table', 'embedding', 
ARRAY(0.1, 0.2, ...), 10);
+```
+
+See [Vector Search](vector_search.md) for the full API, supported metrics, and 
setup instructions.

Review Comment:
   πŸ€– The illustrative `ARRAY(0.1, 0.2, ...)` literal isn't valid SQL β€” readers 
who copy this verbatim will get a parse error. It might help to either (a) show 
a concrete 3–4 element array (e.g., `ARRAY(0.1, 0.2, 0.3, 0.4)`) matching a 
plausible `dim`, or (b) add a one-line note that the `...` is a placeholder for 
the remaining elements up to the column's declared dimension.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/writing_data.md:
##########
@@ -441,5 +441,33 @@ inputDF.write.format("hudi")
        .save(basePath)
 ```
 
+### Rolling Extra Metadata
+
+Rolling extra metadata allows you to automatically carry forward selected 
commit metadata keys to every subsequent commit and clean instant without 
having to walk the full timeline. This is particularly useful for persisting 
checkpoint information such as Kafka offsets or Flink checkpoints across 
commits.
+
+| Config | Default | Description |
+|---|---|---|
+| `hoodie.write.rolling.metadata.keys` | `""` (disabled) | Comma-separated 
list of extra metadata keys to carry forward to each new commit and clean 
instant. Values are read from recent completed instants and written into the 
new commit metadata, so they remain accessible without walking the timeline. 
New values override old ones. Only applies to data table commits and clean 
instants. |
+| `hoodie.write.rolling.metadata.timeline.lookback.commits` | `10` | Maximum 
number of completed instants to walk back when searching for the configured 
rolling metadata keys. Higher values improve resilience at a small performance 
cost. |
+
+**Example:**
+
+```java
+inputDF.write.format("hudi")
+       .option("hoodie.write.rolling.metadata.keys", 
"kafka.offset.partition.0,kafka.offset.partition.1")
+       .option("hoodie.write.rolling.metadata.timeline.lookback.commits", "10")
+       // ... other options
+       .save(basePath)
+```
+
+### Advanced Storage Options
+
+The following advanced storage configuration options are available in 1.2.0:
+
+| Config | Default | Description |
+|---|---|---|
+| `hoodie.parquet.write.config.injector.class` | (none) | Fully-qualified 
class name of a custom `HoodieParquetConfigInjector` implementation. Use this 
to inject custom Parquet writer properties (e.g., disable dictionary encoding, 
set bloom filter sizes) without modifying the Hudi source. The implementing 
class must implement `org.apache.hudi.io.HoodieParquetConfigInjector`. |

Review Comment:
   πŸ€– The `hoodie.parquet.write.config.injector.class` description references 
the interface `org.apache.hudi.io.HoodieParquetConfigInjector`. It might help 
to confirm the exact package path (the `org.apache.hudi.io` package contains a 
large number of writer classes and a `HoodieParquetConfigInjector` interface 
located there should be cross-checked) and to add a one-line note on the 
lifecycle β€” is the injector instantiated once per writer, per task, or per 
file? Users implementing this SPI will need that detail.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/record_merger.md:
##########
@@ -207,6 +207,19 @@ Result data after merging using 
`DefaultHoodieRecordPayload` (always honors orde
     1       2       name_2  price_2
 ```
 
+### ReverseOrderHoodieRecordPayload (deprecated)

Review Comment:
   πŸ€– This adds a section for `ReverseOrderHoodieRecordPayload` and labels it 
`(deprecated)` in the same change. If it's deprecated, can the section say what 
the recommended replacement is (e.g., a specific merge mode or merger impl) so 
new users don't adopt a deprecated payload? Otherwise it reads as "here is a 
new option β€” also, don't use it".
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/variant_type.md:
##########
@@ -253,12 +253,34 @@ binary `value` field.
 
 | Engine | VARIANT Support |
 |:-------|:---------------|
-| **Spark 4.0+** | Native `VariantType` β€” full read/write/query |
+| **Spark 4.0** | Native `VariantType` β€” full read/write/query; MOR + VARIANT 
fully supported; native `df.write` with `VariantType` on V1 datasource |
+| **Spark 4.1** | Native `VariantType` β€” full read/write/query; MOR + VARIANT 
fixed in 1.2.0 |
 | **Spark 3.x** | Reads as `STRUCT<value: BINARY, metadata: BINARY>` β€” 
backward compatible |
-| **Flink** | Reads as `ROW<metadata BYTES, value BYTES>` β€” cross-engine 
compatible |
+| **Flink 2.1+** | Reads and writes as `VARIANT` (Flink's native 
`LogicalTypeRoot.VARIANT`) β€” cross-engine compatible |
+| **Flink < 2.1** | **Not supported** β€” throws 
`UnsupportedOperationException`. Upgrade to Flink 2.1 to use VARIANT columns. |
 
-A VARIANT table written by Spark 4.0 can be read by Spark 3.x or Flink, and 
vice versa. The
-binary encoding is engine-independent.
+A VARIANT table written by Spark 4.0/4.1 can be read by Spark 3.x or Flink 
2.1+, and vice versa.
+The binary encoding is engine-independent.
+
+:::caution Flink version requirement
+VARIANT columns require **Flink 2.1 or later**. On Flink 2.0 and earlier, any 
operation involving
+a VARIANT column throws:
+> `UnsupportedOperationException: VARIANT type is only supported in Flink 
2.1+. Please upgrade your
+> Flink version to use Variant columns.`
+:::
+
+## Metastore Sync
+
+When syncing VARIANT column schemas to external catalogs, Hudi maps the binary 
encoding to the
+target catalog's native struct type:
+
+| Catalog | VARIANT representation |
+|:--------|:----------------------|
+| Hive | `STRUCT<metadata:BINARY, value:BINARY>` |
+| BigQuery | `STRUCT` with `metadata` and `value` fields (`BYTES` type) |
+
+Query engines that support VARIANT (Spark 4.0+, Flink 2.1+) read the table 
directly using the

Review Comment:
   πŸ€– The struct field order shown for VARIANT differs between rows: Spark 3.x 
reads as `STRUCT<value: BINARY, metadata: BINARY>` (value first), while the new 
Metastore Sync table below shows Hive as `STRUCT<metadata:BINARY, 
value:BINARY>` (metadata first), and Flink originally had `ROW<metadata BYTES, 
value BYTES>`. If both orderings are accurate to the actual implementations, a 
brief sentence noting that field ordering may differ across engines/catalogs 
but the binary payloads are interchangeable would help readers reason about 
cross-engine reads.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/syncing_metastore.md:
##########
@@ -297,3 +297,40 @@ While using hive beeline query, you need to enter settings:
 ```bash
 set hive.input.format = 
org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
 ```
+
+## Spark Catalog Metastore Client
+
+When running Hudi inside a Spark environment that already has Hive support 
enabled (e.g., SparkSQL with `spark.sql.catalogImplementation=hive`), the 
standard `IMetaStoreClient` initialization can conflict with Spark's own Hive 
classloader. Setting
+
+```properties
+hoodie.datasource.hive_sync.use_spark_catalog=true
+```
+
+(default: `false`) makes Hudi use `SparkCatalogMetaStoreClient` β€” a 
Spark-native `IMetaStoreClient` implementation β€” instead of creating its own. 
This avoids classloader conflicts in Hive-on-Spark setups. Requires a 
`SparkSession` with Hive support active.
+
+## HMS 4.x Support via JDBC Fallback
+
+HMS 4.x changed several Thrift API method signatures (e.g., `get_table` β†’ 
`get_table_req`), which makes the standard Thrift-based HMS client 
incompatible. Hudi 1.2.0 adds automatic fallback: when the first Thrift call 
fails with a `TApplicationException` indicating an API mismatch, all subsequent 
metadata operations are transparently rerouted through the JDBC path.

Review Comment:
   πŸ€– The HMS 4.x fallback description says the fallback engages 'when the first 
Thrift call fails with a `TApplicationException` indicating an API mismatch.' 
It would help to clarify: (1) is the fallback per-table/per-session (cached 
after first detection) or re-detected on every call? and (2) what happens if 
`mode=jdbc` is set but the JDBC URL/credentials are wrong β€” does the user see 
the HMS error or the JDBC error? @yihua could you verify the exact trigger 
conditions and failure-surface behavior here so the docs match the 
implementation?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/sql_queries.md:
##########
@@ -325,13 +343,32 @@ see all changes in a given time window and not just the 
latest values.
 
 Please refer to [configurations](basic_configurations.md) section for the 
important configuration options.
 
+:::note Column pruning for incremental queries
+Starting with Hudi 1.2.0, **column pruning** is applied to incremental 
queries. Only the columns
+actually referenced in the query are read from the base files, reducing I/O 
for wide tables. No
+configuration change is required β€” this optimization is applied automatically.
+:::

Review Comment:
   πŸ€– This section could clarify the scope of column pruning for incremental 
queries β€” is it applied to both COW and MOR tables, and to both Spark and Flink 
incremental readers? A wide-table user on MOR will want to know whether pruning 
also applies to log file reads or only to base files. Right now the note only 
mentions base files, which leaves MOR semantics ambiguous.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/concurrency_control.md:
##########
@@ -359,6 +391,32 @@ hoodie.write.lock.client.num_retries
 
 *Setting the right values for these depends on a case by case basis; some 
defaults have been provided for general cases.*
 
+## Pre-Write Cleaner Policy
+
+When running multi-writer pipelines, failed writes can accumulate on storage 
if a writer crashes before a clean cycle runs. Hudi 1.2.0 introduces 
`hoodie.prewrite.cleaner.policy` to proactively handle this at write startup:
+
+| Config Key | Default | Description |
+|---|---|---|
+| `hoodie.prewrite.cleaner.policy` | `NONE` | Policy applied before starting a 
new ingestion write commit. `NONE`: no pre-write action (default). `CLEAN`: 
force a clean table service call (also rolls back failed writes). 
`ROLLBACK_FAILED_WRITES`: only roll back failed writes without running a full 
clean. |
+
+This is useful when a writer is perpetually crashing before completing a 
`CLEAN`. See [Cleaning](cleaning.md) for the full list of cleaning 
configurations.

Review Comment:
   πŸ€– There's a description mismatch between this page and `cleaning.md` for 
`hoodie.prewrite.cleaner.policy=CLEAN`. Here it says `CLEAN` will "force a 
clean table service call (also rolls back failed writes)", but on `cleaning.md` 
(Pre-Write Cleaner Policy section) `CLEAN` is described as "run a clean pass 
before each write" without mentioning rollback semantics. If `CLEAN` does 
implicitly roll back failed writes, it would help to align both pages on that 
detail (and call out the relationship to `ROLLBACK_FAILED_WRITES`).
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/lance_file_format.md:
##########
@@ -39,18 +68,21 @@ TBLPROPERTIES (
    .option("hoodie.datasource.write.recordkey.field", "id")
    .option("hoodie.record.merger.impls",
            "org.apache.hudi.DefaultSparkRecordMerger")
-   .option("hoodie.datasource.write.base.file.format", "lance")
+   .option("hoodie.table.base.file.format", "lance")
    .mode("overwrite")
    .save("/path/to/my_ai_table"))
 ```
 
 ### Required Dependencies
 
-Add the Lance Spark bundle to your Spark classpath:
+The Lance JAR is not bundled in Hudi. Add the appropriate Lance Spark bundle 
to your Spark classpath:
 
-| Component | Maven Coordinates |
-|:----------|:-----------------|
-| Lance Spark Bundle (Spark 3.5) | 
`org.lance:lance-spark-bundle-3.5_2.12:0.4.0` |
+| Spark Version | Maven Coordinates |
+|:--------------|:-----------------|
+| Spark 3.4 | `org.lance:lance-spark-3.4_2.12:0.4.0` |
+| Spark 3.5 | `org.lance:lance-spark-3.5_2.12:0.4.0` |
+| Spark 4.0 | `org.lance:lance-spark-4.0_2.13:0.4.0` |

Review Comment:
   πŸ€– The Maven coordinates here dropped `-bundle` from the artifact names (e.g. 
`lance-spark-3.5_2.12` rather than `lance-spark-bundle-3.5_2.12`), but the 
existing shell example just below still references 
`lance-spark-bundle-3.5_2.12-0.4.0.jar` for the `LANCE_BUNDLE_JAR` env var. 
Could you double-check which artifact id is the published one for 0.4.0 and 
make the table + example consistent? As written, a user copying both will end 
up with mismatched artifact names.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/writing_data.md:
##########
@@ -441,5 +441,33 @@ inputDF.write.format("hudi")
        .save(basePath)
 ```
 
+### Rolling Extra Metadata
+
+Rolling extra metadata allows you to automatically carry forward selected 
commit metadata keys to every subsequent commit and clean instant without 
having to walk the full timeline. This is particularly useful for persisting 
checkpoint information such as Kafka offsets or Flink checkpoints across 
commits.
+
+| Config | Default | Description |
+|---|---|---|
+| `hoodie.write.rolling.metadata.keys` | `""` (disabled) | Comma-separated 
list of extra metadata keys to carry forward to each new commit and clean 
instant. Values are read from recent completed instants and written into the 
new commit metadata, so they remain accessible without walking the timeline. 
New values override old ones. Only applies to data table commits and clean 
instants. |
+| `hoodie.write.rolling.metadata.timeline.lookback.commits` | `10` | Maximum 
number of completed instants to walk back when searching for the configured 
rolling metadata keys. Higher values improve resilience at a small performance 
cost. |
+
+**Example:**
+
+```java
+inputDF.write.format("hudi")
+       .option("hoodie.write.rolling.metadata.keys", 
"kafka.offset.partition.0,kafka.offset.partition.1")
+       .option("hoodie.write.rolling.metadata.timeline.lookback.commits", "10")
+       // ... other options
+       .save(basePath)
+```
+
+### Advanced Storage Options
+
+The following advanced storage configuration options are available in 1.2.0:
+
+| Config | Default | Description |
+|---|---|---|
+| `hoodie.parquet.write.config.injector.class` | (none) | Fully-qualified 
class name of a custom `HoodieParquetConfigInjector` implementation. Use this 
to inject custom Parquet writer properties (e.g., disable dictionary encoding, 
set bloom filter sizes) without modifying the Hudi source. The implementing 
class must implement `org.apache.hudi.io.HoodieParquetConfigInjector`. |
+| `hoodie.hfile.writes.allow.duplicates` | `false` | Allow duplicate keys to 
be written into HFile-format log blocks. This is an escape hatch for 
bootstrapping a Record Level Index (RLI) when the source data table contains 
pre-existing duplicates; enabling it avoids bootstrap job failures. Note that 
with duplicates present, there is no deterministic guarantee of which record 
will be retained in the index. Do not enable for normal write workloads. |
+
 ## Java Client
 We can use plain java to write to hudi tables. To use Java client we can 
refere 
[here](https://github.com/apache/hudi/blob/master/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java)

Review Comment:
   πŸ€– [Line 477] For `hoodie.hfile.writes.allow.duplicates`, the description 
warns 'Do not enable for normal write workloads' β€” it might help to add an 
explicit note about how to disable it again after the RLI bootstrap completes 
(i.e., is it safe to set back to `false` on the next commit?) and whether 
enabling it leaves any persistent state in the table.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/cli.md:
##########
@@ -340,6 +340,13 @@ $ hdfs dfs -ls /app/uber/trips/.hoodie/*.inflight
 -rw-r--r--   3 vinoth supergroup     321984 2016-10-05 23:18 
/app/uber/trips/.hoodie/20161005225920.inflight
 ```
 
+To list all inflight and requested instants that have been running longer than 
a specified number of minutes, use `commits show_infights` (note the literal 
spelling of the command key):

Review Comment:
   πŸ€– `commits show_infights` (with the parenthetical "note the literal spelling 
of the command key") β€” is this a real CLI command name (typo carried into the 
source) or a documentation typo? If the underlying command key really is 
misspelled as `show_infights`, it would be worth filing a follow-up to add 
`show_inflights` as an alias so users don't have to type the typo. If the 
canonical key actually is `show_inflights`, this section needs a fix.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/ingestion_flink.md:
##########
@@ -112,15 +112,24 @@ the compaction options `compaction.delta_commits` and 
`compaction.delta_seconds`
 
 For `INSERT` mode write operations, new Parquet files are written directly, 
and the [auto‑file sizing](file_sizing.md) is not enabled.
 
-### In-Memory Buffer Sort
+### Append Write Buffer
 
-For append-only workloads, Hudi supports in-memory buffer sorting to improve 
Parquet compression ratio. When enabled, data is sorted within the write buffer 
before being flushed to disk. This improves columnar file compression 
efficiency by grouping similar values together.
+For append-only workloads, Hudi supports several write-buffer strategies that 
improve Parquet compression ratio and write throughput. Data is sorted or 
batched within the write buffer before being flushed to disk, grouping similar 
values together for better columnar compression.
 
-| Option Name                 | Required | Default | Remarks                   
                                                                                
                    |
-|-----------------------------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------|
-| `write.buffer.sort.enabled` | `false`  | `false` | Whether to enable buffer 
sort within append write function. Improves Parquet compression ratio by 
sorting data before writing |
-| `write.buffer.sort.keys`    | `false`  | `N/A`   | Sort keys concatenated by 
comma (e.g., `col1,col2`). Required when `write.buffer.sort.enabled` is `true`  
                    |
-| `write.buffer.size`         | `false`  | `1000`  | Buffer size in number of 
records. When buffer reaches this size, data is sorted and flushed to disk      
                     |
+The buffer strategy is selected with `write.buffer.type`. In Hudi 1.2.0 this 
replaces the deprecated `write.buffer.sort.enabled` flag.
+
+| Option Name                              | Required | Default    | Remarks   
                                                                                
                                                                                
                                                       |
+|------------------------------------------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.buffer.type`                      | `false`  | `NONE`     | Buffer 
type for append write. Values: `NONE` (no buffering), `BOUNDED_IN_MEMORY` 
(double buffer with async write), `DISRUPTOR` (ring-buffer with async write, 
recommended for higher throughput), `CONTINUOUS_SORT` (TreeMap-based continuous 
sort with incremental draining) |
+| `write.buffer.size`                      | `false`  | `1000`     | Record 
count threshold at which the buffer is flushed. Applies to all non-`NONE` 
buffer types                                                                    
                                                                |
+| `write.buffer.sort.keys`                 | `false`  | `N/A`      | 
Comma-separated sort key columns (e.g., `col1,col2`). Required for `DISRUPTOR` 
and `CONTINUOUS_SORT` modes                                                     
                                                                  |
+| `write.buffer.sort.continuous.drain.size`| `false`  | `1`        | Number of 
records drained per flush cycle in `CONTINUOUS_SORT` mode. Default 1 provides 
smooth incremental draining; increase for batching (e.g., 10–100)               
                                                        |
+
+:::note
+`write.buffer.sort.enabled` is deprecated as of 1.2.0. Use 
`write.buffer.type=DISRUPTOR` instead for equivalent behavior. The `DISRUPTOR` 
and `CONTINUOUS_SORT` modes require `write.buffer.sort.keys` to be set.
+:::
+
+For Disruptor-specific tuning options, see 
[flink_tuning.md](flink_tuning.md#disruptor-buffer-tuning).

Review Comment:
   πŸ€– The note says "`write.buffer.sort.enabled` is deprecated as of 1.2.0. Use 
`write.buffer.type=DISRUPTOR` instead for equivalent behavior." But the table 
above describes `DISRUPTOR` as a "ring-buffer with async write, recommended for 
higher throughput" while the legacy `write.buffer.sort.enabled` was an 
in-memory sort over a bounded buffer to improve Parquet compression β€” 
`CONTINUOUS_SORT` (TreeMap-based) seems closer to the legacy behavior. Could 
you clarify which new mode is the actual replacement for the old sort-enabled 
path and what the migration recipe is (e.g., which keys to set for the same 
on-disk layout)?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/metadata_indexing.md:
##########
@@ -36,7 +36,7 @@ For more information on these indexes please refer [metadata 
section](metadata/#
 :::note
 Please note in order to create secondary index:
 1. The table must have a primary key and merge mode should be 
[COMMIT_TIME_ORDERING](record_merger.md#commit_time_ordering).
-2. Record index must be enabled. This can be done by setting 
`hoodie.metadata.record.index.enable=true` and then creating `record_index`. 
Please note the example below.
+2. Record index must be enabled. This can be done by setting 
`hoodie.metadata.global.record.level.index.enable=true` and then creating 
`record_index`. Please note the example below.

Review Comment:
   πŸ€– Step 2 was changed from `hoodie.metadata.record.index.enable=true` to 
`hoodie.metadata.global.record.level.index.enable=true` as the prerequisite for 
`CREATE INDEX record_index`. This implies that the SQL `record_index` 
corresponds to the **global** RLI. Could you confirm that's correct, and that 
the partitioned RLI is not what's being created by `CREATE INDEX record_index`? 
Right now a reader has to cross-reference the renames table later in the file 
to know which RLI variant they are enabling.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/metadata_indexing.md:
##########
@@ -300,6 +300,40 @@ spark-submit \
 --spark-memory 2g
 ```
 
+## New in 1.2.0
+
+### Record-Level Index Config Key Renames
+
+Hudi 1.2.0 renamed several RLI configuration keys to better distinguish 
between the _global_ RLI (unique record key
+across the entire table) and the _partitioned_ RLI (unique record key within 
each partition). The old keys remain

Review Comment:
   πŸ€– This rename table needs a careful re-check. As I read it: the existing key 
`hoodie.metadata.record.index.enable` (which in 1.0/1.1 enabled the partitioned 
RLI) is now described as an alias for 
`hoodie.metadata.global.record.level.index.enable` β€” i.e. its semantic meaning 
has changed from "partitioned RLI" to "global RLI". If that's correct, this is 
a quietly breaking change for any existing user config and deserves a stronger 
upgrade callout. If the alias preserves the original semantic (partitioned), 
then the "New canonical key" column is wrong. @yihua could you double-check the 
alias mapping in `HoodieMetadataConfig` so we don't mis-document this?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/blob_unstructured_data.md:
##########
@@ -290,15 +292,44 @@ Out-of-line BLOBs keep the Hudi table footprint extremely 
small:
 
 | Property | Default | Description |
 |:---------|:--------|:------------|
-| `hoodie.read.blob.inline.mode` | `CONTENT` | Controls how INLINE BLOBs are 
read. `CONTENT` materializes raw bytes in the `data` column. `DESCRIPTOR` 
surfaces `(position, size)` coordinates rewritten as OUT_OF_LINE references. |
+| `hoodie.read.blob.inline.mode` | `DESCRIPTOR` | Controls how INLINE BLOBs 
are read. `DESCRIPTOR` (default) returns an out-of-line-shaped reference 
pointing at the in-file coordinates of the bytes β€” no bytes are materialized. 
`CONTENT` materializes the raw inline bytes directly in the `data` field on 
every read. |

Review Comment:
   πŸ€– The default for `hoodie.read.blob.inline.mode` flipped from `CONTENT` to 
`DESCRIPTOR`. For users upgrading from a pre-release that exposed `CONTENT` as 
default, this changes the result of `SELECT data FROM table` (now returns a 
descriptor reference instead of raw bytes). Could you add an explicit upgrade 
note (or call out in the release-notes section of this page) so users who built 
queries against the old default are warned?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/ingestion_flink.md:
##########
@@ -349,10 +358,216 @@ For Flink streaming reads, rate limiting helps avoid 
backpressure when processin
 
 The average read rate can be calculated as: **`read.splits.limit` / 
`read.streaming.check-interval`** splits per second.
 
+Hudi 1.2.0 adds `read.commits.limit`, which complements `read.splits.limit` by 
capping the number of commits (instants) consumed per check interval. This is 
useful when tables have many small commits β€” limiting commits bounds the number 
of splits regardless of their individual size.
+
+### Options
+
+| Option Name                     | Required | Default             | Remarks   
                                                                                
                                                                                
                 |
+|---------------------------------|----------|---------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `write.rate.limit`              | `false`  | `0`                 | Write 
record rate limit per second to prevent traffic jitter and improve stability. 
Default is 0 (no limit)                                                         
                       |
+| `read.splits.limit`             | `false`  | `Integer.MAX_VALUE` | Maximum 
number of splits allowed to read in each instant check for streaming reads. 
Average read rate = `read.splits.limit`/`read.streaming.check-interval`. 
Default is no limit           |
+| `read.commits.limit`            | `false`  | `(none)`            | Maximum 
number of commits (instants) allowed to read in each check interval. 
Complements `read.splits.limit`. Average rate = 
`read.commits.limit`/`read.streaming.check-interval`. Default is no limit |
+| `read.streaming.check-interval` | `false`  | `60`                | Check 
interval in seconds for streaming reads. Default is 60 seconds (1 minute)       
                                                                                
                     |
+
+## Flink Source V2 (RFC-95)
+
+Hudi 1.2.0 introduces a new Flink source implementation based on 
[FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface),
 available as an opt-in feature via the `read.source-v2.enabled` flag.
+
+### Why Source V2?
+
+The legacy Hudi Flink source was built on Flink's `SourceFunction` API. The 
FLIP-27 rewrite brings:
+
+- **Resumable split assignment** β€” splits can be checkpointed independently, 
enabling finer-grained recovery
+- **Checkpoint alignment** β€” the new API participates in Flink's coordinated 
checkpoint protocol, improving end-to-end consistency
+- **Push-down support** β€” predicate push-down, partition pruning, and `LIMIT` 
push-down are supported through the new source interface, reducing data scanned 
at the source level
+
+### Enabling Source V2
+
+```sql
+CREATE TABLE t1 (
+  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
+  name VARCHAR(10),
+  age INT,
+  ts TIMESTAMP(3),
+  `partition` VARCHAR(20)
+)
+PARTITIONED BY (`partition`)
+WITH (
+  'connector' = 'hudi',
+  'path' = '${path}',
+  'table.type' = 'MERGE_ON_READ',
+  'read.source-v2.enabled' = 'true'  -- enable the FLIP-27 source
+);
+```
+
+### Options
+
+| Option Name               | Required | Default | Remarks                     
                                                                            |
+|---------------------------|----------|---------|---------------------------------------------------------------------------------------------------------|
+| `read.source-v2.enabled`  | `false`  | `false` | Whether to use the FLIP-27 
new source (Source V2) to consume data files. Default is the legacy source  |
+
+### Savepoint Incompatibility
+
+:::warning
+Savepoints taken with the **legacy source** (`read.source-v2.enabled=false`) 
are **not compatible** with the Source V2 source, and vice versa. When 
switching from the legacy source to Source V2, start a fresh job without 
restoring from a legacy savepoint. If you need to preserve read progress, 
record the last committed instant time and use `read.start-commit` to resume 
from that point.
+:::
+
+## Record-Level Index (RLI) Bucket Indexing for Flink
+
+As of Hudi 1.2.0, the Flink writer supports the Record-Level Index (RLI) 
backed by the metadata table, in addition to the existing `FLINK_STATE` and 
`BUCKET` index types. RLI is stored in the metadata table and avoids the 
state-backend overhead of `FLINK_STATE`, while supporting full global or 
partition-scoped uniqueness guarantees.
+
+Two RLI variants are available via `index.type`:
+
+- `RECORD_LEVEL_INDEX` β€” partitioned RLI; enforces uniqueness per (partition 
path, record key) pair
+- `GLOBAL_RECORD_LEVEL_INDEX` β€” global RLI; enforces uniqueness across all 
partitions
+
+### Bootstrap
+
+When enabling RLI on an existing table, the bootstrap process loads existing 
record locations into RocksDB before the first write. Bootstrap is triggered by 
setting `index.bootstrap.enabled=true`.
+
+```sql
+CREATE TABLE my_hudi_table (
+  id BIGINT,
+  name STRING,
+  ts BIGINT,
+  dt STRING,
+  PRIMARY KEY (id) NOT ENFORCED
+)
+PARTITIONED BY (dt)
+WITH (
+  'connector' = 'hudi',
+  'path' = 'hdfs:///warehouse/my_hudi_table',
+  'table.type' = 'MERGE_ON_READ',
+  'index.type' = 'RECORD_LEVEL_INDEX',
+  'metadata.enabled' = 'true',
+  'index.bootstrap.enabled' = 'true',  -- enable bootstrap on first run
+  'index.bootstrap.rocksdb.path' = '/tmp/hudi-rli-rocksdb'
+);
+```
+
+After the first successful checkpoint (which completes bootstrap), restart the 
job with `index.bootstrap.enabled=false`.
+
+### In-Pipeline MDT Compaction
+
+For RLI workloads, the metadata table (MDT) accumulates log files that need 
periodic compaction. The option `metadata.compaction.async.enabled` (default 
`true`) runs MDT compaction inside the Flink pipeline after every 
`metadata.compaction.delta_commits` (default `10`) delta commits.
+
+### Options
+
+| Option Name                         | Required | Default  | Remarks          
                                                                                
                                                       |
+|-------------------------------------|----------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `index.type`                        | `false`  | `FLINK_STATE` | Set to 
`RECORD_LEVEL_INDEX` or `GLOBAL_RECORD_LEVEL_INDEX` to use the 
metadata-table-backed RLI                                                    |
+| `index.bootstrap.enabled`           | `false`  | `false`  | Bootstrap the 
index from the existing table on first run. Blocks checkpoints during bootstrap 
                                                          |
+| `index.bootstrap.rocksdb.path`      | `false`  | system temp dir | Local 
path for RocksDB storage during RLI bootstrap. Each task manager creates a 
unique subdirectory under this path                             |
+| `index.rli.cache.size`              | `false`  | `256`    | Maximum memory 
in MB for the RLI cache per bucket-assign task. Dynamically adjusted based on 
historical usage                                           |
+| `index.rli.lookup.minibatch.size`   | `false`  | `1000`   | Maximum records 
buffered per mini-batch during RLI lookup. Mini-batching reduces individual 
index lookups. Minimum effective value is 1000              |
+| `metadata.compaction.async.enabled` | `false`  | `true`   | Whether to run 
MDT compaction asynchronously within the Flink pipeline. Recommended to keep 
enabled for RLI workloads                                  |
+| `metadata.compaction.delta_commits` | `false`  | `10`     | Number of MDT 
delta commits that trigger in-pipeline compaction                               
                                                          |
+
+:::note
+`GLOBAL_RECORD_LEVEL_INDEX` requires `metadata.enabled=true` and 
`index.global.enabled=true`. The Flink table factory validates these 
constraints automatically.
+:::
+
+## Lookup Join
+
+Hudi 1.2.0 adds a RocksDB-backed cache option for Flink lookup joins against 
Hudi dimension tables. This avoids JVM heap pressure when the dimension table 
is large.
+
 ### Options
 
-| Option Name                     | Required | Default             | Remarks   
                                                                                
                                                                                
       |
-|---------------------------------|----------|---------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `write.rate.limit`              | `false`  | `0`                 | Write 
record rate limit per second to prevent traffic jitter and improve stability. 
Default is 0 (no limit)                                                         
             |
-| `read.splits.limit`             | `false`  | `Integer.MAX_VALUE` | Maximum 
number of splits allowed to read in each instant check for streaming reads. 
Average read rate = `read.splits.limit`/`read.streaming.check-interval`. 
Default is no limit |
-| `read.streaming.check-interval` | `false`  | `60`                | Check 
interval in seconds for streaming reads. Default is 60 seconds (1 minute)       
                                                                                
           |
+| Option Name                    | Required | Default                         
| Remarks                                                                       
                                                                  |
+|--------------------------------|----------|---------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
+| `lookup.join.cache.type`       | `false`  | `heap`                          
| Storage backend for the lookup join cache. `heap` (default) stores rows in 
JVM heap; `rocksdb` stores rows off-heap in an embedded RocksDB instance |
+| `lookup.join.rocksdb.path`     | `false`  | 
`${java.io.tmpdir}/hudi-lookup-rocksdb` | Local directory for RocksDB data when 
`lookup.join.cache.type=rocksdb`. Cleaned up when the lookup function closes    
                |
+| `lookup.async`                 | `false`  | `false`                         
| Whether to enable async lookup join. Async join can improve throughput when 
the lookup function has high latency                                  |
+| `lookup.async-thread-number`   | `false`  | `16`                            
| Number of threads for async lookup join                                       
                                                                  |
+
+### Example
+
+```sql
+-- Streaming fact table
+CREATE TABLE orders (
+  order_id BIGINT,
+  customer_id BIGINT,
+  amount DOUBLE,
+  ts TIMESTAMP(3),
+  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
+  PRIMARY KEY (order_id) NOT ENFORCED
+) WITH (
+  'connector' = 'hudi',
+  'path' = 'hdfs:///warehouse/orders',
+  'table.type' = 'MERGE_ON_READ',
+  'read.streaming.enabled' = 'true'
+);
+
+-- Hudi dimension table with RocksDB-backed lookup cache
+CREATE TABLE customers (
+  customer_id BIGINT,
+  name STRING,
+  city STRING,
+  PRIMARY KEY (customer_id) NOT ENFORCED
+) WITH (
+  'connector' = 'hudi',
+  'path' = 'hdfs:///warehouse/customers',
+  'lookup.join.cache.type' = 'rocksdb',
+  'lookup.join.rocksdb.path' = '/tmp/hudi-lookup-rocksdb'
+);
+
+-- Lookup join
+SELECT o.order_id, c.name, o.amount
+FROM orders AS o
+JOIN customers FOR SYSTEM_TIME AS OF o.ts AS c
+  ON o.customer_id = c.customer_id;
+```
+
+## Virtual Metadata Columns
+
+Flink 1.2.0 supports declaring Hudi metadata fields as `METADATA VIRTUAL` 
columns in the DDL. This allows accessing system metadata (e.g., commit time, 
record key) without storing them as regular data columns.
+
+```sql
+CREATE TABLE events (
+  event_id BIGINT,
+  payload STRING,
+  -- virtual metadata columns (read-only, not persisted as data)
+  _hoodie_commit_time     STRING METADATA VIRTUAL,
+  _hoodie_record_key      STRING METADATA VIRTUAL,
+  _hoodie_partition_path  STRING METADATA VIRTUAL,
+  PRIMARY KEY (event_id) NOT ENFORCED
+)

Review Comment:
   πŸ€– Looks like a typo: this section starts with "Flink 1.2.0 supports 
declaring Hudi metadata fields..." but everywhere else in this PR the new 
feature is attributed to **Hudi** 1.2.0 (Flink itself does not have a 1.2.0 
release in this version range). Should this read "Hudi 1.2.0 supports declaring 
Hudi metadata fields as `METADATA VIRTUAL` columns in Flink DDL"?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/indexes.md:
##########
@@ -214,10 +214,12 @@ for more details. All these, support the index types 
mentioned [above](#addition
 For Flink DataStream and Flink SQL, Bucket index and Flink state index are 
supported.
 Following are the basic configs that control the indexing behavior. Please 
refer [the Flink 
configurations](configurations.md#Flink-Options-advanced-configs) for advanced 
configs.
 
-| Config Name                | Default                | Description            
                                                                                
                                                                                
                                                 |
-|----------------------------|------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| index.type                 | FLINK_STATE (Optional) | Index type of Flink 
write job, default is using state backed index. Possible values:<br /> 
<ul><li>FLINK_STATE</li><li>BUCKET</li></ul><br />  `Config Param: INDEX_TYPE`  
                                                             |
-| hoodie.index.bucket.engine | SIMPLE (Optional)      | 
org.apache.hudi.index.HoodieIndex$BucketIndexEngineType: Determines the type of 
bucketing or hashing to use when `hoodie.index.type` is set to `BUCKET`.    
Possible Values: <br /> <ul><li>SIMPLE</li><li>CONSISTENT_HASHING</li></ul> |
+| Config Name                | Default                | Description            
                                                                                
                                                                                
                                                                                
                          |
+|----------------------------|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| index.type                 | FLINK_STATE (Optional) | Index type of Flink 
write job, default is using state backed index. Possible values:<br /> 
<ul><li>FLINK_STATE</li><li>BUCKET</li><li>RECORD_LEVEL_INDEX (since 
1.2.0)</li><li>GLOBAL_RECORD_LEVEL_INDEX (since 1.2.0)</li></ul><br />  `Config 
Param: INDEX_TYPE`                               |

Review Comment:
   πŸ€– The note here says `RECORD_LEVEL_INDEX` enforces uniqueness per (partition 
path, record key) pair while `GLOBAL_RECORD_LEVEL_INDEX` enforces uniqueness 
across all partitions. That maps to the partitioned/global RLI distinction in 
metadata_indexing.md. However, on metadata_indexing.md the new config 
`hoodie.metadata.global.record.level.index.enable` is described as the rename 
of the old `hoodie.metadata.record.index.enable`. It would help readers if this 
page linked to the specific config (and renames table) and called out that 
`RECORD_LEVEL_INDEX` requires `hoodie.metadata.record.level.index.enable=true` 
while `GLOBAL_RECORD_LEVEL_INDEX` requires 
`hoodie.metadata.global.record.level.index.enable=true` (plus 
`index.global.enabled=true`), to avoid mis-pairings.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/ingestion_flink.md:
##########
@@ -185,7 +194,7 @@ Hudi Flink writer supports two types of writer indexes:
 | Cross‑Partition Changes | Cannot handle changes among partitions (unless 
input is a CDC stream)                                                          
                                                                                
                                                             | No limit on 
handling cross‑partition changes                                                
                           |
 
 :::note
-Bucket index supports only the `UPSERT` write operation and cannot be used 
with the [append mode](#append-mode) in Flink.
+Bucket index supports `UPSERT` write operations on both COW and MOR tables. As 
of Hudi 1.2.0, MOR + bucket index + upsert is fully supported. Bucket index 
cannot be used with the [append mode](#append-mode) in Flink.

Review Comment:
   πŸ€– The updated note says "As of Hudi 1.2.0, MOR + bucket index + upsert is 
fully supported." Earlier text on this page (and in older releases) restricted 
bucket index to COW-only for upsert in some scenarios. Could you add a 
one-liner about any caveats β€” for example, whether log compaction is required, 
or whether `CONSISTENT_HASHING` is supported on MOR β€” so readers know what 
"fully supported" includes?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
website/docs/metadata.md:
##########
@@ -129,6 +129,38 @@ can bring up the writers sequentially after stopping the 
writers for enabling me
 configurations to only a subset of writers or table services is unsafe and can 
lead to loss of data. So, please ensure you enable 
 metadata table across all writers.
 
+## New in 1.2.0
+
+### Auto-Delete of Disabled MDT Partitions
+
+When an index is disabled in the write config, Hudi previously deleted the 
corresponding metadata table partition
+automatically. Hudi 1.2.0 makes this behavior configurable.
+
+| Config Name | Default | Description |
+|---|---|---|
+| `hoodie.metadata.auto.delete.partitions` | `true` | When enabled (default), 
metadata table partitions (indexes) that are disabled in the write config are 
automatically deleted. Set to `false` to prevent accidental deletion in 
multi-writer environments where not all writers may have the same config β€” 
users must then drop indexes explicitly via Hudi CLI or `DROP INDEX`. |
+
+### MDT Cleaner Policy Derivation
+
+| Config Name | Default | Description |
+|---|---|---|
+| `hoodie.metadata.derive.from.datatable.clean.policy` | `true` | When 
enabled, the metadata table's cleaner uses the same cleaning policy (retention 
count, hours, etc.) as the data table. See also 
[cleaning](cleaning.md#mdt-cleaner-inherits-data-table-policy). |
+
+### MDT Compaction Delegation
+
+The metadata table's compaction and log compaction can be delegated to an 
external table service platform. See
+[compaction](compaction.md#delegating-mdt-compaction-to-an-external-platform) 
for the full config reference.
+
+### Timeline Archival Controls
+
+Two new configs in `HoodieArchivalConfig` let you fine-tune how the timeline 
manifest and archival interact with the
+most recent clean.
+
+| Config Name | Default | Description |
+|---|---|---|
+| `hoodie.timeline.manifest.retained.versions` | `3` | Number of timeline 
manifest file versions to retain. Older manifest versions are pruned during 
archival. |

Review Comment:
   πŸ€– The config name `hoodie.archive.block.on.latest.clean.ectr` uses a 
four-letter abbreviation `ectr` (Earliest Commit To Retain). All other Hudi 
configs spell out the concept (or use widely-known acronyms). Is this really 
the canonical key in `HoodieArchivalConfig`? If so, it might be worth a short 
note expanding `ectr` in-line; if not, the docs key should match the actual 
ConfigProperty name.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to