This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ad0e9560e [HUDI-4529] Tweak some default config options for flink 
(#6287)
1ad0e9560e is described below

commit 1ad0e9560e4805b682fe661d78f2ad0f2fa1025b
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Wed Aug 17 14:20:36 2022 +0800

    [HUDI-4529] Tweak some default config options for flink (#6287)
---
 .../apache/hudi/configuration/FlinkOptions.java    | 10 +++----
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |  6 ++--
 .../org/apache/hudi/table/HoodieTableFactory.java  | 34 ++++++++++------------
 .../apache/hudi/table/TestHoodieTableFactory.java  | 19 +++++++++---
 4 files changed, 38 insertions(+), 31 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 164106a4e8..3638113288 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -22,13 +22,13 @@ import 
org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringP
 import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
@@ -287,7 +287,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> PAYLOAD_CLASS_NAME = ConfigOptions
       .key("write.payload.class")
       .stringType()
-      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+      .defaultValue(EventTimeAvroPayload.class.getName())
       .withDescription("Payload class used. Override this, if you like to roll 
your own merge logic, when upserting/inserting.\n"
           + "This will render any value set for the option in-effective");
 
@@ -718,7 +718,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions
       .key("hive_sync.mode")
       .stringType()
-      .defaultValue("jdbc")
+      .defaultValue("hms")
       .withDescription("Mode to choose for Hive ops. Valid values are hms, 
jdbc and hiveql, default 'jdbc'");
 
   public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
@@ -754,7 +754,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> 
HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME = ConfigOptions
       .key("hive_sync.partition_extractor_class")
       .stringType()
-      
.defaultValue(SlashEncodedDayPartitionValueExtractor.class.getCanonicalName())
+      .defaultValue(MultiPartKeysValueExtractor.class.getName())
       .withDescription("Tool to extract the partition value from HDFS path, "
           + "default 'SlashEncodedDayPartitionValueExtractor'");
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 3ba1c6230f..3447a23851 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -24,7 +24,7 @@ import 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.util.FlinkStateBackendConverter;
 import org.apache.hudi.util.StreamerUtil;
@@ -321,8 +321,8 @@ public class FlinkStreamerConfig extends Configuration {
   public String hiveSyncPartitionFields = "";
 
   @Parameter(names = {"--hive-sync-partition-extractor-class"}, description = 
"Tool to extract the partition value from HDFS path, "
-      + "default 'SlashEncodedDayPartitionValueExtractor'")
-  public String hiveSyncPartitionExtractorClass = 
SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
+      + "default 'MultiPartKeysValueExtractor'")
+  public String hiveSyncPartitionExtractorClass = 
MultiPartKeysValueExtractor.class.getCanonicalName();
 
   @Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = 
"Assume partitioning is yyyy/mm/dd, default false")
   public Boolean hiveSyncAssumeDatePartition = false;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 51bbb2dc87..1cf66ea343 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,12 +19,10 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
-import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieValidationException;
-import org.apache.hudi.hive.MultiPartKeysValueExtractor;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -38,6 +36,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -71,7 +70,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     Configuration conf = 
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
     ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
     sanityCheck(conf, schema);
-    setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), 
context.getCatalogTable(), schema);
+    setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
 
     Path path = new Path(conf.getOptional(FlinkOptions.PATH).orElseThrow(() ->
         new ValidationException("Option [path] should not be empty.")));
@@ -90,7 +89,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
         "Option [path] should not be empty.");
     ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
     sanityCheck(conf, schema);
-    setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), 
context.getCatalogTable(), schema);
+    setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
     return new HoodieTableSink(conf, schema);
   }
 
@@ -154,35 +153,30 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
         throw new HoodieValidationException("Field " + preCombineField + " 
does not exist in the table schema."
             + "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' 
option.");
       }
