This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957_rebased
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 26f51a05430732b31f5ccb18d7ee60148841bc04
Author: vinayak hegde <[email protected]>
AuthorDate: Thu Sep 25 22:27:31 2025 +0530

    HBASE-29521: Update Restore Command to Handle Bulkloaded Files (#7300)
    
    Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
    Signed-off-by: Andor Molnár [email protected]
    Reviewed by: Kevin Geiszler <[email protected]>
    Reviewed by: Kota-SH <[email protected]>
---
 .../hbase/backup/PointInTimeRestoreDriver.java     |  13 +-
 .../hbase/backup/PointInTimeRestoreRequest.java    |  24 ++
 .../backup/impl/AbstractPitrRestoreHandler.java    |  88 ++++-
 .../hadoop/hbase/backup/impl/BackupCommands.java   |   2 +-
 .../backup/impl/IncrementalTableBackupClient.java  |   4 +-
 .../backup/mapreduce/BulkLoadCollectorJob.java     | 399 +++++++++++++++++++++
 .../backup/mapreduce/MapReduceRestoreJob.java      |   9 +-
 .../replication/BackupFileSystemManager.java       |  71 ----
 .../ContinuousBackupReplicationEndpoint.java       |   2 +
 .../hbase/backup/util/BackupFileSystemManager.java | 154 ++++++++
 .../hbase/backup/util/BulkFilesCollector.java      | 226 ++++++++++++
 .../{replication => util}/BulkLoadProcessor.java   |  50 ++-
 .../apache/hadoop/hbase/backup/PITRTestUtil.java   |  36 ++
 .../hbase/backup/TestBackupDeleteWithCleanup.java  |   6 +-
 .../hbase/backup/TestPointInTimeRestore.java       |  26 +-
 .../hbase/backup/impl/TestBackupCommands.java      |   4 +-
 .../backup/mapreduce/TestBulkLoadCollectorJob.java | 220 ++++++++++++
 .../TestBulkLoadCollectorJobIntegration.java       | 252 +++++++++++++
 .../TestContinuousBackupReplicationEndpoint.java   |   5 +-
 .../backup/util/TestBackupFileSystemManager.java   | 164 +++++++++
 .../TestBulkLoadProcessor.java                     |  68 +++-
 .../hadoop/hbase/mapreduce/WALInputFormat.java     |   2 +-
 22 files changed, 1714 insertions(+), 111 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
index abdf52f1430..19159eeba92 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreDriver.java
@@ -27,6 +27,7 @@ import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_TO_DA
 
 import java.net.URI;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
@@ -101,9 +102,15 @@ public class PointInTimeRestoreDriver extends 
AbstractRestoreDriver {
         return -5;
       }
 
-      PointInTimeRestoreRequest pointInTimeRestoreRequest = new 
PointInTimeRestoreRequest.Builder()
-        
.withBackupRootDir(backupRootDir).withCheck(check).withFromTables(fromTables)
-        
.withToTables(toTables).withOverwrite(isOverwrite).withToDateTime(endTime).build();
+      // TODO: Currently hardcoding keepOriginalSplits=false and 
restoreRootDir via tmp dir.
+      // These should come from user input (same issue exists in normal 
restore).
+      // Expose them as configurable options in future.
+      PointInTimeRestoreRequest pointInTimeRestoreRequest =
+        new 
PointInTimeRestoreRequest.Builder().withBackupRootDir(backupRootDir).withCheck(check)
+          
.withFromTables(fromTables).withToTables(toTables).withOverwrite(isOverwrite)
+          
.withToDateTime(endTime).withKeepOriginalSplits(false).withRestoreRootDir(
+            BackupUtils.getTmpRestoreOutputDir(FileSystem.get(conf), 
conf).toString())
+          .build();
 
       client.pointInTimeRestore(pointInTimeRestoreRequest);
     } catch (Exception e) {
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
index f2462a1cfd1..d7f69c05b68 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/PointInTimeRestoreRequest.java
@@ -27,25 +27,33 @@ import org.apache.yetus.audience.InterfaceAudience;
 public final class PointInTimeRestoreRequest {
 
   private final String backupRootDir;
+  private final String restoreRootDir;
   private final boolean check;
   private final TableName[] fromTables;
   private final TableName[] toTables;
   private final boolean overwrite;
   private final long toDateTime;
+  private final boolean isKeepOriginalSplits;
 
   private PointInTimeRestoreRequest(Builder builder) {
     this.backupRootDir = builder.backupRootDir;
+    this.restoreRootDir = builder.restoreRootDir;
     this.check = builder.check;
     this.fromTables = builder.fromTables;
     this.toTables = builder.toTables;
     this.overwrite = builder.overwrite;
     this.toDateTime = builder.toDateTime;
+    this.isKeepOriginalSplits = builder.isKeepOriginalSplits;
   }
 
   public String getBackupRootDir() {
     return backupRootDir;
   }
 
+  public String getRestoreRootDir() {
+    return restoreRootDir;
+  }
+
   public boolean isCheck() {
     return check;
   }
@@ -66,19 +74,30 @@ public final class PointInTimeRestoreRequest {
     return toDateTime;
   }
 
+  public boolean isKeepOriginalSplits() {
+    return isKeepOriginalSplits;
+  }
+
   public static class Builder {
     private String backupRootDir;
+    private String restoreRootDir;
     private boolean check = false;
     private TableName[] fromTables;
     private TableName[] toTables;
     private boolean overwrite = false;
     private long toDateTime;
+    private boolean isKeepOriginalSplits;
 
     public Builder withBackupRootDir(String backupRootDir) {
       this.backupRootDir = backupRootDir;
       return this;
     }
 
+    public Builder withRestoreRootDir(String restoreRootDir) {
+      this.restoreRootDir = restoreRootDir;
+      return this;
+    }
+
     public Builder withCheck(boolean check) {
       this.check = check;
       return this;
@@ -104,6 +123,11 @@ public final class PointInTimeRestoreRequest {
       return this;
     }
 
+    public Builder withKeepOriginalSplits(boolean isKeepOriginalSplits) {
+      this.isKeepOriginalSplits = isKeepOriginalSplits;
+      return this;
+    }
+
     public PointInTimeRestoreRequest build() {
       return new PointInTimeRestoreRequest(this);
     }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
index d7ffbb58093..ce6c4d4dc68 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.backup.impl;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
 import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
 
@@ -30,6 +30,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -41,9 +42,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
 import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
+import org.apache.hadoop.hbase.backup.RestoreJob;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.backup.util.BulkFilesCollector;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
@@ -305,6 +309,63 @@ public abstract class AbstractPitrRestoreHandler {
 
     backupAdmin.restore(restoreRequest);
     replayWal(sourceTable, targetTable, backupMetadata.getStartTs(), endTime);
+
+    reBulkloadFiles(sourceTable, targetTable, backupMetadata.getStartTs(), 
endTime,
+      request.isKeepOriginalSplits(), request.getRestoreRootDir());
+  }
+
+  /**
+   * Re-applies/re-bulkloads store files discovered from WALs into the target 
table.
+   * <p>
+   * <b>Note:</b> this method re-uses the same {@link RestoreJob} MapReduce 
job that we originally
+   * implemented for performing full and incremental backup restores. The MR 
job (obtained via
+   * {@link BackupRestoreFactory#getRestoreJob(Configuration)}) is used here 
to perform an HFile
+   * bulk-load of the discovered store files into {@code targetTable}.
+   * @param sourceTable        source table name (used for locating bulk files 
and logging)
+   * @param targetTable        destination table to bulk-load the HFiles into
+   * @param startTime          start of WAL range (ms)
+   * @param endTime            end of WAL range (ms)
+   * @param keepOriginalSplits pass-through flag to control whether original 
region splits are
+   *                           preserved
+   * @param restoreRootDir     local/DFS path under which temporary and output 
dirs are created
+   * @throws IOException on IO or job failure
+   */
+  private void reBulkloadFiles(TableName sourceTable, TableName targetTable, 
long startTime,
+    long endTime, boolean keepOriginalSplits, String restoreRootDir) throws 
IOException {
+
+    Configuration conf = HBaseConfiguration.create(conn.getConfiguration());
+    conf.setBoolean(RestoreJob.KEEP_ORIGINAL_SPLITS_KEY, keepOriginalSplits);
+
+    String walBackupDir = 
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    Path walDirPath = new Path(walBackupDir);
+    conf.set(RestoreJob.BACKUP_ROOT_PATH_KEY, walDirPath.toString());
+
+    RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
+
+    List<Path> bulkloadFiles =
+      collectBulkFiles(sourceTable, targetTable, startTime, endTime, new 
Path(restoreRootDir));
+
+    if (bulkloadFiles.isEmpty()) {
+      LOG.info("No bulk-load files found for {} in time range {}-{}. Skipping 
bulkload restore.",
+        sourceTable, startTime, endTime);
+      return;
+    }
+
+    Path[] pathsArray = bulkloadFiles.toArray(new Path[0]);
+
+    try {
+      // Use the existing RestoreJob MR job (the same MapReduce job used for 
full/incremental
+      // restores)
+      // to perform the HFile bulk-load of the discovered store files into 
`targetTable`.
+      restoreService.run(pathsArray, new TableName[] { sourceTable }, new 
Path(restoreRootDir),
+        new TableName[] { targetTable }, false);
+      LOG.info("Re-bulkload completed for {}", targetTable);
+    } catch (Exception e) {
+      String errorMessage =
+        String.format("Re-bulkload failed for %s: %s", targetTable, 
e.getMessage());
+      LOG.error(errorMessage, e);
+      throw new IOException(errorMessage, e);
+    }
   }
 
   /**
@@ -329,6 +390,29 @@ public abstract class AbstractPitrRestoreHandler {
     executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
   }
 
+  private List<Path> collectBulkFiles(TableName sourceTable, TableName 
targetTable, long startTime,
+    long endTime, Path restoreRootDir) throws IOException {
+
+    String walBackupDir = 
conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+    Path walDirPath = new Path(walBackupDir);
+    LOG.info(
+      "Starting WAL bulk-file collection for source: {}, target: {}, time 
range: {} - {}, WAL backup dir: {}, restore root: {}",
+      sourceTable, targetTable, startTime, endTime, walDirPath, 
restoreRootDir);
+
+    List<String> validDirs =
+      getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
+    if (validDirs.isEmpty()) {
+      LOG.warn("No valid WAL directories found for range {} - {}. Skipping 
bulk-file collection.",
+        startTime, endTime);
+      return Collections.emptyList();
+    }
+
+    String walDirsCsv = String.join(",", validDirs);
+
+    return 
BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()),
+      walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, 
endTime);
+  }
+
   /**
    * Fetches valid WAL directories based on the given time range.
    */
@@ -356,7 +440,7 @@ public abstract class AbstractPitrRestoreHandler {
           validDirs.add(dayDir.getPath().toString());
         }
       } catch (ParseException e) {
-        LOG.warn("Skipping invalid directory name: " + dirName, e);
+        LOG.warn("Skipping invalid directory name: {}", dirName, e);
       }
     }
     return validDirs;
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index a30530a98fc..f70bf627d17 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -81,7 +81,7 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
 import org.apache.hadoop.hbase.backup.util.BackupSet;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Admin;
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 8a893994616..a78f6c929c4 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.backup.impl;
 
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
 import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 
 import java.io.IOException;
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
new file mode 100644
index 00000000000..b752c7f78e0
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.TABLES_KEY;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.TABLE_MAP_KEY;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
+import org.apache.hadoop.hbase.backup.util.BulkLoadProcessor;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MapReduce job that scans WAL backups and extracts referenced bulk-load 
store-file paths.
+ * <p>
+ * This job is intended to be used when you want a list of HFiles / 
store-files referenced by WAL
+ * bulk-load descriptors. It emits a de-duplicated list of full paths (one per 
line) by default
+ * using the {@link DedupReducer}.
+ * </p>
+ * <p>
+ * Usage (CLI):
+ * {@code BulkLoadCollector <WAL inputdir> <bulk-files-output-dir> [<tables> 
[<tableMappings>]]}
+ * </p>
+ */
[email protected]
+public class BulkLoadCollectorJob extends Configured implements Tool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkLoadCollectorJob.class);
+
+  public static final String NAME = "BulkLoadCollector";
+  public static final String DEFAULT_REDUCERS = "1";
+
+  public BulkLoadCollectorJob() {
+  }
+
+  protected BulkLoadCollectorJob(final Configuration c) {
+    super(c);
+  }
+
+  /**
+   * Mapper that extracts relative bulk-load paths from a WAL entry (via 
{@code BulkLoadProcessor}),
+   * resolves them to full paths (via
+   * {@code BackupFileSystemManager#resolveBulkLoadFullPath(Path, Path)}), and 
emits each full path
+   * as the map key (Text). Uses the same table-filtering semantics as other 
WAL mappers: if no
+   * tables are configured, all tables are processed; otherwise only the 
configured table set is
+   * processed. Map output: (Text fullPathString, NullWritable)
+   */
+  public static class BulkLoadCollectorMapper extends Mapper<WALKey, WALEdit, 
Text, NullWritable> {
+    private final Map<TableName, TableName> tables = new TreeMap<>();
+    private final Text out = new Text();
+
+    @Override
+    protected void map(WALKey key, WALEdit value, Context context)
+      throws IOException, InterruptedException {
+      if (key == null) {
+        if (LOG.isTraceEnabled()) LOG.trace("map: received null WALKey, 
skipping");
+        return;
+      }
+      if (value == null) {
+        if (LOG.isTraceEnabled())
+          LOG.trace("map: received null WALEdit for table={}, skipping", 
safeTable(key));
+        return;
+      }
+
+      TableName tname = key.getTableName();
+
+      // table filtering
+      if (!(tables.isEmpty() || tables.containsKey(tname))) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("map: skipping table={} because it is not in configured 
table list", tname);
+        }
+        return;
+      }
+
+      // Extract relative store-file paths referenced by this WALEdit.
+      // Delegates parsing to BulkLoadProcessor so parsing logic is 
centralized.
+      List<Path> relativePaths = BulkLoadProcessor.processBulkLoadFiles(key, 
value);
+      if (relativePaths.isEmpty()) return;
+
+      // Determine WAL input path for this split (used to compute date/prefix 
for full path)
+      Path walInputPath;
+      try {
+        walInputPath =
+          new Path(((WALInputFormat.WALSplit) 
context.getInputSplit()).getLogFileName());
+      } catch (ClassCastException cce) {
+        String splitClass =
+          (context.getInputSplit() == null) ? "null" : 
context.getInputSplit().getClass().getName();
+        LOG.warn(
+          "map: unexpected InputSplit type (not WALSplit) - cannot determine 
WAL input path; context.getInputSplit() class={}",
+          splitClass, cce);
+        throw new IOException("Unexpected InputSplit type: expected WALSplit 
but got " + splitClass,
+          cce);
+      }
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("map: walInputPath={} table={} relativePathsCount={}", 
walInputPath, tname,
+          relativePaths.size());
+      }
+
+      // Build full path for each relative path and emit it.
+      for (Path rel : relativePaths) {
+        Path full = 
BackupFileSystemManager.resolveBulkLoadFullPath(walInputPath, rel);
+        out.set(full.toString());
+        context.write(out, NullWritable.get());
+        context.getCounter("BulkCollector", "StoreFilesEmitted").increment(1);
+      }
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException {
+      String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
+      String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
+      if (tableMap == null) {
+        tableMap = tablesToUse;
+      }
+      if (tablesToUse == null) {
+        // user requested all tables; tables map remains empty to indicate 
"all"
+        return;
+      }
+
+      if (tablesToUse.length != tableMap.length) {
+        throw new IOException("Incorrect table mapping specified.");
+      }
+
+      int i = 0;
+      for (String table : tablesToUse) {
+        TableName from = TableName.valueOf(table);
+        TableName to = TableName.valueOf(tableMap[i++]);
+        tables.put(from, to);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("setup: configuring mapping {} -> {}", from, to);
+        }
+      }
+    }
+
+    private String safeTable(WALKey key) {
+      try {
+        return key == null ? "<null>" : key.getTableName().toString();
+      } catch (Exception e) {
+        return "<error>";
+      }
+    }
+  }
+
+  /**
+   * Reducer that deduplicates full-path keys emitted by the mappers. It 
writes each unique key
+   * exactly once. Reduce input: (Text fullPathString, Iterable<NullWritable>) 
Reduce output: (Text
+   * fullPathString, NullWritable)
+   */
+  public static class DedupReducer extends Reducer<Text, NullWritable, Text, 
NullWritable> {
+    @Override
+    protected void reduce(Text key, Iterable<NullWritable> values, Context ctx)
+      throws IOException, InterruptedException {
+      // Write the unique path once.
+      ctx.write(key, NullWritable.get());
+    }
+  }
+
+  /**
+   * Create and configure a Job instance for bulk-file collection.
+   * @param args CLI args expected to be: inputDirs bulkFilesOut [tables] 
[tableMap]
+   * @throws IOException on misconfiguration
+   */
+  public Job createSubmittableJob(String[] args) throws IOException {
+    Configuration conf = getConf();
+
+    setupTime(conf, WALInputFormat.START_TIME_KEY);
+    setupTime(conf, WALInputFormat.END_TIME_KEY);
+
+    if (args == null || args.length < 2) {
+      throw new IOException(
+        "Usage: <WAL inputdir> <bulk-files-output-dir> [<tables> 
[<tableMappings>]]");
+    }
+
+    String inputDirs = args[0];
+    String bulkFilesOut = args[1];
+
+    // tables are optional (args[2])
+    String[] tables = (args.length == 2) ? new String[] {} : 
args[2].split(",");
+    String[] tableMap;
+    if (args.length > 3) {
+      tableMap = args[3].split(",");
+      if (tableMap.length != tables.length) {
+        throw new IOException("The same number of tables and mapping must be 
provided.");
+      }
+    } else {
+      // if no mapping is specified, map each table to itself
+      tableMap = tables;
+    }
+
+    LOG.info("createSubmittableJob: inputDirs='{}' bulkFilesOut='{}' tables={} 
tableMap={}",
+      inputDirs, bulkFilesOut, String.join(",", tables), String.join(",", 
tableMap));
+
+    conf.setStrings(TABLES_KEY, tables);
+    conf.setStrings(TABLE_MAP_KEY, tableMap);
+    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
+
+    // create and return the actual Job configured for bulk-file discovery
+    return BulkLoadCollectorJob.createSubmittableJob(conf, inputDirs, 
bulkFilesOut);
+  }
+
+  /**
+   * Low-level job wiring. Creates the Job instance and sets input, mapper, 
reducer and output.
+   * @param conf         configuration used for the job
+   * @param inputDirs    WAL input directories (comma-separated)
+   * @param bulkFilesOut output directory to write discovered full-paths
+   * @throws IOException on invalid args
+   */
+  private static Job createSubmittableJob(Configuration conf, String 
inputDirs, String bulkFilesOut)
+    throws IOException {
+    if (bulkFilesOut == null || bulkFilesOut.isEmpty()) {
+      throw new IOException("bulkFilesOut (output dir) must be provided.");
+    }
+    if (inputDirs == null || inputDirs.isEmpty()) {
+      throw new IOException("inputDirs (WAL input dir) must be provided.");
+    }
+
+    Job job = Job.getInstance(conf, NAME + "_" + 
EnvironmentEdgeManager.currentTime());
+    job.setJarByClass(BulkLoadCollectorJob.class);
+
+    // Input: use same WALInputFormat used by WALPlayer so we parse WALs 
consistently
+    job.setInputFormatClass(WALInputFormat.class);
+    FileInputFormat.setInputDirRecursive(job, true);
+    FileInputFormat.setInputPaths(job, inputDirs);
+
+    // Mapper: extract and emit full bulk-load file paths (Text, NullWritable)
+    job.setMapperClass(BulkLoadCollectorMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(NullWritable.class);
+
+    // Reducer: deduplicate the full-path keys
+    job.setReducerClass(DedupReducer.class);
+    // default to a single reducer (single deduped file); callers can set 
mapreduce.job.reduces
+    int reducers = conf.getInt("mapreduce.job.reduces", 
Integer.parseInt(DEFAULT_REDUCERS));
+    job.setNumReduceTasks(reducers);
+
+    // Output: write plain text lines (one path per line)
+    job.setOutputFormatClass(TextOutputFormat.class);
+    FileOutputFormat.setOutputPath(job, new Path(bulkFilesOut));
+
+    LOG.info("createSubmittableJob: created job name='{}' reducers={}", 
job.getJobName(), reducers);
+
+    String codecCls = WALCellCodec.getWALCellCodecClass(conf).getName();
+    try {
+      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+        Class.forName(codecCls));
+    } catch (Exception e) {
+      throw new IOException("Cannot determine wal codec class " + codecCls, e);
+    }
+    return job;
+  }
+
+  /**
+   * Parse a time option. Supports the user-friendly ISO-like format
+   * {@code yyyy-MM-dd'T'HH:mm:ss.SS} or milliseconds since epoch. If the 
option is not present,
+   * this method is a no-op.
+   * @param conf   configuration containing option
+   * @param option key to read (e.g. WALInputFormat.START_TIME_KEY)
+   * @throws IOException on parse failure
+   */
+  private void setupTime(Configuration conf, String option) throws IOException 
{
+    String val = conf.get(option);
+    if (val == null) {
+      return;
+    }
+    long ms;
+    try {
+      // first try to parse in user-friendly form
+      ms = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
+    } catch (ParseException pe) {
+      try {
+        // then see if a number (milliseconds) was specified
+        ms = Long.parseLong(val);
+      } catch (NumberFormatException nfe) {
+        throw new IOException(
+          option + " must be specified either in the form 
2001-02-20T16:35:06.99 "
+            + "or as number of milliseconds");
+      }
+    }
+    conf.setLong(option, ms);
+  }
+
+  /**
+   * CLI entry point.
+   * @param args job arguments (see {@link #usage(String)})
+   * @throws Exception on job failure
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new 
BulkLoadCollectorJob(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage("Wrong number of arguments: " + args.length);
+      System.exit(-1);
+    }
+
+    Job job = createSubmittableJob(args);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+
+  /**
+   * Print usage/help for the BulkLoadCollectorJob CLI/driver.
+   * <p>
+   *
+   * <pre>
+   * args layout:
+   *   args[0] = input directory (required)
+   *   args[1] = output directory (required)
+   *   args[2] = tables (comma-separated) (optional)
+   *   args[3] = tableMappings (comma-separated) (optional; must match tables 
length)
+   * </pre>
+   */
+  private void usage(final String errorMsg) {
+    if (errorMsg != null && !errorMsg.isEmpty()) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+
+    System.err.println(
+      "Usage: " + NAME + " <WAL inputdir> <bulk-files-output-dir> [<tables> 
[<tableMappings>]]");
+    System.err.println(
+      "  <WAL inputdir>             directory of WALs to scan (comma-separated 
list accepted)");
+    System.err.println(
+      "  <bulk-files-output-dir>    directory to write discovered store-file 
paths (output)");
+    System.err.println(
+      "  <tables>                   optional comma-separated list of tables to 
include; if omitted, all tables are processed");
+    System.err.println(
+      "  <tableMappings>            optional comma-separated list of mapped 
target tables; must match number of tables");
+
+    System.err.println();
+    System.err.println("Time range options (either milliseconds or 
yyyy-MM-dd'T'HH:mm:ss.SS):");
+    System.err.println("  -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
+    System.err.println("  -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
+
+    System.err.println();
+    System.err.println("Configuration alternatives (can be provided via -D):");
+    System.err
+      .println("  -D" + TABLES_KEY + "=<comma-separated-tables>         
(alternative to arg[2])");
+    System.err
+      .println("  -D" + TABLE_MAP_KEY + "=<comma-separated-mappings>     
(alternative to arg[3])");
+    System.err.println(
+      "  -Dmapreduce.job.reduces=<N>                            (number of 
reducers; default 1)");
+    System.err.println();
+
+    System.err.println("Performance hints:");
+    System.err.println("  For large inputs consider disabling speculative 
execution:");
+    System.err
+      .println("    -Dmapreduce.map.speculative=false 
-Dmapreduce.reduce.speculative=false");
+
+    System.err.println();
+    System.err.println("Example:");
+    System.err.println(
+      "  " + NAME + " /wals/input /out/bulkfiles ns:tbl1,ns:tbl2 
ns:tbl1_mapped,ns:tbl2_mapped");
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 7a2fce4c418..4711cba4668 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -61,11 +61,10 @@ public class MapReduceRestoreJob implements RestoreJob {
     String dirs = StringUtils.join(dirPaths, ",");
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental")
-        + " backup from directory " + dirs + " from hbase tables "
-        + StringUtils.join(tableNames, 
BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND)
-        + " to tables "
-        + StringUtils.join(newTableNames, 
BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
+      LOG.debug("Restore {} from directory {} from hbase tables {} to tables 
{}",
+        fullBackupRestore ? "full backup" : "incremental backup / bulkload 
files (as part of PITR)",
+        dirs, StringUtils.join(tableNames, 
BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND),
+        StringUtils.join(newTableNames, 
BackupRestoreConstants.TABLENAME_DELIMITER_IN_COMMAND));
     }
 
     for (int i = 0; i < tableNames.length; i++) {
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
deleted file mode 100644
index 225d3217276..00000000000
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.backup.replication;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Initializes and organizes backup directories for continuous Write-Ahead 
Logs (WALs) and
- * bulk-loaded files within the specified backup root directory.
- */
[email protected]
-public class BackupFileSystemManager {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BackupFileSystemManager.class);
-
-  public static final String WALS_DIR = "WALs";
-  public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
-  private final String peerId;
-  private final FileSystem backupFs;
-  private final Path backupRootDir;
-  private final Path walsDir;
-  private final Path bulkLoadFilesDir;
-
-  public BackupFileSystemManager(String peerId, Configuration conf, String 
backupRootDirStr)
-    throws IOException {
-    this.peerId = peerId;
-    this.backupRootDir = new Path(backupRootDirStr);
-    this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
-    this.walsDir = createDirectory(WALS_DIR);
-    this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
-  }
-
-  private Path createDirectory(String dirName) throws IOException {
-    Path dirPath = new Path(backupRootDir, dirName);
-    backupFs.mkdirs(dirPath);
-    LOG.info("{} Initialized directory: {}", Utils.logPeerId(peerId), dirPath);
-    return dirPath;
-  }
-
-  public Path getWalsDir() {
-    return walsDir;
-  }
-
-  public Path getBulkLoadFilesDir() {
-    return bulkLoadFilesDir;
-  }
-
-  public FileSystem getBackupFs() {
-    return backupFs;
-  }
-}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index 19624d04c23..19cd2733af7 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -38,7 +38,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.backup.util.BulkLoadProcessor;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupFileSystemManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupFileSystemManager.java
new file mode 100644
index 00000000000..a616eb69e47
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupFileSystemManager.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.util;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.backup.replication.Utils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Initializes and organizes backup directories for continuous Write-Ahead 
Logs (WALs) and
+ * bulk-loaded files within the specified backup root directory.
+ */
[email protected]
+public class BackupFileSystemManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BackupFileSystemManager.class);
+
+  public static final String WALS_DIR = "WALs";
+  public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
+  private final String peerId;
+  private final FileSystem backupFs;
+  private final Path backupRootDir;
+  private final Path walsDir;
+  private final Path bulkLoadFilesDir;
+
+  public BackupFileSystemManager(String peerId, Configuration conf, String 
backupRootDirStr)
+    throws IOException {
+    this.peerId = peerId;
+    this.backupRootDir = new Path(backupRootDirStr);
+    this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
+    this.walsDir = createDirectory(WALS_DIR);
+    this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
+  }
+
+  private Path createDirectory(String dirName) throws IOException {
+    Path dirPath = new Path(backupRootDir, dirName);
+    backupFs.mkdirs(dirPath);
+    LOG.info("{} Initialized directory: {}", Utils.logPeerId(peerId), dirPath);
+    return dirPath;
+  }
+
+  public Path getWalsDir() {
+    return walsDir;
+  }
+
+  public Path getBulkLoadFilesDir() {
+    return bulkLoadFilesDir;
+  }
+
+  public FileSystem getBackupFs() {
+    return backupFs;
+  }
+
+  public static final class WalPathInfo {
+    private final Path prefixBeforeWALs;
+    private final String dateSegment;
+
+    public WalPathInfo(Path prefixBeforeWALs, String dateSegment) {
+      this.prefixBeforeWALs = prefixBeforeWALs;
+      this.dateSegment = dateSegment;
+    }
+
+    public Path getPrefixBeforeWALs() {
+      return prefixBeforeWALs;
+    }
+
+    public String getDateSegment() {
+      return dateSegment;
+    }
+  }
+
+  /**
+   * Validate the walPath has the expected structure: 
.../WALs/<date>/<wal-file> and return
+   * WalPathInfo(prefixBeforeWALs, dateSegment).
+   * @throws IOException if the path is not in expected format
+   */
+  public static WalPathInfo extractWalPathInfo(Path walPath) throws 
IOException {
+    if (walPath == null) {
+      throw new IllegalArgumentException("walPath must not be null");
+    }
+
+    Path dateDir = walPath.getParent(); // .../WALs/<date>
+    if (dateDir == null) {
+      throw new IOException("Invalid WAL path: missing date directory. Path: " 
+ walPath);
+    }
+
+    Path walsDir = dateDir.getParent(); // .../WALs
+    if (walsDir == null) {
+      throw new IOException("Invalid WAL path: missing WALs directory. Path: " 
+ walPath);
+    }
+
+    String walsDirName = walsDir.getName();
+    if (!WALS_DIR.equals(walsDirName)) {
+      throw new IOException("Invalid WAL path: expected '" + WALS_DIR + "' 
segment but found '"
+        + walsDirName + "'. Path: " + walPath);
+    }
+
+    String dateSegment = dateDir.getName();
+    if (dateSegment == null || dateSegment.isEmpty()) {
+      throw new IOException("Invalid WAL path: date segment is empty. Path: " 
+ walPath);
+    }
+
+    Path prefixBeforeWALs = walsDir.getParent(); // might be null if path is 
like "/WALs/..."
+    return new WalPathInfo(prefixBeforeWALs, dateSegment);
+  }
+
+  /**
+   * Resolve the full bulk-load file path corresponding to a relative 
bulk-load path referenced from
+   * a WAL file path. For a WAL path like: 
/some/prefix/.../WALs/23-08-2025/some-wal-file and a
+   * relative bulk path like: namespace/table/region/family/file, this returns:
+   * 
/some/prefix/.../bulk-load-files/23-08-2025/namespace/table/region/family/file
+   * @param walPath          the Path to the WAL file (must contain the {@link 
#WALS_DIR} segment
+   *                         followed by date)
+   * @param relativeBulkPath the relative bulk-load file Path
+   * @return resolved full Path for the bulk-load file
+   * @throws IOException if the WAL path does not contain the expected segments
+   */
+  public static Path resolveBulkLoadFullPath(Path walPath, Path 
relativeBulkPath)
+    throws IOException {
+    WalPathInfo info = extractWalPathInfo(walPath);
+
+    Path prefixBeforeWALs = info.getPrefixBeforeWALs();
+    String dateSegment = info.getDateSegment();
+
+    Path full; // Build final path:
+               // 
<prefixBeforeWALs>/bulk-load-files/<dateSegment>/<relativeBulkPath>
+    if (prefixBeforeWALs == null || prefixBeforeWALs.toString().isEmpty()) {
+      full = new Path(BULKLOAD_FILES_DIR, new Path(dateSegment, 
relativeBulkPath));
+    } else {
+      full = new Path(new Path(prefixBeforeWALs, BULKLOAD_FILES_DIR),
+        new Path(dateSegment, relativeBulkPath));
+    }
+    return full;
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java
new file mode 100644
index 00000000000..718a662abb7
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkFilesCollector.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.mapreduce.BulkLoadCollectorJob;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.Tool;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility to run BulkLoadCollectorJob over a comma-separated list of WAL 
directories and return a
+ * deduplicated list of discovered bulk-load file paths.
+ */
[email protected]
+public final class BulkFilesCollector {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkFilesCollector.class);
+
+  private BulkFilesCollector() {
+    /* static only */ }
+
+  /**
+   * Convenience overload: collector will create and configure 
BulkLoadCollectorJob internally.
+   * @param conf           cluster/configuration used to initialize job and 
access FS
+   * @param walDirsCsv     comma-separated WAL directories
+   * @param restoreRootDir parent path under which temporary output dir will 
be created
+   * @param sourceTable    source table name (for args/logging)
+   * @param targetTable    target table name (for args/logging)
+   * @param startTime      start time (ms) to set in the job config 
(WALInputFormat.START_TIME_KEY)
+   * @param endTime        end time (ms) to set in the job config 
(WALInputFormat.END_TIME_KEY)
+   * @return deduplicated list of Paths discovered by the collector
+   * @throws IOException on IO or job failure
+   */
+  public static List<Path> collectFromWalDirs(Configuration conf, String 
walDirsCsv,
+    Path restoreRootDir, TableName sourceTable, TableName targetTable, long 
startTime, long endTime)
+    throws IOException {
+
+    // prepare job Tool
+    Configuration jobConf = new Configuration(conf);
+    if (startTime > 0) jobConf.setLong(WALInputFormat.START_TIME_KEY, 
startTime);
+    if (endTime > 0) jobConf.setLong(WALInputFormat.END_TIME_KEY, endTime);
+
+    // ignore empty WAL files by default to make collection robust
+    jobConf.setBoolean(WALPlayer.IGNORE_EMPTY_FILES, true);
+
+    BulkLoadCollectorJob bulkCollector = new BulkLoadCollectorJob();
+    bulkCollector.setConf(jobConf);
+
+    return collectFromWalDirs(conf, walDirsCsv, restoreRootDir, sourceTable, 
targetTable,
+      bulkCollector);
+  }
+
+  /**
+   * Primary implementation: runs the provided Tool (BulkLoadCollectorJob) 
with args "<walDirsCsv>
+   * <bulkFilesOut> <sourceTable> <targetTable>" and returns deduped list of 
Paths.
+   */
+  public static List<Path> collectFromWalDirs(Configuration conf, String 
walDirsCsv,
+    Path restoreRootDir, TableName sourceTable, TableName targetTable, Tool 
bulkCollector)
+    throws IOException {
+
+    if (walDirsCsv == null || walDirsCsv.trim().isEmpty()) {
+      throw new IOException(
+        "walDirsCsv must be a non-empty comma-separated list of WAL 
directories");
+    }
+
+    List<String> walDirs =
+      Arrays.stream(walDirsCsv.split(",")).map(String::trim).filter(s -> 
!s.isEmpty()).toList();
+
+    if (walDirs.isEmpty()) {
+      throw new IOException("walDirsCsv did not contain any entries: '" + 
walDirsCsv + "'");
+    }
+
+    List<String> existing = new ArrayList<>();
+    for (String d : walDirs) {
+      Path p = new Path(d);
+      try {
+        FileSystem fsForPath = p.getFileSystem(conf);
+        if (fsForPath.exists(p)) {
+          existing.add(d);
+        } else {
+          LOG.debug("WAL dir does not exist: {}", d);
+        }
+      } catch (IOException e) {
+        // If getting FS or checking existence fails, treat as missing but log 
the cause.
+        LOG.warn("Error checking WAL dir {}: {}", d, e.toString());
+      }
+    }
+
+    // If any of the provided walDirs are missing, fail with an informative 
message.
+    List<String> missing = new ArrayList<>(walDirs);
+    missing.removeAll(existing);
+
+    if (!missing.isEmpty()) {
+      throw new IOException(
+        "Some of the provided WAL paths do not exist: " + String.join(", ", 
missing));
+    }
+
+    // Create unique temporary output dir under restoreRootDir, e.g.
+    // <restoreRootDir>/_wal_collect_<table_name><ts>
+    final String unique = String.format("_wal_collect_%s%d", 
sourceTable.getQualifierAsString(),
+      EnvironmentEdgeManager.currentTime());
+    final Path bulkFilesOut = new Path(restoreRootDir, unique);
+
+    FileSystem fs = bulkFilesOut.getFileSystem(conf);
+
+    try {
+      // If bulkFilesOut exists for some reason, delete it.
+      if (fs.exists(bulkFilesOut)) {
+        LOG.info("Temporary bulkload file collect output directory {} already 
exists - deleting.",
+          bulkFilesOut);
+        fs.delete(bulkFilesOut, true);
+      }
+
+      final String[] args = new String[] { walDirsCsv, bulkFilesOut.toString(),
+        sourceTable.getNameAsString(), targetTable.getNameAsString() };
+
+      LOG.info("Running bulk collector Tool with args: {}", (Object) args);
+
+      int exitCode;
+      try {
+        exitCode = bulkCollector.run(args);
+      } catch (Exception e) {
+        LOG.error("Error during BulkLoadCollectorJob for {}: {}", sourceTable, 
e.getMessage(), e);
+        throw new IOException("Exception during BulkLoadCollectorJob collect", 
e);
+      }
+
+      if (exitCode != 0) {
+        throw new IOException("Bulk collector Tool returned non-zero exit 
code: " + exitCode);
+      }
+
+      LOG.info("BulkLoadCollectorJob collect completed successfully for {}", 
sourceTable);
+
+      // read and dedupe
+      List<Path> results = readBulkFilesListFromOutput(fs, bulkFilesOut);
+      LOG.info("BulkFilesCollector: discovered {} unique bulk-load files", 
results.size());
+      return results;
+    } finally {
+      // best-effort cleanup
+      try {
+        if (fs.exists(bulkFilesOut)) {
+          boolean deleted = fs.delete(bulkFilesOut, true);
+          if (!deleted) {
+            LOG.warn("Could not delete temporary bulkFilesOut directory {}", 
bulkFilesOut);
+          } else {
+            LOG.debug("Deleted temporary bulkFilesOut directory {}", 
bulkFilesOut);
+          }
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Exception while deleting temporary bulkload file collect 
output dir {}: {}",
+          bulkFilesOut, ioe.getMessage(), ioe);
+      }
+    }
+  }
+
+  // reads all non-hidden files under bulkFilesOut, collects lines in 
insertion order, returns Paths
+  private static List<Path> readBulkFilesListFromOutput(FileSystem fs, Path 
bulkFilesOut)
+    throws IOException {
+    if (!fs.exists(bulkFilesOut)) {
+      LOG.warn("BulkFilesCollector: bulkFilesOut directory does not exist: 
{}", bulkFilesOut);
+      return new ArrayList<>();
+    }
+
+    RemoteIterator<LocatedFileStatus> it = fs.listFiles(bulkFilesOut, true);
+    Set<String> dedupe = new LinkedHashSet<>();
+
+    while (it.hasNext()) {
+      LocatedFileStatus status = it.next();
+      Path p = status.getPath();
+      String name = p.getName();
+      // skip hidden/system files like _SUCCESS or _logs
+      if (name.startsWith("_") || name.startsWith(".")) continue;
+
+      try (FSDataInputStream in = fs.open(p);
+        BufferedReader br = new BufferedReader(new InputStreamReader(in, 
StandardCharsets.UTF_8))) {
+        String line;
+        while ((line = br.readLine()) != null) {
+          line = line.trim();
+          if (line.isEmpty()) continue;
+          dedupe.add(line);
+        }
+      }
+    }
+
+    List<Path> result = new ArrayList<>(dedupe.size());
+    for (String s : dedupe)
+      result.add(new Path(s));
+
+    LOG.info("Collected {} unique bulk-load store files.", result.size());
+    return result;
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkLoadProcessor.java
similarity index 63%
rename from 
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
rename to 
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkLoadProcessor.java
index 6e1271313bc..4ab8bfb104e 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BulkLoadProcessor.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.backup.replication;
+package org.apache.hadoop.hbase.backup.util;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -26,19 +26,16 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
 /**
- * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase 
replication.
+ * Processes bulk load files from Write-Ahead Log (WAL) entries.
  * <p>
- * This utility class extracts and constructs the file paths of bulk-loaded 
files based on WAL
- * entries. It processes bulk load descriptors and their associated store 
descriptors to generate
- * the paths for each bulk-loaded file.
- * <p>
- * The class is designed for scenarios where replicable bulk load operations 
need to be parsed and
- * their file paths need to be determined programmatically.
+ * Used by backup/restore and replication flows to discover HFiles referenced 
by bulk-load WALEdits.
+ * Returned {@link Path}s are constructed from the 
namespace/table/region/family/file components.
  * </p>
  */
 @InterfaceAudience.Private
@@ -46,20 +43,41 @@ public final class BulkLoadProcessor {
   private BulkLoadProcessor() {
   }
 
+  /**
+   * Extract bulk-load file {@link Path}s from a list of {@link WAL.Entry}.
+   * @param walEntries list of WAL entries.
+   * @return list of Paths in discovery order; empty list if none
+   * @throws IOException if descriptor parsing fails
+   */
   public static List<Path> processBulkLoadFiles(List<WAL.Entry> walEntries) 
throws IOException {
     List<Path> bulkLoadFilePaths = new ArrayList<>();
 
     for (WAL.Entry entry : walEntries) {
-      WALEdit edit = entry.getEdit();
-      for (Cell cell : edit.getCells()) {
-        if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
-          TableName tableName = entry.getKey().getTableName();
-          String namespace = tableName.getNamespaceAsString();
-          String table = tableName.getQualifierAsString();
-          bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, 
table));
-        }
+      bulkLoadFilePaths.addAll(processBulkLoadFiles(entry.getKey(), 
entry.getEdit()));
+    }
+    return bulkLoadFilePaths;
+  }
+
+  /**
+   * Extract bulk-load file {@link Path}s from a single WAL entry.
+   * @param key  WALKey containing table information; if null returns empty 
list
+   * @param edit WALEdit to scan; if null returns empty list
+   * @return list of Paths referenced by bulk-load descriptor(s) in this edit; 
may be empty or
+   *         contain duplicates
+   * @throws IOException if descriptor parsing fails
+   */
+  public static List<Path> processBulkLoadFiles(WALKey key, WALEdit edit) 
throws IOException {
+    List<Path> bulkLoadFilePaths = new ArrayList<>();
+
+    for (Cell cell : edit.getCells()) {
+      if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+        TableName tableName = key.getTableName();
+        String namespace = tableName.getNamespaceAsString();
+        String table = tableName.getQualifierAsString();
+        bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, 
table));
       }
     }
+
     return bulkLoadFilePaths;
   }
 
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java
index ae26cf96050..24f5237866d 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/PITRTestUtil.java
@@ -28,9 +28,17 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,4 +112,32 @@ public final class PITRTestUtil {
       return HBaseTestingUtil.countRows(table);
     }
   }
+
+  public static void generateHFiles(Path outputDir, Configuration conf, String 
cfName)
+    throws IOException {
+    String hFileName = "MyHFile";
+    int numRows = 1000;
+
+    FileSystem fs = FileSystem.get(conf);
+    outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+    byte[] from = Bytes.toBytes(cfName + "begin");
+    byte[] to = Bytes.toBytes(cfName + "end");
+
+    Path familyDir = new Path(outputDir, cfName);
+    HFileTestUtil.createHFile(conf, fs, new Path(familyDir, hFileName), 
Bytes.toBytes(cfName),
+      Bytes.toBytes("qualifier"), from, to, numRows);
+  }
+
+  public static void bulkLoadHFiles(TableName tableName, Path inputDir, 
Connection conn,
+    Configuration conf) throws IOException {
+    conf.setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
+
+    try (Table table = conn.getTable(tableName)) {
+      BulkLoadHFiles loader = new BulkLoadHFilesTool(conf);
+      loader.bulkLoad(table.getName(), inputDir);
+    } finally {
+      conf.setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
+    }
+  }
 }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
index 8bd2fe4cc78..c6c6f5e9799 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
@@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.backup;
 
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
 import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
-import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.ToolRunner;
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
index a1ce9c97a68..54667752f01 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
@@ -36,6 +37,21 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+/**
+ * Integration-style tests for Point-in-Time Restore (PITR).
+ * <p>
+ * These tests exercise the full backup / continuous backup / restore flow: - 
create backups at
+ * multiple historical points in time (via {@code BackupDriver}) - exercise 
WAL-based
+ * replication/continuous backup - validate Point-in-Time Restore behavior 
(successful restores,
+ * failure cases)
+ * </p>
+ * <p>
+ * NOTE: Some tests also create HFiles and perform HBase bulk-loads (HFile -> 
table) so the restore
+ * flow is validated when bulk-loaded storefiles are present in WALs. This 
ensures the
+ * BulkLoadCollector/BulkFilesCollector logic (discovering bulk-loaded store 
files referenced from
+ * WAL bulk-load descriptors) is exercised by the test suite.
+ * </p>
+ */
 @Category(LargeTests.class)
 public class TestPointInTimeRestore extends TestBackupBase {
   @ClassRule
@@ -67,8 +83,8 @@ public class TestPointInTimeRestore extends TestBackupBase {
     // Simulate a backup taken 20 days ago
     EnvironmentEdgeManager
       .injectEdge(() -> System.currentTimeMillis() - 20 * 
ONE_DAY_IN_MILLISECONDS);
-    PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert 
initial data into
-                                                                   // table1
+    // Insert initial data into table1
+    PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000);
 
     // Perform a full backup for table1 with continuous backup enabled
     String[] args =
@@ -80,6 +96,12 @@ public class TestPointInTimeRestore extends TestBackupBase {
     EnvironmentEdgeManager
       .injectEdge(() -> System.currentTimeMillis() - 15 * 
ONE_DAY_IN_MILLISECONDS);
     PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Add more 
data to table1
+
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
+    PITRTestUtil.generateHFiles(dir, TEST_UTIL.getConfiguration(), 
Bytes.toString(famName));
+    PITRTestUtil.bulkLoadHFiles(table1, dir, TEST_UTIL.getConnection(),
+      TEST_UTIL.getConfiguration());
+
     PITRTestUtil.loadRandomData(TEST_UTIL, table2, famName, 500); // Insert 
data into table2
 
     PITRTestUtil.waitForReplication(); // Ensure replication is complete
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
index 15ab2b2bdbe..30ee495df55 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.backup.impl;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure;
 import static 
org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
 import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJob.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJob.java
new file mode 100644
index 00000000000..20295d7e4ea
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJob.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.util.BulkLoadProcessor;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+/**
+ * Unit tests for BulkLoadCollectorJob (mapper, reducer and job 
creation/validation).
+ */
+@Category({ MapReduceTests.class, LargeTests.class })
+public class TestBulkLoadCollectorJob {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBulkLoadCollectorJob.class);
+
+  private Configuration conf;
+
+  @Before
+  public void setUp() {
+    // fresh configuration for each test
+    conf = HBaseConfiguration.create();
+  }
+
+  @After
+  public void tearDown() {
+    // nothing for now
+  }
+
+  /**
+   * Ensures {@link BulkLoadCollectorJob#createSubmittableJob(String[])} 
correctly configures
+   * input/output paths and parses time options into the job configuration.
+   */
+  @Test
+  public void testCreateSubmittableJobValid() throws Exception {
+    // set a start time option to make sure setupTime runs and applies it
+    String dateStr = "2001-02-20T16:35:06.99";
+    conf.set(WALInputFormat.START_TIME_KEY, dateStr);
+
+    BulkLoadCollectorJob jobDriver = new BulkLoadCollectorJob(conf);
+    String inputDirs = new Path("file:/wals/input").toString();
+    String outDir = new Path("file:/out/bulk").toString();
+    Job job = jobDriver.createSubmittableJob(new String[] { inputDirs, outDir 
});
+
+    // Input path set
+    Path[] inPaths = FileInputFormat.getInputPaths(job);
+    assertEquals(1, inPaths.length);
+    assertEquals(inputDirs, inPaths[0].toString());
+
+    // Output path set
+    Path out = FileOutputFormat.getOutputPath(job);
+    assertEquals(new Path(outDir), out);
+
+    // Ensure the conf had START_TIME_KEY parsed to a long (setupTime executed)
+    long parsed = conf.getLong(WALInputFormat.START_TIME_KEY, -1L);
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS");
+    long expected = sdf.parse(dateStr).getTime();
+    assertEquals(expected, parsed);
+  }
+
+  /**
+   * Verifies that {@link BulkLoadCollectorJob#createSubmittableJob(String[])} 
throws an IOException
+   * when called with insufficient or null arguments.
+   */
+  @Test(expected = IOException.class)
+  public void testCreateSubmittableJob_throwsForInsufficientArgs() throws 
Exception {
+    BulkLoadCollectorJob jobDriver = new BulkLoadCollectorJob(conf);
+    // this call must throw IOException for the test to pass
+    jobDriver.createSubmittableJob(new String[] { "file:/only/one/arg" });
+  }
+
+  @Test(expected = IOException.class)
+  public void testCreateSubmittableJob_throwsForNullArgs() throws Exception {
+    BulkLoadCollectorJob jobDriver = new BulkLoadCollectorJob(conf);
+    // this call must throw IOException for the test to pass
+    jobDriver.createSubmittableJob(null);
+  }
+
+  /**
+   * Verifies that {@link BulkLoadCollectorJob.BulkLoadCollectorMapper} 
ignores WAL entries whose
+   * table is not present in the configured tables map.
+   */
+  @Test
+  public void testMapperIgnoresWhenTableNotInMap() throws Exception {
+    // Prepare mapper and a mocked MapReduce context
+    BulkLoadCollectorJob.BulkLoadCollectorMapper mapper =
+      new BulkLoadCollectorJob.BulkLoadCollectorMapper();
+    @SuppressWarnings("unchecked")
+    Mapper<WALKey, WALEdit, Text, NullWritable>.Context ctx = 
mock(Mapper.Context.class);
+
+    // Build a Configuration that only allows a single table: ns:allowed
+    // Note: TABLES_KEY / TABLE_MAP_KEY are the same constants used by the 
mapper.setup(...)
+    Configuration cfgForTest = new Configuration(conf);
+    cfgForTest.setStrings(WALPlayer.TABLES_KEY, "ns:allowed");
+    cfgForTest.setStrings(WALPlayer.TABLE_MAP_KEY, "ns:allowed"); // maps to 
itself
+
+    // Have the mocked context return our test configuration when 
mapper.setup() runs
+    when(ctx.getConfiguration()).thenReturn(cfgForTest);
+    mapper.setup(ctx);
+
+    // Create a WALKey for a table that is NOT in the allowed map (ns:other)
+    WALKey keyForOtherTable = mock(WALKey.class);
+    
when(keyForOtherTable.getTableName()).thenReturn(TableName.valueOf("ns:other"));
+    WALEdit walEdit = mock(WALEdit.class);
+
+    // Static-mock BulkLoadProcessor to ensure it would not be relied on:
+    // even if invoked unexpectedly, it returns a non-empty list, but we will 
assert no writes
+    // occurred.
+    try (MockedStatic<BulkLoadProcessor> proc = 
Mockito.mockStatic(BulkLoadProcessor.class)) {
+      proc.when(() -> BulkLoadProcessor.processBulkLoadFiles(any(), any()))
+        .thenReturn(Collections.singletonList(new Path("x")));
+
+      // Invoke mapper - because the table is not allowed, mapper should do 
nothing
+      mapper.map(keyForOtherTable, walEdit, ctx);
+
+      // Assert: mapper did not write any output to the context
+      verify(ctx, never()).write(any(Text.class), any(NullWritable.class));
+    }
+  }
+
+  /**
+   * Verifies that {@link BulkLoadCollectorJob.BulkLoadCollectorMapper} safely 
handles null inputs.
+   * <p>
+   * The mapper should ignore WAL entries when either the WAL key or the 
WALEdit value is null, and
+   * must not emit any output in those cases.
+   * </p>
+   * @throws Exception on test failure
+   */
+  @Test
+  public void testMapperHandlesNullKeyOrValue() throws Exception {
+    BulkLoadCollectorJob.BulkLoadCollectorMapper mapper =
+      new BulkLoadCollectorJob.BulkLoadCollectorMapper();
+    @SuppressWarnings("unchecked")
+    Mapper<WALKey, WALEdit, Text, NullWritable>.Context ctx = 
mock(Mapper.Context.class);
+    when(ctx.getConfiguration()).thenReturn(conf);
+    mapper.setup(ctx);
+
+    // null key
+    mapper.map(null, mock(WALEdit.class), ctx);
+    // null value
+    mapper.map(mock(WALKey.class), null, ctx);
+
+    // ensure no writes
+    verify(ctx, never()).write(any(Text.class), any(NullWritable.class));
+  }
+
+  /**
+   * Verifies that {@link BulkLoadCollectorJob.DedupReducer} writes each 
unique key exactly once.
+   */
+  @Test
+  public void testDedupReducerWritesOnce() throws Exception {
+    BulkLoadCollectorJob.DedupReducer reducer = new 
BulkLoadCollectorJob.DedupReducer();
+    @SuppressWarnings("unchecked")
+    Reducer<Text, NullWritable, Text, NullWritable>.Context ctx = 
mock(Reducer.Context.class);
+
+    Text key = new Text("/some/path");
+
+    // Simulate three duplicate values for the same key; reducer should still 
write the key once.
+    Iterable<NullWritable> vals =
+      Arrays.asList(NullWritable.get(), NullWritable.get(), 
NullWritable.get());
+
+    reducer.reduce(key, vals, ctx);
+
+    // verify exactly once write with the same key
+    verify(ctx, times(1)).write(eq(key), eq(NullWritable.get()));
+  }
+}
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJobIntegration.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJobIntegration.java
new file mode 100644
index 00000000000..b72f931732a
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/mapreduce/TestBulkLoadCollectorJobIntegration.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.util.BulkLoadProcessor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Integration-like unit test for BulkLoadCollectorJob.
+ * <p>
+ * - Creates a WAL with a BULK_LOAD descriptor (ProtobufLogWriter).
+ * <p>
+ * - Runs BulkLoadCollectorJob.
+ * <p>
+ * - Verifies the job emits the expected store-file path.
+ */
+@Category({ MapReduceTests.class, LargeTests.class })
+public class TestBulkLoadCollectorJobIntegration {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBulkLoadCollectorJobIntegration.class);
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestBulkLoadCollectorJobIntegration.class);
+  private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static Path hbaseDir;
+  static final TableName tableName = TableName.valueOf(getName());
+  static final RegionInfo info = 
RegionInfoBuilder.newBuilder(tableName).build();
+  private static final byte[] family = Bytes.toBytes("column");
+  private static Path logDir;
+  protected MultiVersionConcurrencyControl mvcc;
+  protected static NavigableMap<byte[], Integer> scopes = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+  private static String getName() {
+    return "TestBulkLoadCollectorJobIntegration";
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    if (hbaseDir != null && fs != null) fs.delete(hbaseDir, true);
+    mvcc = new MultiVersionConcurrencyControl();
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    conf.setInt("dfs.blocksize", 1024 * 1024);
+    conf.setInt("dfs.replication", 1);
+
+    // Start a mini DFS cluster
+    TEST_UTIL.startMiniDFSCluster(3);
+
+    conf = TEST_UTIL.getConfiguration();
+    fs = TEST_UTIL.getDFSCluster().getFileSystem();
+
+    hbaseDir = TEST_UTIL.createRootDir();
+
+    // Use a deterministic test WAL directory under the test filesystem
+    logDir = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "WALs/23-11-2024");
+    fs.mkdirs(logDir);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if (fs != null && hbaseDir != null) fs.delete(hbaseDir, true);
+    TEST_UTIL.shutdownMiniDFSCluster();
+  }
+
+  /**
+   * Test that BulkLoadCollectorJob discovers and emits store-file paths from 
WAL files created
+   * using WALFactory (no RegionServer needed).
+   */
+  @Test
+  public void testBulkLoadCollectorEmitsStoreFilesFromWAL() throws Exception {
+    // Create WAL entry with BULK_LOAD descriptor
+    final String storeFileName = "storefile-abc.hfile";
+    WAL.Entry entry =
+      createBulkLoadWalEntry(info.getEncodedName(), Bytes.toString(family), 
storeFileName);
+
+    // Verify the processor would extract relative paths
+    List<Path> relativePaths =
+      BulkLoadProcessor.processBulkLoadFiles(entry.getKey(), entry.getEdit());
+    LOG.debug("BulkLoadProcessor returned {} relative path(s): {}", 
relativePaths.size(),
+      relativePaths);
+    assertEquals("Expected exactly one relative path from BulkLoadProcessor", 
1,
+      relativePaths.size());
+
+    // Build WAL file path and write WAL using ProtobufLogWriter into logDir
+    String walFileName = "wal-" + EnvironmentEdgeManager.currentTime();
+    Path walFilePath = new Path(logDir, walFileName);
+    fs.mkdirs(logDir);
+
+    FSHLogProvider.Writer writer = null;
+    try {
+      writer = new ProtobufLogWriter();
+      long blockSize = WALUtil.getWALBlockSize(conf, fs, walFilePath);
+      writer.init(fs, walFilePath, conf, true, blockSize,
+        StreamSlowMonitor.create(conf, walFileName));
+      writer.append(entry);
+      writer.sync(true);
+      writer.close();
+    } catch (Exception e) {
+      throw new IOException("Failed to write WAL via ProtobufLogWriter", e);
+    } finally {
+      try {
+        if (writer != null) writer.close();
+      } catch (Exception ignore) {
+      }
+    }
+
+    // Assert WAL file exists and has content
+    boolean exists = fs.exists(walFilePath);
+    long len = exists ? fs.getFileStatus(walFilePath).getLen() : -1L;
+    assertTrue("WAL file should exist at " + walFilePath, exists);
+    assertTrue("WAL file should have non-zero length, actual=" + len, len > 0);
+
+    // Run the MR job
+    Path walInputDir = logDir;
+    Path outDir = new Path("/tmp/test-bulk-files-output-" + 
System.currentTimeMillis());
+
+    int res = ToolRunner.run(TEST_UTIL.getConfiguration(),
+      new BulkLoadCollectorJob(TEST_UTIL.getConfiguration()),
+      new String[] { walInputDir.toString(), outDir.toString() });
+    assertEquals("BulkLoadCollectorJob should exit with code 0", 0, res);
+
+    // Inspect job output
+    FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+    assertTrue("Output directory should exist", dfs.exists(outDir));
+
+    List<Path> partFiles = 
Arrays.stream(dfs.listStatus(outDir)).map(FileStatus::getPath)
+      .filter(p -> p.getName().startsWith("part-")).toList();
+
+    assertFalse("Expect at least one part file in output", 
partFiles.isEmpty());
+
+    // Read all lines (collect while stream is open)
+    List<String> lines = partFiles.stream().flatMap(p -> {
+      try (FSDataInputStream in = dfs.open(p);
+        BufferedReader r = new BufferedReader(new InputStreamReader(in, 
StandardCharsets.UTF_8))) {
+        List<String> fileLines = r.lines().toList();
+        return fileLines.stream();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }).toList();
+
+    assertFalse("Job should have emitted at least one storefile path", 
lines.isEmpty());
+
+    boolean found = lines.stream().anyMatch(l -> l.contains(storeFileName));
+    assertTrue(
+      "Expected emitted path to contain store file name: " + storeFileName + " 
; got: " + lines,
+      found);
+
+    // cleanup
+    dfs.delete(outDir, true);
+  }
+
+  private WAL.Entry createBulkLoadWalEntry(String regionName, String family, 
String... storeFiles) {
+
+    WALProtos.StoreDescriptor.Builder storeDescBuilder =
+      
WALProtos.StoreDescriptor.newBuilder().setFamilyName(ByteString.copyFromUtf8(family))
+        .setStoreHomeDir(family).addAllStoreFile(Arrays.asList(storeFiles));
+
+    WALProtos.BulkLoadDescriptor.Builder bulkDescBuilder = 
WALProtos.BulkLoadDescriptor.newBuilder()
+      
.setReplicate(true).setEncodedRegionName(ByteString.copyFromUtf8(regionName))
+      
.setTableName(ProtobufUtil.toProtoTableName(tableName)).setBulkloadSeqNum(1000)
+      .addStores(storeDescBuilder);
+
+    byte[] valueBytes = bulkDescBuilder.build().toByteArray();
+
+    WALEdit edit = new WALEdit();
+    Cell cell = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setType(Cell.Type.Put)
+      .setRow(new byte[] { 1 
}).setFamily(WALEdit.METAFAMILY).setQualifier(WALEdit.BULK_LOAD)
+      .setValue(valueBytes).build();
+    edit.add(cell);
+
+    long ts = EnvironmentEdgeManager.currentTime();
+    WALKeyImpl key = getWalKeyImpl(ts, scopes);
+    return new WAL.Entry(key, edit);
+  }
+
+  protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], 
Integer> scopes) {
+    return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, 
scopes);
+  }
+}
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
index cc9200882e3..78991a463da 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.backup.replication;
 
 import static 
org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
-import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_PEER_UUID;
@@ -29,6 +27,8 @@ import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplica
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.copyWithCleanup;
+import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static 
org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
 import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -65,6 +65,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBackupFileSystemManager.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBackupFileSystemManager.java
new file mode 100644
index 00000000000..2cca13bf19c
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBackupFileSystemManager.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Unit tests for {@link BackupFileSystemManager}.
+ */
+@Category(SmallTests.class)
+public class TestBackupFileSystemManager {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBackupFileSystemManager.class);
+
+  @Rule
+  public TemporaryFolder tmp = new TemporaryFolder();
+
+  /**
+   * extractWalPathInfo: happy path where WALs dir has a prefix. e.g.
+   * /some/prefix/WALs/2025-09-14/some-wal
+   */
+  @Test
+  public void testExtractWalPathInfo_withPrefix() throws Exception {
+    Path walPath =
+      new Path("/some/prefix/" + BackupFileSystemManager.WALS_DIR + 
"/2025-09-14/wal-000");
+    BackupFileSystemManager.WalPathInfo info = 
BackupFileSystemManager.extractWalPathInfo(walPath);
+
+    assertNotNull("WalPathInfo should not be null", info);
+    // prefixBeforeWALs should be "/some/prefix"
+    assertEquals("/some/prefix", info.getPrefixBeforeWALs().toString());
+    assertEquals("2025-09-14", info.getDateSegment());
+  }
+
+  /**
+   * extractWalPathInfo: case where WALs is at root (leading slash). e.g. 
/WALs/2025-09-14/some-wal
+   * Expect prefixBeforeWALs to be "/" (root) or non-null; resolution should 
still work.
+   */
+  @Test
+  public void testExtractWalPathInfo_rootWALs() throws Exception {
+    Path walPath = new Path("/" + BackupFileSystemManager.WALS_DIR + 
"/2025-09-14/wal-123");
+    BackupFileSystemManager.WalPathInfo info = 
BackupFileSystemManager.extractWalPathInfo(walPath);
+
+    assertNotNull(info);
+    // parent of "/WALs" in Hadoop Path is "/" (root). Ensure date segment 
parsed.
+    assertEquals("2025-09-14", info.getDateSegment());
+    assertNotNull("prefixBeforeWALs should not be null for root-style path",
+      info.getPrefixBeforeWALs());
+    // prefix might be "/" (expected), be tolerant: assert it ends with "/" or 
equals "/"
+    assertTrue(info.getPrefixBeforeWALs().toString().equals("/")
+      || !info.getPrefixBeforeWALs().toString().isEmpty());
+  }
+
+  /**
+   * extractWalPathInfo: null input should throw IllegalArgumentException.
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void testExtractWalPathInfo_nullPath() throws Exception {
+    BackupFileSystemManager.extractWalPathInfo(null);
+  }
+
+  /**
+   * extractWalPathInfo: missing date directory -> should throw IOException. 
Example: path that has
+   * no parent for the wal file.
+   */
+  @Test(expected = IOException.class)
+  public void testExtractWalPathInfo_missingDateDir() throws Exception {
+    // A single segment path (no parents) e.g. "walfile"
+    Path walPath = new Path("walfile");
+    BackupFileSystemManager.extractWalPathInfo(walPath);
+  }
+
+  /**
+   * extractWalPathInfo: WALs segment name mismatch -> should throw 
IOException. e.g.
+   * /prefix/NOT_WALs/2025/wal
+   */
+  @Test(expected = IOException.class)
+  public void testExtractWalPathInfo_wrongWALsegment() throws Exception {
+    Path walPath = new Path("/prefix/NOT_WALS/2025/wal");
+    BackupFileSystemManager.extractWalPathInfo(walPath);
+  }
+
+  /**
+   * resolveBulkLoadFullPath: normal case with prefix.
+   */
+  @Test
+  public void testResolveBulkLoadFullPath_withPrefix() throws Exception {
+    Path walPath =
+      new Path("/some/prefix/" + BackupFileSystemManager.WALS_DIR + 
"/2025-08-30/wal-1");
+    Path relative = new Path("namespace/table/region/family/file1");
+    Path full = BackupFileSystemManager.resolveBulkLoadFullPath(walPath, 
relative);
+
+    // expected: 
/some/prefix/bulk-load-files/2025-08-30/namespace/table/region/family/file1
+    String expected = "/some/prefix/" + 
BackupFileSystemManager.BULKLOAD_FILES_DIR
+      + "/2025-08-30/namespace/table/region/family/file1";
+    assertEquals(expected, full.toString());
+  }
+
+  /**
+   * resolveBulkLoadFullPath: when WALs is under root (prefix is root) - 
ensure path resolved under
+   * /bulk-load-files/<date>/...
+   */
+  @Test
+  public void testResolveBulkLoadFullPath_rootWALs() throws Exception {
+    Path walPath = new Path("/" + BackupFileSystemManager.WALS_DIR + 
"/2025-08-30/wal-2");
+    Path relative = new Path("ns/tbl/r/f");
+    Path full = BackupFileSystemManager.resolveBulkLoadFullPath(walPath, 
relative);
+
+    String expected = "/" + BackupFileSystemManager.BULKLOAD_FILES_DIR + 
"/2025-08-30/ns/tbl/r/f";
+    assertEquals(expected, full.toString());
+  }
+
+  /**
+   * Integration-y test: constructor should create directories under the 
provided backup root. Uses
+   * a temporary folder (local fs).
+   */
+  @Test
+  public void testConstructorCreatesDirectories() throws Exception {
+    File root = tmp.newFolder("backupRoot");
+    String rootPath = root.getAbsolutePath();
+
+    Configuration conf = HBaseConfiguration.create();
+    BackupFileSystemManager mgr = new BackupFileSystemManager("peer-1", conf, 
rootPath);
+
+    FileSystem fs = mgr.getBackupFs();
+    Path wals = mgr.getWalsDir();
+    Path bulk = mgr.getBulkLoadFilesDir();
+
+    assertTrue("WALs dir should exist", fs.exists(wals));
+    assertTrue("bulk-load-files dir should exist", fs.exists(bulk));
+  }
+}
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBulkLoadProcessor.java
similarity index 69%
rename from 
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java
rename to 
hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBulkLoadProcessor.java
index 9837f9e926d..1d3b8ab09ea 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/util/TestBulkLoadProcessor.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.backup.replication;
+package org.apache.hadoop.hbase.backup.util;
 
 import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
 import static org.junit.Assert.assertEquals;
@@ -119,6 +119,18 @@ public class TestBulkLoadProcessor {
     assertTrue(paths.get(1).toString().contains("ns/tbl/region123/cf1/file2"));
   }
 
+  @Test
+  public void testProcessBulkLoadFiles_validEntry_singleEntryApi() throws 
IOException {
+    WAL.Entry entry = createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), 
"region123", true,
+      "cf1", "file1", "file2");
+
+    List<Path> paths = BulkLoadProcessor.processBulkLoadFiles(entry.getKey(), 
entry.getEdit());
+
+    assertEquals(2, paths.size());
+    assertTrue(paths.get(0).toString().contains("ns/tbl/region123/cf1/file1"));
+    assertTrue(paths.get(1).toString().contains("ns/tbl/region123/cf1/file2"));
+  }
+
   /**
    * Verifies that a non-replicable bulk load entry is ignored.
    */
@@ -132,6 +144,16 @@ public class TestBulkLoadProcessor {
     assertTrue(paths.isEmpty());
   }
 
+  @Test
+  public void testProcessBulkLoadFiles_nonReplicableSkipped_singleEntryApi() 
throws IOException {
+    WAL.Entry entry =
+      createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", 
false, "cf1", "file1");
+
+    List<Path> paths = BulkLoadProcessor.processBulkLoadFiles(entry.getKey(), 
entry.getEdit());
+
+    assertTrue(paths.isEmpty());
+  }
+
   /**
    * Verifies that entries without the BULK_LOAD qualifier are ignored.
    */
@@ -146,6 +168,16 @@ public class TestBulkLoadProcessor {
     assertTrue(paths.isEmpty());
   }
 
+  @Test
+  public void testProcessBulkLoadFiles_noBulkLoadQualifier_singleEntryApi() 
throws IOException {
+    WALEdit edit = new WALEdit();
+    WALKeyImpl key = new WALKeyImpl(new byte[] {}, TableName.valueOf("ns", 
"tbl"), 0L, 0L, null);
+
+    List<Path> paths = BulkLoadProcessor.processBulkLoadFiles(key, edit);
+
+    assertTrue(paths.isEmpty());
+  }
+
   /**
    * Verifies that multiple WAL entries with different column families produce 
the correct set of
    * file paths.
@@ -163,4 +195,38 @@ public class TestBulkLoadProcessor {
     assertTrue(paths.stream().anyMatch(p -> 
p.toString().contains("cf1/file1")));
     assertTrue(paths.stream().anyMatch(p -> 
p.toString().contains("cf2/fileA")));
   }
+
+  @Test
+  public void testProcessBulkLoadFiles_multipleFamilies_singleEntryApi() 
throws IOException {
+    WAL.Entry entry =
+      createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", 
true, "cf1", "file1");
+    WAL.Entry entry2 =
+      createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", 
true, "cf2", "fileA");
+
+    List<Path> paths1 = BulkLoadProcessor.processBulkLoadFiles(entry.getKey(), 
entry.getEdit());
+    List<Path> paths2 = 
BulkLoadProcessor.processBulkLoadFiles(entry2.getKey(), entry2.getEdit());
+
+    // combine to mimic processing multiple entries
+    paths1.addAll(paths2);
+
+    assertEquals(2, paths1.size());
+    assertTrue(paths1.stream().anyMatch(p -> 
p.toString().contains("cf1/file1")));
+    assertTrue(paths1.stream().anyMatch(p -> 
p.toString().contains("cf2/fileA")));
+  }
+
+  /**
+   * Sanity check: list-based API should still work and return the same 
results as invoking the
+   * single-entry API for the same entry (ensures delegation/backwards 
compatibility).
+   */
+  @Test
+  public void testProcessBulkLoadFiles_listApi_delegatesToSingle() throws 
IOException {
+    WAL.Entry entry =
+      createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", 
true, "cf1", "file1");
+
+    List<Path> single = BulkLoadProcessor.processBulkLoadFiles(entry.getKey(), 
entry.getEdit());
+    List<Path> listApi = 
BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry));
+
+    assertEquals(single.size(), listApi.size());
+    
assertTrue(listApi.get(0).toString().contains("ns/tbl/region123/cf1/file1"));
+  }
 }
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index b5c1d39a550..b2bda03c44a 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -66,7 +66,7 @@ public class WALInputFormat extends InputFormat<WALKey, 
WALEdit> {
   /**
    * {@link InputSplit} for {@link WAL} files. Each split represent exactly 
one log file.
    */
-  static class WALSplit extends InputSplit implements Writable {
+  public static class WALSplit extends InputSplit implements Writable {
     private String logFileName;
     private long fileSize;
     private long startTime;

Reply via email to