GitHub user cshuo edited a discussion: Dynamic Bucket for Flink streaming with Partitioned RLI
## Background Hudi currently provides multiple bucket-based indexing options, but all have practical limitations for continuously growing production workloads. * Simple bucket index The main limitation of simple bucket index is weak rescaling capability: the number of buckets is fixed once configured, when data keeps growing over time, each bucket may accumulate more and more records which can eventually hurt query performance. * Partition-level bucket index The partition-level bucket index improves flexibility by allowing different partitions to use different bucket numbers. However, once configured for a partition, the bucket count is still not dynamically scalable online, and can only be rescaled through an offline rewrite process. * Consistent bucket index The consistent bucket index supports bucket split and merge, but it also has limitations: 1) The overall bucket resize lifecycle is coupled with clustering; 2) Before clustering completes, writes during the transition rely on dual-write semantics, which introduces extra write overhead and affects write performance. For Flink streaming workloads, this model is too heavy to use. This proposal takes a different direction: * Reuse Hudi's partitioned RLI as the source of truth for bucket assigning. * Dynamic bucket growth only affects new keys, old keys always go to the original bucket, thus no split/merge for original bucket. With this approach, dynamic bucket growth can be performed online during streaming ingestion and is also lightweight, without coupling with any background table service and heavy dual-write. ## Goals * Support dynamic bucket assigning built on top of partitioned RLI. * Keep bucket assignment immutable once a record key is assigned to avoid historical data relocation for bucket growth. * Support lazy bootstrap of `key -> bucket` cache from partitioned RLI. * Keep memory usage bounded through partition-granularity cache lifecycle management. * Reuse existing MDT / RLI infrastructure as much as possible. ## Non Goals * Introducing a new hash-based or consistent-hashing bucket index. * Rebalancing historical keys after bucket growth. * Solving hot-key skew caused by a small set of existing keys. * Multiple writers scenario. ## The Design <img width="2680" height="988" alt="image" src="https://github.com/user-attachments/assets/b3939712-5e2f-47a0-899f-617bf7bba32d" /> The high-level ideas: * Use partitioned RLI as the persistent backend for dynamic bucket assigning. * Set the initial bucket/file-group count to the number of bucket assigners. * Maintain per-partition mapping cache in the bucket assigner, and is lazily bootstrapped from partitioned RLI. * `Partition -> { recordKey -> fileId }` * The memory usage of the cache is bounded and can be spilled to disk. * Support partition-granularity cache eviction after commit and inactivity * Support MDT lookup for the index data of a specific data partition ## The Impl ### The Initial Bucket Count * The initial bucket count is set to the number of bucket assigners.    This gives a natural initial routing space and aligns initial bucket layout with write parallelism. * The capacity for each bucket can be defined as the maximum file size of the latest file slice. - For new keys, existing buckets are preferred to avoid small files problem; new buckets are created until the capacity of all the existing buckets are exceeded. ### The Bucket Assigning Strategy We are not calculating bucket id based on static hash strategy anymore. Instead, for each data partition, maintain the per-partition mapping cache: ```plaintext Partition -> { recordKey -> fileId} ``` **Assigning behavior**: <img width="2154" height="1596" alt="image" src="https://github.com/user-attachments/assets/7b733b71-1ecd-4ebe-90c2-4242c8b4744f" /> * if the record key already exists in the mapping, use the existing bucket id/fileGroup id * if the record key does not exist: * Select a bucket which is not 'full', and assign the bucket to the record key. * Create a new bucket if all the existing buckets are 'full',  ### Lazy Bootstrap for Bucket Assign Cache The `recordKey -> fileId` cache is bootstrapped lazily from partitioned RLI. Behavior: * No eager preload for all partitions * When a partition becomes active, load its routing mapping from partitioned RLI * Once loaded, serve the bucket assigning from the cache. * Set a total memory cap for the cache, and the cache will spill to disk/rocksdb if exceeding the limit. This keeps memory proportional to active partitions instead of total table size. ### Cache Eviction Bucket assigning Cache is managed at partition granularity, there a flag for each Partition bucket cache: * `lastUpdatedCheckpoint`: denote the last checkpoint interval during which the bucket cache is updated. **Eviction flow** <img width="2226" height="1416" alt="image" src="https://github.com/user-attachments/assets/88cb2a0d-c8c8-4c90-84c4-a0283b79a625" /> When bucket assigner assigns bucket for a record key: 1. If the record key does not exist in the cache, then update the cache and `lastUpdatedCheckpoint` as the current checkpoint id. When the bucket assigner operator receives a checkpoint complete notification: 1. Get the latest successful checkpoint id `lastestSuccessfulCheckpoint` correspoinding to the latest completed instant. 2. Save the `lastestSuccessfulCheckpoint` in bucket assign operator, which will be used to decide whether a bucket assign cache is evictable. 3. The bucket assign cache is evicted lazily: - If the total memory usage of the cache doesn't exceed the limit, the bucket assign cache for a partition will not be evicted even it's inactive. - Before creating a new bucket assign cache for a new partition, if there is no enough memory, the inactive cache will be evicted. The eviction strategy can avoid unbounded cache growth while keeping hot partitions resident. ### The Write of Partitioned RLI The index metadata is stored under the partitioned RLI in MDT, since the index write pipeline for global RLI is already supported, we can reuse the pipeline for partitioned RLI. Index write rules: * Only insert index entries for new record keys * Existing keys never update their bucket assignment This keeps index maintenance simple and the partitioned RLI data updated incrementally. ### The Lookup Path The system should support lookup query of partitoned RLI data for a specific data partition from MDT. ```typescript public class HoodieBackedTableMetadata extends BaseTableMetadata { ... /** * Reads record locations from partitioned record-level index with a specified data partition. */ public HoodiePairData<String, HoodieRecordGlobalLocation> readRecordIndexLocations(String dataTablePartition); ... } ``` ### Concurrent Writers This design currently does not work well under the concurrent writers scenario. The main risks are: 1. Conflicting bucket assignment for the same new key * two writers may assign the same new key to different buckets * this breaks the correctness of the `key -> bucket` mapping 2. Conflicting bucket creation during bucket expansion * two writers may generate the same bucket id but bind it to different file groups * this creates bucket ownership conflicts, similar to the problem in simple bucket index. Because of these risks, concurrent writers are not supported without additional coordination. ## Benefits * No need to move/rewrite historical data when bucket count grows. * No bucket lineage or transitional read semantics are required. * Update routing remains simple because the same key always maps to the same bucket. * Reuses Hudi's existing partitioned RLI / MDT infrastructure. * More natural for workloads where new keys continuously arrive over time. ## Tradeoffs / Risks * The cache can become large for hot, high-cardinality partitions. * First access to a large partition may incur bootstrap latency. * Bucket growth only helps future new keys, not existing hot keys. ## Summary This proposal introduces a **Partitioned RLI-based Dynamic Bucket Index** for Hudi. The key idea is to use partitioned RLI as the persistent routing backend for a stable per-key bucket assignment: * initial bucket count is set by bucket assigner parallelism * the cache is bootstrapped lazily per partition, and is evicted gradually to avoid OOM. * only index entries for new keys are written into RLI In short, this design treats dynamic bucket routing as an explicit metadata indexing problem, using Hudi's own partitioned RLI as the source of truth. GitHub link: https://github.com/apache/hudi/discussions/18514 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