-    } else if (FlinkOptions.isDefaultValueDefined(conf, 
FlinkOptions.PAYLOAD_CLASS_NAME)) {
-      // if precombine field is specified but payload clazz is default,
-      // use DefaultHoodieRecordPayload to make sure the precombine field is 
always taken for
-      // comparing.
-      conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
EventTimeAvroPayload.class.getName());
     }
   }
 
   /**
-   * Sets up the config options based on the table definition, for e.g the 
table name, primary key.
+   * Sets up the config options based on the table definition, for e.g, the 
table name, primary key.
    *
-   * @param conf      The configuration to setup
-   * @param tableName The table name
+   * @param conf      The configuration to set up
+   * @param tablePath The table path
    * @param table     The catalog table
    * @param schema    The physical schema
    */
   private static void setupConfOptions(
       Configuration conf,
-      String tableName,
+      ObjectIdentifier tablePath,
       CatalogTable table,
       ResolvedSchema schema) {
     // table name
-    conf.setString(FlinkOptions.TABLE_NAME.key(), tableName);
+    conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName());
     // hoodie key about options
     setupHoodieKeyOptions(conf, table);
     // compaction options
     setupCompactionOptions(conf);
     // hive options
-    setupHiveOptions(conf);
+    setupHiveOptions(conf, tablePath);
     // read options
     setupReadOptions(conf);
     // write options
@@ -309,10 +303,12 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   /**
    * Sets up the hive options from the table definition.
    */
-  private static void setupHiveOptions(Configuration conf) {
-    if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)
-        && FlinkOptions.isDefaultValueDefined(conf, 
FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME)) {
-      conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, 
MultiPartKeysValueExtractor.class.getName());
+  private static void setupHiveOptions(Configuration conf, ObjectIdentifier 
tablePath) {
+    if (!conf.contains(FlinkOptions.HIVE_SYNC_DB)) {
+      conf.setString(FlinkOptions.HIVE_SYNC_DB, tablePath.getDatabaseName());
+    }
+    if (!conf.contains(FlinkOptions.HIVE_SYNC_TABLE)) {
+      conf.setString(FlinkOptions.HIVE_SYNC_TABLE, tablePath.getObjectName());
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index f27ab4ca53..f7a35e57f2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.hive.MultiPartKeysValueExtractor;
-import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
@@ -240,15 +239,21 @@ public class TestHoodieTableFactory {
     final MockContext sourceContext1 = MockContext.getInstance(this.conf, 
schema1, "f2");
     final HoodieTableSource tableSource1 = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext1);
     final Configuration conf1 = tableSource1.getConf();
+    assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1"));
+    assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1"));
     
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
 is(MultiPartKeysValueExtractor.class.getName()));
 
     // set up hive style partitioning is true.
+    this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
+    this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
     this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
 
     final MockContext sourceContext2 = MockContext.getInstance(this.conf, 
schema1, "f2");
     final HoodieTableSource tableSource2 = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext2);
     final Configuration conf2 = tableSource2.getConf();
-    
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
 is(SlashEncodedDayPartitionValueExtractor.class.getName()));
+    assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2"));
+    assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2"));
+    
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
 is(MultiPartKeysValueExtractor.class.getName()));
   }
 
   @Test
@@ -430,15 +435,21 @@ public class TestHoodieTableFactory {
     final MockContext sinkContext1 = MockContext.getInstance(this.conf, 
schema1, "f2");
     final HoodieTableSink tableSink1 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sinkContext1);
     final Configuration conf1 = tableSink1.getConf();
+    assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_DB), is("db1"));
+    assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t1"));
     
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
 is(MultiPartKeysValueExtractor.class.getName()));
 
     // set up hive style partitioning is true.
+    this.conf.setString(FlinkOptions.HIVE_SYNC_DB, "db2");
+    this.conf.setString(FlinkOptions.HIVE_SYNC_TABLE, "t2");
     this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
 
     final MockContext sinkContext2 = MockContext.getInstance(this.conf, 
schema1, "f2");
     final HoodieTableSink tableSink2 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sinkContext2);
     final Configuration conf2 = tableSink2.getConf();
-    
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
 is(SlashEncodedDayPartitionValueExtractor.class.getName()));
+    assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_DB), is("db2"));
+    assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_TABLE), is("t2"));
+    
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME),
 is(MultiPartKeysValueExtractor.class.getName()));
   }
 
   @Test
@@ -542,7 +553,7 @@ public class TestHoodieTableFactory {
 
     @Override
     public ObjectIdentifier getObjectIdentifier() {
-      return ObjectIdentifier.of("hudi", "default", "t1");
+      return ObjectIdentifier.of("hudi", "db1", "t1");
     }
 
     @Override

Reply via email to