This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-28957 by this push:
     new 51934cf345a HBASE-29656 Scan WALs to identify bulkload operations for 
incremental backup (#7400)
51934cf345a is described below

commit 51934cf345ac6c0ccdac53b16ac62bd795d5e940
Author: asolomon <[email protected]>
AuthorDate: Tue Oct 28 01:55:24 2025 +0700

    HBASE-29656 Scan WALs to identify bulkload operations for incremental 
backup (#7400)
    
    * Scan WALs to identify bulkload operations for incremental backup
    
    * Update unit test
    
    * Info log
    
    * Minor test fix
    
    * Address review comments
    
    * Spotless apply
    
    * Addressed review comment
    
    * spotless
    
    * Remove log
    
    * Retrigger CI
    
    ---------
    
    Co-authored-by: Ankit Solomon <[email protected]>
---
 .../apache/hadoop/hbase/backup/BackupObserver.java |  10 +-
 .../backup/impl/AbstractPitrRestoreHandler.java    |  72 +------
 .../backup/impl/IncrementalTableBackupClient.java  | 214 +++++++++------------
 .../backup/mapreduce/BulkLoadCollectorJob.java     |   2 +-
 .../hadoop/hbase/backup/util/BackupUtils.java      |  73 +++++++
 .../apache/hadoop/hbase/backup/TestBackupBase.java |   4 +-
 .../TestIncrementalBackupWithContinuous.java       |   8 +-
 7 files changed, 182 insertions(+), 201 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
index 392e2771091..c506d6dc6ae 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java
@@ -96,12 +96,18 @@ public class BackupObserver implements RegionCoprocessor, 
RegionObserver {
     try (Connection connection = ConnectionFactory.createConnection(cfg);
       BackupSystemTable tbl = new BackupSystemTable(connection)) {
       Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();
+      Map<TableName, Long> continuousBackupTableSet = 
tbl.getContinuousBackupTableSet();
 
-      if (fullyBackedUpTables.contains(tableName)) {
+      // Tables in continuousBackupTableSet do not rely on BackupSystemTable 
but rather
+      // scan on WAL backup directory to identify bulkload operation 
HBASE-29656
+      if (
+        fullyBackedUpTables.contains(tableName) && 
!continuousBackupTableSet.containsKey(tableName)
+      ) {
         tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), 
cfToHFilePaths);
       } else {
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Table {} has not gone through full backup - skipping.", 
tableName);
+          LOG.trace("Table {} has either not gone through full backup or is "
+            + "part of continuousBackupTableSet - skipping", tableName);
         }
       }
     }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
index ce6c4d4dc68..3f31255d60f 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -20,25 +20,16 @@ package org.apache.hadoop.hbase.backup.impl;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
-import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
-import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
-import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
 
 import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
@@ -47,7 +38,6 @@ import 
org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
 import org.apache.hadoop.hbase.backup.RestoreJob;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.backup.util.BulkFilesCollector;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
