This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/asf-site by this push: new 72b01a53d3d [HUDI-6330][DOCS] Update user doc to show how to use consistent bucket index for Flink engine (#10977) 72b01a53d3d is described below commit 72b01a53d3d22a51e9210b4b69f368e7388821e4 Author: Jing Zhang <beyond1...@gmail.com> AuthorDate: Tue Apr 9 00:18:41 2024 +0800 [HUDI-6330][DOCS] Update user doc to show how to use consistent bucket index for Flink engine (#10977) --- website/docs/sql_dml.md | 80 ++++++++++++++++++++++++++++++++++++-- website/releases/release-0.14.0.md | 4 +- 2 files changed, 78 insertions(+), 6 deletions(-) diff --git a/website/docs/sql_dml.md b/website/docs/sql_dml.md index 90576dcb0e0..edb63730b13 100644 --- a/website/docs/sql_dml.md +++ b/website/docs/sql_dml.md @@ -323,12 +323,15 @@ In the below example, we have two streaming ingestion pipelines that concurrentl pipeline is responsible for the compaction and cleaning table services, while the other pipeline is just for data ingestion. -```sql +In order to commit the dataset, the checkpoint needs to be enabled, here is an example configuration for a flink-conf.yaml: +```yaml -- set the interval as 30 seconds execution.checkpointing.interval: 30000 state.backend: rocksdb +``` --- This is a datagen source that can generates records continuously +```sql +-- This is a datagen source that can generate records continuously CREATE TABLE sourceT ( uuid varchar(20), name varchar(10), @@ -349,7 +352,7 @@ CREATE TABLE t1( `partition` varchar(20) ) WITH ( 'connector' = 'hudi', - 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1', + 'path' = '${work_path}/hudi-demo/t1', 'table.type' = 'MERGE_ON_READ', 'index.type' = 'BUCKET', 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL', @@ -365,7 +368,7 @@ CREATE TABLE t1_2( `partition` varchar(20) ) WITH ( 'connector' = 'hudi', - 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1', + 'path' = '${work_path}/hudi-demo/t1', 'table.type' = 'MERGE_ON_READ', 'index.type' = 'BUCKET', 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL', @@ -390,3 +393,72 @@ and `clean.async.enabled` options are used to disable the compaction and cleanin This is done to ensure that the compaction and cleaning services are not executed twice for the same table. +### Consistent hashing index (Experimental) + +We have introduced the Consistent Hashing Index since [0.13.0 release](/releases/release-0.13.0#consistent-hashing-index). In comparison to the static hashing index ([Bucket Index](/releases/release-0.11.0#bucket-index)), the consistent hashing index offers dynamic scalability of data buckets for the writer. +You can find the [RFC](https://github.com/apache/hudi/blob/master/rfc/rfc-42/rfc-42.md) for the design of this feature. +In the 0.13.X release, the Consistent Hashing Index is supported only for Spark engine. And since [release 0.14.0](/releases/release-0.14.0#consistent-hashing-index-support), the index is supported for Flink engine. + +To utilize this feature, configure the option `index.type` as `BUCKET` and set `hoodie.index.bucket.engine` to `CONSISTENT_HASHING`. +When enabling the consistent hashing index, it's important to enable clustering scheduling within the writer. During this process, the writer will perform dual writes for both the old and new data buckets while the clustering is pending. Although the dual write does not impact correctness, it is strongly recommended to execute clustering as quickly as possible. + +In the below example, we will create a datagen source and do streaming ingestion into Hudi table with consistent bucket index. In order to commit the dataset, the checkpoint needs to be enabled, here is an example configuration for a flink-conf.yaml: +```yaml +-- set the interval as 30 seconds +execution.checkpointing.interval: 30000 +state.backend: rocksdb +``` + +```sql +-- This is a datagen source that can generate records continuously +CREATE TABLE sourceT ( + uuid varchar(20), + name varchar(10), + age int, + ts timestamp(3), + `partition` as 'par1' +) WITH ( + 'connector' = 'datagen', + 'rows-per-second' = '200' +); + +-- Create the hudi table with consistent bucket index +CREATE TABLE t1( + uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, + name VARCHAR(10), + age INT, + ts TIMESTAMP(3), + `partition` VARCHAR(20) +) +PARTITIONED BY (`partition`) +WITH ( + 'connector'='hudi', + 'path' = '${work_path}/hudi-demo/hudiT', + 'table.type' = 'MERGE_ON_READ', + 'index.type' = 'BUCKET', + 'clustering.schedule.enabled'='true', + 'hoodie.index.bucket.engine'='CONSISTENT_HASHING', + 'hoodie.clustering.plan.strategy.class'='org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy', + 'hoodie.clustering.execution.strategy.class'='org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy', + 'hoodie.bucket.index.num.buckets'='8', + 'hoodie.bucket.index.max.num.buckets'='128', + 'hoodie.bucket.index.min.num.buckets'='8', + 'hoodie.bucket.index.split.threshold'='1.5', + 'write.tasks'='2' +); + +-- submit the pipelines +insert into t1 select * from sourceT; + +select * from t1 limit 20; +``` + +:::caution +Consistent Hashing Index is supported for Flink engine since [release 0.14.0](/releases/release-0.14.0#consistent-hashing-index-support) and currently there are some limitations to use it as of 0.14.0: + +- This index is supported only for MOR table. This limitation also exists even if using Spark engine. +- It does not work with metadata table enabled. This limitation also exists even if using Spark engine. +- Consistent hashing index does not work with bulk-insert using Flink engine yet, please use simple bucket index or Spark engine for bulk-insert pipelines. +- The resize plan which generated by Flink engine only supports merging small file groups, the file splitting is not supported yet. +- The resize plan should be executed through an offline Spark job. Flink engine does not support execute resize plan yet. + ::: \ No newline at end of file diff --git a/website/releases/release-0.14.0.md b/website/releases/release-0.14.0.md index 266db7c0c2b..71b8cc9a934 100644 --- a/website/releases/release-0.14.0.md +++ b/website/releases/release-0.14.0.md @@ -289,8 +289,8 @@ In comparison to the static hashing index (BUCKET index), the consistent hashing data buckets for the writer. To utilize this feature, configure the option `index.type` as `BUCKET` and set `hoodie.index.bucket.engine` to `CONSISTENT_HASHING`. -When enabling the consistent hashing index, it's important to activate asynchronous clustering scheduling within the writer. -The clustering plan should be executed through an offline job. During this process, the writer will perform dual writes +When enabling the consistent hashing index, it's important to activate clustering scheduling within the writer. +The clustering plan should be executed through an offline Spark job. During this process, the writer will perform dual writes for both the old and new data buckets while the clustering is pending. Although the dual write does not impact correctness, it is strongly recommended to execute clustering as quickly as possible.