Dear Hudi Community,

I would like to propose Consistent Hashing Indexing to enable dynamic
bucket number, saving hyper-parameter tuning for Hudi users.

Currently, we have Bucket Index on landing [1]. It is an effective index
approach to address the performance issue during Upsert. I observed ~3x
throughput improvement for Upsert in my local setup compared to the Bloom
Filter approach. However, it requires pre-configure a bucket number when
creating the table. As described in [1], this imposes two limitations:

- Due to the one-one mapping between buckets and file groups, the size of a
single file group may grow infinitely. Services like compaction will take
longer because of the larger read/write amplification.

- There may exist data skew because of imbalance data distribution,
resulting in long-tail read/write.

Based on the above observation, supporting dynamic bucket number is
necessary, especially for rapidly changing hudi tables. Looking at the
market, Consistent Hashing has been adopted in DB systems[2][3]. The main
idea of it is to turn the "key->bucket" mapping into
"key->hash_value->(range mapping)->bucket", constraining the re-hashing
process to touch only several local buckets (e.g., only large file groups)
rather than shuffling the whole hash table.

In order to introduce Consistent Hashing to Hudi, we need to consider the
following issues:

- Storing hashing metadata, such as range mapping infos. Metadata size and
concurrent updates to metadata should also be considered.

- Splitting & Merging criteria. We need to design a (or several) policies
to manage 'when and how to split & merge bucket'. A simple policy would be
splitting in the middle when the file group reaches the size threshold.

- Supporting concurrent write & read. The splitting or merging must not
block concurrent writer & reader, and the whole process should be fast
enough (e.g., one bucket at a time) to minimize the impact on other
operations.

- Integrating splitting & merging process into existing hudi table service
pipelines.

I have sketched a prototype design to address the above problems:

- Maintain hashing metadata for each partition (persisted as files), and
use instant to manage multi-version and concurrent updates of it.

- A flexible framework will be implemented for different pluggable
policies. The splitting plan, specifying which and how the bucket to split
(merge), will be generated during the scheduling (just like how compaction
does).

- Dual-write will be activated once the writer observes the splitting(or
merging) process, upserting records as log files into both old and new
buckets (file groups). Readers can see records once the writer completes,
regardless of the splitting process.

- The splitting & merging could be integrated as a sub-task into the
Clustering service, because we could view them as a special case of the
Clustering's goal (i.e., managing file groups based on file size). Though
we need to modify Clustering to handle log files, the bucket index enhances
Clustering by allowing concurrent updates.


Would love to hear your thoughts and any feedback about the proposal. I can
draft an RFC with a detailed design once we reach an agreement.

[1] https://cwiki.apache.org/confluence/display/HUDI/RFC+-+29%3A+Hash+Index

[2] YugabyteDB
https://docs.yugabyte.com/latest/architecture/docdb-sharding/sharding/#example

[3] PolarDB-X
https://help.aliyun.com/document_detail/316603.html#title-y5n-2i1-5ws



Best,

Yuwei Xiao

Reply via email to