@@ -342,8 +332,8 @@ public abstract class AbstractPitrRestoreHandler {
 
     RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
 
-    List<Path> bulkloadFiles =
-      collectBulkFiles(sourceTable, targetTable, startTime, endTime, new 
Path(restoreRootDir));
+    List<Path> bulkloadFiles = BackupUtils.collectBulkFiles(conn, sourceTable, 
targetTable,
+      startTime, endTime, new Path(restoreRootDir), new ArrayList<String>());
 
     if (bulkloadFiles.isEmpty()) {
       LOG.info("No bulk-load files found for {} in time range {}-{}. Skipping 
bulkload restore.",
@@ -380,7 +370,7 @@ public abstract class AbstractPitrRestoreHandler {
       sourceTable, targetTable, startTime, endTime, walDirPath);
 
     List<String> validDirs =
-      getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
+      BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, 
startTime, endTime);
     if (validDirs.isEmpty()) {
       LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL 
replay.", startTime,
         endTime);
@@ -390,62 +380,6 @@ public abstract class AbstractPitrRestoreHandler {
     executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
   }
 
-  private List<Path> collectBulkFiles(TableName sourceTable, TableName 
targetTable, long startTime,
-    long endTime, Path restoreRootDir) throws IOException {
-
-    String walBackupDir = 
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
-    Path walDirPath = new Path(walBackupDir);
-    LOG.info(
-      "Starting WAL bulk-file collection for source: {}, target: {}, time 
range: {} - {}, WAL backup dir: {}, restore root: {}",
-      sourceTable, targetTable, startTime, endTime, walDirPath, 
restoreRootDir);
-
-    List<String> validDirs =
-      getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
-    if (validDirs.isEmpty()) {
-      LOG.warn("No valid WAL directories found for range {} - {}. Skipping 
bulk-file collection.",
-        startTime, endTime);
-      return Collections.emptyList();
-    }
-
-    String walDirsCsv = String.join(",", validDirs);
-
-    return 
BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()),
-      walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, 
endTime);
-  }
-
-  /**
-   * Fetches valid WAL directories based on the given time range.
-   */
-  private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, 
long startTime,
-    long endTime) throws IOException {
-    FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
-    FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, 
WALS_DIR));
-
-    List<String> validDirs = new ArrayList<>();
-    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
-
-    for (FileStatus dayDir : dayDirs) {
-      if (!dayDir.isDirectory()) {
-        continue; // Skip files, only process directories
-      }
-
-      String dirName = dayDir.getPath().getName();
-      try {
-        Date dirDate = dateFormat.parse(dirName);
-        long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
-        long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End 
time of day (23:59:59)
-
-        // Check if this day's WAL files overlap with the required time range
-        if (dirEndTime >= startTime && dirStartTime <= endTime) {
-          validDirs.add(dayDir.getPath().toString());
-        }
-      } catch (ParseException e) {
-        LOG.warn("Skipping invalid directory name: {}", dirName, e);
-      }
-    }
-    return validDirs;
-  }
-
   /**
    * Executes WAL replay using WALPlayer.
    */
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index c2aa0aa17fd..1bd3621b294 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -19,27 +19,19 @@ package org.apache.hadoop.hbase.backup.impl;
 
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
-import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
-import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
-import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TimeZone;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -86,6 +78,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
 @InterfaceAudience.Private
 public class IncrementalTableBackupClient extends TableBackupClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalTableBackupClient.class);
+  private static final String BULKLOAD_COLLECTOR_OUTPUT = 
"bulkload-collector-output";
 
   protected IncrementalTableBackupClient() {
   }
@@ -137,89 +130,89 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
    * the backup is marked as complete.
    * @param tablesToBackup list of tables to be backed up
    */
