GitHub user danny0405 edited a discussion: RLI support for Flink streaming

## Background
Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket  or flink_state , this caused a overhead 
for users in production.

Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state  actually costs a lot of memory and can 
not be shared between different workloads.
## Goal
Impl basic write and read support for RLI for midium volume workloads, for 
super large workloads, the RLI may not work well for streaming, user can choose 
bucket index instead.

## Design 
### The Write
In `BucketAssigner` operator, the RLI index metadata would be utilitized as the 
index state backend, the `BucketAssigner` operator will probe the state with 
the incoming record keys to figure out whether msg is update or insert or 
delete. We need a memory cache for the index mappings of current checkpoint 
because it is not committed to Hudi table yet so invisible.

Then the index items are shuffled by record keys with the **same hashing 
algorithm** of the MDT `RLI index` partitioner, this is critical to avoid _N*M_ 
 files to write to MDT partition(N is the RLI partition buckets number, M is 
the data table buckets involved in the current write).

In `StreamWrite` operator, the index items are buffered first and write to the 
MDT after the data items are flushed, the write status metadata will be sent to 
the `coordinator` altogether with the data table write status metadata.

And when the commit to the data table, the MDT is committed firstly with the 
partial RLI write metadata list.

<img width="2572" height="1232" alt="image" 
src="https://github.com/user-attachments/assets/6d29b776-94b0-4bb0-9b34-59a5f8354b45";
 />

In order to keep exactly once semantics for job recovery, the write status 
metadata will also needs to be stored both in the `StreamWrite` operator and 
the `coordinator`, pretty much the same behaviors of the current maintainance 
of the data table metadata. 
### The Compaction
In order not take up too much task slot, we will reuse the current compaction 
sub-pipeline for scalable execution of the MDT compaction, it is auto applied 
when RLI is enabled.
<img width="2558" height="620" alt="image" 
src="https://github.com/user-attachments/assets/4f539ae3-57ca-4e38-a0c6-e5bbe92e65ec";
 />


## Open Questions
needs to benchmark the read perf of index items in BucketAssign op, to see if 
we need to introudce layered cache strategies similiar with RocksDB ; 


GitHub link: https://github.com/apache/hudi/discussions/17452

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to