danny0405 commented on code in PR #17610:
URL: https://github.com/apache/hudi/pull/17610#discussion_r2740026551


##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.

Review Comment:
   remove this part because it is not necessary and we are not adding it in out 
impl.



##########
rfc/rfc-102/rfc-102.md:
##########
@@ -0,0 +1,258 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-102: Record Level and Secondary Index Support for Flink Writers
+
+## Proposers
+
+- @danny0405
+
+## Approvers
+ - @geserdugarov
+ - @vinothchandar
+ - @cshuo
+
+## Status
+
+GH Discussion: https://github.com/apache/hudi/discussions/17452
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Apache Hudi provides multiple indexing strategies to efficiently locate 
records during upsert operations. 
+The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata 
Table (MDT)** that maps each record key to its 
+exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** 
extends this capability to non-record-key, non-unique-key columns. 
+Currently, Spark reads/writes support RLI & SI while Flink does not, creating 
feature disparity between the two engines for Hudi table reads and writes.
+
+This RFC proposes adding RLI and SI support for Flink streaming writes. 
Throughout this document, the term **"index"** refers broadly to both RLI and 
SI; 
+when discussing behavior specific to one type, the terms "RLI" or "SI" will be 
used explicitly.
+
+The goals of this RFC are:
+
+- Provide reliable and performant write support for RLI/SI using Flink APIs
+- Ensure cross-engine compatibility so that Flink can access and utilize 
indexes written by Spark, and vice versa
+- Support global RLI for cross-partition upserts, as well as partition-level 
RLI for large fact tables
+- Enable asynchronous compaction for MDT when indexing is enabled, either 
within the writer pipeline or via background table services
+- Implement smart caching of index data for low-latency access during 
streaming writes
+- Document scale and performance limits for write throughput supported by 
indexing (based on empirical benchmarks)
+- Design the implementation to be extensible for arbitrary secondary indexing 
on different columns
+
+## Background
+
+Apache Hudi uses indexes to determine the location of existing records when 
processing upserts. Without an efficient index, Hudi would 
+need to scan the entire table to find whether a record already exists and 
where it is located. Different index types offer different 
+trade-offs between write performance, read performance, and resource 
consumption.
+
+Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark 
datasource does and proven at [massive production 
scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/).
+This inconsistency causes friction for users who migrate tables from Spark to 
Flink streaming. When migrating, users must switch 
+the index type from RLI/SI to either `bucket` (a hash-based partitioning 
scheme) or `flink_state` (which uses Flink's state backend to 
+maintain record-to-location mappings). This migration overhead complicates 
production deployments.
+
+Another key motivation is to provide scalable, efficient support for 
**cross-partition updates**—scenarios where a record's partition path changes 
between writes. 
+Currently, the only option for handling cross-partition updates in Flink is 
the `flink_state` index, which maintains a global view of all record locations. 
However, this approach has significant drawbacks: 
+it consumes substantial memory (proportional to the table size) and cannot be 
shared across different workloads or job restarts without state migration.
+
+## High Level Design
+
+The high-level design introduces the following components:
+
+- **MDT-based Index backend**: A new index implementation that can replace the 
current `flink_state` index, storing record-to-location mappings in the MDT 
rather than in Flink's state backend
+- **Index cache with invalidation**: An in-memory cache to accelerate RLI 
lookups, along with a cache invalidation mechanism to maintain consistency with 
the committed state of the table
+- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) 
responsible for writing RLI/SI payloads to the MDT
+- **Synchronous MDT writes**: The MDT's RLI and SI files are written 
synchronously with the data table files within the same commit boundary; the 
metadata is then sent to the coordinator for a final commit to the MDT (after 
the `FILES` partition update is computed)
+- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, 
reusing the existing data file compaction pipeline to minimize task slot 
consumption
+
+![Index Write Flow](./index-write-flow.png)
+
+### Detailed Design
+
+### The Index Access
+
+In Hudi's Flink integration, the `BucketAssigner` operator is responsible for 
determining where each incoming record should be written. 
+It must identify whether each record is an insert (new record), update 
(existing record), or delete. To make this determination, the operator needs to 
look up whether 
+the record key already exists in the table and, if so, where it is located.
+
+With index support, the `BucketAssigner` operator will use the index metadata 
stored in the MDT as its backend. It will probe the index 
+with incoming record keys to determine the appropriate operation type (insert, 
update, or delete). In this design, the index serves the same role 
+that the `flink_state` index currently serves. Since the existing 
`BucketAssigner` already supports both **global** and **non-global** index 
types, 
+the global RLI will be used for **global** index configurations, while 
partitioned RLI will be used for **non-global** configurations.
+
+To optimize index access patterns and avoid caching all index shards in every 
`BucketAssigner` task, the input records will be shuffled 
+by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm 
as the MDT's index partitioner, ensuring that 
+each `BucketAssigner` task only needs to read from a subset of index shards.
+
+#### Index Cache
+
+Streaming workloads require low-latency processing of each record to achieve 
high throughput. Thus, each record lookup against the index 
+should complete really fast. Reading a RLI entry each time for each record 
will incur 10+ms of latency per record and seriously affect throughput. 
+
+We need two layers of caching to meet this requirement.
+
+**General-purpose hotspot cache:** The implementation will use an in-memory 
LRU (Least Recently Used) cache keyed by active upsert record keys. 
+Cache entries will be evicted when memory usage exceeds a configurable 
threshold.

Review Comment:
   We are not checking the log files, we are cleaning the cache with committed 
checkpoint ids. With OCC concurrent strategy, the cache would anyway been 
invalited once the conflicts happens after the task got restarted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to