This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 1ad0e9560e [HUDI-4529] Tweak some default config options for flink (#6287) 1ad0e9560e is described below commit 1ad0e9560e4805b682fe661d78f2ad0f2fa1025b Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Wed Aug 17 14:20:36 2022 +0800 [HUDI-4529] Tweak some default config options for flink (#6287) --- .../apache/hudi/configuration/FlinkOptions.java | 10 +++---- .../apache/hudi/streamer/FlinkStreamerConfig.java | 6 ++-- .../org/apache/hudi/table/HoodieTableFactory.java | 34 ++++++++++------------ .../apache/hudi/table/TestHoodieTableFactory.java | 19 +++++++++--- 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 164106a4e8..3638113288 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -22,13 +22,13 @@ import org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringP import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; +import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; @@ -287,7 +287,7 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions .key("write.payload.class") .stringType() - .defaultValue(OverwriteWithLatestAvroPayload.class.getName()) + .defaultValue(EventTimeAvroPayload.class.getName()) .withDescription("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.\n" + "This will render any value set for the option in-effective"); @@ -718,7 +718,7 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions .key("hive_sync.mode") .stringType() - .defaultValue("jdbc") + .defaultValue("hms") .withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'"); public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions @@ -754,7 +754,7 @@ public class FlinkOptions extends HoodieConfig { public static final ConfigOption<String> HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME = ConfigOptions .key("hive_sync.partition_extractor_class") .stringType() - .defaultValue(SlashEncodedDayPartitionValueExtractor.class.getCanonicalName()) + .defaultValue(MultiPartKeysValueExtractor.class.getName()) .withDescription("Tool to extract the partition value from HDFS path, " + "default 'SlashEncodedDayPartitionValueExtractor'"); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index 3ba1c6230f..3447a23851 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; -import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; +import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.util.FlinkStateBackendConverter; import org.apache.hudi.util.StreamerUtil; @@ -321,8 +321,8 @@ public class FlinkStreamerConfig extends Configuration { public String hiveSyncPartitionFields = ""; @Parameter(names = {"--hive-sync-partition-extractor-class"}, description = "Tool to extract the partition value from HDFS path, " - + "default 'SlashEncodedDayPartitionValueExtractor'") - public String hiveSyncPartitionExtractorClass = SlashEncodedDayPartitionValueExtractor.class.getCanonicalName(); + + "default 'MultiPartKeysValueExtractor'") + public String hiveSyncPartitionExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName(); @Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false") public Boolean hiveSyncAssumeDatePartition = false; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 51bbb2dc87..1cf66ea343 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -19,12 +19,10 @@ package org.apache.hudi.table; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; -import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieValidationException; -import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; @@ -38,6 +36,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -71,7 +70,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); sanityCheck(conf, schema); - setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); + setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() -> new ValidationException("Option [path] should not be empty."))); @@ -90,7 +89,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab "Option [path] should not be empty."); ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); sanityCheck(conf, schema); - setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema); + setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); return new HoodieTableSink(conf, schema); } @@ -154,35 +153,30 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema." + "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option."); } - } else if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.PAYLOAD_CLASS_NAME)) { - // if precombine field is specified but payload clazz is default, - // use DefaultHoodieRecordPayload to make sure the precombine field is always taken for - // comparing. - conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName()); } } /** - * Sets up the config options based on the table definition, for e.g the table name, primary key. + * Sets up the config options based on the table definition, for e.g, the table name, primary key. * - * @param conf The configuration to setup - * @param tableName The table name + * @param conf The configuration to set up + * @param tablePath The table path * @param table The catalog table * @param schema The physical schema */ private static void setupConfOptions( Configuration conf, - String tableName, + ObjectIdentifier tablePath, CatalogTable table, ResolvedSchema schema) { // table name - conf.setString(FlinkOptions.TABLE_NAME.key(), tableName); + conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName()); // hoodie key about options setupHoodieKeyOptions(conf, table); // compaction options setupCompactionOptions(conf); // hive options - setupHiveOptions(conf); + setupHiveOptions(conf, tablePath); // read options setupReadOptions(conf); // write options @@ -309,10 +303,12 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab /** * Sets up the hive options from the table definition. */ - private static void setupHiveOptions(Configuration conf) { - if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING) - && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME)) { - conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, MultiPartKeysValueExtractor.class.getName()); + private static void setupHiveOptions(Configuration conf, ObjectIdentifier tablePath) { + if (!conf.contains(FlinkOptions.HIVE_SYNC_DB)) { + conf.setString(FlinkOptions.HIVE_SYNC_DB, tablePath.getDatabaseName()); + } + if (!conf.contains(FlinkOptions.HIVE_SYNC_TABLE)) { + conf.setString(FlinkOptions.HIVE_SYNC_TABLE, tablePath.getObjectName()); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index f27ab4ca53..f7a35e57f2 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; -import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; @@ -240,15 +239,21 @@ public class TestHoodieTableFactory { final MockContext sourceContext1 = MockContext.getInstance(this.conf, schema1, "f2"); final HoodieTableSource tableSource1 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext1); final Configuration conf1 = tableSource1.getConf(); + assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1")); + assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1")); assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName())); // set up hive style partitioning is true. + this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2"); + this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2"); this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true); final MockContext sourceContext2 = MockContext.getInstance(this.conf, schema1, "f2"); final HoodieTableSource tableSource2 = (HoodieTableSource) new HoodieTableFactory().createDynamicTableSource(sourceContext2); final Configuration conf2 = tableSource2.getConf(); - assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(SlashEncodedDayPartitionValueExtractor.class.getName())); + assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2")); + assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2")); + assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName())); } @Test @@ -430,15 +435,21 @@ public class TestHoodieTableFactory { final MockContext sinkContext1 = MockContext.getInstance(this.conf, schema1, "f2"); final HoodieTableSink tableSink1 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext1); final Configuration conf1 = tableSink1.getConf(); + assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1")); + assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1")); assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName())); // set up hive style partitioning is true. + this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2"); + this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2"); this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true); final MockContext sinkContext2 = MockContext.getInstance(this.conf, schema1, "f2"); final HoodieTableSink tableSink2 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(sinkContext2); final Configuration conf2 = tableSink2.getConf(); - assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(SlashEncodedDayPartitionValueExtractor.class.getName())); + assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2")); + assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2")); + assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME), is(MultiPartKeysValueExtractor.class.getName())); } @Test @@ -542,7 +553,7 @@ public class TestHoodieTableFactory { @Override public ObjectIdentifier getObjectIdentifier() { - return ObjectIdentifier.of("hudi", "default", "t1"); + return ObjectIdentifier.of("hudi", "db1", "t1"); } @Override