This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a5af80  [HUDI-1818] Validate required fields for Flink HoodieTable 
(#2930)
7a5af80 is described below

commit 7a5af806cfe54474014db5c70641c0c6269fff03
Author: hiscat <46845236+mylanpan...@users.noreply.github.com>
AuthorDate: Tue May 11 11:11:19 2021 +0800

    [HUDI-1818] Validate required fields for Flink HoodieTable (#2930)
---
 .../org/apache/hudi/table/HoodieTableFactory.java  | 31 +++++++++++++++
 .../apache/hudi/table/TestHoodieTableFactory.java  | 46 ++++++++++++++++++++++
 2 files changed, 77 insertions(+)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 1566aa6..02ab280 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -40,9 +40,11 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Hoodie data source/sink factory.
@@ -59,6 +61,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
 
     Configuration conf = (Configuration) helper.getOptions();
     TableSchema schema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+    validateRequiredFields(conf, schema);
     setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), 
context.getCatalogTable(), schema);
 
     Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
@@ -75,6 +78,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   public DynamicTableSink createDynamicTableSink(Context context) {
     Configuration conf = 
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
     TableSchema schema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+    validateRequiredFields(conf, schema);
     setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), 
context.getCatalogTable(), schema);
     return new HoodieTableSink(conf, schema);
   }
@@ -98,6 +102,33 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   //  Utilities
   // -------------------------------------------------------------------------
 
+  /** Validate required options. e.g record key and pre combine key.
+   *
+   * @param conf The table options
+   * @param schema The table schema
+   */
+  private void validateRequiredFields(Configuration conf, TableSchema schema) {
+    List<String> fields = 
Arrays.stream(schema.getFieldNames()).collect(Collectors.toList());
+
+    // validate record key in pk absence.
+    if (!schema.getPrimaryKey().isPresent()) {
+      Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","))
+          .filter(field -> !fields.contains(field))
+          .findAny()
+          .ifPresent(e -> {
+            throw new ValidationException("The " + e + " field not exists in 
table schema."
+                + "Please define primary key or modify 
hoodie.datasource.write.recordkey.field option.");
+          });
+    }
+
+    // validate pre combine key
+    String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
+    if (!fields.contains(preCombineField)) {
+      throw new ValidationException("The " + preCombineField + " field not 
exists in table schema."
+          + "Please check write.precombine.field option.");
+    }
+  }
+
   /**
    * Setup the config options based on the table definition, for e.g the table 
name, primary key.
    *
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index b7f4429..d7ec693 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -44,7 +45,9 @@ import java.util.Objects;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
  * Test cases for {@link HoodieTableFactory}.
@@ -75,6 +78,43 @@ public class TestHoodieTableFactory {
   }
 
   @Test
+  void testRequiredOptionsForSource() {
+    // miss pk and pre combine key will throw exception
+    TableSchema schema1 = TableSchema.builder()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .build();
+    final MockContext sourceContext1 = MockContext.getInstance(this.conf, 
schema1, "f2");
+    assertThrows(ValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext1));
+    assertThrows(ValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext1));
+
+    // given the pk and miss the pre combine key will throw exception
+    TableSchema schema2 = TableSchema.builder()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0")
+        .build();
+    final MockContext sourceContext2 = MockContext.getInstance(this.conf, 
schema2, "f2");
+    assertThrows(ValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext2));
+    assertThrows(ValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext2));
+
+    // given pk and pre combine key will be ok
+    TableSchema schema3 = TableSchema.builder()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0")
+        .build();
+    final MockContext sourceContext3 = MockContext.getInstance(this.conf, 
schema3, "f2");
+
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSource(sourceContext3));
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext3));
+  }
+
+  @Test
   void testInferAvroSchemaForSource() {
     // infer the schema if not specified
     final HoodieTableSource tableSource1 =
@@ -99,6 +139,7 @@ public class TestHoodieTableFactory {
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
         .primaryKey("f0")
         .build();
     final MockContext sourceContext1 = MockContext.getInstance(this.conf, 
schema1, "f2");
@@ -113,6 +154,7 @@ public class TestHoodieTableFactory {
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20).notNull())
         .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
         .primaryKey("f0", "f1")
         .build();
     final MockContext sourceContext2 = MockContext.getInstance(this.conf, 
schema2, "f2");
@@ -137,6 +179,7 @@ public class TestHoodieTableFactory {
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
         .primaryKey("f0")
         .build();
     // set up new retains commits that is less than min archive commits
@@ -183,6 +226,7 @@ public class TestHoodieTableFactory {
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
         .primaryKey("f0")
         .build();
     final MockContext sinkContext1 = MockContext.getInstance(this.conf, 
schema1, "f2");
@@ -197,6 +241,7 @@ public class TestHoodieTableFactory {
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20).notNull())
         .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
         .primaryKey("f0", "f1")
         .build();
     final MockContext sinkContext2 = MockContext.getInstance(this.conf, 
schema2, "f2");
@@ -221,6 +266,7 @@ public class TestHoodieTableFactory {
         .field("f0", DataTypes.INT().notNull())
         .field("f1", DataTypes.VARCHAR(20))
         .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
         .primaryKey("f0")
         .build();
     // set up new retains commits that is less than min archive commits

Reply via email to