This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957_rebased
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 29c228a85a21e5faae0235057dd90f12dba5d58b
Author: asolomon <[email protected]>
AuthorDate: Tue Sep 9 03:18:01 2025 +0530

    [HBASE-29520] Utilize Backed-up Bulkloaded Files in Incremental Backup 
(#7246)
    
    Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
---
 .../backup/impl/AbstractPitrRestoreHandler.java    |  2 +-
 .../hadoop/hbase/backup/impl/BackupCommands.java   |  2 +-
 .../backup/impl/IncrementalTableBackupClient.java  | 23 +++++++-
 .../ContinuousBackupReplicationEndpoint.java       | 24 ++------
 .../hadoop/hbase/backup/util/BackupUtils.java      | 13 +++++
 .../apache/hadoop/hbase/backup/TestBackupBase.java | 13 +++++
 .../hbase/backup/TestBackupDeleteWithCleanup.java  |  2 +-
 .../hadoop/hbase/backup/TestContinuousBackup.java  |  6 --
 .../TestIncrementalBackupWithContinuous.java       | 67 ++++++++++++----------
 .../hbase/backup/impl/TestBackupCommands.java      |  2 +-
 .../TestContinuousBackupReplicationEndpoint.java   |  5 +-
 11 files changed, 95 insertions(+), 64 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
index 8072277bf68..048ed882fe8 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -21,8 +21,8 @@ import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINU
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
-import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
 
 import java.io.IOException;
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index 0b02a5edd89..a30530a98fc 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -49,8 +49,8 @@ import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKE
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC;
-import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 
 import java.io.IOException;
 import java.net.URI;
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 4d39d11a36d..8a893994616 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -19,9 +19,10 @@ package org.apache.hadoop.hbase.backup.impl;
 
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
-import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 
 import java.io.IOException;
 import java.net.URI;
@@ -170,6 +171,26 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
       Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
       Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
 
+      // For continuous backup: bulkload files are copied from backup 
directory defined by
+      // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster.
+      String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+      if (backupInfo.isContinuousBackupEnabled() && 
!Strings.isNullOrEmpty(backupRootDir)) {
+        String dayDirectoryName = 
BackupUtils.formatToDateString(bulkLoad.getTimestamp());
+        Path bulkLoadBackupPath =
+          new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + 
dayDirectoryName);
+        Path bulkLoadDir = new Path(bulkLoadBackupPath,
+          srcTable.getNamespaceAsString() + Path.SEPARATOR + 
srcTable.getNameAsString());
+        FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf);
+        Path fullBulkLoadBackupPath =
+          new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
+        if (backupFs.exists(fullBulkLoadBackupPath)) {
+          LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath);
+          p = fullBulkLoadBackupPath;
+        } else {
+          LOG.warn("Backup bulkload file not found {}", 
fullBulkLoadBackupPath);
+        }
+      }
+
       String srcTableQualifier = srcTable.getQualifierAsString();
       String srcTableNs = srcTable.getNamespaceAsString();
       Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index 69c445c484d..19624d04c23 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -21,11 +21,8 @@ import com.google.errorprone.annotations.RestrictedApi;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -41,6 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
@@ -94,7 +92,6 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
 
   public static final long ONE_DAY_IN_MILLISECONDS = TimeUnit.DAYS.toMillis(1);
   public static final String WAL_FILE_PREFIX = "wal_file.";
-  public static final String DATE_FORMAT = "yyyy-MM-dd";
 
   @Override
   public void init(Context context) throws IOException {
@@ -330,7 +327,7 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
   }
 
   private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
-    String dayDirectoryName = formatToDateString(dayInMillis);
+    String dayDirectoryName = BackupUtils.formatToDateString(dayInMillis);
 
     FileSystem fs = backupFileSystemManager.getBackupFs();
     Path walsDir = backupFileSystemManager.getWalsDir();
@@ -408,7 +405,7 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
       LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
         
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
     }
-    String dayDirectoryName = formatToDateString(dayInMillis);
+    String dayDirectoryName = BackupUtils.formatToDateString(dayInMillis);
     Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), 
dayDirectoryName);
     try {
       backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir);
@@ -446,7 +443,7 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
       } catch (IOException e) {
         throw new BulkLoadUploadException(
           String.format("%s Failed to copy bulk load file %s to %s on day %s",
-            Utils.logPeerId(peerId), file, destPath, 
formatToDateString(dayInMillis)),
+            Utils.logPeerId(peerId), file, destPath, 
BackupUtils.formatToDateString(dayInMillis)),
           e);
       }
     }
@@ -495,19 +492,6 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
     }
   }
 
