This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 8c02e90a9b [HUDI-4354] Add --force-empty-sync flag to deltastreamer (#6027) 8c02e90a9b is described below commit 8c02e90a9bc36bdd1f9aa115b792c0dc57ae7868 Author: Qi Ji <qjq...@users.noreply.github.com> AuthorDate: Wed Aug 17 00:53:46 2022 +0800 [HUDI-4354] Add --force-empty-sync flag to deltastreamer (#6027) --- .../hudi/utilities/deltastreamer/DeltaSync.java | 2 +- .../deltastreamer/HoodieDeltaStreamer.java | 7 ++++++- .../functional/TestHoodieDeltaStreamer.java | 21 +++++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index f3d9af3150..3c1d1f5ef1 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -630,7 +630,7 @@ public class DeltaSync implements Serializable { scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty()); } - if (!isEmpty) { + if (!isEmpty || cfg.forceEmptyMetaSync) { runMetaSync(); } } else { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index a22a3581ae..fe7576cc80 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -305,6 +305,9 @@ public class HoodieDeltaStreamer implements Serializable { @Parameter(names = {"--enable-sync"}, description = "Enable syncing meta") public Boolean enableMetaSync = false; + @Parameter(names = {"--force-empty-sync"}, description = "Force syncing meta even on empty commit") + public Boolean forceEmptyMetaSync = false; + @Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools") public String syncClientToolClassNames = HiveSyncTool.class.getName(); @@ -443,6 +446,7 @@ public class HoodieDeltaStreamer implements Serializable { && Objects.equals(filterDupes, config.filterDupes) && Objects.equals(enableHiveSync, config.enableHiveSync) && Objects.equals(enableMetaSync, config.enableMetaSync) + && Objects.equals(forceEmptyMetaSync, config.forceEmptyMetaSync) && Objects.equals(syncClientToolClassNames, config.syncClientToolClassNames) && Objects.equals(maxPendingCompactions, config.maxPendingCompactions) && Objects.equals(maxPendingClustering, config.maxPendingClustering) @@ -468,7 +472,7 @@ public class HoodieDeltaStreamer implements Serializable { baseFileFormat, propsFilePath, configs, sourceClassName, sourceOrderingField, payloadClassName, schemaProviderClassName, transformerClassNames, sourceLimit, operation, filterDupes, - enableHiveSync, enableMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering, + enableHiveSync, enableMetaSync, forceEmptyMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering, continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors, deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare, compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint, @@ -494,6 +498,7 @@ public class HoodieDeltaStreamer implements Serializable { + ", filterDupes=" + filterDupes + ", enableHiveSync=" + enableHiveSync + ", enableMetaSync=" + enableMetaSync + + ", forceEmptyMetaSync=" + forceEmptyMetaSync + ", syncClientToolClassNames=" + syncClientToolClassNames + ", maxPendingCompactions=" + maxPendingCompactions + ", maxPendingClustering=" + maxPendingClustering diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 850b0d1d60..88948b0385 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -2169,6 +2169,27 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertFalse(tableFields.contains("partition_path")); } + @Test + public void testForceEmptyMetaSync() throws Exception { + String tableBasePath = dfsBasePath + "/test_force_empty_meta_sync"; + + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); + cfg.sourceLimit = 0; + cfg.allowCommitOnNoCheckpointChange = true; + cfg.enableMetaSync = true; + cfg.forceEmptyMetaSync = true; + + new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync(); + TestHelpers.assertRecordCount(0, tableBasePath, sqlContext); + + // make sure hive table is present + HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips"); + hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf()); + HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig); + final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME); + assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist"); + } + class TestDeltaSync extends DeltaSync { public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,