This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 8c2197a [HUDI-1269] Make whether the failure of connect hive affects hudi ingest process configurable (#2443) 8c2197a is described below commit 8c2197ae5e9c139e488a33f5a507b79bfa2f6f27 Author: liujinhui <965147...@qq.com> AuthorDate: Thu Feb 25 23:09:32 2021 +0800 [HUDI-1269] Make whether the failure of connect hive affects hudi ingest process configurable (#2443) Co-authored-by: Sivabalan Narayanan <sivab...@uber.com> --- .../main/java/org/apache/hudi/DataSourceUtils.java | 2 + .../scala/org/apache/hudi/DataSourceOptions.scala | 2 + .../org/apache/hudi/HoodieSparkSqlWriter.scala | 1 + .../java/org/apache/hudi/hive/HiveSyncConfig.java | 4 ++ .../java/org/apache/hudi/hive/HiveSyncTool.java | 74 +++++++++++++--------- .../org/apache/hudi/hive/TestHiveSyncTool.java | 22 +++++++ 6 files changed, 76 insertions(+), 29 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 18c51e3..632a155 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -293,6 +293,8 @@ public class DataSourceUtils { DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL())); hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE_OPT_KEY(), DataSourceWriteOptions.DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY())); + hiveSyncConfig.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS_OPT_KEY(), + DataSourceWriteOptions.DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY())); hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(), DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL())); hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP(), diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 965b35c..4b8e97c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -347,6 +347,7 @@ object DataSourceWriteOptions { val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format" val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc" val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database" + val HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "hoodie.datasource.hive_sync.ignore_exceptions" val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix" val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp" @@ -365,6 +366,7 @@ object DataSourceWriteOptions { val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false" val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true" val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "true" + val DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "false" val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false" val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false" diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f5ba6c8..ef28191 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -374,6 +374,7 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean hiveSyncConfig.useFileListingFromMetadata = parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean hiveSyncConfig.verifyMetadataFileListing = parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean + hiveSyncConfig.ignoreExceptions = parameters.get(HIVE_IGNORE_EXCEPTIONS_OPT_KEY).exists(r => r.toBoolean) hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean) hiveSyncConfig.autoCreateDatabase = parameters.get(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).exists(r => r.toBoolean) hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY, diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index dd9d483..0063d15 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -76,6 +76,9 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--auto-create-database"}, description = "Auto create hive database") public Boolean autoCreateDatabase = true; + @Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions") + public Boolean ignoreExceptions = false; + @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering") public Boolean skipROSuffix = false; @@ -130,6 +133,7 @@ public class HiveSyncConfig implements Serializable { + ", usePreApacheInputFormat=" + usePreApacheInputFormat + ", useJdbc=" + useJdbc + ", autoCreateDatabase=" + autoCreateDatabase + + ", ignoreExceptions=" + ignoreExceptions + ", skipROSuffix=" + skipROSuffix + ", help=" + help + ", supportTimestamp=" + supportTimestamp diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 47d4500..bbda97e 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -58,56 +58,72 @@ public class HiveSyncTool extends AbstractSyncTool { public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro"; private final HiveSyncConfig cfg; - private final HoodieHiveClient hoodieHiveClient; - private final String snapshotTableName; - private final Option<String> roTableTableName; + private HoodieHiveClient hoodieHiveClient = null; + private String snapshotTableName = null; + private Option<String> roTableName = null; public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { super(configuration.getAllProperties(), fs); - this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs); + + try { + this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs); + } catch (RuntimeException e) { + if (cfg.ignoreExceptions) { + LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e); + } else { + throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e); + } + } + this.cfg = cfg; // Set partitionFields to empty, when the NonPartitionedExtractor is used if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) { LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used"); cfg.partitionFields = new ArrayList<>(); } - switch (hoodieHiveClient.getTableType()) { - case COPY_ON_WRITE: - this.snapshotTableName = cfg.tableName; - this.roTableTableName = Option.empty(); - break; - case MERGE_ON_READ: - this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE; - this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) : - Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE); - break; - default: - LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); - throw new InvalidTableException(hoodieHiveClient.getBasePath()); - } - } - - @Override - public void syncHoodieTable() { - try { + if (hoodieHiveClient != null) { switch (hoodieHiveClient.getTableType()) { case COPY_ON_WRITE: - syncHoodieTable(snapshotTableName, false); + this.snapshotTableName = cfg.tableName; + this.roTableName = Option.empty(); break; case MERGE_ON_READ: - // sync a RO table for MOR - syncHoodieTable(roTableTableName.get(), false); - // sync a RT table for MOR - syncHoodieTable(snapshotTableName, true); + this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE; + this.roTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) : + Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE); break; default: LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); throw new InvalidTableException(hoodieHiveClient.getBasePath()); } + } + } + + @Override + public void syncHoodieTable() { + try { + if (hoodieHiveClient != null) { + switch (hoodieHiveClient.getTableType()) { + case COPY_ON_WRITE: + syncHoodieTable(snapshotTableName, false); + break; + case MERGE_ON_READ: + // sync a RO table for MOR + syncHoodieTable(roTableName.get(), false); + // sync a RT table for MOR + syncHoodieTable(snapshotTableName, true); + break; + default: + LOG.error("Unknown table type " + hoodieHiveClient.getTableType()); + throw new InvalidTableException(hoodieHiveClient.getBasePath()); + } + } } catch (RuntimeException re) { throw new HoodieException("Got runtime exception when hive syncing " + cfg.tableName, re); } finally { - hoodieHiveClient.close(); + if (hoodieHiveClient != null) { + hoodieHiveClient.close(); + } } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 8a1ea4f..c38a6ed 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -40,6 +40,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -613,4 +614,25 @@ public class TestHiveSyncTool { "The last commit that was sycned should be 103"); } + @Test + public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxException { + HiveTestUtil.hiveSyncConfig.useJdbc = true; + String instantTime = "100"; + HiveTestUtil.createCOWTable(instantTime, 5, false); + HoodieHiveClient hiveClient = + new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), + "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + // Lets do the sync + + HiveSyncConfig syncToolConfig = HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig); + syncToolConfig.ignoreExceptions = true; + syncToolConfig.jdbcUrl = HiveTestUtil.hiveSyncConfig.jdbcUrl.replace("9999","9031"); + HiveSyncTool tool = new HiveSyncTool(syncToolConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), + "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially"); + } + }