This is an automated email from the ASF dual-hosted git repository.
taklwu pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push:
new d19b33532ee HBASE-28990 Modify Incremental Backup for Continuous
Backup (#6788)
d19b33532ee is described below
commit d19b33532ee8ad84859da733ba051d2d080d6a47
Author: asolomon <[email protected]>
AuthorDate: Sat Jun 21 00:32:14 2025 +0530
HBASE-28990 Modify Incremental Backup for Continuous Backup (#6788)
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
Signed-off-by: Andor Molnár [email protected]
Reviewed by: Kota-SH <[email protected]>
Reviewed by: Vinayak Hegde <[email protected]>
Reviewed by: Kevin Geiszler <[email protected]>
---
.../org/apache/hadoop/hbase/backup/BackupInfo.java | 14 ++
.../hadoop/hbase/backup/impl/BackupAdminImpl.java | 58 +++--
.../hbase/backup/impl/FullTableBackupClient.java | 8 +
.../backup/impl/IncrementalTableBackupClient.java | 165 ++++++++++---
.../apache/hadoop/hbase/backup/TestBackupBase.java | 29 ++-
.../hadoop/hbase/backup/TestBackupDescribe.java | 1 +
.../hadoop/hbase/backup/TestContinuousBackup.java | 15 +-
.../TestIncrementalBackupWithContinuous.java | 254 +++++++++++++++++++++
8 files changed, 480 insertions(+), 64 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
index 862a9cbad10..0997aec19ec 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java
@@ -124,6 +124,11 @@ public class BackupInfo implements Comparable<BackupInfo> {
*/
private long completeTs;
+ /**
+ * Committed WAL timestamp for incremental backup
+ */
+ private long incrCommittedWalTs;
+
/**
* Total bytes of incremental logs copied
*/
@@ -293,6 +298,14 @@ public class BackupInfo implements Comparable<BackupInfo> {
this.completeTs = endTs;
}
+ public long getIncrCommittedWalTs() {
+ return incrCommittedWalTs;
+ }
+
+ public void setIncrCommittedWalTs(long timestamp) {
+ this.incrCommittedWalTs = timestamp;
+ }
+
public long getTotalBytesCopied() {
return totalBytesCopied;
}
@@ -549,6 +562,7 @@ public class BackupInfo implements Comparable<BackupInfo> {
sb.append("{");
sb.append("ID=" + backupId).append(",");
sb.append("Type=" + getType()).append(",");
+ sb.append("IsContinuous=" + isContinuousBackupEnabled()).append(",");
sb.append("Tables=" + getTableListAsString()).append(",");
sb.append("State=" + getState()).append(",");
Calendar cal = Calendar.getInstance();
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
index e94389d6938..1e91258ba6c 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupAdminImpl.java
@@ -859,28 +859,47 @@ public class BackupAdminImpl implements BackupAdmin {
String backupId = BackupRestoreConstants.BACKUPID_PREFIX +
EnvironmentEdgeManager.currentTime();
if (type == BackupType.INCREMENTAL) {
- Set<TableName> incrTableSet;
- try (BackupSystemTable table = new BackupSystemTable(conn)) {
- incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
- }
+ if (request.isContinuousBackupEnabled()) {
+ Set<TableName> continuousBackupTableSet;
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ continuousBackupTableSet =
table.getContinuousBackupTableSet().keySet();
+ }
+ if (continuousBackupTableSet.isEmpty()) {
+ String msg = "Continuous backup table set contains no tables. "
+ + "You need to run Continuous backup first "
+ + (tableList != null ? "on " + StringUtils.join(tableList, ",") :
"");
+ throw new IOException(msg);
+ }
+ if (!continuousBackupTableSet.containsAll(tableList)) {
+ String extraTables = StringUtils.join(tableList, ",");
+ String msg = "Some tables (" + extraTables + ") haven't gone through
Continuous backup. "
+ + "Perform Continuous backup on " + extraTables + " first, then
retry the command";
+ throw new IOException(msg);
+ }
+ } else {
+ Set<TableName> incrTableSet;
+ try (BackupSystemTable table = new BackupSystemTable(conn)) {
+ incrTableSet = table.getIncrementalBackupTableSet(targetRootDir);
+ }
- if (incrTableSet.isEmpty()) {
- String msg =
- "Incremental backup table set contains no tables. " + "You need to
run full backup first "
+ if (incrTableSet.isEmpty()) {
+ String msg = "Incremental backup table set contains no tables. "
+ + "You need to run full backup first "
+ (tableList != null ? "on " + StringUtils.join(tableList, ",") :
"");
- throw new IOException(msg);
- }
- if (tableList != null) {
- tableList.removeAll(incrTableSet);
- if (!tableList.isEmpty()) {
- String extraTables = StringUtils.join(tableList, ",");
- String msg = "Some tables (" + extraTables + ") haven't gone through
full backup. "
- + "Perform full backup on " + extraTables + " first, " + "then
retry the command";
throw new IOException(msg);
}
+ if (tableList != null) {
+ tableList.removeAll(incrTableSet);
+ if (!tableList.isEmpty()) {
+ String extraTables = StringUtils.join(tableList, ",");
+ String msg = "Some tables (" + extraTables + ") haven't gone
through full backup. "
+ + "Perform full backup on " + extraTables + " first, then retry
the command";
+ throw new IOException(msg);
+ }
+ }
+ tableList = Lists.newArrayList(incrTableSet);
}
- tableList = Lists.newArrayList(incrTableSet);
}
if (tableList != null && !tableList.isEmpty()) {
for (TableName table : tableList) {
@@ -907,7 +926,12 @@ public class BackupAdminImpl implements BackupAdmin {
}
}
if (nonExistingTableList != null) {
- if (type == BackupType.INCREMENTAL) {
+ // Non-continuous incremental backup is controlled by 'incremental
backup table set'
+ // and not by user provided backup table list. This is an optimization
to avoid copying
+ // the same set of WALs for incremental backups of different tables at
different times
+ // HBASE-14038. Since continuous incremental backup and full backup
backs-up user provided
+ // table list, we should inform use about non-existence of input
table(s)
+ if (type == BackupType.INCREMENTAL &&
!request.isContinuousBackupEnabled()) {
// Update incremental backup set
tableList = excludeNonExistingTables(tableList,
nonExistingTableList);
} else {
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index 53f2f96b47e..3735817f487 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -149,12 +149,20 @@ public class FullTableBackupClient extends
TableBackupClient {
try (Admin admin = conn.getAdmin()) {
beginBackup(backupManager, backupInfo);
+ // Gather the bulk loads being tracked by the system, which can be
deleted (since their data
+ // will be part of the snapshot being taken). We gather this list before
taking the actual
+ // snapshots for the same reason as the log rolls.
+ List<BulkLoad> bulkLoadsToDelete =
backupManager.readBulkloadRows(tableList);
+
if (backupInfo.isContinuousBackupEnabled()) {
handleContinuousBackup(admin);
} else {
handleNonContinuousBackup(admin);
}
+ backupManager
+
.deleteBulkLoadedRows(bulkLoadsToDelete.stream().map(BulkLoad::getRowKey).toList());
+
completeBackup(conn, backupInfo, BackupType.FULL, conf);
} catch (Exception e) {
failBackup(conn, backupInfo, backupManager, e, "Unexpected
BackupException : ",
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 52f824c5dda..de97694f22f 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -17,18 +17,28 @@
*/
package org.apache.hadoop.hbase.backup.impl;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
+import static
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -48,6 +58,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
@@ -60,6 +71,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
@@ -262,9 +274,19 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
// case PREPARE_INCREMENTAL:
beginBackup(backupManager, backupInfo);
backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
- LOG.debug("For incremental backup, current table set is "
- + backupManager.getIncrementalBackupTableSet());
- newTimestamps = ((IncrementalBackupManager)
backupManager).getIncrBackupLogFileMap();
+ // Non-continuous Backup incremental backup is controlled by
'incremental backup table set'
+ // and not by user provided backup table list. This is an optimization
to avoid copying
+ // the same set of WALs for incremental backups of different tables at
different times
+ // HBASE-14038
+ // Continuous-incremental backup backs up user provided table list/set
+ Set<TableName> currentTableSet;
+ if (backupInfo.isContinuousBackupEnabled()) {
+ currentTableSet = backupInfo.getTables();
+ } else {
+ currentTableSet = backupManager.getIncrementalBackupTableSet();
+ newTimestamps = ((IncrementalBackupManager)
backupManager).getIncrBackupLogFileMap();
+ }
+ LOG.debug("For incremental backup, the current table set is {}",
currentTableSet);
} catch (Exception e) {
// fail the overall backup and return
failBackup(conn, backupInfo, backupManager, e, "Unexpected Exception : ",
@@ -291,21 +313,24 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
// set overall backup status: complete. Here we make sure to complete the
backup.
// After this checkpoint, even if entering cancel process, will let the
backup finished
try {
- // Set the previousTimestampMap which is before this current log roll to
the manifest.
- Map<TableName, Map<String, Long>> previousTimestampMap =
backupManager.readLogTimestampMap();
- backupInfo.setIncrTimestampMap(previousTimestampMap);
-
- // The table list in backupInfo is good for both full backup and
incremental backup.
- // For incremental backup, it contains the incremental backup table set.
- backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(),
newTimestamps);
-
- Map<TableName, Map<String, Long>> newTableSetTimestampMap =
- backupManager.readLogTimestampMap();
-
- backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
- Long newStartCode =
-
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
- backupManager.writeBackupStartCode(newStartCode);
+ if (!backupInfo.isContinuousBackupEnabled()) {
+ // Set the previousTimestampMap which is before this current log roll
to the manifest.
+ Map<TableName, Map<String, Long>> previousTimestampMap =
+ backupManager.readLogTimestampMap();
+ backupInfo.setIncrTimestampMap(previousTimestampMap);
+
+ // The table list in backupInfo is good for both full backup and
incremental backup.
+ // For incremental backup, it contains the incremental backup table
set.
+ backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(),
newTimestamps);
+
+ Map<TableName, Map<String, Long>> newTableSetTimestampMap =
+ backupManager.readLogTimestampMap();
+
+ backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
+ Long newStartCode =
+
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
+ backupManager.writeBackupStartCode(newStartCode);
+ }
List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());
@@ -362,23 +387,88 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
}
protected void convertWALsToHFiles() throws IOException {
- // get incremental backup file list and prepare parameters for DistCp
- List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
- // Get list of tables in incremental backup set
- Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
- // filter missing files out (they have been copied by previous backups)
- incrBackupFileList = filterMissingFiles(incrBackupFileList);
- List<String> tableList = new ArrayList<String>();
- for (TableName table : tableSet) {
- // Check if table exists
- if (tableExists(table, conn)) {
- tableList.add(table.getNameAsString());
- } else {
- LOG.warn("Table " + table + " does not exists. Skipping in WAL
converter");
+ long previousBackupTs = 0L;
+ if (backupInfo.isContinuousBackupEnabled()) {
+ Set<TableName> tableSet = backupInfo.getTables();
+ List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
+ for (TableName table : tableSet) {
+ for (BackupInfo backup : backupInfos) {
+ // find previous backup for this table
+ if (backup.getTables().contains(table)) {
+ LOG.info("Found previous backup of type {} with id {} for table
{}", backup.getType(),
+ backup.getBackupId(), table.getNameAsString());
+ List<String> walBackupFileList;
+ if (backup.getType() == BackupType.FULL) {
+ previousBackupTs = backup.getStartTs();
+ } else {
+ previousBackupTs = backup.getIncrCommittedWalTs();
+ }
+ walBackupFileList = getBackupLogs(previousBackupTs);
+ walToHFiles(walBackupFileList,
Arrays.asList(table.getNameAsString()),
+ previousBackupTs);
+ break;
+ }
+ }
}
+ } else {
+ // get incremental backup file list and prepare parameters for DistCp
+ List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
+ // Get list of tables in incremental backup set
+ Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
+ // filter missing files out (they have been copied by previous backups)
+ incrBackupFileList = filterMissingFiles(incrBackupFileList);
+ List<String> tableList = new ArrayList<String>();
+ for (TableName table : tableSet) {
+ // Check if table exists
+ if (tableExists(table, conn)) {
+ tableList.add(table.getNameAsString());
+ } else {
+ LOG.warn("Table " + table + " does not exists. Skipping in WAL
converter");
+ }
+ }
+ walToHFiles(incrBackupFileList, tableList, previousBackupTs);
}
- walToHFiles(incrBackupFileList, tableList);
+ }
+ private List<String> getBackupLogs(long startTs) throws IOException {
+ // get log files from backup dir
+ String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ if (Strings.isNullOrEmpty(walBackupDir)) {
+ throw new IOException(
+ "Incremental backup requires the WAL backup directory " +
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ }
+ List<String> resultLogFiles = new ArrayList<>();
+ Path walBackupPath = new Path(walBackupDir);
+ FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf);
+ FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir,
WALS_DIR));
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ for (FileStatus dayDir : dayDirs) {
+ if (!dayDir.isDirectory()) {
+ continue; // Skip files, only process directories
+ }
+
+ String dirName = dayDir.getPath().getName();
+ try {
+ Date dirDate = dateFormat.parse(dirName);
+ long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+ long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End
time of day (23:59:59)
+
+ if (dirEndTime >= startTs) {
+ Path dirPath = dayDir.getPath();
+ FileStatus[] logs = backupFs.listStatus(dirPath);
+ for (FileStatus log : logs) {
+ String filepath = log.getPath().toString();
+ LOG.debug("Found WAL file: {}", filepath);
+ resultLogFiles.add(filepath);
+ }
+ }
+ } catch (ParseException e) {
+ LOG.warn("Skipping invalid directory name: " + dirName, e);
+ }
+ }
+ return resultLogFiles;
}
protected boolean tableExists(TableName table, Connection conn) throws
IOException {
@@ -387,7 +477,8 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
}
}
- protected void walToHFiles(List<String> dirPaths, List<String> tableList)
throws IOException {
+ protected void walToHFiles(List<String> dirPaths, List<String> tableList,
long previousBackupTs)
+ throws IOException {
Tool player = new WALPlayer();
// Player reads all files in arbitrary directory structure and creates
@@ -401,6 +492,14 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
conf.set(JOB_NAME_CONF_KEY, jobname);
+ if (backupInfo.isContinuousBackupEnabled()) {
+ conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs));
+ // committedWALsTs is needed only for Incremental backups with
continuous backup
+ // since these do not depend on log roll ts
+ long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn);
+ backupInfo.setIncrCommittedWalTs(committedWALsTs);
+ conf.set(WALInputFormat.END_TIME_KEY, Long.toString(committedWALsTs));
+ }
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
try {
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 582c7d1ae54..f6e7590661a 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.backup;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
@@ -414,15 +415,31 @@ public class TestBackupBase {
return request;
}
+ protected BackupRequest createBackupRequest(BackupType type, List<TableName>
tables, String path,
+ boolean noChecksumVerify, boolean continuousBackupEnabled) {
+ BackupRequest.Builder builder = new BackupRequest.Builder();
+ BackupRequest request = builder.withBackupType(type).withTableList(tables)
+ .withTargetRootDir(path).withNoChecksumVerify(noChecksumVerify)
+ .withContinuousBackupEnabled(continuousBackupEnabled).build();
+ return request;
+ }
+
protected String backupTables(BackupType type, List<TableName> tables,
String path)
throws IOException {
+ return backupTables(type, tables, path, false);
+ }
+
+ protected String backupTables(BackupType type, List<TableName> tables,
String path,
+ boolean isContinuousBackup) throws IOException {
Connection conn = null;
BackupAdmin badmin = null;
String backupId;
try {
conn = ConnectionFactory.createConnection(conf1);
badmin = new BackupAdminImpl(conn);
- BackupRequest request = createBackupRequest(type, new
ArrayList<>(tables), path);
+
+ BackupRequest request =
+ createBackupRequest(type, new ArrayList<>(tables), path, false,
isContinuousBackup);
backupId = badmin.backupTables(request);
} finally {
if (badmin != null) {
@@ -554,4 +571,14 @@ public class TestBackupBase {
LOG.debug(Objects.toString(it.next().getPath()));
}
}
+
+ void deleteContinuousBackupReplicationPeerIfExists(Admin admin) throws
IOException {
+ if (
+ admin.listReplicationPeers().stream()
+ .anyMatch(peer ->
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER))
+ ) {
+ admin.disableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
+ admin.removeReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
+ }
+ }
}
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java
index 7ce039fd666..6084dc730ee 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDescribe.java
@@ -94,6 +94,7 @@ public class TestBackupDescribe extends TestBackupBase {
System.setOut(new PrintStream(baos));
String[] args = new String[] { "describe", backupId };
+
// Run backup
int ret = ToolRunner.run(conf1, new BackupDriver(), args);
assertTrue(ret == 0);
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
index fe44ebf420d..0cc34ed63eb 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
@@ -259,8 +259,7 @@ public class TestContinuousBackup extends TestBackupBase {
}
}
- private String[] buildBackupArgs(String backupType, TableName[] tables,
- boolean continuousEnabled) {
+ String[] buildBackupArgs(String backupType, TableName[] tables, boolean
continuousEnabled) {
String tableNames =
Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
@@ -272,7 +271,7 @@ public class TestContinuousBackup extends TestBackupBase {
}
}
- private BackupManifest getLatestBackupManifest(List<BackupInfo> backups)
throws IOException {
+ BackupManifest getLatestBackupManifest(List<BackupInfo> backups) throws
IOException {
BackupInfo newestBackup = backups.get(0);
return HBackupFileSystem.getManifest(conf1, new Path(BACKUP_ROOT_DIR),
newestBackup.getBackupId());
@@ -289,14 +288,4 @@ public class TestContinuousBackup extends TestBackupBase {
}
}
- private void deleteContinuousBackupReplicationPeerIfExists(Admin admin)
throws IOException {
- if (
- admin.listReplicationPeers().stream()
- .anyMatch(peer ->
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER))
- ) {
- admin.disableReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
- admin.removeReplicationPeer(CONTINUOUS_BACKUP_REPLICATION_PEER);
- }
- }
-
}
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
new file mode 100644
index 00000000000..79d1df645b9
--- /dev/null
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
+import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.impl.BulkLoad;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category(LargeTests.class)
+public class TestIncrementalBackupWithContinuous extends TestContinuousBackup {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestIncrementalBackupWithContinuous.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);
+
+ private byte[] ROW = Bytes.toBytes("row1");
+ private final byte[] FAMILY = Bytes.toBytes("family");
+ private final byte[] COLUMN = Bytes.toBytes("col");
+ private static final int ROWS_IN_BULK_LOAD = 100;
+
+ @Test
+ public void testContinuousBackupWithIncrementalBackupSuccess() throws
Exception {
+ LOG.info("Testing incremental backup with continuous backup");
+ conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName = TableName.valueOf("table_" + methodName);
+ Table t1 = TEST_UTIL.createTable(tableName, FAMILY);
+
+ try (BackupSystemTable table = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ int before = table.getBackupHistory().size();
+
+ // Run continuous backup
+ String[] args = buildBackupArgs("full", new TableName[] { tableName },
true);
+ int ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Full Backup should succeed", 0, ret);
+
+ // Verify backup history increased and all the backups are succeeded
+ LOG.info("Verify backup history increased and all the backups are
succeeded");
+ List<BackupInfo> backups = table.getBackupHistory();
+ assertEquals("Backup history should increase", before + 1,
backups.size());
+ for (BackupInfo data : List.of(backups.get(0))) {
+ String backupId = data.getBackupId();
+ assertTrue(checkSucceeded(backupId));
+ }
+
+ // Verify backup manifest contains the correct tables
+ LOG.info("Verify backup manifest contains the correct tables");
+ BackupManifest manifest = getLatestBackupManifest(backups);
+ assertEquals("Backup should contain the expected tables",
Sets.newHashSet(tableName),
+ new HashSet<>(manifest.getTableList()));
+
+ Put p = new Put(ROW);
+ p.addColumn(FAMILY, COLUMN, COLUMN);
+ t1.put(p);
+ Thread.sleep(5000);
+
+ // Run incremental backup
+ LOG.info("Run incremental backup now");
+ before = table.getBackupHistory().size();
+ args = buildBackupArgs("incremental", new TableName[] { tableName },
false);
+ ret = ToolRunner.run(conf1, new BackupDriver(), args);
+ assertEquals("Incremental Backup should succeed", 0, ret);
+ LOG.info("Incremental backup completed");
+
+ // Verify backup history increased and all the backups are succeeded
+ backups = table.getBackupHistory();
+ String incrementalBackupid = null;
+ assertEquals("Backup history should increase", before + 1,
backups.size());
+ for (BackupInfo data : List.of(backups.get(0))) {
+ String backupId = data.getBackupId();
+ incrementalBackupid = backupId;
+ assertTrue(checkSucceeded(backupId));
+ }
+
+ TEST_UTIL.truncateTable(tableName);
+ // Restore incremental backup
+ TableName[] tables = new TableName[] { tableName };
+ BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
+ client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
incrementalBackupid, false,
+ tables, tables, true));
+
+ verifyTable(t1);
+ conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY,
REPLICATION_MARKER_ENABLED_DEFAULT);
+ }
+ }
+
+ @Test
+ public void testContinuousBackupWithIncrementalBackupAndBulkloadSuccess()
throws Exception {
+ conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ try (BackupSystemTable systemTable = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ // The test starts with some data, and no bulk loaded rows.
+ int expectedRowCount = NB_ROWS_IN_BATCH;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty());
+
+ // Bulk loads aren't tracked if the table isn't backed up yet
+ performBulkLoad("bulk1", methodName);
+ expectedRowCount += ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
+
+ // Create a backup, bulk loads are now being tracked
+ String backup1 = backupTables(BackupType.FULL, List.of(table1),
BACKUP_ROOT_DIR, true);
+ assertTrue(checkSucceeded(backup1));
+
+ loadTable(TEST_UTIL.getConnection().getTable(table1));
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ performBulkLoad("bulk2", methodName);
+ expectedRowCount += ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size());
+
+ // Creating an incremental backup clears the bulk loads
+ performBulkLoad("bulk4", methodName);
+ performBulkLoad("bulk5", methodName);
+ performBulkLoad("bulk6", methodName);
+ expectedRowCount += 3 * ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(4, systemTable.readBulkloadRows(List.of(table1)).size());
+ String backup2 = backupTables(BackupType.INCREMENTAL, List.of(table1),
BACKUP_ROOT_DIR, true);
+ assertTrue(checkSucceeded(backup2));
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
+ int rowCountAfterBackup2 = expectedRowCount;
+
+ // Doing another bulk load, to check that this data will disappear after
a restore operation
+ performBulkLoad("bulk7", methodName);
+ expectedRowCount += ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ List<BulkLoad> bulkloadsTemp =
systemTable.readBulkloadRows(List.of(table1));
+ assertEquals(1, bulkloadsTemp.size());
+ BulkLoad bulk7 = bulkloadsTemp.get(0);
+
+ // Doing a restore. Overwriting the table implies clearing the bulk
loads,
+ // but the loading of restored data involves loading bulk data, we
expect 2 bulk loads
+ // associated with backup 3 (loading of full backup, loading of
incremental backup).
+ BackupAdmin client = getBackupAdmin();
+ client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
backup2, false,
+ new TableName[] { table1 }, new TableName[] { table1 }, true));
+ assertEquals(rowCountAfterBackup2, TEST_UTIL.countRows(table1));
+ List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1));
+ assertEquals(3, bulkLoads.size());
+ conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY,
REPLICATION_MARKER_ENABLED_DEFAULT);
+ }
+ }
+
+ private void verifyTable(Table t1) throws IOException {
+ Get g = new Get(ROW);
+ Result r = t1.get(g);
+ assertEquals(1, r.size());
+ assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN));
+ }
+
+ private void performBulkLoad(String keyPrefix, String testDir) throws
IOException {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(testDir);
+ Path hfilePath =
+ new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR +
"hfile_" + keyPrefix);
+
+ HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath,
famName, qualName,
+ Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"),
ROWS_IN_BULK_LOAD);
+
+ listFiles(fs, baseDirectory, baseDirectory);
+
+ Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
+ BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1,
baseDirectory);
+ assertFalse(result.isEmpty());
+ }
+
+ private static Set<String> listFiles(final FileSystem fs, final Path root,
final Path dir)
+ throws IOException {
+ Set<String> files = new HashSet<>();
+ FileStatus[] list = CommonFSUtils.listStatus(fs, dir);
+ if (list != null) {
+ for (FileStatus fstat : list) {
+ if (fstat.isDirectory()) {
+ LOG.info("Found directory {}", Objects.toString(fstat.getPath()));
+ files.addAll(listFiles(fs, root, fstat.getPath()));
+ } else {
+ LOG.info("Found file {}", Objects.toString(fstat.getPath()));
+ String file = fstat.getPath().makeQualified(fs).toString();
+ files.add(file);
+ }
+ }
+ }
+ return files;
+ }
+
+ protected static void loadTable(Table table) throws Exception {
+ Put p; // 100 + 1 row to t1_syncup
+ for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
+ p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+ table.put(p);
+ }
+ }
+}