GitHub user danny0405 edited a discussion: RLI support for Flink streaming
## Background Flink does not support RLI while spark does, this caused inconsistency between engines, for tables migrated from Spark to flink streaming, the index type needs to be switched to either bucket or flink_state , this caused a overhead for users in production. Another reason is for multiple partition upserts, currently the only choice is flink_state index, but the flink_state actually costs a lot of memory and can not be shared between different workloads. ## Goal Impl basic write and read support for RLI for midium volume workloads, for super large workloads, the RLI may not work well for streaming, user can choose bucket index instead. ## Design ### The Write In `BucketAssigner` operator, the RLI index metadata would be utilitized as the index state backend, the `BucketAssigner` operator will probe the state with the incoming record keys to figure out whether msg is update or insert or delete. We need a memory cache for the index mappings of current checkpoint because it is not committed to Hudi table yet so invisible. Then the index items are shuffled by record keys with the **same hashing algorithm** of the MDT `RLI index` partitioner, this is critical to avoid _N*M_ files to write to MDT partition(N is the RLI partition buckets number, M is the data table buckets involved in the current write). In `StreamWrite` operator, the index items are buffered first and write to the MDT after the data items are flushed, the write status metadata will be sent to the `coordinator` altogether with the data table write status metadata. And when the commit to the data table, the MDT is committed firstly with the partial RLI write metadata list. <img width="2572" height="1232" alt="image" src="https://github.com/user-attachments/assets/6d29b776-94b0-4bb0-9b34-59a5f8354b45" /> In order to keep exactly once semantics for job recovery, the write status metadata will also needs to be stored both in the `StreamWrite` operator and the `coordinator`, pretty much the same behaviors of the current maintainance of the data table metadata. ### The Compaction In order not take up too much task slot, we will reuse the current compaction sub-pipeline for scalable execution of the MDT compaction, it is auto applied when RLI is enabled. <img width="2558" height="620" alt="image" src="https://github.com/user-attachments/assets/4f539ae3-57ca-4e38-a0c6-e5bbe92e65ec" /> ## Open Questions needs to benchmark the read perf of index items in BucketAssign op, to see if we need to introudce layered cache strategies similiar with RocksDB ; GitHub link: https://github.com/apache/hudi/discussions/17452 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
