GitHub user cshuo edited a discussion: Dynamic Bucket Index For Flink streaming

## Background

Hudi currently provides multiple bucket-based indexing options, but all have 
practical limitations for continuously growing production workloads.

* Simple bucket index

The main limitation of simple bucket index is weak rescaling capability: the 
number of buckets is fixed once configured, when data keeps growing over time, 
each bucket may accumulate more and more records which can eventually hurt 
query performance.

* Partition-level bucket index

The partition-level bucket index improves flexibility by allowing different 
partitions to use different bucket numbers. However, once configured for a 
partition, the bucket count is still not dynamically scalable online, and can 
only be rescaled through an offline rewrite process.

* Consistent bucket index

The consistent bucket index supports bucket split and merge, but it also has 
limitations: 1) The overall bucket resize lifecycle is coupled with clustering; 
2) Before clustering completes, writes during the transition rely on dual-write 
semantics, which introduces extra write overhead and affects write performance. 
For Flink streaming workloads, this model is too heavy to use.

This proposal takes a different direction:

* Reuse Hudi's partitioned RLI as the source of truth for bucket assigning.

* Dynamic bucket growth only affects new keys, old keys always go to the 
original bucket, thus no split/merge for original bucket.

With this approach, dynamic bucket growth can be performed online during 
streaming ingestion and is also lightweight, without coupling with any 
background table service and heavy dual-write.


## Goals

* Support a dynamic bucket index built on top of Hudi partitioned RLI.

* Keep bucket assignment immutable once a record key is assigned to avoid 
historical data relocation for bucket growth.

* Support lazy bootstrap of `key -> bucket` cache from partitioned RLI.

* Keep memory usage bounded through partition-granularity cache lifecycle 
management.

* Reuse existing MDT / RLI infrastructure as much as possible.


## Non Goals

* Introducing a new hash-based or consistent-hashing bucket model.

* Rebalancing historical keys after bucket growth.

* Solving hot-key skew caused by a small set of existing keys.



## The Design
<img width="2680" height="988" alt="image" 
src="https://github.com/user-attachments/assets/b3939712-5e2f-47a0-899f-617bf7bba32d";
 />

The high-level ideas:

* Use partitioned RLI as the persistent backend for dynamic bucket assigning.

* Set the initial bucket count to the number of bucket assigners.

* Follow the same fileId naming pattern as simple bucket index: `<bucketId(8 
digits)>-<uuid tail>`

* Maintain per-partition mapping cache in the bucket assigner.

  * `Partition -> { recordKeyHash -> bucketId }`

* Lazily bootstrap the in-memory cache from partitioned RLI.

* Support partition-granularity cache eviction after commit and inactivity

* Support MDT lookup for the index data of a specific data partition



## The Impl

### The Initial Bucket Count

* The initial bucket count is set to the number of bucket assigners.&#x20;

&#x20;     This gives a natural initial routing space and aligns initial bucket 
layout with write parallelism.

* The capacity for each bucket can be defined as the maximum number of record 
keys or the maximum file size of the latest base file.
  - For new keys, existing buckets are preferred to avoid small files problem; 
new buckets are created until the capacity of all the existing buckets are 
exceeded. 


### The Bucket Assigning Strategy

We are not calculating bucket id based on static hash strategy anymore. 
Instead, for each data partition, maintain the per-partition mapping cache:

```plaintext
Partition -> { recordKeyHash -> bucketId}
```


**Assigning behavior**:
<img width="2154" height="1596" alt="image" 
src="https://github.com/user-attachments/assets/7b733b71-1ecd-4ebe-90c2-4242c8b4744f";
 />


* if the record key already exists in the mapping, use the existing bucket 
id/fileGroup id

* if the record key does not exist:

  * Select a bucket which is not 'full', and assign the bucket to the record 
key.

  * Create a new bucket if all the existing buckets are 'full',&#x20;



**Cache Size estimation**:

For memory efficiency, we store the mapping of hashcode(INT) for record key to 
the bucket id(Short), where each entry is about 4 Bytes + 2 Bytes. Considering 
the auxiliary cost of the map, we can rough estimate:

* 100 million keys -> \~1 GB&#x20;

And the mapping is distributed among all tasks of bucket assigners, e.g., if 
the parallelism of bucket assigner is 20, then only 50MB memory is needed for 
each task.



