This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 834cf4a63ce [HUDI-7317] FlinkTableFactory snatifyCheck should contains index type (#10541) 834cf4a63ce is described below commit 834cf4a63ceb1193025010d3f8f0cc065e23c46d Author: xuzifu666 <x...@zepp.com> AuthorDate: Mon Jan 22 13:29:29 2024 +0800 [HUDI-7317] FlinkTableFactory snatifyCheck should contains index type (#10541) Co-authored-by: xuyu <11161...@vivo.com> --- .../org/apache/hudi/table/HoodieTableFactory.java | 12 +++++++++++ .../apache/hudi/table/TestHoodieTableFactory.java | 25 ++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 6751083a5cd..68642b39da8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; @@ -176,6 +177,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab */ private void sanityCheck(Configuration conf, ResolvedSchema schema) { checkTableType(conf); + checkIndexType(conf); if (!OptionsResolver.isAppendMode(conf)) { checkRecordKey(conf, schema); @@ -183,6 +185,16 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab StreamerUtil.checkPreCombineKey(conf, schema.getColumnNames()); } + /** + * Validate the index type. + */ + private void checkIndexType(Configuration conf) { + String indexType = conf.get(FlinkOptions.INDEX_TYPE); + if (!StringUtils.isNullOrEmpty(indexType)) { + HoodieIndexConfig.INDEX_TYPE.checkValues(indexType); + } + } + /** * Validate the table type. */ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 64145abd5bb..6469fb5c634 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -191,6 +191,31 @@ public class TestHoodieTableFactory { assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext6)); } + @Test + void testIndexTypeCheck() { + ResolvedSchema schema = SchemaBuilder.instance() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + + // Index type unset. The default value will be ok + final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema, "f2"); + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext1)); + + // Invalid index type will throw exception + this.conf.set(FlinkOptions.INDEX_TYPE, "BUCKET_AA"); + final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema, "f2"); + assertThrows(IllegalArgumentException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2)); + + // Valid index type will be ok + this.conf.set(FlinkOptions.INDEX_TYPE, "BUCKET"); + final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema, "f2"); + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext3)); + } + @Test void testTableTypeCheck() { ResolvedSchema schema = SchemaBuilder.instance()