This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push:
new 51934cf345a HBASE-29656 Scan WALs to identify bulkload operations for
incremental backup (#7400)
51934cf345a is described below
commit 51934cf345ac6c0ccdac53b16ac62bd795d5e940
Author: asolomon <[email protected]>
AuthorDate: Tue Oct 28 01:55:24 2025 +0700
HBASE-29656 Scan WALs to identify bulkload operations for incremental
backup (#7400)
* Scan WALs to identify bulkload operations for incremental backup
* Update unit test
* Info log
* Minor test fix
* Address review comments
* Spotless apply
* Addressed review comment
* spotless
* Remove log
* Retrigger CI
---------
Co-authored-by: Ankit Solomon <[email protected]>
---
.../apache/hadoop/hbase/backup/BackupObserver.java | 10 +-
.../backup/impl/AbstractPitrRestoreHandler.java | 72 +------
.../backup/impl/IncrementalTableBackupClient.java | 214 +++++++++------------
.../backup/mapreduce/BulkLoadCollectorJob.java | 2 +-
.../hadoop/hbase/backup/util/BackupUtils.java | 73 +++++++
.../apache/hadoop/hbase/backup/TestBackupBase.java | 4 +-
.../TestIncrementalBackupWithContinuous.java | 8 +-
7 files changed, 182 insertions(+), 201 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
index 392e2771091..c506d6dc6ae 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
@@ -96,12 +96,18 @@ public class BackupObserver implements RegionCoprocessor,
RegionObserver {
try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();
+ Map<TableName, Long> continuousBackupTableSet =
tbl.getContinuousBackupTableSet();
- if (fullyBackedUpTables.contains(tableName)) {
+ // Tables in continuousBackupTableSet do not rely on BackupSystemTable
but rather
+ // scan on WAL backup directory to identify bulkload operation
HBASE-29656
+ if (
+ fullyBackedUpTables.contains(tableName) &&
!continuousBackupTableSet.containsKey(tableName)
+ ) {
tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(),
cfToHFilePaths);
} else {
if (LOG.isTraceEnabled()) {
- LOG.trace("Table {} has not gone through full backup - skipping.",
tableName);
+ LOG.trace("Table {} has either not gone through full backup or is "
+ + "part of continuousBackupTableSet - skipping", tableName);
}
}
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
index ce6c4d4dc68..3f31255d60f 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -20,25 +20,16 @@ package org.apache.hadoop.hbase.backup.impl;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
-import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
-import static
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
-import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
@@ -47,7 +38,6 @@ import
org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
import org.apache.hadoop.hbase.backup.RestoreJob;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.backup.util.BulkFilesCollector;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
@@ -342,8 +332,8 @@ public abstract class AbstractPitrRestoreHandler {
RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
- List<Path> bulkloadFiles =
- collectBulkFiles(sourceTable, targetTable, startTime, endTime, new
Path(restoreRootDir));
+ List<Path> bulkloadFiles = BackupUtils.collectBulkFiles(conn, sourceTable,
targetTable,
+ startTime, endTime, new Path(restoreRootDir), new ArrayList<String>());
if (bulkloadFiles.isEmpty()) {
LOG.info("No bulk-load files found for {} in time range {}-{}. Skipping
bulkload restore.",
@@ -380,7 +370,7 @@ public abstract class AbstractPitrRestoreHandler {
sourceTable, targetTable, startTime, endTime, walDirPath);
List<String> validDirs =
- getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
+ BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath,
startTime, endTime);
if (validDirs.isEmpty()) {
LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL
replay.", startTime,
endTime);
@@ -390,62 +380,6 @@ public abstract class AbstractPitrRestoreHandler {
executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
}
- private List<Path> collectBulkFiles(TableName sourceTable, TableName
targetTable, long startTime,
- long endTime, Path restoreRootDir) throws IOException {
-
- String walBackupDir =
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
- Path walDirPath = new Path(walBackupDir);
- LOG.info(
- "Starting WAL bulk-file collection for source: {}, target: {}, time
range: {} - {}, WAL backup dir: {}, restore root: {}",
- sourceTable, targetTable, startTime, endTime, walDirPath,
restoreRootDir);
-
- List<String> validDirs =
- getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
- if (validDirs.isEmpty()) {
- LOG.warn("No valid WAL directories found for range {} - {}. Skipping
bulk-file collection.",
- startTime, endTime);
- return Collections.emptyList();
- }
-
- String walDirsCsv = String.join(",", validDirs);
-
- return
BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()),
- walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime,
endTime);
- }
-
- /**
- * Fetches valid WAL directories based on the given time range.
- */
- private List<String> getValidWalDirs(Configuration conf, Path walBackupDir,
long startTime,
- long endTime) throws IOException {
- FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
- FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir,
WALS_DIR));
-
- List<String> validDirs = new ArrayList<>();
- SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
-
- for (FileStatus dayDir : dayDirs) {
- if (!dayDir.isDirectory()) {
- continue; // Skip files, only process directories
- }
-
- String dirName = dayDir.getPath().getName();
- try {
- Date dirDate = dateFormat.parse(dirName);
- long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
- long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End
time of day (23:59:59)
-
- // Check if this day's WAL files overlap with the required time range
- if (dirEndTime >= startTime && dirStartTime <= endTime) {
- validDirs.add(dayDir.getPath().toString());
- }
- } catch (ParseException e) {
- LOG.warn("Skipping invalid directory name: {}", dirName, e);
- }
- }
- return validDirs;
- }
-
/**
* Executes WAL replay using WALPlayer.
*/
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index c2aa0aa17fd..1bd3621b294 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -19,27 +19,19 @@ package org.apache.hadoop.hbase.backup.impl;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
-import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
-import static
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
-import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TimeZone;
+import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -86,6 +78,7 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
@InterfaceAudience.Private
public class IncrementalTableBackupClient extends TableBackupClient {
private static final Logger LOG =
LoggerFactory.getLogger(IncrementalTableBackupClient.class);
+ private static final String BULKLOAD_COLLECTOR_OUTPUT =
"bulkload-collector-output";
protected IncrementalTableBackupClient() {
}
@@ -137,89 +130,89 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
* the backup is marked as complete.
* @param tablesToBackup list of tables to be backed up
*/
- protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup)
throws IOException {
+ protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup,
+ Map<TableName, List<String>> tablesToWALFileList, Map<TableName, Long>
tablesToPrevBackupTs)
+ throws IOException {
Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
- List<BulkLoad> bulkLoads;
- if (backupInfo.isContinuousBackupEnabled()) {
- bulkLoads =
- backupManager.readBulkloadRows(tablesToBackup,
backupInfo.getIncrCommittedWalTs());
- } else {
- bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
- }
+ List<BulkLoad> bulkLoads = new ArrayList<>();
+
FileSystem tgtFs;
try {
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
} catch (URISyntaxException use) {
throw new IOException("Unable to get FileSystem", use);
}
+
Path rootdir = CommonFSUtils.getRootDir(conf);
Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
- for (BulkLoad bulkLoad : bulkLoads) {
- TableName srcTable = bulkLoad.getTableName();
- MergeSplitBulkloadInfo bulkloadInfo =
- toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
- String regionName = bulkLoad.getRegion();
- String fam = bulkLoad.getColumnFamily();
- String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+ if (!backupInfo.isContinuousBackupEnabled()) {
+ bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+ for (BulkLoad bulkLoad : bulkLoads) {
+ TableName srcTable = bulkLoad.getTableName();
+ if (!tablesToBackup.contains(srcTable)) {
+ LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
+ continue;
+ }
+
+ MergeSplitBulkloadInfo bulkloadInfo =
+ toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
+ String regionName = bulkLoad.getRegion();
+ String fam = bulkLoad.getColumnFamily();
+ String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+ Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
+ Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam +
Path.SEPARATOR + filename);
+ String srcTableQualifier = srcTable.getQualifierAsString();
+ String srcTableNs = srcTable.getNamespaceAsString();
+ Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR +
srcTableQualifier
+ + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
+ if (!tgtFs.mkdirs(tgtFam)) {
+ throw new IOException("couldn't create " + tgtFam);
+ }
- if (!tablesToBackup.contains(srcTable)) {
- LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
- continue;
- }
- Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
- Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam +
Path.SEPARATOR + filename);
-
- // For continuous backup: bulkload files are copied from backup
directory defined by
- // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster.
- String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
- if (backupInfo.isContinuousBackupEnabled() &&
!Strings.isNullOrEmpty(backupRootDir)) {
- String dayDirectoryName =
BackupUtils.formatToDateString(bulkLoad.getTimestamp());
- Path bulkLoadBackupPath =
- new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR +
dayDirectoryName);
- Path bulkLoadDir = new Path(bulkLoadBackupPath,
- srcTable.getNamespaceAsString() + Path.SEPARATOR +
srcTable.getNameAsString());
- FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf);
- Path fullBulkLoadBackupPath =
- new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam +
Path.SEPARATOR + filename);
- if (backupFs.exists(fullBulkLoadBackupPath)) {
- LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath);
- p = fullBulkLoadBackupPath;
- } else {
- LOG.warn("Backup bulkload file not found {}",
fullBulkLoadBackupPath);
+ Path tgt = new Path(tgtFam, filename);
+ Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable,
regionName, fam);
+ Path archive = new Path(archiveDir, filename);
+
+ if (fs.exists(p)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("found bulk hfile {} in {} for {}",
bulkLoad.getHfilePath(), p.getParent(),
+ srcTableQualifier);
+ LOG.trace("copying {} to {}", p, tgt);
+ }
+ bulkloadInfo.addActiveFile(p.toString());
+ } else if (fs.exists(archive)) {
+ LOG.debug("copying archive {} to {}", archive, tgt);
+ bulkloadInfo.addArchiveFiles(archive.toString());
}
}
- String srcTableQualifier = srcTable.getQualifierAsString();
- String srcTableNs = srcTable.getNamespaceAsString();
- Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR +
srcTableQualifier
- + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
- if (!tgtFs.mkdirs(tgtFam)) {
- throw new IOException("couldn't create " + tgtFam);
+ for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
+ mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
+ bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
}
- Path tgt = new Path(tgtFam, filename);
+ } else {
+ // Continuous incremental backup: run BulkLoadCollectorJob over
backed-up WALs
+ Path collectorOutput = new Path(getBulkOutputDir(),
BULKLOAD_COLLECTOR_OUTPUT);
+ for (TableName table : tablesToBackup) {
+ long startTs = tablesToPrevBackupTs.getOrDefault(table, 0L);
+ long endTs = backupInfo.getIncrCommittedWalTs();
+ List<String> walDirs = tablesToWALFileList.getOrDefault(table, new
ArrayList<String>());
- Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable,
regionName, fam);
- Path archive = new Path(archiveDir, filename);
+ List<Path> bulkloadPaths = BackupUtils.collectBulkFiles(conn, table,
table, startTs, endTs,
+ collectorOutput, walDirs);
- if (fs.exists(p)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("found bulk hfile {} in {} for {}",
bulkLoad.getHfilePath(), p.getParent(),
- srcTableQualifier);
- LOG.trace("copying {} to {}", p, tgt);
+ List<String> bulkLoadFiles =
+
bulkloadPaths.stream().map(Path::toString).collect(Collectors.toList());
+
+ if (bulkLoadFiles.isEmpty()) {
+ LOG.info("No bulk-load files found for table {}", table);
+ continue;
}
- bulkloadInfo.addActiveFile(p.toString());
- } else if (fs.exists(archive)) {
- LOG.debug("copying archive {} to {}", archive, tgt);
- bulkloadInfo.addArchiveFiles(archive.toString());
- }
- }
- for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
- mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
- bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
+ mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs);
+ }
}
-
return bulkLoads;
}
@@ -306,11 +299,20 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
*/
@Override
public void execute() throws IOException, ColumnFamilyMismatchException {
+ // tablesToWALFileList and tablesToPrevBackupTs are needed for
"continuous" Incremental backup
+ Map<TableName, List<String>> tablesToWALFileList = new HashMap<>();
+ Map<TableName, Long> tablesToPrevBackupTs = new HashMap<>();
try {
Map<TableName, String> tablesToFullBackupIds = getFullBackupIds();
verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds);
// case PREPARE_INCREMENTAL:
+ if (backupInfo.isContinuousBackupEnabled()) {
+ // committedWALsTs is needed only for Incremental backups with
continuous backup
+ // since these do not depend on log roll ts
+ long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn);
+ backupInfo.setIncrCommittedWalTs(committedWALsTs);
+ }
beginBackup(backupManager, backupInfo);
backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
// Non-continuous Backup incremental backup is controlled by
'incremental backup table set'
@@ -339,7 +341,7 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
setupRegionLocator();
// convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
- convertWALsToHFiles();
+ convertWALsToHFiles(tablesToWALFileList, tablesToPrevBackupTs);
incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
backupInfo.getBackupRootDir());
} catch (Exception e) {
@@ -371,7 +373,8 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
backupManager.writeBackupStartCode(newStartCode);
}
- List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());
+ List<BulkLoad> bulkLoads =
+ handleBulkLoad(backupInfo.getTableNames(), tablesToWALFileList,
tablesToPrevBackupTs);
// backup complete
completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);
@@ -425,10 +428,19 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
}
}
- protected void convertWALsToHFiles() throws IOException {
+ protected void convertWALsToHFiles(Map<TableName, List<String>>
tablesToWALFileList,
+ Map<TableName, Long> tablesToPrevBackupTs) throws IOException {
long previousBackupTs = 0L;
+ long currentBackupTs = 0L;
if (backupInfo.isContinuousBackupEnabled()) {
+ String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ if (Strings.isNullOrEmpty(walBackupDir)) {
+ throw new IOException(
+ "Incremental backup requires the WAL backup directory " +
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ }
+ Path walBackupPath = new Path(walBackupDir);
Set<TableName> tableSet = backupInfo.getTables();
+ currentBackupTs = backupInfo.getIncrCommittedWalTs();
List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
for (TableName table : tableSet) {
for (BackupInfo backup : backupInfos) {
@@ -442,7 +454,10 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
} else {
previousBackupTs = backup.getIncrCommittedWalTs();
}
- walBackupFileList = getBackupLogs(previousBackupTs);
+ walBackupFileList =
+ BackupUtils.getValidWalDirs(conf, walBackupPath,
previousBackupTs, currentBackupTs);
+ tablesToWALFileList.put(table, walBackupFileList);
+ tablesToPrevBackupTs.put(table, previousBackupTs);
walToHFiles(walBackupFileList,
Arrays.asList(table.getNameAsString()),
previousBackupTs);
break;
@@ -469,47 +484,6 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
}
}
- private List<String> getBackupLogs(long startTs) throws IOException {
- // get log files from backup dir
- String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
- if (Strings.isNullOrEmpty(walBackupDir)) {
- throw new IOException(
- "Incremental backup requires the WAL backup directory " +
CONF_CONTINUOUS_BACKUP_WAL_DIR);
- }
- List<String> resultLogFiles = new ArrayList<>();
- Path walBackupPath = new Path(walBackupDir);
- FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf);
- FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir,
WALS_DIR));
- SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
- dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-
- for (FileStatus dayDir : dayDirs) {
- if (!dayDir.isDirectory()) {
- continue; // Skip files, only process directories
- }
-
- String dirName = dayDir.getPath().getName();
- try {
- Date dirDate = dateFormat.parse(dirName);
- long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
- long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End
time of day (23:59:59)
-
- if (dirEndTime >= startTs) {
- Path dirPath = dayDir.getPath();
- FileStatus[] logs = backupFs.listStatus(dirPath);
- for (FileStatus log : logs) {
- String filepath = log.getPath().toString();
- LOG.debug("Found WAL file: {}", filepath);
- resultLogFiles.add(filepath);
- }
- }
- } catch (ParseException e) {
- LOG.warn("Skipping invalid directory name: " + dirName, e);
- }
- }
- return resultLogFiles;
- }
-
protected boolean tableExists(TableName table, Connection conn) throws
IOException {
try (Admin admin = conn.getAdmin()) {
return admin.tableExists(table);
@@ -533,11 +507,7 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
conf.set(JOB_NAME_CONF_KEY, jobname);
if (backupInfo.isContinuousBackupEnabled()) {
conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs));
- // committedWALsTs is needed only for Incremental backups with
continuous backup
- // since these do not depend on log roll ts
- long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn);
- backupInfo.setIncrCommittedWalTs(committedWALsTs);
- conf.set(WALInputFormat.END_TIME_KEY, Long.toString(committedWALsTs));
+ conf.set(WALInputFormat.END_TIME_KEY,
Long.toString(backupInfo.getIncrCommittedWalTs()));
}
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
index b752c7f78e0..cf19d262221 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
@@ -75,7 +75,7 @@ public class BulkLoadCollectorJob extends Configured
implements Tool {
public BulkLoadCollectorJob() {
}
- protected BulkLoadCollectorJob(final Configuration c) {
+ public BulkLoadCollectorJob(final Configuration c) {
super(c);
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index bf309104775..28bbfcf254a 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -17,13 +17,17 @@
*/
package org.apache.hadoop.hbase.backup.util;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URLDecoder;
+import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
@@ -46,6 +50,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
@@ -78,6 +83,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
@@ -945,4 +951,71 @@ public final class BackupUtils {
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return dateFormat.format(new Date(dayInMillis));
}
+
+ /**
+ * Fetches bulkload filepaths based on the given time range from backup WAL
directory.
+ */
+ public static List<Path> collectBulkFiles(Connection conn, TableName
sourceTable,
+ TableName targetTable, long startTime, long endTime, Path restoreRootDir,
List<String> walDirs)
+ throws IOException {
+
+ if (walDirs.isEmpty()) {
+ String walBackupDir =
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ if (Strings.isNullOrEmpty(walBackupDir)) {
+ throw new IOException(
+ "WAL backup directory is not configured " +
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ }
+ Path walDirPath = new Path(walBackupDir);
+ walDirs =
+ BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath,
startTime, endTime);
+ }
+
+ if (walDirs.isEmpty()) {
+ LOG.warn("No valid WAL directories found for range {} - {}. Skipping
bulk-file collection.",
+ startTime, endTime);
+ return Collections.emptyList();
+ }
+
+ LOG.info(
+ "Starting WAL bulk-file collection for source: {}, target: {}, time
range: {} - {}, WAL "
+ + "backup dir: {}, restore root: {}",
+ sourceTable, targetTable, startTime, endTime, walDirs, restoreRootDir);
+ String walDirsCsv = String.join(",", walDirs);
+
+ return
BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()),
+ walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime,
endTime);
+ }
+
+ /**
+ * Fetches valid WAL directories based on the given time range.
+ */
+ public static List<String> getValidWalDirs(Configuration conf, Path
walBackupDir, long startTime,
+ long endTime) throws IOException {
+ FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
+ FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir,
WALS_DIR));
+
+ List<String> validDirs = new ArrayList<>();
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+ for (FileStatus dayDir : dayDirs) {
+ if (!dayDir.isDirectory()) {
+ continue; // Skip files, only process directories
+ }
+
+ String dirName = dayDir.getPath().getName();
+ try {
+ Date dirDate = dateFormat.parse(dirName);
+ long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+ long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End
time of day (23:59:59)
+
+ // Check if this day's WAL files overlap with the required time range
+ if (dirEndTime >= startTime && dirStartTime <= endTime) {
+ validDirs.add(dayDir.getPath().toString());
+ }
+ } catch (ParseException e) {
+ LOG.warn("Skipping invalid directory name: {}", dirName, e);
+ }
+ }
+ return validDirs;
+ }
}
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 159514bd45b..e32e1b8f920 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -175,7 +175,7 @@ public class TestBackupBase {
// copy out the table and region info files for each table
BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
// convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
- convertWALsToHFiles();
+ convertWALsToHFiles(new HashMap<>(), new HashMap<>());
incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
backupInfo.getBackupRootDir());
failStageIf(Stage.stage_2);
@@ -200,7 +200,7 @@ public class TestBackupBase {
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode);
- handleBulkLoad(backupInfo.getTableNames());
+ handleBulkLoad(backupInfo.getTableNames(), new HashMap<>(), new
HashMap<>());
failStageIf(Stage.stage_4);
// backup complete
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
index 54f3842f463..72867da95f1 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -163,20 +163,18 @@ public class TestIncrementalBackupWithContinuous extends
TestBackupBase {
performBulkLoad("bulkPreIncr", methodName, tableName1);
expectedRowCount += ROWS_IN_BULK_LOAD;
assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
- assertEquals(1,
systemTable.readBulkloadRows(List.of(tableName1)).size());
+ assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
loadTable(TEST_UTIL.getConnection().getTable(tableName1));
Thread.sleep(15000);
performBulkLoad("bulkPostIncr", methodName, tableName1);
- assertEquals(2,
systemTable.readBulkloadRows(List.of(tableName1)).size());
+ assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
// Incremental backup
String backup2 =
backupTables(BackupType.INCREMENTAL, List.of(tableName1),
BACKUP_ROOT_DIR, true);
assertTrue(checkSucceeded(backup2));
-
- // bulkPostIncr Bulkload entry should not be deleted post incremental
backup
- assertEquals(1,
systemTable.readBulkloadRows(List.of(tableName1)).size());
+ assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
TEST_UTIL.truncateTable(tableName1);
// Restore incremental backup