### Lazy Bootstrap for Bucket Assign Cache

The `recordKeyHash -> bucket` cache is bootstrapped lazily from partitioned RLI.

Behavior:

* No eager preload for all partitions

* When a partition becomes active, load its routing mapping from partitioned RLI

* Once loaded, serve the bucket assigning from the in-memory cache.

This keeps memory proportional to active partitions instead of total table size.



### Cache Eviction

Bucket assigning Cache is managed at partition granularity, there are two flags 
for each Partition bucket cache:

* `updated`: indicate if the bucket cache of the partition is updated during 
the current checkpoint interval.

* `lastUpdatedCheckpoint`: denote the last checkpoint interval during which the 
bucket cache is updated.

**Eviction flow**

<img width="2634" height="1726" alt="image" 
src="https://github.com/user-attachments/assets/a6f2ffc8-b4c7-4dad-9b81-0ca4ee817dd9";
 />

When bucket assigner assigns bucket for a record key:

1. If the record key hash does not exist in the cache, then update the mapping 
and set `updated` as true.

When the bucket assigner operator performs snapshot():&#x20;

1. Get the latest successful checkpoint id `lastestSuccessfulCheckpoint` 
correspoinding to the latest completed instant.

2. checks the flags for the bucket cache for each partition:

   1. If `updated` is true, then update `lastUpdatedCheckpoint` as the current 
checkpoint id;

   2. If `updated` is false:

      1. if `lastUpdatedCheckpoint <= lastestSuccessfulCheckpoint`: remove the 
bucket cache

      2. Else: keep the partition bucket cache, since there exists new 
`key-bucket` mapping which are not committed into partitioned RLI yet.

3. Finally set `updated` as false for bucket cache of each partition.

The eviction strategy can avoid unbounded cache growth while keeping hot 
partitions resident.



### The Write of Partitioned RLI

The index metadata is stored under the partitioned RLI in MDT, since the index 
write pipeline for global RLI is already supported, we can reuse the pipeline 
for partitioned RLI.

Index write rules:

* Only insert index entries for new record keys

* Existing keys never update their bucket assignment

This keeps index maintenance simple and the partitioned RLI data updated 
incrementally.



### The Lookup Path

The system should support lookup query of partitoned RLI data for a specific 
data partition from MDT.

```typescript
public class HoodieBackedTableMetadata extends BaseTableMetadata {
    ...
    /**
     * Reads record locations from partitioned record-level index with a 
specified data partition.
     */
    public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocations(String dataTablePartition);
    ...
}
```



### Concurrent Writers

This design currently does not work well under the concurrent writers scenario. 
The main risks are:

1. Conflicting bucket assignment for the same new key

   * two writers may assign the same new key to different buckets

   * this breaks the correctness of the `key -> bucket` mapping

2. Conflicting bucket creation during bucket expansion

   * two writers may generate the same bucket id but bind it to different file 
groups

   * this creates bucket ownership conflicts, similar to the problem in simple 
bucket index.

Because of these risks, concurrent writers are not supported without additional 
coordination.



## Benefits

* No need to move/rewrite historical data when bucket count grows.

* No bucket lineage or transitional read semantics are required.

* Update routing remains simple because the same key always maps to the same 
bucket.

* Reuses Hudi's existing partitioned RLI / MDT infrastructure.

* More natural for workloads where new keys continuously arrive over time.



## Tradeoffs / Risks

* The in-memory cache can become large for hot, high-cardinality partitions.

* First access to a large partition may incur bootstrap latency.

* Bucket growth only helps future new keys, not existing hot keys.



## Summary

This proposal introduces a **Partitioned RLI-based Dynamic Bucket Index** for 
Hudi.

The key idea is to use partitioned RLI as the persistent routing backend for a 
stable per-key bucket assignment:

* initial bucket count is set by bucket assigner parallelism

* the in-memory cache is bootstrapped lazily per partition, and is evicted 
gradually to avoid OOM.

* only index entries for new keys are written into RLI

* bucket assignment never changes once inserted

In short, this design treats dynamic bucket routing as an explicit metadata 
indexing problem, using Hudi's own partitioned RLI as the source of truth.


GitHub link: https://github.com/apache/hudi/discussions/18514

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to