GitHub user danny0405 edited a discussion: RLI support for Flink streaming

## Background
Flink does not support RLI while spark does, this caused inconsistency between 
engines, for tables migrated from Spark to flink streaming, the index type 
needs to be switched to either bucket  or flink_state , this caused a overhead 
for users in production.

Another reason is for multiple partition upserts, currently the only choice is 
flink_state index, but the flink_state  actually costs a lot of memory and can 
not be shared between different workloads.
## Goals
- Impl reliable and performant write and read support for RLI via Flink APIs 
[VC: list these out] 
- The RLI impl is engines compatible, for e.g, Flink can access and utilize the 
RLI written by Spark and vice versa
- The RLI is global, upserts among partitions is supported; Also support 
partition level RLI for large fact tables.
- Async compaction for MDT when RLI is enabled; in writer pipeline or table 
services background job.
- Smart caching of RLI  
- Clearly set limits for the kind of write throughput supported by RLI (based 
on certain average response time for the RLI access, like from x0ms to x00ms) 
via empirical benchmarks
- Ability to be expanded to arbitrary secondary indexing on different columns 
(or) at-least be compatible with how such secondary indexes can be built 
outside of Flink pipeline, in a consistent fashion. 

## Non Goals 

[VC: what are these?] 

## The Design
The high-level ideas:
- a RLI based index backend will be there to replace the flink_state index;
- a cache of RLI would be introduced to speed the access
- a separate index function to write the RLI/SI payloads;
- the MDT RLI files is written synchronously with the data table data files, 
the metadata is sent to the coordinator for a final commit to the MDT(after 
`FILES` partition is ready)
- the MDT compaction is  switched to be async and the data files compaction 
pipeline is reused for less take up of task slots.

## The Impl
### The Write

#### The RLI Access
In `BucketAssigner` operator, the RLI index metadata would be utilitized as the 
index backend, the `BucketAssigner` operator will probe the RLI with the 
incoming record keys to figure out whether msg is update or insert or delete. 
In other words, the RLI index metadata will served as the same role of the 
`flink_state` index.

##### The Cache of RLI Access
We need fast access in streaming to have high throughput(ideally per record 
access should be < 10ms), thus a general hotspot cache is needed. We will build 
a in-memory LRU cache by the active upsert records keys, the cache items will 
be force evictted by a configured memory threshold.

We also need a memory cache for the index mappings of current checkpoint 
because it is not committed to Hudi table yet so invisible.

The query will access the in-memory cache first then the MDT RLI index:
<img width="1650" height="594" alt="image" 
src="https://github.com/user-attachments/assets/f0a4780c-0750-41c3-9ee6-59207a00023e";
 />


#### The Shuffle of RLI Payloads
In `StreamWrite` operator, the the index items are inferred and sent to 
`IndexWrite` operator in streaming style, the index reocrds are shuffled by 
`hash(record_key) % num_rli_shards`(the **same hashing algorithm** of the MDT 
`RLI index` partitioner), this is critical to avoid _N*M_  files to write to 
MDT partition(N is the RLI partition buckets number, M is the data table 
buckets involved in the current write).

How do we ensure the data record and index record always belong to one 
commit/checkpoint: the barrier is flowing together with the records in Flink, 
see 
[how-does-state-snapshotting-work](https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/#how-does-state-snapshotting-work),
 when the `StreamWrite` operator received a record, it emits its corresponding 
index record in one `#processElement` call, so we can always keep the bindings 
of these two, in other words, no barrier would be amidst of the two.

#### The RLI Write
In `IndexWrite` operator, the index items are buffered first and write to the 
MDT (triggered by Flink checkpoint), the write status metadata will be sent to 
the `coordinator`. The metadata sent to the coordinator includes two parts:

- A: the written data file handles ;
- B: the written MDT file handles(specifically under the RLI/SI partition)

#### The Commit of MDT(including RLI)
And when commit to the data table, the MDT is committed firstly with the 
partial RLI/SI write metadata list(the MDT RLI/SI partition file handles info), 
the `RLI` and `SI` partition file handles info would be commited altogether 
with the `FILES` partition.

On Flink checkpoint : each index/data writing task flushes all its records to 
RLI and data files respectively. So the RLI and data files are always 
consistent. We commit both as we do now, from Coordinator into a single hudi 
commit.

<img width="3398" height="1178" alt="image" 
src="https://github.com/user-attachments/assets/a5bdcb07-0035-47fc-9fe4-19d798f47830";
 />

In order to keep exactly once semantics for job recovery, the write status 
metadata will also needs to be stored both in the `StreamWrite` operator, 
`IndexWrite` operator and the `coordinator`, pretty much the same behaviors of 
the current maintainance of the data table metadata. 
### The Compaction
In order not take up too much task slot, we will reuse the current compaction 
sub-pipeline for scalable execution of the MDT compaction, it is auto applied 
when RLI is enabled.
<img width="2558" height="620" alt="image" 
src="https://github.com/user-attachments/assets/4f539ae3-57ca-4e38-a0c6-e5bbe92e65ec";
 />

## Open Questions
needs to benchmark the read perf of index items in BucketAssign op, to see if 
we need to introudce layered cache strategies similiar with RocksDB ; 

## Appendix

### SI support
Because SI needs to be figured out on the fly after data files are created, we 
generally needs another SI write op to handle the write of SI, the op 
parallelism is same with the SI partition buckets number, the shffle strategy 
should be in line with the MDT SI partitioner.

The partial commit metadata of SI is sent to the coordinator for MDT commit.

This will increase the checkpoint time a lot and has risk of checkpoint timeout 
and backpressure for hight volumn workloads. ~~One solution is to build the SI 
async before queries but that is another story and not in the scope of this 
design.~~

<img width="2254" height="1206" alt="image" 
src="https://github.com/user-attachments/assets/813b7b61-a7b0-4cfb-bbb3-64e9dc1515c3";
 />



GitHub link: https://github.com/apache/hudi/discussions/17452

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to