This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c2197a  [HUDI-1269] Make whether the failure of connect hive affects 
hudi ingest process configurable (#2443)
8c2197a is described below

commit 8c2197ae5e9c139e488a33f5a507b79bfa2f6f27
Author: liujinhui <965147...@qq.com>
AuthorDate: Thu Feb 25 23:09:32 2021 +0800

    [HUDI-1269] Make whether the failure of connect hive affects hudi ingest 
process configurable (#2443)
    
    Co-authored-by: Sivabalan Narayanan <sivab...@uber.com>
---
 .../main/java/org/apache/hudi/DataSourceUtils.java |  2 +
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  2 +
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  1 +
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  |  4 ++
 .../java/org/apache/hudi/hive/HiveSyncTool.java    | 74 +++++++++++++---------
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 22 +++++++
 6 files changed, 76 insertions(+), 29 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 18c51e3..632a155 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -293,6 +293,8 @@ public class DataSourceUtils {
         DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL()));
     hiveSyncConfig.autoCreateDatabase = 
Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE_OPT_KEY(),
         DataSourceWriteOptions.DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY()));
+    hiveSyncConfig.ignoreExceptions = 
Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS_OPT_KEY(),
+        DataSourceWriteOptions.DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY()));
     hiveSyncConfig.skipROSuffix = 
Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(),
         DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL()));
     hiveSyncConfig.supportTimestamp = 
Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP(),
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 965b35c..4b8e97c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -347,6 +347,7 @@ object DataSourceWriteOptions {
   val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = 
"hoodie.datasource.hive_sync.use_pre_apache_input_format"
   val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc"
   val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = 
"hoodie.datasource.hive_sync.auto_create_database"
+  val HIVE_IGNORE_EXCEPTIONS_OPT_KEY = 
"hoodie.datasource.hive_sync.ignore_exceptions"
   val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix"
   val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp"
 
@@ -365,6 +366,7 @@ object DataSourceWriteOptions {
   val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
   val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
   val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "true"
+  val DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "false"
   val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false"
   val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false"
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f5ba6c8..ef28191 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -374,6 +374,7 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
     hiveSyncConfig.useFileListingFromMetadata = 
parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean
     hiveSyncConfig.verifyMetadataFileListing = 
parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean
+    hiveSyncConfig.ignoreExceptions = 
parameters.get(HIVE_IGNORE_EXCEPTIONS_OPT_KEY).exists(r => r.toBoolean)
     hiveSyncConfig.supportTimestamp = 
parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
     hiveSyncConfig.autoCreateDatabase = 
parameters.get(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).exists(r => r.toBoolean)
     hiveSyncConfig.decodePartition = 
parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index dd9d483..0063d15 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -76,6 +76,9 @@ public class HiveSyncConfig implements Serializable {
   @Parameter(names = {"--auto-create-database"}, description = "Auto create 
hive database")
   public Boolean autoCreateDatabase = true;
 
+  @Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive 
exceptions")
+  public Boolean ignoreExceptions = false;
+
   @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` 
suffix for Read optimized table, when registering")
   public Boolean skipROSuffix = false;
 
@@ -130,6 +133,7 @@ public class HiveSyncConfig implements Serializable {
         + ", usePreApacheInputFormat=" + usePreApacheInputFormat
         + ", useJdbc=" + useJdbc
         + ", autoCreateDatabase=" + autoCreateDatabase
+        + ", ignoreExceptions=" + ignoreExceptions
         + ", skipROSuffix=" + skipROSuffix
         + ", help=" + help
         + ", supportTimestamp=" + supportTimestamp
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 47d4500..bbda97e 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -58,56 +58,72 @@ public class HiveSyncTool extends AbstractSyncTool {
   public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
 
   private final HiveSyncConfig cfg;
-  private final HoodieHiveClient hoodieHiveClient;
-  private final String snapshotTableName;
-  private final Option<String> roTableTableName;
+  private HoodieHiveClient hoodieHiveClient = null;
+  private String snapshotTableName = null;
+  private Option<String> roTableName = null;
 
   public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem 
fs) {
     super(configuration.getAllProperties(), fs);
-    this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
+
+    try {
+      this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
+    } catch (RuntimeException e) {
+      if (cfg.ignoreExceptions) {
+        LOG.error("Got runtime exception when hive syncing, but continuing as 
ignoreExceptions config is set ", e);
+      } else {
+        throw new HoodieHiveSyncException("Got runtime exception when hive 
syncing", e);
+      }
+    }
+
     this.cfg = cfg;
     // Set partitionFields to empty, when the NonPartitionedExtractor is used
     if 
(NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass))
 {
       LOG.warn("Set partitionFields to empty, since the 
NonPartitionedExtractor is used");
       cfg.partitionFields = new ArrayList<>();
     }
-    switch (hoodieHiveClient.getTableType()) {
-      case COPY_ON_WRITE:
-        this.snapshotTableName = cfg.tableName;
-        this.roTableTableName = Option.empty();
-        break;
-      case MERGE_ON_READ:
-        this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
-        this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
-            Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
-        break;
-      default:
-        LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
-        throw new InvalidTableException(hoodieHiveClient.getBasePath());
-    }
-  }
-
-  @Override
-  public void syncHoodieTable() {
-    try {
+    if (hoodieHiveClient != null) {
       switch (hoodieHiveClient.getTableType()) {
         case COPY_ON_WRITE:
-          syncHoodieTable(snapshotTableName, false);
+          this.snapshotTableName = cfg.tableName;
+          this.roTableName = Option.empty();
           break;
         case MERGE_ON_READ:
-          // sync a RO table for MOR
-          syncHoodieTable(roTableTableName.get(), false);
-          // sync a RT table for MOR
-          syncHoodieTable(snapshotTableName, true);
+          this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
+          this.roTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
+              Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
           break;
         default:
           LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
           throw new InvalidTableException(hoodieHiveClient.getBasePath());
       }
+    }
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    try {
+      if (hoodieHiveClient != null) {
+        switch (hoodieHiveClient.getTableType()) {
+          case COPY_ON_WRITE:
+            syncHoodieTable(snapshotTableName, false);
+            break;
+          case MERGE_ON_READ:
+            // sync a RO table for MOR
+            syncHoodieTable(roTableName.get(), false);
+            // sync a RT table for MOR
+            syncHoodieTable(snapshotTableName, true);
+            break;
+          default:
+            LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
+            throw new InvalidTableException(hoodieHiveClient.getBasePath());
+        }
+      }
     } catch (RuntimeException re) {
       throw new HoodieException("Got runtime exception when hive syncing " + 
cfg.tableName, re);
     } finally {
-      hoodieHiveClient.close();
+      if (hoodieHiveClient != null) {
+        hoodieHiveClient.close();
+      }
     }
   }
 
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 8a1ea4f..c38a6ed 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -613,4 +614,25 @@ public class TestHiveSyncTool {
         "The last commit that was sycned should be 103");
   }
 
+  @Test
+  public void testConnectExceptionIgnoreConfigSet() throws IOException, 
URISyntaxException {
+    HiveTestUtil.hiveSyncConfig.useJdbc = true;
+    String instantTime = "100";
+    HiveTestUtil.createCOWTable(instantTime, 5, false);
+    HoodieHiveClient hiveClient =
+        new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+        "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist 
initially");
+    // Lets do the sync
+
+    HiveSyncConfig syncToolConfig = 
HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig);
+    syncToolConfig.ignoreExceptions = true;
+    syncToolConfig.jdbcUrl = 
HiveTestUtil.hiveSyncConfig.jdbcUrl.replace("9999","9031");
+    HiveSyncTool tool = new HiveSyncTool(syncToolConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+        "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist 
initially");
+  }
+
 }

Reply via email to