This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 834cf4a63ce [HUDI-7317] FlinkTableFactory snatifyCheck should contains 
index type (#10541)
834cf4a63ce is described below

commit 834cf4a63ceb1193025010d3f8f0cc065e23c46d
Author: xuzifu666 <x...@zepp.com>
AuthorDate: Mon Jan 22 13:29:29 2024 +0800

    [HUDI-7317] FlinkTableFactory snatifyCheck should contains index type 
(#10541)
    
    Co-authored-by: xuyu <11161...@vivo.com>
---
 .../org/apache/hudi/table/HoodieTableFactory.java  | 12 +++++++++++
 .../apache/hudi/table/TestHoodieTableFactory.java  | 25 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 6751083a5cd..68642b39da8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
@@ -176,6 +177,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
    */
   private void sanityCheck(Configuration conf, ResolvedSchema schema) {
     checkTableType(conf);
+    checkIndexType(conf);
 
     if (!OptionsResolver.isAppendMode(conf)) {
       checkRecordKey(conf, schema);
@@ -183,6 +185,16 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     StreamerUtil.checkPreCombineKey(conf, schema.getColumnNames());
   }
 
+  /**
+   * Validate the index type.
+   */
+  private void checkIndexType(Configuration conf) {
+    String indexType = conf.get(FlinkOptions.INDEX_TYPE);
+    if (!StringUtils.isNullOrEmpty(indexType)) {
+      HoodieIndexConfig.INDEX_TYPE.checkValues(indexType);
+    }
+  }
+
   /**
    * Validate the table type.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 64145abd5bb..6469fb5c634 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -191,6 +191,31 @@ public class TestHoodieTableFactory {
     assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext6));
   }
 
+  @Test
+  void testIndexTypeCheck() {
+    ResolvedSchema schema = SchemaBuilder.instance()
+            .field("f0", DataTypes.INT().notNull())
+            .field("f1", DataTypes.VARCHAR(20))
+            .field("f2", DataTypes.TIMESTAMP(3))
+            .field("ts", DataTypes.TIMESTAMP(3))
+            .primaryKey("f0")
+            .build();
+
+    // Index type unset. The default value will be ok
+    final MockContext sourceContext1 = MockContext.getInstance(this.conf, 
schema, "f2");
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext1));
+
+    // Invalid index type will throw exception
+    this.conf.set(FlinkOptions.INDEX_TYPE, "BUCKET_AA");
+    final MockContext sourceContext2 = MockContext.getInstance(this.conf, 
schema, "f2");
+    assertThrows(IllegalArgumentException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext2));
+
+    // Valid index type will be ok
+    this.conf.set(FlinkOptions.INDEX_TYPE, "BUCKET");
+    final MockContext sourceContext3 = MockContext.getInstance(this.conf, 
schema, "f2");
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext3));
+  }
+
   @Test
   void testTableTypeCheck() {
     ResolvedSchema schema = SchemaBuilder.instance()

Reply via email to