-  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) 
throws IOException {
+  protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup,
+    Map<TableName, List<String>> tablesToWALFileList, Map<TableName, Long> 
tablesToPrevBackupTs)
+    throws IOException {
     Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>();
-    List<BulkLoad> bulkLoads;
-    if (backupInfo.isContinuousBackupEnabled()) {
-      bulkLoads =
-        backupManager.readBulkloadRows(tablesToBackup, 
backupInfo.getIncrCommittedWalTs());
-    } else {
-      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
-    }
+    List<BulkLoad> bulkLoads = new ArrayList<>();
+
     FileSystem tgtFs;
     try {
       tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
     } catch (URISyntaxException use) {
       throw new IOException("Unable to get FileSystem", use);
     }
+
     Path rootdir = CommonFSUtils.getRootDir(conf);
     Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
 
-    for (BulkLoad bulkLoad : bulkLoads) {
-      TableName srcTable = bulkLoad.getTableName();
-      MergeSplitBulkloadInfo bulkloadInfo =
-        toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
-      String regionName = bulkLoad.getRegion();
-      String fam = bulkLoad.getColumnFamily();
-      String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+    if (!backupInfo.isContinuousBackupEnabled()) {
+      bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+      for (BulkLoad bulkLoad : bulkLoads) {
+        TableName srcTable = bulkLoad.getTableName();
+        if (!tablesToBackup.contains(srcTable)) {
+          LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
+          continue;
+        }
+
+        MergeSplitBulkloadInfo bulkloadInfo =
+          toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new);
+        String regionName = bulkLoad.getRegion();
+        String fam = bulkLoad.getColumnFamily();
+        String filename = FilenameUtils.getName(bulkLoad.getHfilePath());
+        Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
+        Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
+        String srcTableQualifier = srcTable.getQualifierAsString();
+        String srcTableNs = srcTable.getNamespaceAsString();
+        Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
+          + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
+        if (!tgtFs.mkdirs(tgtFam)) {
+          throw new IOException("couldn't create " + tgtFam);
+        }
 
-      if (!tablesToBackup.contains(srcTable)) {
-        LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable);
-        continue;
-      }
-      Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable);
-      Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
-
-      // For continuous backup: bulkload files are copied from backup 
directory defined by
-      // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster.
-      String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
-      if (backupInfo.isContinuousBackupEnabled() && 
!Strings.isNullOrEmpty(backupRootDir)) {
-        String dayDirectoryName = 
BackupUtils.formatToDateString(bulkLoad.getTimestamp());
-        Path bulkLoadBackupPath =
-          new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + 
dayDirectoryName);
-        Path bulkLoadDir = new Path(bulkLoadBackupPath,
-          srcTable.getNamespaceAsString() + Path.SEPARATOR + 
srcTable.getNameAsString());
-        FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf);
-        Path fullBulkLoadBackupPath =
-          new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + 
Path.SEPARATOR + filename);
-        if (backupFs.exists(fullBulkLoadBackupPath)) {
-          LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath);
-          p = fullBulkLoadBackupPath;
-        } else {
-          LOG.warn("Backup bulkload file not found {}", 
fullBulkLoadBackupPath);
+        Path tgt = new Path(tgtFam, filename);
+        Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
+        Path archive = new Path(archiveDir, filename);
+
+        if (fs.exists(p)) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("found bulk hfile {} in {} for {}", 
bulkLoad.getHfilePath(), p.getParent(),
+              srcTableQualifier);
+            LOG.trace("copying {} to {}", p, tgt);
+          }
+          bulkloadInfo.addActiveFile(p.toString());
+        } else if (fs.exists(archive)) {
+          LOG.debug("copying archive {} to {}", archive, tgt);
+          bulkloadInfo.addArchiveFiles(archive.toString());
         }
       }
 
-      String srcTableQualifier = srcTable.getQualifierAsString();
-      String srcTableNs = srcTable.getNamespaceAsString();
-      Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + 
srcTableQualifier
-        + Path.SEPARATOR + regionName + Path.SEPARATOR + fam);
-      if (!tgtFs.mkdirs(tgtFam)) {
-        throw new IOException("couldn't create " + tgtFam);
+      for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
+        mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
+          bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
       }
-      Path tgt = new Path(tgtFam, filename);
+    } else {
+      // Continuous incremental backup: run BulkLoadCollectorJob over 
backed-up WALs
+      Path collectorOutput = new Path(getBulkOutputDir(), 
BULKLOAD_COLLECTOR_OUTPUT);
+      for (TableName table : tablesToBackup) {
+        long startTs = tablesToPrevBackupTs.getOrDefault(table, 0L);
+        long endTs = backupInfo.getIncrCommittedWalTs();
+        List<String> walDirs = tablesToWALFileList.getOrDefault(table, new 
ArrayList<String>());
 
-      Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, 
regionName, fam);
-      Path archive = new Path(archiveDir, filename);
+        List<Path> bulkloadPaths = BackupUtils.collectBulkFiles(conn, table, 
table, startTs, endTs,
+          collectorOutput, walDirs);
 
-      if (fs.exists(p)) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("found bulk hfile {} in {} for {}", 
bulkLoad.getHfilePath(), p.getParent(),
-            srcTableQualifier);
-          LOG.trace("copying {} to {}", p, tgt);
+        List<String> bulkLoadFiles =
+          
bulkloadPaths.stream().map(Path::toString).collect(Collectors.toList());
+
+        if (bulkLoadFiles.isEmpty()) {
+          LOG.info("No bulk-load files found for table {}", table);
+          continue;
         }
-        bulkloadInfo.addActiveFile(p.toString());
-      } else if (fs.exists(archive)) {
-        LOG.debug("copying archive {} to {}", archive, tgt);
-        bulkloadInfo.addArchiveFiles(archive.toString());
-      }
-    }
 
