This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b220703567 [HUDI-6060] Added a config to backup instants before 
deletion during rollbacks and restores. (#8430)
5b220703567 is described below

commit 5b22070356799e7470e0999781f9168c4e5ebcc6
Author: Prashant Wason <pwa...@uber.com>
AuthorDate: Wed May 31 07:12:39 2023 -0700

    [HUDI-6060] Added a config to backup instants before deletion during 
rollbacks and restores. (#8430)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 28 ++++++++++++++++
 .../rollback/BaseRollbackActionExecutor.java       | 39 ++++++++++++++++++++++
 .../TestCopyOnWriteRollbackActionExecutor.java     | 36 ++++++++++++++++++++
 .../table/timeline/HoodieActiveTimeline.java       | 14 ++++++++
 4 files changed, 117 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index da72151601d..9a7ee2fbaa7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -698,6 +698,16 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "will not print any configuration which contains the configured 
filter. For example with "
           + "a configured filter `ssl`, value for config 
`ssl.trustore.location` would be masked.");
 
+  public static final ConfigProperty<Boolean> ROLLBACK_INSTANT_BACKUP_ENABLED 
= ConfigProperty
+          .key("hoodie.rollback.instant.backup.enabled")
+          .defaultValue(false)
+          .withDocumentation("Backup instants removed during rollback and 
restore (useful for debugging)");
+
+  public static final ConfigProperty<String> ROLLBACK_INSTANT_BACKUP_DIRECTORY 
= ConfigProperty
+          .key("hoodie.rollback.instant.backup.dir")
+          .defaultValue(".rollback_backup")
+          .withDocumentation("Path where instants being rolled back are 
copied. If not absolute path then a directory relative to .hoodie folder is 
created.");
+
   private ConsistencyGuardConfig consistencyGuardConfig;
   private FileSystemRetryConfig fileSystemRetryConfig;
 
@@ -2489,6 +2499,14 @@ public class HoodieWriteConfig extends HoodieConfig {
     return tableServiceManagerConfig.isTableServiceManagerEnabled();
   }
 
+  public boolean shouldBackupRollbacks() {
+    return getBoolean(ROLLBACK_INSTANT_BACKUP_ENABLED);
+  }
+
+  public String getRollbackBackupDirectory() {
+    return getString(ROLLBACK_INSTANT_BACKUP_DIRECTORY);
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -2961,6 +2979,16 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withRollbackBackupEnabled(boolean rollbackBackupEnabled) {
+      writeConfig.setValue(ROLLBACK_INSTANT_BACKUP_ENABLED, 
String.valueOf(rollbackBackupEnabled));
+      return this;
+    }
+
+    public Builder withRollbackBackupDirectory(String backupDir) {
+      writeConfig.setValue(ROLLBACK_INSTANT_BACKUP_DIRECTORY, backupDir);
+      return this;
+    }
+
     protected void setDefaults() {
       writeConfig.setDefaultValue(MARKERS_TYPE, 
getDefaultMarkersType(engineType));
       // Check for mandatory properties
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 86b58b85dbc..3e887503db4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.rollback;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
 import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
@@ -45,6 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -211,6 +213,8 @@ public abstract class BaseRollbackActionExecutor<T, I, K, 
O> extends BaseActionE
       validateRollbackCommitSequence();
     }
 
+    backupRollbackInstantsIfNeeded();
+
     try {
       List<HoodieRollbackStat> stats = executeRollback(hoodieRollbackPlan);
       LOG.info("Rolled back inflight instant " + instantTimeToRollback);
@@ -297,4 +301,39 @@ public abstract class BaseRollbackActionExecutor<T, I, K, 
O> extends BaseActionE
       BootstrapIndex.getBootstrapIndex(table.getMetaClient()).dropIndex();
     }
   }
+
+  private void backupRollbackInstantsIfNeeded() {
+    if (!config.shouldBackupRollbacks()) {
+      // Backup not required
+      return;
+    }
+
+    Path backupDir = new Path(config.getRollbackBackupDirectory());
+    if (!backupDir.isAbsolute()) {
+      // Path specified is relative to the meta directory
+      backupDir = new Path(table.getMetaClient().getMetaPath(), 
config.getRollbackBackupDirectory());
+    }
+
+    // Determine the instants to back up
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    List<HoodieInstant> instantsToBackup = new ArrayList<>(3);
+    instantsToBackup.add(instantToRollback);
+    if (instantToRollback.isCompleted()) {
+      
instantsToBackup.add(HoodieTimeline.getInflightInstant(instantToRollback, 
table.getMetaClient()));
+      
instantsToBackup.add(HoodieTimeline.getRequestedInstant(instantToRollback));
+    }
+    if (instantToRollback.isInflight()) {
+      
instantsToBackup.add(HoodieTimeline.getRequestedInstant(instantToRollback));
+    }
+
+    for (HoodieInstant instant : instantsToBackup) {
+      try {
+        activeTimeline.copyInstant(instant, backupDir);
+        LOG.info(String.format("Copied instant %s to backup dir %s during 
rollback at %s", instant, backupDir, instantTime));
+      } catch (HoodieIOException e) {
+        // Ignoring error in backing up
+        LOG.warn("Failed to backup rollback instant: " + e.getMessage());
+      }
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index b265f860be3..b0eae3b83ec 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -39,6 +39,7 @@ import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.testutils.Assertions;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -328,4 +329,39 @@ public class TestCopyOnWriteRollbackActionExecutor extends 
HoodieClientRollbackT
 
     assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, 
commitInstant.getTimestamp()).doesMarkerDirExist());
   }
+
+  @Test
+  public void testRollbackBackup() throws Exception {
+    final String p1 = "2015/03/16";
+    final String p2 = "2015/03/17";
+    final String p3 = "2016/03/15";
+    // Let's create some commit files and parquet files
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient)
+        .withPartitionMetaFiles(p1, p2, p3)
+        .addCommit("001")
+        .withBaseFilesInPartition(p1, "id11")
+        .withBaseFilesInPartition(p2, "id12")
+        .withLogFile(p1, "id11", 3)
+        .addCommit("002")
+        .withBaseFilesInPartition(p1, "id21")
+        .withBaseFilesInPartition(p2, "id22");
+
+    HoodieTable table = this.getHoodieTable(metaClient, 
getConfigBuilder().withRollbackBackupEnabled(true).build());
+    HoodieInstant needRollBackInstant = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "002");
+
+    // Create the rollback plan and perform the rollback
+    BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
+        new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, 
"003", needRollBackInstant, false,
+        table.getConfig().shouldRollbackUsingMarkers());
+    copyOnWriteRollbackPlanActionExecutor.execute();
+
+    CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new 
CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003",
+        needRollBackInstant, true, false);
+    copyOnWriteRollbackActionExecutor.execute();
+
+    // Completed and inflight instants should have been backed up
+    Path backupDir = new Path(metaClient.getMetaPath(), 
table.getConfig().getRollbackBackupDirectory());
+    assertTrue(fs.exists(new Path(backupDir, 
testTable.getCommitFilePath("002").getName())));
+    assertTrue(fs.exists(new Path(backupDir, 
testTable.getInflightCommitFilePath("002").getName())));
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index fb33da5ec45..e8c94e474fa 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -31,6 +31,7 @@ import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -807,4 +808,17 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
   public HoodieActiveTimeline reload() {
     return new HoodieActiveTimeline(metaClient);
   }
+
+  public void copyInstant(HoodieInstant instant, Path dstDir) {
+    Path srcPath = new Path(metaClient.getMetaPath(), instant.getFileName());
+    Path dstPath = new Path(dstDir, instant.getFileName());
+    try {
+      FileSystem srcFs = srcPath.getFileSystem(metaClient.getHadoopConf());
+      FileSystem dstFs = dstPath.getFileSystem(metaClient.getHadoopConf());
+      dstFs.mkdirs(dstDir);
+      FileUtil.copy(srcFs, srcPath, dstFs, dstPath, false, true, 
srcFs.getConf());
+    } catch (IOException e) {
+      throw new HoodieIOException("Could not copy instant from " + srcPath + " 
to " + dstPath, e);
+    }
+  }
 }

Reply via email to