vinothchandar commented on code in PR #17610: URL: https://github.com/apache/hudi/pull/17610#discussion_r2761979250
########## rfc/rfc-102/rfc-102.md: ########## @@ -0,0 +1,258 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-102: Record Level and Secondary Index Support for Flink Writers + +## Proposers + +- @danny0405 + +## Approvers + - @geserdugarov + - @vinothchandar + - @cshuo + +## Status + +GH Discussion: https://github.com/apache/hudi/discussions/17452 + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Apache Hudi provides multiple indexing strategies to efficiently locate records during upsert operations. +The **Record Level Index (RLI)** is a global index stored in Hudi's **Metadata Table (MDT)** that maps each record key to its +exact file group location, enabling O(1) lookups. The **Secondary Index (SI)** extends this capability to non-record-key, non-unique-key columns. +Currently, Spark reads/writes support RLI & SI while Flink does not, creating feature disparity between the two engines for Hudi table reads and writes. + +This RFC proposes adding RLI and SI support for Flink streaming writes. Throughout this document, the term **"index"** refers broadly to both RLI and SI; +when discussing behavior specific to one type, the terms "RLI" or "SI" will be used explicitly. + +The goals of this RFC are: + +- Provide reliable and performant write support for RLI/SI using Flink APIs +- Ensure cross-engine compatibility so that Flink can access and utilize indexes written by Spark, and vice versa +- Support global RLI for cross-partition upserts, as well as partition-level RLI for large fact tables +- Enable asynchronous compaction for MDT when indexing is enabled, either within the writer pipeline or via background table services +- Implement smart caching of index data for low-latency access during streaming writes +- Document scale and performance limits for write throughput supported by indexing (based on empirical benchmarks) +- Design the implementation to be extensible for arbitrary secondary indexing on different columns + +## Background + +Apache Hudi uses indexes to determine the location of existing records when processing upserts. Without an efficient index, Hudi would +need to scan the entire table to find whether a record already exists and where it is located. Different index types offer different +trade-offs between write performance, read performance, and resource consumption. + +Currently, Flink Hudi sink does not support RLI or SI, while Hudi Spark datasource does and proven at [massive production scale](https://hudi.apache.org/blog/2023/11/01/record-level-index/). +This inconsistency causes friction for users who migrate tables from Spark to Flink streaming. When migrating, users must switch +the index type from RLI/SI to either `bucket` (a hash-based partitioning scheme) or `flink_state` (which uses Flink's state backend to +maintain record-to-location mappings). This migration overhead complicates production deployments. + +Another key motivation is to provide scalable, efficient support for **cross-partition updates**—scenarios where a record's partition path changes between writes. +Currently, the only option for handling cross-partition updates in Flink is the `flink_state` index, which maintains a global view of all record locations. However, this approach has significant drawbacks: +it consumes substantial memory (proportional to the table size) and cannot be shared across different workloads or job restarts without state migration. + +## High Level Design + +The high-level design introduces the following components: + +- **MDT-based Index backend**: A new index implementation that can replace the current `flink_state` index, storing record-to-location mappings in the MDT rather than in Flink's state backend +- **Index cache with invalidation**: An in-memory cache to accelerate RLI lookups, along with a cache invalidation mechanism to maintain consistency with the committed state of the table +- **New Flink Index operator**: A separate Flink operator (`IndexWrite`) responsible for writing RLI/SI payloads to the MDT +- **Synchronous MDT writes**: The MDT's RLI and SI files are written synchronously with the data table files within the same commit boundary; the metadata is then sent to the coordinator for a final commit to the MDT (after the `FILES` partition update is computed) +- **Asynchronous MDT compaction**: MDT compaction is performed asynchronously, reusing the existing data file compaction pipeline to minimize task slot consumption + + + +### Detailed Design + +### The Index Access + +In Hudi's Flink integration, the `BucketAssigner` operator is responsible for determining where each incoming record should be written. +It must identify whether each record is an insert (new record), update (existing record), or delete. To make this determination, the operator needs to look up whether +the record key already exists in the table and, if so, where it is located. + +With index support, the `BucketAssigner` operator will use the index metadata stored in the MDT as its backend. It will probe the index +with incoming record keys to determine the appropriate operation type (insert, update, or delete). In this design, the index serves the same role +that the `flink_state` index currently serves. Since the existing `BucketAssigner` already supports both **global** and **non-global** index types, +the global RLI will be used for **global** index configurations, while partitioned RLI will be used for **non-global** configurations. + +To optimize index access patterns and avoid caching all index shards in every `BucketAssigner` task, the input records will be shuffled +by `hash(record_key) % num_index_shards`. This uses the same hashing algorithm as the MDT's index partitioner, ensuring that +each `BucketAssigner` task only needs to read from a subset of index shards. + +#### Index Cache + +Streaming workloads require low-latency processing of each record to achieve high throughput. Thus, each record lookup against the index +should complete really fast. Reading a RLI entry each time for each record will incur 10+ms of latency per record and seriously affect throughput. + +We need two layers of caching to meet this requirement. + +**General-purpose hotspot cache:** The implementation will use an in-memory LRU (Least Recently Used) cache keyed by active upsert record keys. +Cache entries will be evicted when memory usage exceeds a configurable threshold. + +**New index mappings cache:** Additionally, a separate memory cache is needed for index mappings created during the current checkpoint. +These mappings are not yet committed to the Hudi table and are therefore invisible to MDT queries. This cache must not be cleared until the +corresponding checkpoint/instant is committed to Hudi, which indicates that the index payloads have also been committed. This ensures multiple +records for the same record key (e,g insert to a key, followed by an update within the same commit boundary) are routed consitently to same +file group, preserving the 1:1 mapping from record key to file group. + +The cache stores `key -> location` mappings at the record level, using an LRU eviction strategy with a configurable size limit. +(Note that the MDT reader also maintains its own native file-level cache.) + +The actual index writes occur in the `IndexWrite` operator and the location from the cache will be propagated downstream from the `BucketAssigner` operator, where cache lookups and MDT queries to determine record locations. +The cache is updated for new records and location changes, while the MDT is queried only for existing key locations. + +The cache update flow is as follows: + +1. Probe the cache for the key. If found, update the cache entry if the location has changed. +2. If the key is not in the cache, fall back to querying the MDT. If the key exists in the MDT, add it to the cache with its location. +3. If the key does not exist in the MDT either, add the new key and its assigned location to the cache. + +#### Job restarts and lost checkpoint acknowledgements + +Hudi uses a two-phase commit protocol where each Flink checkpoint corresponds to a Hudi completed instant. During a checkpoint, Flink completes the data writes and collects the Hudi commit metadata. +Once the checkpoint acknowledgment event is received, Flink knows the checkpoint completed successfully, and the corresponding Hudi instant can be committed. +However, the acknowledgment message is sent asynchronously on a best-effort basis and may be lost in corner cases. A Hudi instant cannot be committed without receiving this acknowledgment. + +During job restarts or task failovers, there are scenarios where a Flink checkpoint succeeds but the corresponding Hudi instant remains uncommitted due to the two-phase commit mechanism: Review Comment: So when we say "Flink checkpoint succeeds" - we just mean every operator snapshotted successfully and the job manager received the ack. but the messages to say Hudi write coordinator was lost, so it was not committed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
