GitHub user cshuo edited a discussion: Dynamic Bucket Index For Flink streaming
## Background Hudi currently provides multiple bucket-based indexing options, but all have practical limitations for continuously growing production workloads. * Simple bucket index The main limitation of simple bucket index is weak rescaling capability: the number of buckets is fixed once configured, when data keeps growing over time, each bucket may accumulate more and more records which can eventually hurt query performance. * Partition-level bucket index The partition-level bucket index improves flexibility by allowing different partitions to use different bucket numbers. However, once configured for a partition, the bucket count is still not dynamically scalable online, and can only be rescaled through an offline rewrite process. * Consistent bucket index The consistent bucket index supports bucket split and merge, but it also has limitations: 1) The overall bucket resize lifecycle is coupled with clustering; 2) Before clustering completes, writes during the transition rely on dual-write semantics, which introduces extra write overhead and affects write performance. For Flink streaming workloads, this model is too heavy to use. This proposal takes a different direction: * Reuse Hudi's partitioned RLI as the source of truth for bucket assigning. * Dynamic bucket growth only affects new keys, old keys always go to the original bucket, thus no split/merge for original bucket. With this approach, dynamic bucket growth can be performed online during streaming ingestion and is also lightweight, without coupling with any background table service and heavy dual-write. ## Goals * Support a dynamic bucket index built on top of Hudi partitioned RLI. * Keep bucket assignment immutable once a record key is assigned to avoid historical data relocation for bucket growth. * Support lazy bootstrap of `key -> bucket` cache from partitioned RLI. * Keep memory usage bounded through partition-granularity cache lifecycle management. * Reuse existing MDT / RLI infrastructure as much as possible. ## Non Goals * Introducing a new hash-based or consistent-hashing bucket model. * Rebalancing historical keys after bucket growth. * Solving hot-key skew caused by a small set of existing keys. * Multiple writers scenario. ## The Design <img width="2680" height="988" alt="image" src="https://github.com/user-attachments/assets/b3939712-5e2f-47a0-899f-617bf7bba32d" /> The high-level ideas: * Use partitioned RLI as the persistent backend for dynamic bucket assigning. * Set the initial bucket count to the number of bucket assigners. * Follow the same fileId naming pattern as simple bucket index: `<bucketId(8 digits)>-<uuid tail>` * Maintain per-partition mapping cache in the bucket assigner. * `Partition -> { recordKeyHash -> bucketId }` * Lazily bootstrap the in-memory cache from partitioned RLI. * Support partition-granularity cache eviction after commit and inactivity * Support MDT lookup for the index data of a specific data partition ## The Impl ### The Initial Bucket Count * The initial bucket count is set to the number of bucket assigners.    This gives a natural initial routing space and aligns initial bucket layout with write parallelism. * The capacity for each bucket can be defined as the maximum number of record keys or the maximum file size of the latest base file. - For new keys, existing buckets are preferred to avoid small files problem; new buckets are created until the capacity of all the existing buckets are exceeded. ### The Bucket Assigning Strategy We are not calculating bucket id based on static hash strategy anymore. Instead, for each data partition, maintain the per-partition mapping cache: ```plaintext Partition -> { recordKeyHash -> bucketId} ``` **Assigning behavior**: <img width="2154" height="1596" alt="image" src="https://github.com/user-attachments/assets/7b733b71-1ecd-4ebe-90c2-4242c8b4744f" /> * if the record key already exists in the mapping, use the existing bucket id/fileGroup id * if the record key does not exist: * Select a bucket which is not 'full', and assign the bucket to the record key. * Create a new bucket if all the existing buckets are 'full',  **Cache Size estimation**: For memory efficiency, we store the mapping of hashcode(INT) for record key to the bucket id(Short), where each entry is about 4 Bytes + 2 Bytes. Considering the auxiliary cost of the map, we can rough estimate: * 100 million keys -> \~1 GB  And the mapping is distributed among all tasks of bucket assigners, e.g., if the parallelism of bucket assigner is 20, then only 50MB memory is needed for each task. ### Lazy Bootstrap for Bucket Assign Cache The `recordKeyHash -> bucket` cache is bootstrapped lazily from partitioned RLI. Behavior: * No eager preload for all partitions * When a partition becomes active, load its routing mapping from partitioned RLI * Once loaded, serve the bucket assigning from the in-memory cache. This keeps memory proportional to active partitions instead of total table size. ### Cache Eviction Bucket assigning Cache is managed at partition granularity, there are two flags for each Partition bucket cache: * `updated`: indicate if the bucket cache of the partition is updated during the current checkpoint interval. * `lastUpdatedCheckpoint`: denote the last checkpoint interval during which the bucket cache is updated. **Eviction flow** <img width="2634" height="1726" alt="image" src="https://github.com/user-attachments/assets/a6f2ffc8-b4c7-4dad-9b81-0ca4ee817dd9" /> When bucket assigner assigns bucket for a record key: 1. If the record key hash does not exist in the cache, then update the mapping and set `updated` as true. When the bucket assigner operator performs snapshot():  1. Get the latest successful checkpoint id `lastestSuccessfulCheckpoint` correspoinding to the latest completed instant. 2. checks the flags for the bucket cache for each partition: 1. If `updated` is true, then update `lastUpdatedCheckpoint` as the current checkpoint id; 2. If `updated` is false: 1. if `lastUpdatedCheckpoint <= lastestSuccessfulCheckpoint`: remove the bucket cache 2. Else: keep the partition bucket cache, since there exists new `key-bucket` mapping which are not committed into partitioned RLI yet. 3. Finally set `updated` as false for bucket cache of each partition. The eviction strategy can avoid unbounded cache growth while keeping hot partitions resident. ### The Write of Partitioned RLI The index metadata is stored under the partitioned RLI in MDT, since the index write pipeline for global RLI is already supported, we can reuse the pipeline for partitioned RLI. Index write rules: * Only insert index entries for new record keys * Existing keys never update their bucket assignment This keeps index maintenance simple and the partitioned RLI data updated incrementally. ### The Lookup Path The system should support lookup query of partitoned RLI data for a specific data partition from MDT. ```typescript public class HoodieBackedTableMetadata extends BaseTableMetadata { ... /** * Reads record locations from partitioned record-level index with a specified data partition. */ public HoodiePairData<String, HoodieRecordGlobalLocation> readRecordIndexLocations(String dataTablePartition); ... } ``` ### Concurrent Writers This design currently does not work well under the concurrent writers scenario. The main risks are: 1. Conflicting bucket assignment for the same new key * two writers may assign the same new key to different buckets * this breaks the correctness of the `key -> bucket` mapping 2. Conflicting bucket creation during bucket expansion * two writers may generate the same bucket id but bind it to different file groups * this creates bucket ownership conflicts, similar to the problem in simple bucket index. Because of these risks, concurrent writers are not supported without additional coordination. ## Benefits * No need to move/rewrite historical data when bucket count grows. * No bucket lineage or transitional read semantics are required. * Update routing remains simple because the same key always maps to the same bucket. * Reuses Hudi's existing partitioned RLI / MDT infrastructure. * More natural for workloads where new keys continuously arrive over time. ## Tradeoffs / Risks * The in-memory cache can become large for hot, high-cardinality partitions. * First access to a large partition may incur bootstrap latency. * Bucket growth only helps future new keys, not existing hot keys. ## Summary This proposal introduces a **Partitioned RLI-based Dynamic Bucket Index** for Hudi. The key idea is to use partitioned RLI as the persistent routing backend for a stable per-key bucket assignment: * initial bucket count is set by bucket assigner parallelism * the in-memory cache is bootstrapped lazily per partition, and is evicted gradually to avoid OOM. * only index entries for new keys are written into RLI * bucket assignment never changes once inserted In short, this design treats dynamic bucket routing as an explicit metadata indexing problem, using Hudi's own partitioned RLI as the source of truth. GitHub link: https://github.com/apache/hudi/discussions/18514 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
