Dear Hudi Community, I would like to propose Consistent Hashing Indexing to enable dynamic bucket number, saving hyper-parameter tuning for Hudi users.
Currently, we have Bucket Index on landing [1]. It is an effective index approach to address the performance issue during Upsert. I observed ~3x throughput improvement for Upsert in my local setup compared to the Bloom Filter approach. However, it requires pre-configure a bucket number when creating the table. As described in [1], this imposes two limitations: - Due to the one-one mapping between buckets and file groups, the size of a single file group may grow infinitely. Services like compaction will take longer because of the larger read/write amplification. - There may exist data skew because of imbalance data distribution, resulting in long-tail read/write. Based on the above observation, supporting dynamic bucket number is necessary, especially for rapidly changing hudi tables. Looking at the market, Consistent Hashing has been adopted in DB systems[2][3]. The main idea of it is to turn the "key->bucket" mapping into "key->hash_value->(range mapping)->bucket", constraining the re-hashing process to touch only several local buckets (e.g., only large file groups) rather than shuffling the whole hash table. In order to introduce Consistent Hashing to Hudi, we need to consider the following issues: - Storing hashing metadata, such as range mapping infos. Metadata size and concurrent updates to metadata should also be considered. - Splitting & Merging criteria. We need to design a (or several) policies to manage 'when and how to split & merge bucket'. A simple policy would be splitting in the middle when the file group reaches the size threshold. - Supporting concurrent write & read. The splitting or merging must not block concurrent writer & reader, and the whole process should be fast enough (e.g., one bucket at a time) to minimize the impact on other operations. - Integrating splitting & merging process into existing hudi table service pipelines. I have sketched a prototype design to address the above problems: - Maintain hashing metadata for each partition (persisted as files), and use instant to manage multi-version and concurrent updates of it. - A flexible framework will be implemented for different pluggable policies. The splitting plan, specifying which and how the bucket to split (merge), will be generated during the scheduling (just like how compaction does). - Dual-write will be activated once the writer observes the splitting(or merging) process, upserting records as log files into both old and new buckets (file groups). Readers can see records once the writer completes, regardless of the splitting process. - The splitting & merging could be integrated as a sub-task into the Clustering service, because we could view them as a special case of the Clustering's goal (i.e., managing file groups based on file size). Though we need to modify Clustering to handle log files, the bucket index enhances Clustering by allowing concurrent updates. Would love to hear your thoughts and any feedback about the proposal. I can draft an RFC with a detailed design once we reach an agreement. [1] https://cwiki.apache.org/confluence/display/HUDI/RFC+-+29%3A+Hash+Index [2] YugabyteDB https://docs.yugabyte.com/latest/architecture/docdb-sharding/sharding/#example [3] PolarDB-X https://help.aliyun.com/document_detail/316603.html#title-y5n-2i1-5ws Best, Yuwei Xiao
