This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push:
new fcee6f7e680 HBASE-29025: Enhance the full backup command to support
Continuous Backup (#6710)
fcee6f7e680 is described below
commit fcee6f7e680984279d04d508cda6ce433a677689
Author: vinayak hegde <[email protected]>
AuthorDate: Tue Mar 4 22:27:44 2025 +0530
HBASE-29025: Enhance the full backup command to support Continuous Backup
(#6710)
* HBASE-29025: Enhance the full backup command to support continuous backup
* add new check for full backup command regards to continuous backup flag
* minor fixes
---
.../apache/hadoop/hbase/backup/BackupDriver.java | 6 +-
.../org/apache/hadoop/hbase/backup/BackupInfo.java | 12 +
.../apache/hadoop/hbase/backup/BackupRequest.java | 14 +
.../hbase/backup/BackupRestoreConstants.java | 12 +
.../hadoop/hbase/backup/impl/BackupAdminImpl.java | 3 +-
.../hadoop/hbase/backup/impl/BackupCommands.java | 63 ++++-
.../hadoop/hbase/backup/impl/BackupManager.java | 18 +-
.../hbase/backup/impl/BackupSystemTable.java | 94 +++++++
.../hbase/backup/impl/FullTableBackupClient.java | 255 +++++++++++++----
.../hbase/backup/impl/TableBackupClient.java | 2 +-
.../hadoop/hbase/backup/TestContinuousBackup.java | 302 +++++++++++++++++++++
.../src/main/protobuf/Backup.proto | 1 +
12 files changed, 713 insertions(+), 69 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
index d55a280b4aa..e096bbee161 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupDriver.java
@@ -17,11 +17,14 @@
*/
package org.apache.hadoop.hbase.backup;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.LONG_OPTION_ENABLE_CONTINUOUS_BACKUP;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP_DESC;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM_DESC;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_KEEP;
@@ -159,7 +162,8 @@ public class BackupDriver extends AbstractHBaseTool {
addOptWithArg(OPTION_PATH, OPTION_PATH_DESC);
addOptWithArg(OPTION_KEEP, OPTION_KEEP_DESC);
addOptWithArg(OPTION_YARN_QUEUE_NAME, OPTION_YARN_QUEUE_NAME_DESC);
-
+ addOptNoArg(OPTION_ENABLE_CONTINUOUS_BACKUP,
LONG_OPTION_ENABLE_CONTINUOUS_BACKUP,
+ OPTION_ENABLE_CONTINUOUS_BACKUP_DESC);
}
@Override
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
index f0dc10b8361..862a9cbad10 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
@@ -71,6 +71,7 @@ public class BackupInfo implements Comparable<BackupInfo> {
*/
public enum BackupPhase {
REQUEST,
+ SETUP_WAL_REPLICATION,
SNAPSHOT,
PREPARE_INCREMENTAL,
SNAPSHOTCOPY,
@@ -170,6 +171,8 @@ public class BackupInfo implements Comparable<BackupInfo> {
*/
private boolean noChecksumVerify;
+ private boolean continuousBackupEnabled;
+
public BackupInfo() {
backupTableInfoMap = new HashMap<>();
}
@@ -185,6 +188,7 @@ public class BackupInfo implements Comparable<BackupInfo> {
}
this.startTs = 0;
this.completeTs = 0;
+ this.continuousBackupEnabled = false;
}
public int getWorkers() {
@@ -592,4 +596,12 @@ public class BackupInfo implements Comparable<BackupInfo> {
Long otherTS =
Long.valueOf(o.getBackupId().substring(o.getBackupId().lastIndexOf("_") + 1));
return thisTS.compareTo(otherTS);
}
+
+ public void setContinuousBackupEnabled(boolean continuousBackupEnabled) {
+ this.continuousBackupEnabled = continuousBackupEnabled;
+ }
+
+ public boolean isContinuousBackupEnabled() {
+ return this.continuousBackupEnabled;
+ }
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRequest.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRequest.java
index aa2d5b44259..822c84c57c0 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRequest.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRequest.java
@@ -75,6 +75,11 @@ public final class BackupRequest {
return this;
}
+ public Builder withContinuousBackupEnabled(boolean
continuousBackupEnabled) {
+ request.setContinuousBackupEnabled(continuousBackupEnabled);
+ return this;
+ }
+
public BackupRequest build() {
return request;
}
@@ -89,6 +94,7 @@ public final class BackupRequest {
private boolean noChecksumVerify = false;
private String backupSetName;
private String yarnPoolName;
+ private boolean continuousBackupEnabled;
private BackupRequest() {
}
@@ -163,4 +169,12 @@ public final class BackupRequest {
public void setYarnPoolName(String yarnPoolName) {
this.yarnPoolName = yarnPoolName;
}
+
+ private void setContinuousBackupEnabled(boolean continuousBackupEnabled) {
+ this.continuousBackupEnabled = continuousBackupEnabled;
+ }
+
+ public boolean isContinuousBackupEnabled() {
+ return this.continuousBackupEnabled;
+ }
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
index 30a5674eb02..5d35c8bc3fa 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreConstants.java
@@ -96,6 +96,11 @@ public interface BackupRestoreConstants {
String OPTION_YARN_QUEUE_NAME_DESC = "Yarn queue name to run backup create
command on";
String OPTION_YARN_QUEUE_NAME_RESTORE_DESC = "Yarn queue name to run backup
restore command on";
+ String OPTION_ENABLE_CONTINUOUS_BACKUP = "cb";
+ String LONG_OPTION_ENABLE_CONTINUOUS_BACKUP = "continuous-backup-enabled";
+ String OPTION_ENABLE_CONTINUOUS_BACKUP_DESC =
+ "Flag indicating that the full backup is part of a continuous backup
process.";
+
String JOB_NAME_CONF_KEY = "mapreduce.job.name";
String BACKUP_CONFIG_STRING =
@@ -122,6 +127,13 @@ public interface BackupRestoreConstants {
String BACKUPID_PREFIX = "backup_";
+ String CONTINUOUS_BACKUP_REPLICATION_PEER =
"continuous_backup_replication_peer";
+
+ String DEFAULT_CONTINUOUS_BACKUP_REPLICATION_ENDPOINT =
+
"org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint";
+
+ String CONF_CONTINUOUS_BACKUP_WAL_DIR = "hbase.backup.continuous.wal.dir";
+
enum BackupCommand {
CREATE,
CANCEL,
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index c36b398e5e8..1e745c69cda 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -581,7 +581,8 @@ public class BackupAdminImpl implements BackupAdmin {
request =
builder.withBackupType(request.getBackupType()).withTableList(tableList)
.withTargetRootDir(request.getTargetRootDir()).withBackupSetName(request.getBackupSetName())
.withTotalTasks(request.getTotalTasks()).withBandwidthPerTasks((int)
request.getBandwidth())
- .withNoChecksumVerify(request.getNoChecksumVerify()).build();
+ .withNoChecksumVerify(request.getNoChecksumVerify())
+
.withContinuousBackupEnabled(request.isContinuousBackupEnabled()).build();
TableBackupClient client;
try {
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index 66694f4384f..ab9ca1c4ed2 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -22,6 +22,8 @@ import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDW
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH_DESC;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_DEBUG_DESC;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP_DESC;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_IGNORECHECKSUM_DESC;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_KEEP;
@@ -45,6 +47,7 @@ import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_
import java.io.IOException;
import java.net.URI;
import java.util.List;
+import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -339,14 +342,64 @@ public final class BackupCommands {
boolean ignoreChecksum = cmdline.hasOption(OPTION_IGNORECHECKSUM);
+ BackupType backupType = BackupType.valueOf(args[1].toUpperCase());
+ List<TableName> tableNameList = null;
+ if (tables != null) {
+ tableNameList =
Lists.newArrayList(BackupUtils.parseTableNames(tables));
+ }
+ boolean continuousBackup =
cmdline.hasOption(OPTION_ENABLE_CONTINUOUS_BACKUP);
+ if (continuousBackup && !BackupType.FULL.equals(backupType)) {
+ System.out.println("ERROR: Continuous backup can Only be specified for
Full Backup");
+ printUsage();
+ throw new IOException(INCORRECT_USAGE);
+ }
+
+ /*
+ * The `continuousBackup` flag is specified only during the first full
backup to initiate
+ * continuous WAL replication. After that, it is redundant because the
tables are already set
+ * up for continuous backup. If the `continuousBackup` flag is not
explicitly enabled, we need
+ * to determine the backup mode based on the current state of the
specified tables: - If all
+ * the specified tables are already part of continuous backup, we treat
the request as a
+ * continuous backup request and proceed accordingly (since these tables
are already
+ * continuously backed up, no additional setup is needed). - If none of
the specified tables
+ * are part of continuous backup, we treat the request as a normal full
backup without
+ * continuous backup. - If the request includes a mix of tables—some
with continuous backup
+ * enabled and others without—we cannot determine a clear backup
strategy. In this case, we
+ * throw an error. If all tables are already in continuous backup mode,
we explicitly set the
+ * `continuousBackup` flag to `true` so that the request is processed
using the continuous
+ * backup approach rather than the normal full backup flow.
+ */
+ if (!continuousBackup && tableNameList != null &&
!tableNameList.isEmpty()) {
+ try (BackupSystemTable backupSystemTable = new
BackupSystemTable(conn)) {
+ Set<TableName> continuousBackupTableSet =
+ backupSystemTable.getContinuousBackupTableSet().keySet();
+
+ boolean allTablesInContinuousBackup =
continuousBackupTableSet.containsAll(tableNameList);
+ boolean noTablesInContinuousBackup =
+
tableNameList.stream().noneMatch(continuousBackupTableSet::contains);
+
+ // Ensure that all tables are either fully in continuous backup or
not at all
+ if (!allTablesInContinuousBackup && !noTablesInContinuousBackup) {
+ System.err
+ .println("ERROR: Some tables are already in continuous backup,
while others are not. "
+ + "Cannot mix both in a single request.");
+ printUsage();
+ throw new IOException(INCORRECT_USAGE);
+ }
+
+ // If all tables are already in continuous backup, enable the flag
+ if (allTablesInContinuousBackup) {
+ continuousBackup = true;
+ }
+ }
+ }
+
try (BackupAdminImpl admin = new BackupAdminImpl(conn)) {
BackupRequest.Builder builder = new BackupRequest.Builder();
- BackupRequest request =
builder.withBackupType(BackupType.valueOf(args[1].toUpperCase()))
- .withTableList(
- tables != null ?
Lists.newArrayList(BackupUtils.parseTableNames(tables)) : null)
+ BackupRequest request =
builder.withBackupType(backupType).withTableList(tableNameList)
.withTargetRootDir(targetBackupDir).withTotalTasks(workers)
.withBandwidthPerTasks(bandwidth).withNoChecksumVerify(ignoreChecksum)
- .withBackupSetName(setName).build();
+
.withBackupSetName(setName).withContinuousBackupEnabled(continuousBackup).build();
String backupId = admin.backupTables(request);
System.out.println("Backup session " + backupId + " finished. Status:
SUCCESS");
} catch (IOException e) {
@@ -400,6 +453,8 @@ public final class BackupCommands {
options.addOption(OPTION_YARN_QUEUE_NAME, true,
OPTION_YARN_QUEUE_NAME_DESC);
options.addOption(OPTION_DEBUG, false, OPTION_DEBUG_DESC);
options.addOption(OPTION_IGNORECHECKSUM, false,
OPTION_IGNORECHECKSUM_DESC);
+ options.addOption(OPTION_ENABLE_CONTINUOUS_BACKUP, false,
+ OPTION_ENABLE_CONTINUOUS_BACKUP_DESC);
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.setLeftPadding(2);
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 41dc300abfa..a836ac1e09e 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -193,8 +193,8 @@ public class BackupManager implements Closeable {
* @throws BackupException exception
*/
public BackupInfo createBackupInfo(String backupId, BackupType type,
List<TableName> tableList,
- String targetRootDir, int workers, long bandwidth, boolean
noChecksumVerify)
- throws BackupException {
+ String targetRootDir, int workers, long bandwidth, boolean
noChecksumVerify,
+ boolean continuousBackupEnabled) throws BackupException {
if (targetRootDir == null) {
throw new BackupException("Wrong backup request parameter: target backup
root directory");
}
@@ -232,6 +232,7 @@ public class BackupManager implements Closeable {
backupInfo.setBandwidth(bandwidth);
backupInfo.setWorkers(workers);
backupInfo.setNoChecksumVerify(noChecksumVerify);
+ backupInfo.setContinuousBackupEnabled(continuousBackupEnabled);
return backupInfo;
}
@@ -422,4 +423,17 @@ public class BackupManager implements Closeable {
public Connection getConnection() {
return conn;
}
+
+ /**
+ * Adds a set of tables to the global continuous backup set. Only tables
that do not already have
+ * continuous backup enabled will be updated.
+ * @param tables set of tables to add to continuous backup
+ * @param startTimestamp timestamp indicating when continuous backup started
for newly added
+ * tables
+ * @throws IOException if an error occurs while updating the backup system
table
+ */
+ public void addContinuousBackupTableSet(Set<TableName> tables, long
startTimestamp)
+ throws IOException {
+ systemTable.addContinuousBackupTableSet(tables, startTimestamp);
+ }
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 2ec6c6adbd4..cfe2e5b80a8 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -168,6 +168,7 @@ public final class BackupSystemTable implements Closeable {
private final static byte[] ACTIVE_SESSION_NO = Bytes.toBytes("no");
private final static String INCR_BACKUP_SET = "incrbackupset:";
+ private final static String CONTINUOUS_BACKUP_SET = "continuousbackupset";
private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
private final static String RS_LOG_TS_PREFIX = "rslogts:";
@@ -1025,6 +1026,37 @@ public final class BackupSystemTable implements
Closeable {
}
}
+ /**
+ * Retrieves the current set of tables covered by continuous backup along
with the timestamp
+ * indicating when continuous backup started for each table.
+ * @return a map where the key is the table name and the value is the
timestamp representing the
+ * start time of continuous backup for that table.
+ * @throws IOException if an I/O error occurs while accessing the backup
system table.
+ */
+ public Map<TableName, Long> getContinuousBackupTableSet() throws IOException
{
+ LOG.trace("Retrieving continuous backup table set from the backup system
table.");
+ Map<TableName, Long> tableMap = new TreeMap<>();
+
+ try (Table systemTable = connection.getTable(tableName)) {
+ Get getOperation = createGetForContinuousBackupTableSet();
+ Result result = systemTable.get(getOperation);
+
+ if (result.isEmpty()) {
+ return tableMap;
+ }
+
+ // Extract table names and timestamps from the result cells
+ List<Cell> cells = result.listCells();
+ for (Cell cell : cells) {
+ TableName tableName = TableName.valueOf(CellUtil.cloneQualifier(cell));
+ long timestamp = Bytes.toLong(CellUtil.cloneValue(cell));
+ tableMap.put(tableName, timestamp);
+ }
+ }
+
+ return tableMap;
+ }
+
/**
* Add tables to global incremental backup set
* @param tables set of tables
@@ -1046,6 +1078,34 @@ public final class BackupSystemTable implements
Closeable {
}
}
+ /**
+ * Add tables to the global continuous backup set. Only updates tables that
are not already in the
+ * continuous backup set.
+ * @param tables set of tables to add
+ * @param startTimestamp timestamp indicating when continuous backup started
+ * @throws IOException if an error occurs while updating the backup system
table
+ */
+ public void addContinuousBackupTableSet(Set<TableName> tables, long
startTimestamp)
+ throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Add continuous backup table set to backup system table.
tables ["
+ + StringUtils.join(tables, " ") + "]");
+ }
+ if (LOG.isDebugEnabled()) {
+ tables.forEach(table -> LOG.debug(Objects.toString(table)));
+ }
+
+ // Get existing continuous backup tables
+ Map<TableName, Long> existingTables = getContinuousBackupTableSet();
+
+ try (Table table = connection.getTable(tableName)) {
+ Put put = createPutForContinuousBackupTableSet(tables, existingTables,
startTimestamp);
+ if (!put.isEmpty()) {
+ table.put(put);
+ }
+ }
+ }
+
/**
* Deletes incremental backup set for a backup destination
* @param backupRoot backup root
@@ -1374,6 +1434,18 @@ public final class BackupSystemTable implements
Closeable {
return get;
}
+ /**
+ * Creates a Get operation to retrieve the continuous backup table set from
the backup system
+ * table.
+ * @return a Get operation for retrieving the table set
+ */
+ private Get createGetForContinuousBackupTableSet() throws IOException {
+ Get get = new Get(rowkey(CONTINUOUS_BACKUP_SET));
+ get.addFamily(BackupSystemTable.META_FAMILY);
+ get.readVersions(1);
+ return get;
+ }
+
/**
* Creates Put to store incremental backup table set
* @param tables tables
@@ -1388,6 +1460,28 @@ public final class BackupSystemTable implements
Closeable {
return put;
}
+ /**
+ * Creates a Put operation to store the continuous backup table set. Only
includes tables that are
+ * not already in the set.
+ * @param tables tables to add
+ * @param existingTables tables that already have continuous backup enabled
+ * @param startTimestamp timestamp indicating when continuous backup started
+ * @return put operation
+ */
+ private Put createPutForContinuousBackupTableSet(Set<TableName> tables,
+ Map<TableName, Long> existingTables, long startTimestamp) {
+ Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET));
+
+ for (TableName table : tables) {
+ if (!existingTables.containsKey(table)) {
+ put.addColumn(BackupSystemTable.META_FAMILY,
Bytes.toBytes(table.getNameAsString()),
+ Bytes.toBytes(startTimestamp));
+ }
+ }
+
+ return put;
+ }
+
/**
* Creates Delete for incremental backup table set
* @param backupRoot backup root
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index c4017e8c1a1..d71c6ce6b4d 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -17,16 +17,25 @@
*/
package org.apache.hadoop.hbase.backup.impl;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_ATTEMPTS_PAUSE_MS;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_BACKUP_MAX_ATTEMPTS;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_REPLICATION_ENDPOINT;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupCopyJob;
import org.apache.hadoop.hbase.backup.BackupInfo;
@@ -38,7 +47,13 @@ import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -62,9 +77,9 @@ public class FullTableBackupClient extends TableBackupClient {
/**
* Do snapshot copy.
* @param backupInfo backup info
- * @throws Exception exception
+ * @throws IOException exception
*/
- protected void snapshotCopy(BackupInfo backupInfo) throws Exception {
+ protected void snapshotCopy(BackupInfo backupInfo) throws IOException {
LOG.info("Snapshot copy is starting.");
// set overall backup phase: snapshot_copy
@@ -132,67 +147,14 @@ public class FullTableBackupClient extends
TableBackupClient {
@Override
public void execute() throws IOException {
try (Admin admin = conn.getAdmin()) {
- // Begin BACKUP
beginBackup(backupManager, backupInfo);
- String savedStartCode;
- boolean firstBackup;
- // do snapshot for full table backup
-
- savedStartCode = backupManager.readBackupStartCode();
- firstBackup = savedStartCode == null || Long.parseLong(savedStartCode)
== 0L;
- if (firstBackup) {
- // This is our first backup. Let's put some marker to system table so
that we can hold the
- // logs while we do the backup.
- backupManager.writeBackupStartCode(0L);
- }
- // We roll log here before we do the snapshot. It is possible there is
duplicate data
- // in the log that is already in the snapshot. But if we do it after the
snapshot, we
- // could have data loss.
- // A better approach is to do the roll log on each RS in the same global
procedure as
- // the snapshot.
- LOG.info("Execute roll log procedure for full backup ...");
-
- Map<String, String> props = new HashMap<>();
- props.put("backupRoot", backupInfo.getBackupRootDir());
-
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
- LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
-
- newTimestamps = backupManager.readRegionServerLastLogRollResult();
-
- // SNAPSHOT_TABLES:
- backupInfo.setPhase(BackupPhase.SNAPSHOT);
- for (TableName tableName : tableList) {
- String snapshotName = "snapshot_" +
Long.toString(EnvironmentEdgeManager.currentTime())
- + "_" + tableName.getNamespaceAsString() + "_" +
tableName.getQualifierAsString();
-
- snapshotTable(admin, tableName, snapshotName);
- backupInfo.setSnapshotName(tableName, snapshotName);
+
+ if (backupInfo.isContinuousBackupEnabled()) {
+ handleContinuousBackup(admin);
+ } else {
+ handleNonContinuousBackup(admin);
}
- // SNAPSHOT_COPY:
- // do snapshot copy
- LOG.debug("snapshot copy for " + backupId);
- snapshotCopy(backupInfo);
- // Updates incremental backup table set
- backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
-
- // BACKUP_COMPLETE:
- // set overall backup status: complete. Here we make sure to complete
the backup.
- // After this checkpoint, even if entering cancel process, will let the
backup finished
- backupInfo.setState(BackupState.COMPLETE);
- // The table list in backupInfo is good for both full backup and
incremental backup.
- // For incremental backup, it contains the incremental backup table set.
- backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(),
newTimestamps);
-
- Map<TableName, Map<String, Long>> newTableSetTimestampMap =
- backupManager.readLogTimestampMap();
-
- backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
- Long newStartCode =
-
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
- backupManager.writeBackupStartCode(newStartCode);
-
- // backup complete
completeBackup(conn, backupInfo, BackupType.FULL, conf);
} catch (Exception e) {
failBackup(conn, backupInfo, backupManager, e, "Unexpected
BackupException : ",
@@ -201,6 +163,179 @@ public class FullTableBackupClient extends
TableBackupClient {
}
}
+ private void handleContinuousBackup(Admin admin) throws IOException {
+ backupInfo.setPhase(BackupInfo.BackupPhase.SETUP_WAL_REPLICATION);
+ long startTimestamp = startContinuousWALBackup(admin);
+
+ performBackupSnapshots(admin);
+
+ backupManager.addContinuousBackupTableSet(backupInfo.getTables(),
startTimestamp);
+
+ // set overall backup status: complete. Here we make sure to complete the
backup.
+ // After this checkpoint, even if entering cancel process, will let the
backup finished
+ backupInfo.setState(BackupState.COMPLETE);
+
+ if (!conf.getBoolean("hbase.replication.bulkload.enabled", false)) {
+ System.out.println("NOTE: Bulkload replication is not enabled. "
+ + "Bulk loaded files will not be backed up as part of continuous
backup. "
+ + "To ensure bulk loaded files are included in the backup, please
enable bulkload replication "
+ + "(hbase.replication.bulkload.enabled=true) and configure other
necessary settings "
+ + "to properly enable bulkload replication.");
+ }
+ }
+
+ private void handleNonContinuousBackup(Admin admin) throws IOException {
+ initializeBackupStartCode(backupManager);
+ performLogRoll(admin);
+ performBackupSnapshots(admin);
+ backupManager.addIncrementalBackupTableSet(backupInfo.getTables());
+
+ // set overall backup status: complete. Here we make sure to complete the
backup.
+ // After this checkpoint, even if entering cancel process, will let the
backup finished
+ backupInfo.setState(BackupState.COMPLETE);
+
+ updateBackupMetadata();
+ }
+
+ private void initializeBackupStartCode(BackupManager backupManager) throws
IOException {
+ String savedStartCode;
+ boolean firstBackup;
+ // do snapshot for full table backup
+ savedStartCode = backupManager.readBackupStartCode();
+ firstBackup = savedStartCode == null || Long.parseLong(savedStartCode) ==
0L;
+ if (firstBackup) {
+ // This is our first backup. Let's put some marker to system table so
that we can hold the
+ // logs while we do the backup.
+ backupManager.writeBackupStartCode(0L);
+ }
+ }
+
+ private void performLogRoll(Admin admin) throws IOException {
+ // We roll log here before we do the snapshot. It is possible there is
duplicate data
+ // in the log that is already in the snapshot. But if we do it after the
snapshot, we
+ // could have data loss.
+ // A better approach is to do the roll log on each RS in the same global
procedure as
+ // the snapshot.
+ LOG.info("Execute roll log procedure for full backup ...");
+ Map<String, String> props = new HashMap<>();
+ props.put("backupRoot", backupInfo.getBackupRootDir());
+
admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE,
+ LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
+
+ newTimestamps = backupManager.readRegionServerLastLogRollResult();
+ }
+
+ private void performBackupSnapshots(Admin admin) throws IOException {
+ backupInfo.setPhase(BackupPhase.SNAPSHOT);
+ performSnapshots(admin);
+ LOG.debug("Performing snapshot copy for backup ID: {}",
backupInfo.getBackupId());
+ snapshotCopy(backupInfo);
+ }
+
+ private void performSnapshots(Admin admin) throws IOException {
+ backupInfo.setPhase(BackupPhase.SNAPSHOT);
+
+ for (TableName tableName : tableList) {
+ String snapshotName = String.format("snapshot_%d_%s_%s",
EnvironmentEdgeManager.currentTime(),
+ tableName.getNamespaceAsString(), tableName.getQualifierAsString());
+ snapshotTable(admin, tableName, snapshotName);
+ backupInfo.setSnapshotName(tableName, snapshotName);
+ }
+ }
+
+ private void updateBackupMetadata() throws IOException {
+ // The table list in backupInfo is good for both full backup and
incremental backup.
+ // For incremental backup, it contains the incremental backup table set.
+ backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(),
newTimestamps);
+ Map<TableName, Map<String, Long>> timestampMap =
backupManager.readLogTimestampMap();
+ backupInfo.setTableSetTimestampMap(timestampMap);
+ Long newStartCode =
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(timestampMap));
+ backupManager.writeBackupStartCode(newStartCode);
+ }
+
+ private long startContinuousWALBackup(Admin admin) throws IOException {
+ enableTableReplication(admin);
+ if (continuousBackupReplicationPeerExists(admin)) {
+ updateContinuousBackupReplicationPeer(admin);
+ } else {
+ addContinuousBackupReplicationPeer(admin);
+ }
+ LOG.info("Continuous WAL Backup setup completed.");
+ return EnvironmentEdgeManager.getDelegate().currentTime();
+ }
+
+ private void enableTableReplication(Admin admin) throws IOException {
+ for (TableName table : tableList) {
+ TableDescriptor tableDescriptor = admin.getDescriptor(table);
+ TableDescriptorBuilder tableDescriptorBuilder =
+ TableDescriptorBuilder.newBuilder(tableDescriptor);
+
+ for (ColumnFamilyDescriptor cfDescriptor :
tableDescriptor.getColumnFamilies()) {
+ if (cfDescriptor.getScope() != REPLICATION_SCOPE_GLOBAL) {
+ ColumnFamilyDescriptor newCfDescriptor =
ColumnFamilyDescriptorBuilder
+
.newBuilder(cfDescriptor).setScope(REPLICATION_SCOPE_GLOBAL).build();
+
+ tableDescriptorBuilder.modifyColumnFamily(newCfDescriptor);
+ }
+ }
+
+ admin.modifyTable(tableDescriptorBuilder.build());
+ LOG.info("Enabled Global replication scope for table: {}", table);
+ }
+ }
+
+ private void updateContinuousBackupReplicationPeer(Admin admin) throws
IOException {
+ Map<TableName, List<String>> tableMap = tableList.stream()
+ .collect(Collectors.toMap(tableName -> tableName, tableName -> new
ArrayList<>()));
+
+ try {
+ admin.appendReplicationPeerTableCFs(CONTINUOUS_BACKUP_REPLICATION_PEER,
tableMap);
+ LOG.info("Updated replication peer {} with table and column family map.",
+ CONTINUOUS_BACKUP_REPLICATION_PEER);
+ } catch (ReplicationException e) {
+ LOG.error("Error while updating the replication peer: {}. Error: {}",
+ CONTINUOUS_BACKUP_REPLICATION_PEER, e.getMessage(), e);
+ throw new IOException("Error while updating the continuous backup
replication peer.", e);
+ }
+ }
+
+ private void addContinuousBackupReplicationPeer(Admin admin) throws
IOException {
+ String backupWalDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+
+ if (backupWalDir == null || backupWalDir.isEmpty()) {
+ String errorMsg = "WAL Directory is not specified for continuous
backup.";
+ LOG.error(errorMsg);
+ throw new IOException(errorMsg);
+ }
+
+ Map<String, String> additionalArgs = new HashMap<>();
+ additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString());
+ additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupWalDir);
+
+ Map<TableName, List<String>> tableMap = tableList.stream()
+ .collect(Collectors.toMap(tableName -> tableName, tableName -> new
ArrayList<>()));
+
+ ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+
.setReplicationEndpointImpl(DEFAULT_CONTINUOUS_BACKUP_REPLICATION_ENDPOINT)
+
.setReplicateAllUserTables(false).setTableCFsMap(tableMap).putAllConfiguration(additionalArgs)
+ .build();
+
+ try {
+ admin.addReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER, peerConfig,
true);
+ LOG.info("Successfully added replication peer with ID: {}",
+ CONTINUOUS_BACKUP_REPLICATION_PEER);
+ } catch (IOException e) {
+ LOG.error("Failed to add replication peer with ID: {}. Error: {}",
+ CONTINUOUS_BACKUP_REPLICATION_PEER, e.getMessage(), e);
+ throw e;
+ }
+ }
+
+ private boolean continuousBackupReplicationPeerExists(Admin admin) throws
IOException {
+ return admin.listReplicationPeers().stream()
+ .anyMatch(peer ->
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER));
+ }
+
protected void snapshotTable(Admin admin, TableName tableName, String
snapshotName)
throws IOException {
int maxAttempts = conf.getInt(BACKUP_MAX_ATTEMPTS_KEY,
DEFAULT_BACKUP_MAX_ATTEMPTS);
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
index 30c27f01faa..9e31ca409ad 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/TableBackupClient.java
@@ -92,7 +92,7 @@ public abstract class TableBackupClient {
this.fs = CommonFSUtils.getCurrentFileSystem(conf);
backupInfo = backupManager.createBackupInfo(backupId,
request.getBackupType(), tableList,
request.getTargetRootDir(), request.getTotalTasks(),
request.getBandwidth(),
- request.getNoChecksumVerify());
+ request.getNoChecksumVerify(), request.isContinuousBackupEnabled());
if (tableList == null || tableList.isEmpty()) {
this.tableList = new ArrayList<>(backupInfo.getTables());
}
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
new file mode 100644
index 00000000000..fe44ebf420d
--- /dev/null
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category(LargeTests.class)
+public class TestContinuousBackup extends TestBackupBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestContinuousBackup.class);
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestContinuousBackup.class);
+
+ String backupWalDirName = "TestContinuousBackupWalDir";
+
+ @Before
+ public void beforeTest() throws IOException {
+ Path root = TEST_UTIL.getDataTestDirOnTestFS();
+ Path backupWalDir = new Path(root, backupWalDirName);
+ FileSystem fs = FileSystem.get(conf1);
+ fs.mkdirs(backupWalDir);
+ conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+ }
+
+ @After
+ public void afterTest() throws IOException {
+ Path root = TEST_UTIL.getDataTestDirOnTestFS();
+ Path backupWalDir = new Path(root, backupWalDirName);
+ FileSystem fs = FileSystem.get(conf1);
+
+ if (fs.exists(backupWalDir)) {
+ fs.delete(backupWalDir, true);
+ }
+
+ conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ deleteContinuousBackupReplicationPeerIfExists(TEST_UTIL.getAdmin());
+ }
+
+ @Test
+ public void testContinuousBackupWithFullBackup() throws Exception {
+ LOG.info("Testing successful continuous backup with full backup");
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName = TableName.valueOf("table_" + methodName);
+ TEST_UTIL.createTable(tableName, "cf");
+
+ try (BackupSystemTable table = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ int before = table.getBackupHistory().size();
+
+ // Run backup
+ String[] args = buildBackupArgs("full", new TableName[] { tableName },
true);
+ int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Backup should succeed", 0, ret);
+
+ // Verify backup history increased and all the backups are succeeded
+ List<BackupInfo> backups = table.getBackupHistory();
+ assertEquals("Backup history should increase", before + 1,
backups.size());
+ for (BackupInfo data : List.of(backups.get(0))) {
+ String backupId = data.getBackupId();
+ assertTrue(checkSucceeded(backupId));
+ }
+
+ // Verify backup manifest contains the correct tables
+ BackupManifest manifest = getLatestBackupManifest(backups);
+ assertEquals("Backup should contain the expected tables",
Sets.newHashSet(tableName),
+ new HashSet<>(manifest.getTableList()));
+ }
+
+ // Verify replication peer subscription
+ verifyReplicationPeerSubscription(tableName);
+
+ // Verify table is registered in Backup System Table
+ verifyTableInBackupSystemTable(tableName);
+ }
+
+ @Test
+ public void testContinuousBackupForMultipleTables() throws Exception {
+ LOG.info("Test continuous backup for multiple tables");
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName1 = TableName.valueOf("table_" + methodName);
+ TEST_UTIL.createTable(tableName1, "cf");
+ TableName tableName2 = TableName.valueOf("table_" + methodName + "2");
+ TEST_UTIL.createTable(tableName2, "cf");
+
+ try (BackupSystemTable table = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ int before = table.getBackupHistory().size();
+
+ // Create full backup for table1
+ String[] args = buildBackupArgs("full", new TableName[] { tableName1 },
true);
+ int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Backup should succeed", 0, ret);
+
+ // Create full backup for table2
+ args = buildBackupArgs("full", new TableName[] { tableName2 }, true);
+ ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Backup should succeed", 0, ret);
+
+ // Verify backup history increased and all the backups are succeeded
+ List<BackupInfo> backups = table.getBackupHistory();
+ assertEquals("Backup history should increase", before + 2,
backups.size());
+ for (BackupInfo data : List.of(backups.get(0), backups.get(1))) {
+ String backupId = data.getBackupId();
+ assertTrue(checkSucceeded(backupId));
+ }
+
+ // Verify backup manifest contains the correct tables
+ BackupManifest manifest = getLatestBackupManifest(backups);
+ assertEquals("Backup should contain the expected tables",
Sets.newHashSet(tableName2),
+ new HashSet<>(manifest.getTableList()));
+ }
+
+ // Verify replication peer subscription for each table
+ verifyReplicationPeerSubscription(tableName1);
+ verifyReplicationPeerSubscription(tableName2);
+
+ // Verify tables are registered in Backup System Table
+ verifyTableInBackupSystemTable(tableName1);
+ verifyTableInBackupSystemTable(tableName2);
+ }
+
+ @Test
+ public void testInvalidBackupScenarioWithContinuousEnabled() throws
Exception {
+ LOG.info("Testing invalid backup scenario with continuous backup enabled");
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName1 = TableName.valueOf("table_" + methodName);
+ TEST_UTIL.createTable(tableName1, "cf");
+ TableName tableName2 = TableName.valueOf("table_" + methodName + "2");
+ TEST_UTIL.createTable(tableName2, "cf");
+
+ try (BackupSystemTable table = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ int before = table.getBackupHistory().size();
+
+ // Create full backup for table1 with continuous backup enabled
+ String[] args = buildBackupArgs("full", new TableName[] { tableName1 },
true);
+ int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Backup should succeed", 0, ret);
+
+ // Create full backup for table2 without continuous backup enabled
+ args = buildBackupArgs("full", new TableName[] { tableName2 }, false);
+ ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Backup should succeed", 0, ret);
+
+ // Attempt full backup for both tables without continuous backup enabled
(should fail)
+ args = buildBackupArgs("full", new TableName[] { tableName1, tableName2
}, false);
+ ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertTrue("Backup should fail due to mismatch in continuous backup
settings", ret != 0);
+
+ // Verify backup history size is unchanged after the failed backup
+ int after = table.getBackupHistory().size();
+ assertEquals("Backup history should remain unchanged on failure", before
+ 2, after);
+ }
+ }
+
+ @Test
+ public void testContinuousBackupWithWALDirNotSpecified() throws Exception {
+ LOG.info("Testing that continuous backup fails when WAL directory is not
specified");
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName = TableName.valueOf("table_" + methodName);
+ TEST_UTIL.createTable(tableName, "cf");
+
+ conf1.unset(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ LOG.info("CONF_CONTINUOUS_BACKUP_WAL_DIR: {}",
conf1.get(CONF_CONTINUOUS_BACKUP_WAL_DIR));
+
+ try (BackupSystemTable table = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ int before = table.getBackupHistory().size();
+
+ // Run full backup without specifying WAL directory (invalid scenario)
+ String[] args = buildBackupArgs("full", new TableName[] { tableName },
true);
+ int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+
+ assertTrue("Backup should fail when WAL directory is not specified", ret
!= 0);
+
+ List<BackupInfo> backups = table.getBackupHistory();
+ int after = backups.size();
+ assertEquals("Backup history should increase", before + 1, after);
+
+ // last backup should be a failure
+ assertFalse(checkSucceeded(backups.get(0).getBackupId()));
+ }
+ }
+
+ @Test
+ public void testContinuousBackupWithIncrementalBackup() throws Exception {
+ LOG.info("Testing that continuous backup cannot be enabled with
incremental backup");
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName = TableName.valueOf("table_" + methodName);
+ TEST_UTIL.createTable(tableName, "cf");
+
+ try (BackupSystemTable table = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ int before = table.getBackupHistory().size();
+
+ // Run incremental backup with continuous backup flag (invalid scenario)
+ String[] args = buildBackupArgs("incremental", new TableName[] {
tableName }, true);
+ int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+
+ assertTrue("Backup should fail when using continuous backup with
incremental mode", ret != 0);
+
+ // Backup history should remain unchanged
+ int after = table.getBackupHistory().size();
+ assertEquals("Backup history should remain unchanged on failure",
before, after);
+ }
+ }
+
+ private void verifyReplicationPeerSubscription(TableName table) throws
IOException {
+ try (Admin admin = TEST_UTIL.getAdmin()) {
+ ReplicationPeerDescription peerDesc =
admin.listReplicationPeers().stream()
+ .filter(peer ->
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)).findFirst()
+ .orElseThrow(() -> new AssertionError("Replication peer not found"));
+
+ assertTrue("Table should be subscribed to the replication peer",
+ peerDesc.getPeerConfig().getTableCFsMap().containsKey(table));
+ }
+ }
+
+ private String[] buildBackupArgs(String backupType, TableName[] tables,
+ boolean continuousEnabled) {
+ String tableNames =
+
Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
+
+ if (continuousEnabled) {
+ return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-t",
tableNames,
+ "-" + OPTION_ENABLE_CONTINUOUS_BACKUP };
+ } else {
+ return new String[] { "create", backupType, BACKUP_ROOT_DIR, "-t",
tableNames };
+ }
+ }
+
+ private BackupManifest getLatestBackupManifest(List<BackupInfo> backups)
throws IOException {
+ BackupInfo newestBackup = backups.get(0);
+ return HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR),
+ newestBackup.getBackupId());
+ }
+
+ private void verifyTableInBackupSystemTable(TableName table) throws
IOException {
+ try (BackupSystemTable backupTable = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ Map<TableName, Long> tableBackupMap =
backupTable.getContinuousBackupTableSet();
+
+ assertTrue("Table is missing in the continuous backup table set",
+ tableBackupMap.containsKey(table));
+
+ assertTrue("Timestamp for table should be greater than 0",
tableBackupMap.get(table) > 0);
+ }
+ }
+
+ private void deleteContinuousBackupReplicationPeerIfExists(Admin admin)
throws IOException {
+ if (
+ admin.listReplicationPeers().stream()
+ .anyMatch(peer ->
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER))
+ ) {
+ admin.disableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
+ admin.removeReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
+ }
+ }
+
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Backup.proto
b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
index a114001ba50..95a29867325 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Backup.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
@@ -118,5 +118,6 @@ message BackupInfo {
SNAPSHOTCOPY = 3;
INCREMENTAL_COPY = 4;
STORE_MANIFEST = 5;
+ SETUP_WAL_REPLICATION = 6;
}
}