-    for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) {
-      mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(),
-        bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs);
+        mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs);
+      }
     }
-
     return bulkLoads;
   }
 
@@ -306,11 +299,20 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
    */
   @Override
   public void execute() throws IOException, ColumnFamilyMismatchException {
+    // tablesToWALFileList and tablesToPrevBackupTs are needed for 
"continuous" Incremental backup
+    Map<TableName, List<String>> tablesToWALFileList = new HashMap<>();
+    Map<TableName, Long> tablesToPrevBackupTs = new HashMap<>();
     try {
       Map<TableName, String> tablesToFullBackupIds = getFullBackupIds();
       verifyCfCompatibility(backupInfo.getTables(), tablesToFullBackupIds);
 
       // case PREPARE_INCREMENTAL:
+      if (backupInfo.isContinuousBackupEnabled()) {
+        // committedWALsTs is needed only for Incremental backups with 
continuous backup
+        // since these do not depend on log roll ts
+        long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn);
+        backupInfo.setIncrCommittedWalTs(committedWALsTs);
+      }
       beginBackup(backupManager, backupInfo);
       backupInfo.setPhase(BackupPhase.PREPARE_INCREMENTAL);
       // Non-continuous Backup incremental backup is controlled by 
'incremental backup table set'
@@ -339,7 +341,7 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
       BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
       setupRegionLocator();
       // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
-      convertWALsToHFiles();
+      convertWALsToHFiles(tablesToWALFileList, tablesToPrevBackupTs);
       incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
         backupInfo.getBackupRootDir());
     } catch (Exception e) {
@@ -371,7 +373,8 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
         backupManager.writeBackupStartCode(newStartCode);
       }
 
-      List<BulkLoad> bulkLoads = handleBulkLoad(backupInfo.getTableNames());
+      List<BulkLoad> bulkLoads =
+        handleBulkLoad(backupInfo.getTableNames(), tablesToWALFileList, 
tablesToPrevBackupTs);
 
       // backup complete
       completeBackup(conn, backupInfo, BackupType.INCREMENTAL, conf);
@@ -425,10 +428,19 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     }
   }
 
