This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 5b220703567 [HUDI-6060] Added a config to backup instants before deletion during rollbacks and restores. (#8430) 5b220703567 is described below commit 5b22070356799e7470e0999781f9168c4e5ebcc6 Author: Prashant Wason <pwa...@uber.com> AuthorDate: Wed May 31 07:12:39 2023 -0700 [HUDI-6060] Added a config to backup instants before deletion during rollbacks and restores. (#8430) --- .../org/apache/hudi/config/HoodieWriteConfig.java | 28 ++++++++++++++++ .../rollback/BaseRollbackActionExecutor.java | 39 ++++++++++++++++++++++ .../TestCopyOnWriteRollbackActionExecutor.java | 36 ++++++++++++++++++++ .../table/timeline/HoodieActiveTimeline.java | 14 ++++++++ 4 files changed, 117 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index da72151601d..9a7ee2fbaa7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -698,6 +698,16 @@ public class HoodieWriteConfig extends HoodieConfig { + "will not print any configuration which contains the configured filter. For example with " + "a configured filter `ssl`, value for config `ssl.trustore.location` would be masked."); + public static final ConfigProperty<Boolean> ROLLBACK_INSTANT_BACKUP_ENABLED = ConfigProperty + .key("hoodie.rollback.instant.backup.enabled") + .defaultValue(false) + .withDocumentation("Backup instants removed during rollback and restore (useful for debugging)"); + + public static final ConfigProperty<String> ROLLBACK_INSTANT_BACKUP_DIRECTORY = ConfigProperty + .key("hoodie.rollback.instant.backup.dir") + .defaultValue(".rollback_backup") + .withDocumentation("Path where instants being rolled back are copied. If not absolute path then a directory relative to .hoodie folder is created."); + private ConsistencyGuardConfig consistencyGuardConfig; private FileSystemRetryConfig fileSystemRetryConfig; @@ -2489,6 +2499,14 @@ public class HoodieWriteConfig extends HoodieConfig { return tableServiceManagerConfig.isTableServiceManagerEnabled(); } + public boolean shouldBackupRollbacks() { + return getBoolean(ROLLBACK_INSTANT_BACKUP_ENABLED); + } + + public String getRollbackBackupDirectory() { + return getString(ROLLBACK_INSTANT_BACKUP_DIRECTORY); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); @@ -2961,6 +2979,16 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withRollbackBackupEnabled(boolean rollbackBackupEnabled) { + writeConfig.setValue(ROLLBACK_INSTANT_BACKUP_ENABLED, String.valueOf(rollbackBackupEnabled)); + return this; + } + + public Builder withRollbackBackupDirectory(String backupDir) { + writeConfig.setValue(ROLLBACK_INSTANT_BACKUP_DIRECTORY, backupDir); + return this; + } + protected void setDefaults() { writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index 86b58b85dbc..3e887503db4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; @@ -45,6 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -211,6 +213,8 @@ public abstract class BaseRollbackActionExecutor<T, I, K, O> extends BaseActionE validateRollbackCommitSequence(); } + backupRollbackInstantsIfNeeded(); + try { List<HoodieRollbackStat> stats = executeRollback(hoodieRollbackPlan); LOG.info("Rolled back inflight instant " + instantTimeToRollback); @@ -297,4 +301,39 @@ public abstract class BaseRollbackActionExecutor<T, I, K, O> extends BaseActionE BootstrapIndex.getBootstrapIndex(table.getMetaClient()).dropIndex(); } } + + private void backupRollbackInstantsIfNeeded() { + if (!config.shouldBackupRollbacks()) { + // Backup not required + return; + } + + Path backupDir = new Path(config.getRollbackBackupDirectory()); + if (!backupDir.isAbsolute()) { + // Path specified is relative to the meta directory + backupDir = new Path(table.getMetaClient().getMetaPath(), config.getRollbackBackupDirectory()); + } + + // Determine the instants to back up + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + List<HoodieInstant> instantsToBackup = new ArrayList<>(3); + instantsToBackup.add(instantToRollback); + if (instantToRollback.isCompleted()) { + instantsToBackup.add(HoodieTimeline.getInflightInstant(instantToRollback, table.getMetaClient())); + instantsToBackup.add(HoodieTimeline.getRequestedInstant(instantToRollback)); + } + if (instantToRollback.isInflight()) { + instantsToBackup.add(HoodieTimeline.getRequestedInstant(instantToRollback)); + } + + for (HoodieInstant instant : instantsToBackup) { + try { + activeTimeline.copyInstant(instant, backupDir); + LOG.info(String.format("Copied instant %s to backup dir %s during rollback at %s", instant, backupDir, instantTime)); + } catch (HoodieIOException e) { + // Ignoring error in backing up + LOG.warn("Failed to backup rollback instant: " + e.getMessage()); + } + } + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index b265f860be3..b0eae3b83ec 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -39,6 +39,7 @@ import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.testutils.Assertions; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -328,4 +329,39 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, commitInstant.getTimestamp()).doesMarkerDirExist()); } + + @Test + public void testRollbackBackup() throws Exception { + final String p1 = "2015/03/16"; + final String p2 = "2015/03/17"; + final String p3 = "2016/03/15"; + // Let's create some commit files and parquet files + HoodieTestTable testTable = HoodieTestTable.of(metaClient) + .withPartitionMetaFiles(p1, p2, p3) + .addCommit("001") + .withBaseFilesInPartition(p1, "id11") + .withBaseFilesInPartition(p2, "id12") + .withLogFile(p1, "id11", 3) + .addCommit("002") + .withBaseFilesInPartition(p1, "id21") + .withBaseFilesInPartition(p2, "id22"); + + HoodieTable table = this.getHoodieTable(metaClient, getConfigBuilder().withRollbackBackupEnabled(true).build()); + HoodieInstant needRollBackInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "002"); + + // Create the rollback plan and perform the rollback + BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor = + new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false, + table.getConfig().shouldRollbackUsingMarkers()); + copyOnWriteRollbackPlanActionExecutor.execute(); + + CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", + needRollBackInstant, true, false); + copyOnWriteRollbackActionExecutor.execute(); + + // Completed and inflight instants should have been backed up + Path backupDir = new Path(metaClient.getMetaPath(), table.getConfig().getRollbackBackupDirectory()); + assertTrue(fs.exists(new Path(backupDir, testTable.getCommitFilePath("002").getName()))); + assertTrue(fs.exists(new Path(backupDir, testTable.getInflightCommitFilePath("002").getName()))); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index fb33da5ec45..e8c94e474fa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -31,6 +31,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -807,4 +808,17 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { public HoodieActiveTimeline reload() { return new HoodieActiveTimeline(metaClient); } + + public void copyInstant(HoodieInstant instant, Path dstDir) { + Path srcPath = new Path(metaClient.getMetaPath(), instant.getFileName()); + Path dstPath = new Path(dstDir, instant.getFileName()); + try { + FileSystem srcFs = srcPath.getFileSystem(metaClient.getHadoopConf()); + FileSystem dstFs = dstPath.getFileSystem(metaClient.getHadoopConf()); + dstFs.mkdirs(dstDir); + FileUtil.copy(srcFs, srcPath, dstFs, dstPath, false, true, srcFs.getConf()); + } catch (IOException e) { + throw new HoodieIOException("Could not copy instant from " + srcPath + " to " + dstPath, e); + } + } }