This is an automated email from the ASF dual-hosted git repository. garyli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 7a5af80 [HUDI-1818] Validate required fields for Flink HoodieTable (#2930) 7a5af80 is described below commit 7a5af806cfe54474014db5c70641c0c6269fff03 Author: hiscat <46845236+mylanpan...@users.noreply.github.com> AuthorDate: Tue May 11 11:11:19 2021 +0800 [HUDI-1818] Validate required fields for Flink HoodieTable (#2930) --- .../org/apache/hudi/table/HoodieTableFactory.java | 31 +++++++++++++++ .../apache/hudi/table/TestHoodieTableFactory.java | 46 ++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 1566aa6..02ab280 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -40,9 +40,11 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * Hoodie data source/sink factory. @@ -59,6 +61,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab Configuration conf = (Configuration) helper.getOptions(); TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + validateRequiredFields(conf, schema); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> @@ -75,6 +78,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab public DynamicTableSink createDynamicTableSink(Context context) { Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + validateRequiredFields(conf, schema); setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); return new HoodieTableSink(conf, schema); } @@ -98,6 +102,33 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab // Utilities // ------------------------------------------------------------------------- + /** Validate required options. e.g record key and pre combine key. + * + * @param conf The table options + * @param schema The table schema + */ + private void validateRequiredFields(Configuration conf, TableSchema schema) { + List<String> fields = Arrays.stream(schema.getFieldNames()).collect(Collectors.toList()); + + // validate record key in pk absence. + if (!schema.getPrimaryKey().isPresent()) { + Arrays.stream(conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",")) + .filter(field -> !fields.contains(field)) + .findAny() + .ifPresent(e -> { + throw new ValidationException("The " + e + " field not exists in table schema." + + "Please define primary key or modify hoodie.datasource.write.recordkey.field option."); + }); + } + + // validate pre combine key + String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD); + if (!fields.contains(preCombineField)) { + throw new ValidationException("The " + preCombineField + " field not exists in table schema." + + "Please check write.precombine.field option."); + } + } + /** * Setup the config options based on the table definition, for e.g the table name, primary key. * diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index b7f4429..d7ec693 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -44,7 +45,9 @@ import java.util.Objects; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; /** * Test cases for {@link HoodieTableFactory}. @@ -75,6 +78,43 @@ public class TestHoodieTableFactory { } @Test + void testRequiredOptionsForSource() { + // miss pk and pre combine key will throw exception + TableSchema schema1 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .build(); + final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2"); + assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext1)); + assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext1)); + + // given the pk and miss the pre combine key will throw exception + TableSchema schema2 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2"); + assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSource(sourceContext2)); + assertThrows(ValidationException.class, () -> new HoodieTableFactory().createDynamicTableSink(sourceContext2)); + + // given pk and pre combine key will be ok + TableSchema schema3 = TableSchema.builder() + .field("f0", DataTypes.INT().notNull()) + .field("f1", DataTypes.VARCHAR(20)) + .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) + .primaryKey("f0") + .build(); + final MockContext sourceContext3 = MockContext.getInstance(this.conf, schema3, "f2"); + + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext3)); + assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext3)); + } + + @Test void testInferAvroSchemaForSource() { // infer the schema if not specified final HoodieTableSource tableSource1 = @@ -99,6 +139,7 @@ public class TestHoodieTableFactory { .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) .primaryKey("f0") .build(); final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2"); @@ -113,6 +154,7 @@ public class TestHoodieTableFactory { .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20).notNull()) .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) .primaryKey("f0", "f1") .build(); final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema2, "f2"); @@ -137,6 +179,7 @@ public class TestHoodieTableFactory { .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) .primaryKey("f0") .build(); // set up new retains commits that is less than min archive commits @@ -183,6 +226,7 @@ public class TestHoodieTableFactory { .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) .primaryKey("f0") .build(); final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2"); @@ -197,6 +241,7 @@ public class TestHoodieTableFactory { .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20).notNull()) .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) .primaryKey("f0", "f1") .build(); final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema2, "f2"); @@ -221,6 +266,7 @@ public class TestHoodieTableFactory { .field("f0", DataTypes.INT().notNull()) .field("f1", DataTypes.VARCHAR(20)) .field("f2", DataTypes.TIMESTAMP(3)) + .field("ts", DataTypes.TIMESTAMP(3)) .primaryKey("f0") .build(); // set up new retains commits that is less than min archive commits