-  protected void convertWALsToHFiles() throws IOException {
+  protected void convertWALsToHFiles(Map<TableName, List<String>> 
tablesToWALFileList,
+    Map<TableName, Long> tablesToPrevBackupTs) throws IOException {
     long previousBackupTs = 0L;
+    long currentBackupTs = 0L;
     if (backupInfo.isContinuousBackupEnabled()) {
+      String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+      if (Strings.isNullOrEmpty(walBackupDir)) {
+        throw new IOException(
+          "Incremental backup requires the WAL backup directory " + 
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+      }
+      Path walBackupPath = new Path(walBackupDir);
       Set<TableName> tableSet = backupInfo.getTables();
+      currentBackupTs = backupInfo.getIncrCommittedWalTs();
       List<BackupInfo> backupInfos = backupManager.getBackupHistory(true);
       for (TableName table : tableSet) {
         for (BackupInfo backup : backupInfos) {
@@ -442,7 +454,10 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
             } else {
               previousBackupTs = backup.getIncrCommittedWalTs();
             }
-            walBackupFileList = getBackupLogs(previousBackupTs);
+            walBackupFileList =
+              BackupUtils.getValidWalDirs(conf, walBackupPath, 
previousBackupTs, currentBackupTs);
+            tablesToWALFileList.put(table, walBackupFileList);
+            tablesToPrevBackupTs.put(table, previousBackupTs);
             walToHFiles(walBackupFileList, 
Arrays.asList(table.getNameAsString()),
               previousBackupTs);
             break;
@@ -469,47 +484,6 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     }
   }
 
-  private List<String> getBackupLogs(long startTs) throws IOException {
-    // get log files from backup dir
-    String walBackupDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
-    if (Strings.isNullOrEmpty(walBackupDir)) {
-      throw new IOException(
-        "Incremental backup requires the WAL backup directory " + 
CONF_CONTINUOUS_BACKUP_WAL_DIR);
-    }
-    List<String> resultLogFiles = new ArrayList<>();
-    Path walBackupPath = new Path(walBackupDir);
-    FileSystem backupFs = FileSystem.get(walBackupPath.toUri(), conf);
-    FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, 
WALS_DIR));
-    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
-    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-
-    for (FileStatus dayDir : dayDirs) {
-      if (!dayDir.isDirectory()) {
-        continue; // Skip files, only process directories
-      }
-
-      String dirName = dayDir.getPath().getName();
-      try {
-        Date dirDate = dateFormat.parse(dirName);
-        long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
-        long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End 
time of day (23:59:59)
-
-        if (dirEndTime >= startTs) {
-          Path dirPath = dayDir.getPath();
-          FileStatus[] logs = backupFs.listStatus(dirPath);
-          for (FileStatus log : logs) {
-            String filepath = log.getPath().toString();
-            LOG.debug("Found WAL file: {}", filepath);
-            resultLogFiles.add(filepath);
-          }
-        }
-      } catch (ParseException e) {
-        LOG.warn("Skipping invalid directory name: " + dirName, e);
-      }
-    }
-    return resultLogFiles;
-  }
-
   protected boolean tableExists(TableName table, Connection conn) throws 
IOException {
     try (Admin admin = conn.getAdmin()) {
       return admin.tableExists(table);
@@ -533,11 +507,7 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     conf.set(JOB_NAME_CONF_KEY, jobname);
     if (backupInfo.isContinuousBackupEnabled()) {
       conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs));
-      // committedWALsTs is needed only for Incremental backups with 
continuous backup
-      // since these do not depend on log roll ts
-      long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn);
-      backupInfo.setIncrCommittedWalTs(committedWALsTs);
-      conf.set(WALInputFormat.END_TIME_KEY, Long.toString(committedWALsTs));
+      conf.set(WALInputFormat.END_TIME_KEY, 
Long.toString(backupInfo.getIncrCommittedWalTs()));
     }
     String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
 
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
index b752c7f78e0..cf19d262221 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
@@ -75,7 +75,7 @@ public class BulkLoadCollectorJob extends Configured 
implements Tool {
   public BulkLoadCollectorJob() {
   }
 
-  protected BulkLoadCollectorJob(final Configuration c) {
+  public BulkLoadCollectorJob(final Configuration c) {
     super(c);
   }
 
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index bf309104775..28bbfcf254a 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.hbase.backup.util;
 
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
 import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URLDecoder;
+import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -46,6 +50,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.ServerName;
@@ -78,6 +83,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
 
@@ -945,4 +951,71 @@ public final class BackupUtils {
     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
     return dateFormat.format(new Date(dayInMillis));
   }
+
+  /**
+   * Fetches bulkload filepaths based on the given time range from backup WAL 
directory.
+   */
+  public static List<Path> collectBulkFiles(Connection conn, TableName 
sourceTable,
+    TableName targetTable, long startTime, long endTime, Path restoreRootDir, 
List<String> walDirs)
+    throws IOException {
+
+    if (walDirs.isEmpty()) {
+      String walBackupDir = 
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+      if (Strings.isNullOrEmpty(walBackupDir)) {
+        throw new IOException(
+          "WAL backup directory is not configured " + 
CONF_CONTINUOUS_BACKUP_WAL_DIR);
+      }
+      Path walDirPath = new Path(walBackupDir);
+      walDirs =
+        BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, 
startTime, endTime);
+    }
+
+    if (walDirs.isEmpty()) {
+      LOG.warn("No valid WAL directories found for range {} - {}. Skipping 
bulk-file collection.",
+        startTime, endTime);
+      return Collections.emptyList();
+    }
+
+    LOG.info(
+      "Starting WAL bulk-file collection for source: {}, target: {}, time 
range: {} - {}, WAL "
+        + "backup dir: {}, restore root: {}",
+      sourceTable, targetTable, startTime, endTime, walDirs, restoreRootDir);
+    String walDirsCsv = String.join(",", walDirs);
+
+    return 
BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()),
+      walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, 
endTime);
+  }
+
+  /**
+   * Fetches valid WAL directories based on the given time range.
+   */
+  public static List<String> getValidWalDirs(Configuration conf, Path 
walBackupDir, long startTime,
+    long endTime) throws IOException {
+    FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
+    FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, 
WALS_DIR));
+
+    List<String> validDirs = new ArrayList<>();
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+    for (FileStatus dayDir : dayDirs) {
+      if (!dayDir.isDirectory()) {
+        continue; // Skip files, only process directories
+      }
+
+      String dirName = dayDir.getPath().getName();
+      try {
+        Date dirDate = dateFormat.parse(dirName);
+        long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
+        long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End 
time of day (23:59:59)
+
+        // Check if this day's WAL files overlap with the required time range
+        if (dirEndTime >= startTime && dirStartTime <= endTime) {
+          validDirs.add(dayDir.getPath().toString());
+        }
+      } catch (ParseException e) {
+        LOG.warn("Skipping invalid directory name: {}", dirName, e);
+      }
+    }
+    return validDirs;
+  }
 }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 159514bd45b..e32e1b8f920 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -175,7 +175,7 @@ public class TestBackupBase {
         // copy out the table and region info files for each table
         BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
         // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
-        convertWALsToHFiles();
+        convertWALsToHFiles(new HashMap<>(), new HashMap<>());
         incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
           backupInfo.getBackupRootDir());
         failStageIf(Stage.stage_2);
