This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957_rebased
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 0bff7ebf7eb1fd087da4dd6416f99371b26ea74b
Author: vinayak hegde <[email protected]>
AuthorDate: Tue Mar 4 22:27:44 2025 +0530

    HBASE-29025: Enhance the full backup command to support Continuous Backup 
(#6710)
    
    * HBASE-29025: Enhance the full backup command to support continuous backup
    
    * add new check for full backup command regards to continuous backup flag
    
    * minor fixes
---
 .../apache/hadoop/hbase/backup/BackupDriver.java   |   6 +-
 .../org/apache/hadoop/hbase/backup/BackupInfo.java |  12 +
 .../apache/hadoop/hbase/backup/BackupRequest.java  |  14 +
 .../hbase/backup/BackupRestoreConstants.java       |  12 +
 .../hadoop/hbase/backup/impl/BackupAdminImpl.java  |   3 +-
 .../hadoop/hbase/backup/impl/BackupCommands.java   |  63 ++++-
 .../hadoop/hbase/backup/impl/BackupManager.java    |  18 +-
 .../hbase/backup/impl/BackupSystemTable.java       |  94 +++++++
 .../hbase/backup/impl/FullTableBackupClient.java   | 245 +++++++++++++----
 .../hbase/backup/impl/TableBackupClient.java       |   2 +-
 .../hadoop/hbase/backup/TestContinuousBackup.java  | 302 +++++++++++++++++++++
 .../src/main/protobuf/Backup.proto                 |   1 +
 12 files changed, 707 insertions(+), 65 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
index d55a280b4aa..e096bbee161 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
@@ -17,11 +17,14 @@
  */
 package org.apache.hadoop.hbase.backup;
 
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_ENABLE_CONTINUOUS_BACKUP;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP_DESC;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM_DESC;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_KEEP;
@@ -159,7 +162,8 @@ public class BackupDriver extends AbstractHBaseTool {
     addOptWithArg(OPTION_PATH, OPTION_PATH_DESC);
     addOptWithArg(OPTION_KEEP, OPTION_KEEP_DESC);
     addOptWithArg(OPTION_YARN_QUEUE_NAME, OPTION_YARN_QUEUE_NAME_DESC);
-
+    addOptNoArg(OPTION_ENABLE_CONTINUOUS_BACKUP, 
LONG_OPTION_ENABLE_CONTINUOUS_BACKUP,
+      OPTION_ENABLE_CONTINUOUS_BACKUP_DESC);
   }
 
   @Override
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
index f0dc10b8361..862a9cbad10 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
@@ -71,6 +71,7 @@ public class BackupInfo implements Comparable<BackupInfo> {
    */
   public enum BackupPhase {
     REQUEST,
+    SETUP_WAL_REPLICATION,
     SNAPSHOT,
     PREPARE_INCREMENTAL,
     SNAPSHOTCOPY,
@@ -170,6 +171,8 @@ public class BackupInfo implements Comparable<BackupInfo> {
    */
   private boolean noChecksumVerify;
 
+  private boolean continuousBackupEnabled;
+
   public BackupInfo() {
     backupTableInfoMap = new HashMap<>();
   }
@@ -185,6 +188,7 @@ public class BackupInfo implements Comparable<BackupInfo> {
     }
     this.startTs = 0;
     this.completeTs = 0;
+    this.continuousBackupEnabled = false;
   }
 
   public int getWorkers() {
@@ -592,4 +596,12 @@ public class BackupInfo implements Comparable<BackupInfo> {
     Long otherTS = 
Long.valueOf(o.getBackupId().substring(o.getBackupId().lastIndexOf("_") + 1));
     return thisTS.compareTo(otherTS);
   }
+
+  public void setContinuousBackupEnabled(boolean continuousBackupEnabled) {
+    this.continuousBackupEnabled = continuousBackupEnabled;
+  }
+
+  public boolean isContinuousBackupEnabled() {
+    return this.continuousBackupEnabled;
+  }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRequest.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRequest.java
index aa2d5b44259..822c84c57c0 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRequest.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRequest.java
@@ -75,6 +75,11 @@ public final class BackupRequest {
       return this;
     }
 
+    public Builder withContinuousBackupEnabled(boolean 
continuousBackupEnabled) {
+      request.setContinuousBackupEnabled(continuousBackupEnabled);
+      return this;
+    }
+
     public BackupRequest build() {
       return request;
     }
@@ -89,6 +94,7 @@ public final class BackupRequest {
   private boolean noChecksumVerify = false;
   private String backupSetName;
   private String yarnPoolName;
+  private boolean continuousBackupEnabled;
 
   private BackupRequest() {
   }
@@ -163,4 +169,12 @@ public final class BackupRequest {
   public void setYarnPoolName(String yarnPoolName) {
     this.yarnPoolName = yarnPoolName;
   }
+
+  private void setContinuousBackupEnabled(boolean continuousBackupEnabled) {
+    this.continuousBackupEnabled = continuousBackupEnabled;
+  }
+
+  public boolean isContinuousBackupEnabled() {
+    return this.continuousBackupEnabled;
+  }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
index 57454d40217..f5c49adb696 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
@@ -99,6 +99,11 @@ public interface BackupRestoreConstants {
   String OPTION_YARN_QUEUE_NAME_DESC = "Yarn queue name to run backup create 
command on";
   String OPTION_YARN_QUEUE_NAME_RESTORE_DESC = "Yarn queue name to run backup 
restore command on";
 
+  String OPTION_ENABLE_CONTINUOUS_BACKUP = "cb";
+  String LONG_OPTION_ENABLE_CONTINUOUS_BACKUP = "continuous-backup-enabled";
+  String OPTION_ENABLE_CONTINUOUS_BACKUP_DESC =
+    "Flag indicating that the full backup is part of a continuous backup 
process.";
+
   String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
   String BACKUP_CONFIG_STRING = BackupRestoreConstants.BACKUP_ENABLE_KEY + 
"=true\n"
@@ -126,6 +131,13 @@ public interface BackupRestoreConstants {
 
   String BACKUPID_PREFIX = "backup_";
 
+  String CONTINUOUS_BACKUP_REPLICATION_PEER = 
"continuous_backup_replication_peer";
+
+  String DEFAULT_CONTINUOUS_BACKUP_REPLICATION_ENDPOINT =
+    
"org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint";
+
+  String CONF_CONTINUOUS_BACKUP_WAL_DIR = "hbase.backup.continuous.wal.dir";
+
   enum BackupCommand {
     CREATE,
     CANCEL,
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index c36b398e5e8..1e745c69cda 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -581,7 +581,8 @@ public class BackupAdminImpl implements BackupAdmin {
     request = 
builder.withBackupType(request.getBackupType()).withTableList(tableList)
       
.withTargetRootDir(request.getTargetRootDir()).withBackupSetName(request.getBackupSetName())
       .withTotalTasks(request.getTotalTasks()).withBandwidthPerTasks((int) 
request.getBandwidth())
-      .withNoChecksumVerify(request.getNoChecksumVerify()).build();
+      .withNoChecksumVerify(request.getNoChecksumVerify())
+      
.withContinuousBackupEnabled(request.isContinuousBackupEnabled()).build();
 
     TableBackupClient client;
     try {
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index 66694f4384f..ab9ca1c4ed2 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -22,6 +22,8 @@ import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDW
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP_DESC;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM_DESC;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_KEEP;
@@ -45,6 +47,7 @@ import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -339,14 +342,64 @@ public final class BackupCommands {
 
       boolean ignoreChecksum = cmdline.hasOption(OPTION_IGNORECHECKSUM);
 
+      BackupType backupType = BackupType.valueOf(args[1].toUpperCase());
+      List<TableName> tableNameList = null;
+      if (tables != null) {
+        tableNameList = 
Lists.newArrayList(BackupUtils.parseTableNames(tables));
+      }
+      boolean continuousBackup = 
cmdline.hasOption(OPTION_ENABLE_CONTINUOUS_BACKUP);
+      if (continuousBackup && !BackupType.FULL.equals(backupType)) {
+        System.out.println("ERROR: Continuous backup can Only be specified for 
Full Backup");
+        printUsage();
+        throw new IOException(INCORRECT_USAGE);
+      }
+
+      /*
+       * The `continuousBackup` flag is specified only during the first full 
backup to initiate
+       * continuous WAL replication. After that, it is redundant because the 
tables are already set
+       * up for continuous backup. If the `continuousBackup` flag is not 
explicitly enabled, we need
+       * to determine the backup mode based on the current state of the 
specified tables: - If all
+       * the specified tables are already part of continuous backup, we treat 
the request as a
+       * continuous backup request and proceed accordingly (since these tables 
are already
+       * continuously backed up, no additional setup is needed). - If none of 
the specified tables
+       * are part of continuous backup, we treat the request as a normal full 
backup without
+       * continuous backup. - If the request includes a mix of tables—some 
with continuous backup
+       * enabled and others without—we cannot determine a clear backup 
strategy. In this case, we
+       * throw an error. If all tables are already in continuous backup mode, 
we explicitly set the
+       * `continuousBackup` flag to `true` so that the request is processed 
using the continuous
+       * backup approach rather than the normal full backup flow.
+       */
+      if (!continuousBackup && tableNameList != null && 
!tableNameList.isEmpty()) {
+        try (BackupSystemTable backupSystemTable = new 
BackupSystemTable(conn)) {
+          Set<TableName> continuousBackupTableSet =
+            backupSystemTable.getContinuousBackupTableSet().keySet();
+
+          boolean allTablesInContinuousBackup = 
continuousBackupTableSet.containsAll(tableNameList);
+          boolean noTablesInContinuousBackup =
+            
tableNameList.stream().noneMatch(continuousBackupTableSet::contains);
+
+          // Ensure that all tables are either fully in continuous backup or 
not at all
+          if (!allTablesInContinuousBackup && !noTablesInContinuousBackup) {
+            System.err
+              .println("ERROR: Some tables are already in continuous backup, 
while others are not. "
+                + "Cannot mix both in a single request.");
+            printUsage();
+            throw new IOException(INCORRECT_USAGE);
+          }
+
+          // If all tables are already in continuous backup, enable the flag
+          if (allTablesInContinuousBackup) {
+            continuousBackup = true;
+          }
+        }
+      }
+
       try (BackupAdminImpl admin = new BackupAdminImpl(conn)) {
         BackupRequest.Builder builder = new BackupRequest.Builder();
-        BackupRequest request = 
builder.withBackupType(BackupType.valueOf(args[1].toUpperCase()))
-          .withTableList(
-            tables != null ? 
Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
+        BackupRequest request = 
builder.withBackupType(backupType).withTableList(tableNameList)
           .withTargetRootDir(targetBackupDir).withTotalTasks(workers)
           
.withBandwidthPerTasks(bandwidth).withNoChecksumVerify(ignoreChecksum)
-          .withBackupSetName(setName).build();
+          
.withBackupSetName(setName).withContinuousBackupEnabled(continuousBackup).build();
         String backupId = admin.backupTables(request);
         System.out.println("Backup session " + backupId + " finished. Status: 
SUCCESS");
       } catch (IOException e) {
@@ -400,6 +453,8 @@ public final class BackupCommands {
       options.addOption(OPTION_YARN_QUEUE_NAME, true, 
OPTION_YARN_QUEUE_NAME_DESC);
       options.addOption(OPTION_DEBUG, false, OPTION_DEBUG_DESC);
       options.addOption(OPTION_IGNORECHECKSUM, false, 
OPTION_IGNORECHECKSUM_DESC);
+      options.addOption(OPTION_ENABLE_CONTINUOUS_BACKUP, false,
+        OPTION_ENABLE_CONTINUOUS_BACKUP_DESC);
 
       HelpFormatter helpFormatter = new HelpFormatter();
       helpFormatter.setLeftPadding(2);
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 810af8f032c..8b17e93868b 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -199,8 +199,8 @@ public class BackupManager implements Closeable {
    * @throws BackupException exception
    */
   public BackupInfo createBackupInfo(String backupId, BackupType type, 
List<TableName> tableList,
-    String targetRootDir, int workers, long bandwidth, boolean 
noChecksumVerify)
-    throws BackupException {
+    String targetRootDir, int workers, long bandwidth, boolean 
noChecksumVerify,
+    boolean continuousBackupEnabled) throws BackupException {
     if (targetRootDir == null) {
       throw new BackupException("Wrong backup request parameter: target backup 
root directory");
     }
@@ -238,6 +238,7 @@ public class BackupManager implements Closeable {
     backupInfo.setBandwidth(bandwidth);
     backupInfo.setWorkers(workers);
     backupInfo.setNoChecksumVerify(noChecksumVerify);
+    backupInfo.setContinuousBackupEnabled(continuousBackupEnabled);
     return backupInfo;
   }
 
@@ -427,4 +428,17 @@ public class BackupManager implements Closeable {
   public Connection getConnection() {
     return conn;
   }
+
+  /**
+   * Adds a set of tables to the global continuous backup set. Only tables 
that do not already have
+   * continuous backup enabled will be updated.
+   * @param tables         set of tables to add to continuous backup
+   * @param startTimestamp timestamp indicating when continuous backup started 
for newly added
+   *                       tables
+   * @throws IOException if an error occurs while updating the backup system 
table
+   */
+  public void addContinuousBackupTableSet(Set<TableName> tables, long 
startTimestamp)
+    throws IOException {
+    systemTable.addContinuousBackupTableSet(tables, startTimestamp);
+  }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index f2ddcf5e757..752f448a301 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -169,6 +169,7 @@ public final class BackupSystemTable implements Closeable {
   private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
 
   private final static String INCR_BACKUP_SET = "incrbackupset:";
+  private final static String CONTINUOUS_BACKUP_SET = "continuousbackupset";
   private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
   private final static String RS_LOG_TS_PREFIX = "rslogts:";
 
@@ -892,6 +893,37 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
+  /**
+   * Retrieves the current set of tables covered by continuous backup along 
with the timestamp
+   * indicating when continuous backup started for each table.
+   * @return a map where the key is the table name and the value is the 
timestamp representing the
+   *         start time of continuous backup for that table.
+   * @throws IOException if an I/O error occurs while accessing the backup 
system table.
+   */
+  public Map<TableName, Long> getContinuousBackupTableSet() throws IOException 
{
+    LOG.trace("Retrieving continuous backup table set from the backup system 
table.");
+    Map<TableName, Long> tableMap = new TreeMap<>();
+
+    try (Table systemTable = connection.getTable(tableName)) {
+      Get getOperation = createGetForContinuousBackupTableSet();
+      Result result = systemTable.get(getOperation);
+
+      if (result.isEmpty()) {
+        return tableMap;
+      }
+
+      // Extract table names and timestamps from the result cells
+      List<Cell> cells = result.listCells();
+      for (Cell cell : cells) {
+        TableName tableName = TableName.valueOf(CellUtil.cloneQualifier(cell));
+        long timestamp = Bytes.toLong(CellUtil.cloneValue(cell));
+        tableMap.put(tableName, timestamp);
+      }
+    }
+
+    return tableMap;
+  }
+
   /**
    * Add tables to global incremental backup set
    * @param tables     set of tables
@@ -913,6 +945,34 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
+  /**
+   * Add tables to the global continuous backup set. Only updates tables that 
are not already in the
+   * continuous backup set.
+   * @param tables         set of tables to add
+   * @param startTimestamp timestamp indicating when continuous backup started
+   * @throws IOException if an error occurs while updating the backup system 
table
+   */
+  public void addContinuousBackupTableSet(Set<TableName> tables, long 
startTimestamp)
+    throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Add continuous backup table set to backup system table. 
tables ["
+        + StringUtils.join(tables, " ") + "]");
+    }
+    if (LOG.isDebugEnabled()) {
+      tables.forEach(table -> LOG.debug(Objects.toString(table)));
+    }
+
+    // Get existing continuous backup tables
+    Map<TableName, Long> existingTables = getContinuousBackupTableSet();
+
+    try (Table table = connection.getTable(tableName)) {
+      Put put = createPutForContinuousBackupTableSet(tables, existingTables, 
startTimestamp);
+      if (!put.isEmpty()) {
+        table.put(put);
+      }
+    }
+  }
+
   /**
    * Deletes incremental backup set for a backup destination
    * @param backupRoot backup root
@@ -1241,6 +1301,18 @@ public final class BackupSystemTable implements 
Closeable {
     return get;
   }
 
+  /**
+   * Creates a Get operation to retrieve the continuous backup table set from 
the backup system
+   * table.
+   * @return a Get operation for retrieving the table set
+   */
+  private Get createGetForContinuousBackupTableSet() throws IOException {
+    Get get = new Get(rowkey(CONTINUOUS_BACKUP_SET));
+    get.addFamily(BackupSystemTable.META_FAMILY);
+    get.readVersions(1);
+    return get;
+  }
+
   /**
    * Creates Put to store incremental backup table set
    * @param tables tables
@@ -1255,6 +1327,28 @@ public final class BackupSystemTable implements 
Closeable {
     return put;
   }
 
+  /**
+   * Creates a Put operation to store the continuous backup table set. Only 
includes tables that are
+   * not already in the set.
+   * @param tables         tables to add
+   * @param existingTables tables that already have continuous backup enabled
+   * @param startTimestamp timestamp indicating when continuous backup started
+   * @return put operation
+   */
+  private Put createPutForContinuousBackupTableSet(Set<TableName> tables,
+    Map<TableName, Long> existingTables, long startTimestamp) {
+    Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET));
+
+    for (TableName table : tables) {
+      if (!existingTables.containsKey(table)) {
+        put.addColumn(BackupSystemTable.META_FAMILY, 
Bytes.toBytes(table.getNameAsString()),
+          Bytes.toBytes(startTimestamp));
+      }
+    }
+
+    return put;
+  }
+
   /**
    * Creates Delete for incremental backup table set
    * @param backupRoot backup root
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index 2293fd4f814..de47e8f3391 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -17,16 +17,24 @@
  */
 package org.apache.hadoop.hbase.backup.impl;
 
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_MAX_ATTEMPTS;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_REPLICATION_ENDPOINT;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupCopyJob;
 import org.apache.hadoop.hbase.backup.BackupInfo;
@@ -37,7 +45,13 @@ import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -61,9 +75,9 @@ public class FullTableBackupClient extends TableBackupClient {
   /**
    * Do snapshot copy.
    * @param backupInfo backup info
-   * @throws Exception exception
+   * @throws IOException exception
    */
-  protected void snapshotCopy(BackupInfo backupInfo) throws Exception {
+  protected void snapshotCopy(BackupInfo backupInfo) throws IOException {
     LOG.info("Snapshot copy is starting.");
 
     // set overall backup phase: snapshot_copy
@@ -131,72 +145,22 @@ public class FullTableBackupClient extends 
TableBackupClient {
   @Override
   public void execute() throws IOException {
     try (Admin admin = conn.getAdmin()) {
-      // Begin BACKUP
       beginBackup(backupManager, backupInfo);
-      String savedStartCode;
-      boolean firstBackup;
-      // do snapshot for full table backup
-
-      savedStartCode = backupManager.readBackupStartCode();
-      firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) 
== 0L;
-      if (firstBackup) {
-        // This is our first backup. Let's put some marker to system table so 
that we can hold the
-        // logs while we do the backup.
-        backupManager.writeBackupStartCode(0L);
-      }
-      // We roll log here before we do the snapshot. It is possible there is 
duplicate data
-      // in the log that is already in the snapshot. But if we do it after the 
snapshot, we
-      // could have data loss.
-      // A better approach is to do the roll log on each RS in the same global 
procedure as
-      // the snapshot.
-      LOG.info("Execute roll log procedure for full backup ...");
 
       // Gather the bulk loads being tracked by the system, which can be 
deleted (since their data
       // will be part of the snapshot being taken). We gather this list before 
taking the actual
       // snapshots for the same reason as the log rolls.
       List<BulkLoad> bulkLoadsToDelete = 
backupManager.readBulkloadRows(tableList);
 
-      BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);
-
-      newTimestamps = backupManager.readRegionServerLastLogRollResult();
-
-      // SNAPSHOT_TABLES:
-      backupInfo.setPhase(BackupPhase.SNAPSHOT);
-      for (TableName tableName : tableList) {
-        String snapshotName = "snapshot_" + 
Long.toString(EnvironmentEdgeManager.currentTime())
-          + "_" + tableName.getNamespaceAsString() + "_" + 
tableName.getQualifierAsString();
-
-        snapshotTable(admin, tableName, snapshotName);
-        backupInfo.setSnapshotName(tableName, snapshotName);
+      if (backupInfo.isContinuousBackupEnabled()) {
+        handleContinuousBackup(admin);
+      } else {
+        handleNonContinuousBackup(admin);
       }
 
-      // SNAPSHOT_COPY:
-      // do snapshot copy
-      LOG.debug("snapshot copy for " + backupId);
-      snapshotCopy(backupInfo);
-      // Updates incremental backup table set
-      backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
-
-      // BACKUP_COMPLETE:
-      // set overall backup status: complete. Here we make sure to complete 
the backup.
-      // After this checkpoint, even if entering cancel process, will let the 
backup finished
-      backupInfo.setState(BackupState.COMPLETE);
-      // The table list in backupInfo is good for both full backup and 
incremental backup.
-      // For incremental backup, it contains the incremental backup table set.
-      backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), 
newTimestamps);
-
-      Map<TableName, Map<String, Long>> newTableSetTimestampMap =
-        backupManager.readLogTimestampMap();
-
-      backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
-      Long newStartCode =
-        
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
-      backupManager.writeBackupStartCode(newStartCode);
-
       backupManager
         
.deleteBulkLoadedRows(bulkLoadsToDelete.stream().map(BulkLoad::getRowKey).toList());
 
-      // backup complete
       completeBackup(conn, backupInfo, BackupType.FULL, conf);
     } catch (Exception e) {
       failBackup(conn, backupInfo, backupManager, e, "Unexpected 
BackupException : ",
@@ -205,6 +169,175 @@ public class FullTableBackupClient extends 
TableBackupClient {
     }
   }
 
+  private void handleContinuousBackup(Admin admin) throws IOException {
+    backupInfo.setPhase(BackupInfo.BackupPhase.SETUP_WAL_REPLICATION);
+    long startTimestamp = startContinuousWALBackup(admin);
+
+    performBackupSnapshots(admin);
+
+    backupManager.addContinuousBackupTableSet(backupInfo.getTables(), 
startTimestamp);
+
+    // set overall backup status: complete. Here we make sure to complete the 
backup.
+    // After this checkpoint, even if entering cancel process, will let the 
backup finished
+    backupInfo.setState(BackupState.COMPLETE);
+
+    if (!conf.getBoolean("hbase.replication.bulkload.enabled", false)) {
+      System.out.println("NOTE: Bulkload replication is not enabled. "
+        + "Bulk loaded files will not be backed up as part of continuous 
backup. "
+        + "To ensure bulk loaded files are included in the backup, please 
enable bulkload replication "
+        + "(hbase.replication.bulkload.enabled=true) and configure other 
necessary settings "
+        + "to properly enable bulkload replication.");
+    }
+  }
+
+  private void handleNonContinuousBackup(Admin admin) throws IOException {
+    initializeBackupStartCode(backupManager);
+    performLogRoll();
+    performBackupSnapshots(admin);
+    backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
+
+    // set overall backup status: complete. Here we make sure to complete the 
backup.
+    // After this checkpoint, even if entering cancel process, will let the 
backup finished
+    backupInfo.setState(BackupState.COMPLETE);
+
+    updateBackupMetadata();
+  }
+
+  private void initializeBackupStartCode(BackupManager backupManager) throws 
IOException {
+    String savedStartCode;
+    boolean firstBackup;
+    // do snapshot for full table backup
+    savedStartCode = backupManager.readBackupStartCode();
+    firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) == 
0L;
+    if (firstBackup) {
+      // This is our first backup. Let's put some marker to system table so 
that we can hold the
+      // logs while we do the backup.
+      backupManager.writeBackupStartCode(0L);
+    }
+  }
+
+  private void performLogRoll() throws IOException {
+    // We roll log here before we do the snapshot. It is possible there is 
duplicate data
+    // in the log that is already in the snapshot. But if we do it after the 
snapshot, we
+    // could have data loss.
+    // A better approach is to do the roll log on each RS in the same global 
procedure as
+    // the snapshot.
+    LOG.info("Execute roll log procedure for full backup ...");
+    BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf);
+    newTimestamps = backupManager.readRegionServerLastLogRollResult();
+  }
+
+  private void performBackupSnapshots(Admin admin) throws IOException {
+    backupInfo.setPhase(BackupPhase.SNAPSHOT);
+    performSnapshots(admin);
+    LOG.debug("Performing snapshot copy for backup ID: {}", 
backupInfo.getBackupId());
+    snapshotCopy(backupInfo);
+  }
+
+  private void performSnapshots(Admin admin) throws IOException {
+    backupInfo.setPhase(BackupPhase.SNAPSHOT);
+
+    for (TableName tableName : tableList) {
+      String snapshotName = String.format("snapshot_%d_%s_%s", 
EnvironmentEdgeManager.currentTime(),
+        tableName.getNamespaceAsString(), tableName.getQualifierAsString());
+      snapshotTable(admin, tableName, snapshotName);
+      backupInfo.setSnapshotName(tableName, snapshotName);
+    }
+  }
+
+  private void updateBackupMetadata() throws IOException {
+    // The table list in backupInfo is good for both full backup and 
incremental backup.
+    // For incremental backup, it contains the incremental backup table set.
+    backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), 
newTimestamps);
+    Map<TableName, Map<String, Long>> timestampMap = 
backupManager.readLogTimestampMap();
+    backupInfo.setTableSetTimestampMap(timestampMap);
+    Long newStartCode = 
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(timestampMap));
+    backupManager.writeBackupStartCode(newStartCode);
+  }
+
+  private long startContinuousWALBackup(Admin admin) throws IOException {
+    enableTableReplication(admin);
+    if (continuousBackupReplicationPeerExists(admin)) {
+      updateContinuousBackupReplicationPeer(admin);
+    } else {
+      addContinuousBackupReplicationPeer(admin);
+    }
+    LOG.info("Continuous WAL Backup setup completed.");
+    return EnvironmentEdgeManager.getDelegate().currentTime();
+  }
+
+  private void enableTableReplication(Admin admin) throws IOException {
+    for (TableName table : tableList) {
+      TableDescriptor tableDescriptor = admin.getDescriptor(table);
+      TableDescriptorBuilder tableDescriptorBuilder =
+        TableDescriptorBuilder.newBuilder(tableDescriptor);
+
+      for (ColumnFamilyDescriptor cfDescriptor : 
tableDescriptor.getColumnFamilies()) {
+        if (cfDescriptor.getScope() != REPLICATION_SCOPE_GLOBAL) {
+          ColumnFamilyDescriptor newCfDescriptor = 
ColumnFamilyDescriptorBuilder
+            
.newBuilder(cfDescriptor).setScope(REPLICATION_SCOPE_GLOBAL).build();
+
+          tableDescriptorBuilder.modifyColumnFamily(newCfDescriptor);
+        }
+      }
+
+      admin.modifyTable(tableDescriptorBuilder.build());
+      LOG.info("Enabled Global replication scope for table: {}", table);
+    }
+  }
+
+  private void updateContinuousBackupReplicationPeer(Admin admin) throws 
IOException {
+    Map<TableName, List<String>> tableMap = tableList.stream()
+      .collect(Collectors.toMap(tableName -> tableName, tableName -> new 
ArrayList<>()));
+
+    try {
+      admin.appendReplicationPeerTableCFs(CONTINUOUS_BACKUP_REPLICATION_PEER, 
tableMap);
+      LOG.info("Updated replication peer {} with table and column family map.",
+        CONTINUOUS_BACKUP_REPLICATION_PEER);
+    } catch (ReplicationException e) {
+      LOG.error("Error while updating the replication peer: {}. Error: {}",
+        CONTINUOUS_BACKUP_REPLICATION_PEER, e.getMessage(), e);
+      throw new IOException("Error while updating the continuous backup 
replication peer.", e);
+    }
+  }
+
+  private void addContinuousBackupReplicationPeer(Admin admin) throws 
IOException {
+    String backupWalDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+
+    if (backupWalDir == null || backupWalDir.isEmpty()) {
+      String errorMsg = "WAL Directory is not specified for continuous 
backup.";
+      LOG.error(errorMsg);
+      throw new IOException(errorMsg);
+    }
+
+    Map<String, String> additionalArgs = new HashMap<>();
+    additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString());
+    additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupWalDir);
+
+    Map<TableName, List<String>> tableMap = tableList.stream()
+      .collect(Collectors.toMap(tableName -> tableName, tableName -> new 
ArrayList<>()));
+
+    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      
.setReplicationEndpointImpl(DEFAULT_CONTINUOUS_BACKUP_REPLICATION_ENDPOINT)
+      
.setReplicateAllUserTables(false).setTableCFsMap(tableMap).putAllConfiguration(additionalArgs)
+      .build();
+
+    try {
+      admin.addReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER, peerConfig, 
true);
+      LOG.info("Successfully added replication peer with ID: {}",
+        CONTINUOUS_BACKUP_REPLICATION_PEER);
+    } catch (IOException e) {
+      LOG.error("Failed to add replication peer with ID: {}. Error: {}",
+        CONTINUOUS_BACKUP_REPLICATION_PEER, e.getMessage(), e);
+      throw e;
+    }
+  }
+
+  private boolean continuousBackupReplicationPeerExists(Admin admin) throws 
IOException {
+    return admin.listReplicationPeers().stream()
+      .anyMatch(peer -> 
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER));
+  }
+
   protected void snapshotTable(Admin admin, TableName tableName, String 
snapshotName)
     throws IOException {
     int maxAttempts = conf.getInt(BACKUP_MAX_ATTEMPTS_KEY, 
DEFAULT_BACKUP_MAX_ATTEMPTS);
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
index 30c27f01faa..9e31ca409ad 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
@@ -92,7 +92,7 @@ public abstract class TableBackupClient {
     this.fs = CommonFSUtils.getCurrentFileSystem(conf);
     backupInfo = backupManager.createBackupInfo(backupId, 
request.getBackupType(), tableList,
       request.getTargetRootDir(), request.getTotalTasks(), 
request.getBandwidth(),
-      request.getNoChecksumVerify());
+      request.getNoChecksumVerify(), request.isContinuousBackupEnabled());
     if (tableList == null || tableList.isEmpty()) {
       this.tableList = new ArrayList<>(backupInfo.getTables());
     }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
new file mode 100644
index 00000000000..fe44ebf420d
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category(LargeTests.class)
+public class TestContinuousBackup extends TestBackupBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestContinuousBackup.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestContinuousBackup.class);
+
+  String backupWalDirName = "TestContinuousBackupWalDir";
+
+  @Before
+  public void beforeTest() throws IOException {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    Path backupWalDir = new Path(root, backupWalDirName);
+    FileSystem fs = FileSystem.get(conf1);
+    fs.mkdirs(backupWalDir);
+    conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+  }
+
+  @After
+  public void afterTest() throws IOException {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    Path backupWalDir = new Path(root, backupWalDirName);
+    FileSystem fs = FileSystem.get(conf1);
+
+    if (fs.exists(backupWalDir)) {
+      fs.delete(backupWalDir, true);
+    }
+
+    conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    deleteContinuousBackupReplicationPeerIfExists(TEST_UTIL.getAdmin());
+  }
+
+  @Test
+  public void testContinuousBackupWithFullBackup() throws Exception {
+    LOG.info("Testing successful continuous backup with full backup");
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName = TableName.valueOf("table_" + methodName);
+    TEST_UTIL.createTable(tableName, "cf");
+
+    try (BackupSystemTable table = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      int before = table.getBackupHistory().size();
+
+      // Run backup
+      String[] args = buildBackupArgs("full", new TableName[] { tableName }, 
true);
+      int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+      assertEquals("Backup should succeed", 0, ret);
+
+      // Verify backup history increased and all the backups are succeeded
+      List<BackupInfo> backups = table.getBackupHistory();
+      assertEquals("Backup history should increase", before + 1, 
backups.size());
+      for (BackupInfo data : List.of(backups.get(0))) {
+        String backupId = data.getBackupId();
+        assertTrue(checkSucceeded(backupId));
+      }
+
+      // Verify backup manifest contains the correct tables
+      BackupManifest manifest = getLatestBackupManifest(backups);
+      assertEquals("Backup should contain the expected tables", 
Sets.newHashSet(tableName),
+        new HashSet<>(manifest.getTableList()));
+    }
+
+    // Verify replication peer subscription
+    verifyReplicationPeerSubscription(tableName);
+
+    // Verify table is registered in Backup System Table
+    verifyTableInBackupSystemTable(tableName);
+  }
+
+  @Test
+  public void testContinuousBackupForMultipleTables() throws Exception {
+    LOG.info("Test continuous backup for multiple tables");
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName1 = TableName.valueOf("table_" + methodName);
+    TEST_UTIL.createTable(tableName1, "cf");
+    TableName tableName2 = TableName.valueOf("table_" + methodName + "2");
+    TEST_UTIL.createTable(tableName2, "cf");
+
+    try (BackupSystemTable table = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      int before = table.getBackupHistory().size();
+
+      // Create full backup for table1
+      String[] args = buildBackupArgs("full", new TableName[] { tableName1 }, 
true);
+      int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+      assertEquals("Backup should succeed", 0, ret);
+
+      // Create full backup for table2
+      args = buildBackupArgs("full", new TableName[] { tableName2 }, true);
+      ret = ToolRunner.run(conf1, new BackupDriver(), args);
+      assertEquals("Backup should succeed", 0, ret);
+
+      // Verify backup history increased and all the backups are succeeded
+      List<BackupInfo> backups = table.getBackupHistory();
+      assertEquals("Backup history should increase", before + 2, 
backups.size());
+      for (BackupInfo data : List.of(backups.get(0), backups.get(1))) {
+        String backupId = data.getBackupId();
+        assertTrue(checkSucceeded(backupId));
+      }
+
+      // Verify backup manifest contains the correct tables
+      BackupManifest manifest = getLatestBackupManifest(backups);
+      assertEquals("Backup should contain the expected tables", 
Sets.newHashSet(tableName2),
+        new HashSet<>(manifest.getTableList()));
+    }
+
+    // Verify replication peer subscription for each table
+    verifyReplicationPeerSubscription(tableName1);
+    verifyReplicationPeerSubscription(tableName2);
+
+    // Verify tables are registered in Backup System Table
+    verifyTableInBackupSystemTable(tableName1);
+    verifyTableInBackupSystemTable(tableName2);
+  }
+
+  @Test
+  public void testInvalidBackupScenarioWithContinuousEnabled() throws 
Exception {
+    LOG.info("Testing invalid backup scenario with continuous backup enabled");
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName1 = TableName.valueOf("table_" + methodName);
+    TEST_UTIL.createTable(tableName1, "cf");
+    TableName tableName2 = TableName.valueOf("table_" + methodName + "2");
+    TEST_UTIL.createTable(tableName2, "cf");
+
+    try (BackupSystemTable table = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      int before = table.getBackupHistory().size();
+
+      // Create full backup for table1 with continuous backup enabled
+      String[] args = buildBackupArgs("full", new TableName[] { tableName1 }, 
true);
+      int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+      assertEquals("Backup should succeed", 0, ret);
+
+      // Create full backup for table2 without continuous backup enabled
+      args = buildBackupArgs("full", new TableName[] { tableName2 }, false);
+      ret = ToolRunner.run(conf1, new BackupDriver(), args);
+      assertEquals("Backup should succeed", 0, ret);
+
+      // Attempt full backup for both tables without continuous backup enabled 
(should fail)
+      args = buildBackupArgs("full", new TableName[] { tableName1, tableName2 
}, false);
+      ret = ToolRunner.run(conf1, new BackupDriver(), args);
+      assertTrue("Backup should fail due to mismatch in continuous backup 
settings", ret != 0);
+
+      // Verify backup history size is unchanged after the failed backup
+      int after = table.getBackupHistory().size();
+      assertEquals("Backup history should remain unchanged on failure", before 
+ 2, after);
+    }
+  }
+
+  @Test
+  public void testContinuousBackupWithWALDirNotSpecified() throws Exception {
+    LOG.info("Testing that continuous backup fails when WAL directory is not 
specified");
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName = TableName.valueOf("table_" + methodName);
+    TEST_UTIL.createTable(tableName, "cf");
+
+    conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    LOG.info("CONF_CONTINUOUS_BACKUP_WAL_DIR: {}", 
conf1.get(CONF_CONTINUOUS_BACKUP_WAL_DIR));
+
+    try (BackupSystemTable table = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      int before = table.getBackupHistory().size();
+
+      // Run full backup without specifying WAL directory (invalid scenario)
+      String[] args = buildBackupArgs("full", new TableName[] { tableName }, 
true);
+      int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+
+      assertTrue("Backup should fail when WAL directory is not specified", ret 
!= 0);
+
+      List<BackupInfo> backups = table.getBackupHistory();
+      int after = backups.size();
+      assertEquals("Backup history should increase", before + 1, after);
+
+      // last backup should be a failure
+      assertFalse(checkSucceeded(backups.get(0).getBackupId()));
+    }
+  }
+
+  @Test
+  public void testContinuousBackupWithIncrementalBackup() throws Exception {
+    LOG.info("Testing that continuous backup cannot be enabled with 
incremental backup");
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName = TableName.valueOf("table_" + methodName);
+    TEST_UTIL.createTable(tableName, "cf");
+
+    try (BackupSystemTable table = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      int before = table.getBackupHistory().size();
+
+      // Run incremental backup with continuous backup flag (invalid scenario)
+      String[] args = buildBackupArgs("incremental", new TableName[] { 
tableName }, true);
+      int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+
+      assertTrue("Backup should fail when using continuous backup with 
incremental mode", ret != 0);
+
+      // Backup history should remain unchanged
+      int after = table.getBackupHistory().size();
+      assertEquals("Backup history should remain unchanged on failure", 
before, after);
+    }
+  }
+
+  private void verifyReplicationPeerSubscription(TableName table) throws 
IOException {
+    try (Admin admin = TEST_UTIL.getAdmin()) {
+      ReplicationPeerDescription peerDesc = 
admin.listReplicationPeers().stream()
+        .filter(peer -> 
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)).findFirst()
+        .orElseThrow(() -> new AssertionError("Replication peer not found"));
+
+      assertTrue("Table should be subscribed to the replication peer",
+        peerDesc.getPeerConfig().getTableCFsMap().containsKey(table));
+    }
+  }
+
+  private String[] buildBackupArgs(String backupType, TableName[] tables,
+    boolean continuousEnabled) {
+    String tableNames =
+      
Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+    if (continuousEnabled) {
+      return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-t", 
tableNames,
+        "-" + OPTION_ENABLE_CONTINUOUS_BACKUP };
+    } else {
+      return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-t", 
tableNames };
+    }
+  }
+
+  private BackupManifest getLatestBackupManifest(List<BackupInfo> backups) 
throws IOException {
+    BackupInfo newestBackup = backups.get(0);
+    return HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR),
+      newestBackup.getBackupId());
+  }
+
+  private void verifyTableInBackupSystemTable(TableName table) throws 
IOException {
+    try (BackupSystemTable backupTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      Map<TableName, Long> tableBackupMap = 
backupTable.getContinuousBackupTableSet();
+
+      assertTrue("Table is missing in the continuous backup table set",
+        tableBackupMap.containsKey(table));
+
+      assertTrue("Timestamp for table should be greater than 0", 
tableBackupMap.get(table) > 0);
+    }
+  }
+
+  private void deleteContinuousBackupReplicationPeerIfExists(Admin admin) 
throws IOException {
+    if (
+      admin.listReplicationPeers().stream()
+        .anyMatch(peer -> 
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER))
+    ) {
+      admin.disableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
+      admin.removeReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
+    }
+  }
+
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Backup.proto 
b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
index a114001ba50..95a29867325 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Backup.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
@@ -118,5 +118,6 @@ message BackupInfo {
     SNAPSHOTCOPY = 3;
     INCREMENTAL_COPY = 4;
     STORE_MANIFEST = 5;
+    SETUP_WAL_REPLICATION = 6;
   }
 }

Reply via email to