-  /**
-   * Convert dayInMillis to "yyyy-MM-dd" format
-   */
-  @RestrictedApi(
-      explanation = "Package-private for test visibility only. Do not use 
outside tests.",
-      link = "",
-      allowedOnPath = 
"(.*/src/test/.*|.*/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java)")
-  String formatToDateString(long dayInMillis) {
-    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
-    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-    return dateFormat.format(new Date(dayInMillis));
-  }
-
   private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) 
throws IOException {
     FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
     Path rootDir = CommonFSUtils.getRootDir(conf);
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index d8a033a1f07..f76fa735321 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -24,14 +24,17 @@ import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarker
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URLDecoder;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TimeZone;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import org.apache.hadoop.conf.Configuration;
@@ -88,6 +91,7 @@ public final class BackupUtils {
   private static final Logger LOG = LoggerFactory.getLogger(BackupUtils.class);
   public static final String LOGNAME_SEPARATOR = ".";
   public static final int MILLISEC_IN_HOUR = 3600000;
+  public static final String DATE_FORMAT = "yyyy-MM-dd";
 
   private BackupUtils() {
     throw new AssertionError("Instantiating utility class...");
@@ -983,4 +987,13 @@ public final class BackupUtils {
     return admin.listReplicationPeers().stream()
       .anyMatch(peer -> 
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER));
   }
+
+  /**
+   * Convert dayInMillis to "yyyy-MM-dd" format
+   */
+  public static String formatToDateString(long dayInMillis) {
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    return dateFormat.format(new Date(dayInMillis));
+  }
 }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 4fb9f209b76..aae88c1bb9a 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -17,10 +17,13 @@
  */
 package org.apache.hadoop.hbase.backup;
 
+import static 
org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -45,6 +48,7 @@ import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
 import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient;
 import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager;
@@ -299,6 +303,9 @@ public class TestBackupBase {
     conf1.set(CONF_BACKUP_MAX_WAL_SIZE, "10240");
     conf1.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10");
     conf1.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10");
+    conf1.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    conf1.set(REPLICATION_CLUSTER_ID, "clusterId1");
+    conf1.setBoolean(IGNORE_EMPTY_FILES, true);
 
     if (secure) {
       // set the always on security provider
@@ -566,6 +573,12 @@ public class TestBackupBase {
     }
   }
 
+  BackupManifest getLatestBackupManifest(List<BackupInfo> backups) throws 
IOException {
+    BackupInfo newestBackup = backups.get(0);
+    return HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR),
+      newestBackup.getBackupId());
+  }
+
   void deleteContinuousBackupReplicationPeerIfExists(Admin admin) throws 
IOException {
     if (
       admin.listReplicationPeers().stream()
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
index 07c9110072b..8bd2fe4cc78 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
@@ -21,8 +21,8 @@ import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINU
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
-import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
index 0cc34ed63eb..2fdfa8b73f8 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
@@ -271,12 +271,6 @@ public class TestContinuousBackup extends TestBackupBase {
     }
   }
 
-  BackupManifest getLatestBackupManifest(List<BackupInfo> backups) throws 
IOException {
-    BackupInfo newestBackup = backups.get(0);
-    return HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR),
-      newestBackup.getBackupId());
-  }
-
   private void verifyTableInBackupSystemTable(TableName table) throws 