@@ -200,7 +200,7 @@ public class TestBackupBase {
           
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
         backupManager.writeBackupStartCode(newStartCode);
 
-        handleBulkLoad(backupInfo.getTableNames());
+        handleBulkLoad(backupInfo.getTableNames(), new HashMap<>(), new 
HashMap<>());
         failStageIf(Stage.stage_4);
 
         // backup complete
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
index 54f3842f463..72867da95f1 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -163,20 +163,18 @@ public class TestIncrementalBackupWithContinuous extends 
TestBackupBase {
       performBulkLoad("bulkPreIncr", methodName, tableName1);
       expectedRowCount += ROWS_IN_BULK_LOAD;
       assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
-      assertEquals(1, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
+      assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
       loadTable(TEST_UTIL.getConnection().getTable(tableName1));
       Thread.sleep(15000);
 
       performBulkLoad("bulkPostIncr", methodName, tableName1);
-      assertEquals(2, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
+      assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
 
       // Incremental backup
       String backup2 =
         backupTables(BackupType.INCREMENTAL, List.of(tableName1), 
BACKUP_ROOT_DIR, true);
       assertTrue(checkSucceeded(backup2));
-
-      // bulkPostIncr Bulkload entry should not be deleted post incremental 
backup
-      assertEquals(1, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
+      assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
 
       TEST_UTIL.truncateTable(tableName1);
       // Restore incremental backup

Reply via email to