IOException {
     try (BackupSystemTable backupTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
       Map<TableName, Long> tableBackupMap = 
backupTable.getContinuousBackupTableSet();
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
index 0978ff3ebef..e67e50ebee3 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.backup;
 
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
 import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
 import static org.junit.Assert.assertEquals;
@@ -48,6 +49,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -57,7 +60,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 
 @Category(LargeTests.class)
-public class TestIncrementalBackupWithContinuous extends TestContinuousBackup {
+public class TestIncrementalBackupWithContinuous extends TestBackupBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
@@ -67,11 +70,31 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
     LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);
 
   private static final int ROWS_IN_BULK_LOAD = 100;
+  private static final String backupWalDirName = "TestContinuousBackupWalDir";
+
+  @Before
+  public void beforeTest() throws IOException {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    Path backupWalDir = new Path(root, backupWalDirName);
+    conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
+  }
+
+  @After
+  public void afterTest() throws IOException {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    Path backupWalDir = new Path(root, backupWalDirName);
+    FileSystem fs = FileSystem.get(conf1);
+    if (fs.exists(backupWalDir)) {
+      fs.delete(backupWalDir, true);
+    }
+    conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
+    deleteContinuousBackupReplicationPeerIfExists(TEST_UTIL.getAdmin());
+  }
 
   @Test
   public void testContinuousBackupWithIncrementalBackupSuccess() throws 
Exception {
-    LOG.info("Testing incremental backup with continuous backup");
-    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
     String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
     TableName tableName = TableName.valueOf("table_" + methodName);
     Table t1 = TEST_UTIL.createTable(tableName, famName);
@@ -80,18 +103,13 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
       int before = table.getBackupHistory().size();
 
       // Run continuous backup
-      String[] args = buildBackupArgs("full", new TableName[] { tableName }, 
true);
-      int ret = ToolRunner.run(conf1, new BackupDriver(), args);
-      assertEquals("Full Backup should succeed", 0, ret);
+      String backup1 = backupTables(BackupType.FULL, List.of(tableName), 
BACKUP_ROOT_DIR, true);
+      assertTrue(checkSucceeded(backup1));
 
       // Verify backup history increased and all the backups are succeeded
       LOG.info("Verify backup history increased and all the backups are 
succeeded");
       List<BackupInfo> backups = table.getBackupHistory();
       assertEquals("Backup history should increase", before + 1, 
backups.size());
-      for (BackupInfo data : List.of(backups.get(0))) {
-        String backupId = data.getBackupId();
-        assertTrue(checkSucceeded(backupId));
-      }
 
       // Verify backup manifest contains the correct tables
       LOG.info("Verify backup manifest contains the correct tables");
@@ -105,42 +123,34 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
       // Run incremental backup
       LOG.info("Run incremental backup now");
       before = table.getBackupHistory().size();
-      args = buildBackupArgs("incremental", new TableName[] { tableName }, 
false);
-      ret = ToolRunner.run(conf1, new BackupDriver(), args);
-      assertEquals("Incremental Backup should succeed", 0, ret);
+      String backup2 =
+        backupTables(BackupType.INCREMENTAL, List.of(tableName), 
BACKUP_ROOT_DIR, true);
+      assertTrue(checkSucceeded(backup2));
       LOG.info("Incremental backup completed");
 
       // Verify backup history increased and all the backups are succeeded
       backups = table.getBackupHistory();
-      String incrementalBackupid = null;
       assertEquals("Backup history should increase", before + 1, 
backups.size());
-      for (BackupInfo data : List.of(backups.get(0))) {
-        String backupId = data.getBackupId();
-        incrementalBackupid = backupId;
-        assertTrue(checkSucceeded(backupId));
-      }
 
       TEST_UTIL.truncateTable(tableName);
+
       // Restore incremental backup
       TableName[] tables = new TableName[] { tableName };
       BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
-      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
incrementalBackupid, false,
-        tables, tables, true));
+      client.restore(
+        BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, 
tables, tables, true));
 
       assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName));
-    } finally {
-      conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
     }
   }
 
   @Test
   public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs() 
throws Exception {
-    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
     String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
     TableName tableName1 = TableName.valueOf("table_" + methodName);
     TEST_UTIL.createTable(tableName1, famName);
-    try (BackupSystemTable systemTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
 
+    try (BackupSystemTable systemTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
       // The test starts with no data, and no bulk loaded rows.
       int expectedRowCount = 0;
       assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
@@ -157,7 +167,7 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
       assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
       assertEquals(1, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
       loadTable(TEST_UTIL.getConnection().getTable(tableName1));
-      Thread.sleep(10000);
+      Thread.sleep(15000);
 
       performBulkLoad("bulkPostIncr", methodName, tableName1);
       assertEquals(2, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
@@ -177,14 +187,11 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
       client.restore(
         BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, 
tables, tables, true));
       assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
-    } finally {
-      conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
     }
   }
 
   @Test
   public void testPitrFailureDueToMissingBackupPostBulkload() throws Exception 
{
-    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
     String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
     TableName tableName1 = TableName.valueOf("table_" + methodName);
     TEST_UTIL.createTable(tableName1, famName);
@@ -228,8 +235,6 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
         new TableName[] { restoredTable }, restoreTs, null);
       int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
       assertNotEquals("Restore should fail since there is one bulkload without 
any backup", 0, ret);
-    } finally {
-      conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
     }
   }
 
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
index be14227e4cc..15ab2b2bdbe 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
@@ -22,8 +22,8 @@ import static 
org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDire
 import static 
org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders;
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
-import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
index 8f8e83dbda6..cc9200882e3 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -26,10 +26,10 @@ import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplica
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
-import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.copyWithCleanup;
+import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -65,6 +65,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -497,7 +498,7 @@ public class TestContinuousBackupReplicationEndpoint {
         firstAttempt = false;
         try {
           // Construct destination path and create a partial file
-          String dayDirectoryName = formatToDateString(dayInMillis);
+          String dayDirectoryName = 
BackupUtils.formatToDateString(dayInMillis);
           BackupFileSystemManager backupFileSystemManager =
             new BackupFileSystemManager("peer1", conf, 
conf.get(CONF_BACKUP_ROOT_DIR));
           Path bulkloadDir =

Reply via email to