This is an automated email from the ASF dual-hosted git repository.

taklwu pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-28957 by this push:
     new 7aaf8fbb892 HBASE-29519 Copy Bulkloaded Files in Continuous Backup 
(#7222)
7aaf8fbb892 is described below

commit 7aaf8fbb89220062ccd7cf99b98b7bd9b47a6512
Author: vinayak hegde <[email protected]>
AuthorDate: Wed Aug 20 21:50:00 2025 +0530

    HBASE-29519 Copy Bulkloaded Files in Continuous Backup (#7222)
    
    Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
    Signed-off-by: Andor Molnár <[email protected]>
---
 .../hbase/backup/impl/FullTableBackupClient.java   |  10 +
 .../replication/BackupFileSystemManager.java       |  11 +-
 .../backup/replication/BulkLoadProcessor.java      |  96 ++++++
 .../replication/BulkLoadUploadException.java       |  32 ++
 .../ContinuousBackupReplicationEndpoint.java       | 168 ++++++++++-
 .../backup/replication/TestBulkLoadProcessor.java  | 166 ++++++++++
 .../TestContinuousBackupReplicationEndpoint.java   | 333 ++++++++++++++++++++-
 7 files changed, 801 insertions(+), 15 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
index 5b49496d626..2b26aba03fe 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.backup.impl;
 
+import static 
org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_ATTEMPTS_PAUSE_MS_KEY;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.BACKUP_MAX_ATTEMPTS_KEY;
@@ -190,6 +191,15 @@ public class FullTableBackupClient extends 
TableBackupClient {
     // set overall backup status: complete. Here we make sure to complete the 
backup.
     // After this checkpoint, even if entering cancel process, will let the 
backup finished
     backupInfo.setState(BackupState.COMPLETE);
+
+    if (!conf.getBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, false)) {
+      System.out.println("WARNING: Bulkload replication is not enabled. "
+        + "Since continuous backup is using HBase replication, bulk loaded 
files won't be backed up as part of continuous backup. "
+        + "To ensure bulk-loaded files are backed up, enable bulkload 
replication "
+        + "(hbase.replication.bulkload.enabled=true) and configure a unique 
cluster ID using "
+        + "hbase.replication.cluster.id. This cluster ID is required by the 
replication framework "
+        + "to uniquely identify clusters, even if continuous backup itself 
does not directly rely on it.");
+    }
   }
 
   private void handleNonContinuousBackup(Admin admin) throws IOException {
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
index 9d1d818c207..225d3217276 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BackupFileSystemManager.java
@@ -26,18 +26,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Initializes and organizes backup directories for continuous Write-Ahead 
Logs (WALs) files within
- * the specified backup root directory.
+ * Initializes and organizes backup directories for continuous Write-Ahead 
Logs (WALs) and
+ * bulk-loaded files within the specified backup root directory.
  */
 @InterfaceAudience.Private
 public class BackupFileSystemManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(BackupFileSystemManager.class);
 
   public static final String WALS_DIR = "WALs";
+  public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
   private final String peerId;
   private final FileSystem backupFs;
   private final Path backupRootDir;
   private final Path walsDir;
+  private final Path bulkLoadFilesDir;
 
   public BackupFileSystemManager(String peerId, Configuration conf, String 
backupRootDirStr)
     throws IOException {
@@ -45,6 +47,7 @@ public class BackupFileSystemManager {
     this.backupRootDir = new Path(backupRootDirStr);
     this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
     this.walsDir = createDirectory(WALS_DIR);
+    this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
   }
 
   private Path createDirectory(String dirName) throws IOException {
@@ -58,6 +61,10 @@ public class BackupFileSystemManager {
     return walsDir;
   }
 
+  public Path getBulkLoadFilesDir() {
+    return bulkLoadFilesDir;
+  }
+
   public FileSystem getBackupFs() {
     return backupFs;
   }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
new file mode 100644
index 00000000000..6e1271313bc
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Processes bulk load files from Write-Ahead Log (WAL) entries for HBase 
replication.
+ * <p>
+ * This utility class extracts and constructs the file paths of bulk-loaded 
files based on WAL
+ * entries. It processes bulk load descriptors and their associated store 
descriptors to generate
+ * the paths for each bulk-loaded file.
+ * <p>
+ * The class is designed for scenarios where replicable bulk load operations 
need to be parsed and
+ * their file paths need to be determined programmatically.
+ * </p>
+ */
[email protected]
+public final class BulkLoadProcessor {
+  private BulkLoadProcessor() {
+  }
+
+  public static List<Path> processBulkLoadFiles(List<WAL.Entry> walEntries) 
throws IOException {
+    List<Path> bulkLoadFilePaths = new ArrayList<>();
+
+    for (WAL.Entry entry : walEntries) {
+      WALEdit edit = entry.getEdit();
+      for (Cell cell : edit.getCells()) {
+        if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+          TableName tableName = entry.getKey().getTableName();
+          String namespace = tableName.getNamespaceAsString();
+          String table = tableName.getQualifierAsString();
+          bulkLoadFilePaths.addAll(processBulkLoadDescriptor(cell, namespace, 
table));
+        }
+      }
+    }
+    return bulkLoadFilePaths;
+  }
+
+  private static List<Path> processBulkLoadDescriptor(Cell cell, String 
namespace, String table)
+    throws IOException {
+    List<Path> bulkLoadFilePaths = new ArrayList<>();
+    WALProtos.BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+
+    if (bld == null || !bld.getReplicate() || bld.getEncodedRegionName() == 
null) {
+      return bulkLoadFilePaths; // Skip if not replicable
+    }
+
+    String regionName = bld.getEncodedRegionName().toStringUtf8();
+    for (WALProtos.StoreDescriptor storeDescriptor : bld.getStoresList()) {
+      bulkLoadFilePaths
+        .addAll(processStoreDescriptor(storeDescriptor, namespace, table, 
regionName));
+    }
+
+    return bulkLoadFilePaths;
+  }
+
+  private static List<Path> processStoreDescriptor(WALProtos.StoreDescriptor 
storeDescriptor,
+    String namespace, String table, String regionName) {
+    List<Path> paths = new ArrayList<>();
+    String columnFamily = storeDescriptor.getFamilyName().toStringUtf8();
+
+    for (String storeFile : storeDescriptor.getStoreFileList()) {
+      paths.add(new Path(namespace,
+        new Path(table, new Path(regionName, new Path(columnFamily, 
storeFile)))));
+    }
+
+    return paths;
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadUploadException.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadUploadException.java
new file mode 100644
index 00000000000..91a46c77e31
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/BulkLoadUploadException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import java.io.IOException;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class BulkLoadUploadException extends IOException {
+  public BulkLoadUploadException(String message) {
+    super(message);
+  }
+
+  public BulkLoadUploadException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index 2442e0789a8..69c445c484d 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.backup.replication;
 
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.text.SimpleDateFormat;
@@ -32,9 +34,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -44,6 +49,7 @@ import 
org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.EmptyEntriesPolicy;
 import org.apache.hadoop.hbase.replication.ReplicationResult;
 import 
org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -54,8 +60,8 @@ import org.slf4j.LoggerFactory;
 /**
  * ContinuousBackupReplicationEndpoint is responsible for replicating WAL 
entries to a backup
  * storage. It organizes WAL entries by day and periodically flushes the data, 
ensuring that WAL
- * files do not exceed the configured size. The class includes mechanisms for 
handling the WAL files
- * and ensuring that the replication process is safe.
+ * files do not exceed the configured size. The class includes mechanisms for 
handling the WAL
+ * files, performing bulk load backups, and ensuring that the replication 
process is safe.
  */
 @InterfaceAudience.Private
 public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint {
@@ -302,6 +308,7 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
       for (WAL.Entry entry : walEntries) {
         walWriter.append(entry);
       }
+
       walWriter.sync(true);
     } catch (UncheckedIOException e) {
       String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create 
WAL Writer for " + day;
@@ -309,6 +316,17 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
         e.getMessage(), e);
       throw new IOException(errorMsg, e);
     }
+
+    List<Path> bulkLoadFiles = 
BulkLoadProcessor.processBulkLoadFiles(walEntries);
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} Processed {} bulk load files for WAL entries", 
Utils.logPeerId(peerId),
+        bulkLoadFiles.size());
+      LOG.trace("{} Bulk load files: {}", Utils.logPeerId(peerId),
+        
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
+    }
+
+    uploadBulkLoadFiles(day, bulkLoadFiles);
   }
 
   private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
@@ -372,15 +390,159 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
     }
   }
 
+  @RestrictedApi(
+      explanation = "Package-private for test visibility only. Do not use 
outside tests.",
+      link = "",
+      allowedOnPath = 
"(.*/src/test/.*|.*/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java)")
+  void uploadBulkLoadFiles(long dayInMillis, List<Path> bulkLoadFiles)
+    throws BulkLoadUploadException {
+    if (bulkLoadFiles.isEmpty()) {
+      LOG.debug("{} No bulk load files to upload for {}", 
Utils.logPeerId(peerId), dayInMillis);
+      return;
+    }
+
+    LOG.debug("{} Starting upload of {} bulk load files", 
Utils.logPeerId(peerId),
+      bulkLoadFiles.size());
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
+        
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
+    }
+    String dayDirectoryName = formatToDateString(dayInMillis);
+    Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), 
dayDirectoryName);
+    try {
+      backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir);
+    } catch (IOException e) {
+      throw new BulkLoadUploadException(
+        String.format("%s Failed to create bulkload directory in backupFS: %s",
+          Utils.logPeerId(peerId), bulkloadDir),
+        e);
+    }
+
+    for (Path file : bulkLoadFiles) {
+      Path sourcePath;
+      try {
+        sourcePath = getBulkLoadFileStagingPath(file);
+      } catch (FileNotFoundException fnfe) {
+        throw new BulkLoadUploadException(
+          String.format("%s Bulk load file not found: %s", 
Utils.logPeerId(peerId), file), fnfe);
+      } catch (IOException ioe) {
+        throw new BulkLoadUploadException(
+          String.format("%s Failed to resolve source path for: %s", 
Utils.logPeerId(peerId), file),
+          ioe);
+      }
+
+      Path destPath = new Path(bulkloadDir, file);
+
+      try {
+        LOG.debug("{} Copying bulk load file from {} to {}", 
Utils.logPeerId(peerId), sourcePath,
+          destPath);
+
+        copyWithCleanup(CommonFSUtils.getRootDirFileSystem(conf), sourcePath,
+          backupFileSystemManager.getBackupFs(), destPath, conf);
+
+        LOG.info("{} Bulk load file {} successfully backed up to {}", 
Utils.logPeerId(peerId), file,
+          destPath);
+      } catch (IOException e) {
+        throw new BulkLoadUploadException(
+          String.format("%s Failed to copy bulk load file %s to %s on day %s",
+            Utils.logPeerId(peerId), file, destPath, 
formatToDateString(dayInMillis)),
+          e);
+      }
+    }
+
+    LOG.debug("{} Completed upload of bulk load files", 
Utils.logPeerId(peerId));
+  }
+
+  /**
+   * Copy a file with cleanup logic in case of failure. Always overwrite 
destination to avoid
+   * leaving corrupt partial files.
+   */
+  @RestrictedApi(
+      explanation = "Package-private for test visibility only. Do not use 
outside tests.",
+      link = "",
+      allowedOnPath = 
"(.*/src/test/.*|.*/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java)")
+  static void copyWithCleanup(FileSystem srcFS, Path src, FileSystem dstFS, 
Path dst,
+    Configuration conf) throws IOException {
+    try {
+      if (dstFS.exists(dst)) {
+        FileStatus srcStatus = srcFS.getFileStatus(src);
+        FileStatus dstStatus = dstFS.getFileStatus(dst);
+
+        if (srcStatus.getLen() == dstStatus.getLen()) {
+          LOG.info("Destination file {} already exists with same length ({}). 
Skipping copy.", dst,
+            dstStatus.getLen());
+          return; // Skip upload
+        } else {
+          LOG.warn(
+            "Destination file {} exists but length differs (src={}, dst={}). " 
+ "Overwriting now.",
+            dst, srcStatus.getLen(), dstStatus.getLen());
+        }
+      }
+
+      // Always overwrite in case previous copy left partial data
+      FileUtil.copy(srcFS, src, dstFS, dst, false, true, conf);
+    } catch (IOException e) {
+      try {
+        if (dstFS.exists(dst)) {
+          dstFS.delete(dst, true);
+          LOG.warn("Deleted partial/corrupt destination file {} after copy 
failure", dst);
+        }
+      } catch (IOException cleanupEx) {
+        LOG.warn("Failed to cleanup destination file {} after copy failure", 
dst, cleanupEx);
+      }
+      throw e;
+    }
+  }
+
   /**
    * Convert dayInMillis to "yyyy-MM-dd" format
    */
-  private String formatToDateString(long dayInMillis) {
+  @RestrictedApi(
+      explanation = "Package-private for test visibility only. Do not use 
outside tests.",
+      link = "",
+      allowedOnPath = 
"(.*/src/test/.*|.*/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java)")
+  String formatToDateString(long dayInMillis) {
     SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
     dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
     return dateFormat.format(new Date(dayInMillis));
   }
 
+  private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) 
throws IOException {
+    FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
+    Path rootDir = CommonFSUtils.getRootDir(conf);
+    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
+    Path baseNamespaceDir = new Path(rootDir, baseNSDir);
+    Path hFileArchiveDir =
+      new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, 
baseNSDir));
+
+    LOG.debug("{} Searching for bulk load file: {} in paths: {}, {}", 
Utils.logPeerId(peerId),
+      relativePathFromNamespace, baseNamespaceDir, hFileArchiveDir);
+
+    Path result =
+      findExistingPath(rootFs, baseNamespaceDir, hFileArchiveDir, 
relativePathFromNamespace);
+    LOG.debug("{} Bulk load file found at {}", Utils.logPeerId(peerId), 
result);
+    return result;
+  }
+
+  private static Path findExistingPath(FileSystem rootFs, Path 
baseNamespaceDir,
+    Path hFileArchiveDir, Path filePath) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Checking for bulk load file at: {} and {}", new 
Path(baseNamespaceDir, filePath),
+        new Path(hFileArchiveDir, filePath));
+    }
+
+    for (Path candidate : new Path[] { new Path(baseNamespaceDir, filePath),
+      new Path(hFileArchiveDir, filePath) }) {
+      if (rootFs.exists(candidate)) {
+        return candidate;
+      }
+    }
+
+    throw new FileNotFoundException("Bulk load file not found at either: "
+      + new Path(baseNamespaceDir, filePath) + " or " + new 
Path(hFileArchiveDir, filePath));
+  }
+
   private void shutdownFlushExecutor() {
     if (flushExecutor != null) {
       LOG.info("{} Initiating WAL flush executor shutdown.", 
Utils.logPeerId(peerId));
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java
new file mode 100644
index 00000000000..9837f9e926d
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestBulkLoadProcessor.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.replication;
+
+import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Unit tests for {@link BulkLoadProcessor}.
+ * <p>
+ * These tests validate the extraction of bulk-loaded file paths from WAL 
entries under different
+ * scenarios, including:
+ * <ul>
+ * <li>Valid replicable bulk load entries</li>
+ * <li>Non-replicable bulk load entries</li>
+ * <li>Entries with no bulk load qualifier</li>
+ * <li>Entries containing multiple column families</li>
+ * </ul>
+ */
+@Category({ SmallTests.class })
+public class TestBulkLoadProcessor {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBulkLoadProcessor.class);
+
+  /**
+   * Creates a WAL.Entry containing a {@link WALProtos.BulkLoadDescriptor} 
with the given
+   * parameters.
+   * @param tableName  The table name
+   * @param regionName The encoded region name
+   * @param replicate  Whether the bulk load is marked for replication
+   * @param family     Column family name
+   * @param storeFiles One or more store file names to include
+   * @return A WAL.Entry representing the bulk load event
+   */
+  private WAL.Entry createBulkLoadWalEntry(TableName tableName, String 
regionName,
+    boolean replicate, String family, String... storeFiles) {
+
+    // Build StoreDescriptor
+    WALProtos.StoreDescriptor.Builder storeDescBuilder =
+      
WALProtos.StoreDescriptor.newBuilder().setFamilyName(ByteString.copyFromUtf8(family))
+        .setStoreHomeDir(family).addAllStoreFile(Arrays.asList(storeFiles));
+
+    // Build BulkLoadDescriptor
+    WALProtos.BulkLoadDescriptor.Builder bulkDescBuilder = 
WALProtos.BulkLoadDescriptor.newBuilder()
+      
.setReplicate(replicate).setEncodedRegionName(ByteString.copyFromUtf8(regionName))
+      
.setTableName(ProtobufUtil.toProtoTableName(tableName)).setBulkloadSeqNum(1000) 
// Random
+      .addStores(storeDescBuilder);
+
+    byte[] value = bulkDescBuilder.build().toByteArray();
+
+    // Build Cell with BULK_LOAD qualifier
+    Cell cell = 
CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setType(Cell.Type.Put)
+      .setRow(new byte[] { 1 
}).setFamily(METAFAMILY).setQualifier(WALEdit.BULK_LOAD)
+      .setValue(value).build();
+
+    WALEdit edit = new WALEdit();
+    edit.add(cell);
+
+    WALKeyImpl key = new WALKeyImpl(Bytes.toBytes(regionName), // region
+      tableName, 0L, 0L, null);
+
+    return new WAL.Entry(key, edit);
+  }
+
+  /**
+   * Verifies that a valid replicable bulk load WAL entry produces the correct 
number and structure
+   * of file paths.
+   */
+  @Test
+  public void testProcessBulkLoadFiles_validEntry() throws IOException {
+    WAL.Entry entry = createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), 
"region123", true,
+      "cf1", "file1", "file2");
+
+    List<Path> paths = 
BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry));
+
+    assertEquals(2, paths.size());
+    assertTrue(paths.get(0).toString().contains("ns/tbl/region123/cf1/file1"));
+    assertTrue(paths.get(1).toString().contains("ns/tbl/region123/cf1/file2"));
+  }
+
+  /**
+   * Verifies that a non-replicable bulk load entry is ignored.
+   */
+  @Test
+  public void testProcessBulkLoadFiles_nonReplicableSkipped() throws 
IOException {
+    WAL.Entry entry =
+      createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "region123", 
false, "cf1", "file1");
+
+    List<Path> paths = 
BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry));
+
+    assertTrue(paths.isEmpty());
+  }
+
+  /**
+   * Verifies that entries without the BULK_LOAD qualifier are ignored.
+   */
+  @Test
+  public void testProcessBulkLoadFiles_noBulkLoadQualifier() throws 
IOException {
+    WALEdit edit = new WALEdit();
+    WALKeyImpl key = new WALKeyImpl(new byte[] {}, TableName.valueOf("ns", 
"tbl"), 0L, 0L, null);
+    WAL.Entry entry = new WAL.Entry(key, edit);
+
+    List<Path> paths = 
BulkLoadProcessor.processBulkLoadFiles(Collections.singletonList(entry));
+
+    assertTrue(paths.isEmpty());
+  }
+
+  /**
+   * Verifies that multiple WAL entries with different column families produce 
the correct set of
+   * file paths.
+   */
+  @Test
+  public void testProcessBulkLoadFiles_multipleFamilies() throws IOException {
+    WAL.Entry entry =
+      createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", 
true, "cf1", "file1");
+    WAL.Entry entry2 =
+      createBulkLoadWalEntry(TableName.valueOf("ns", "tbl"), "regionXYZ", 
true, "cf2", "fileA");
+
+    List<Path> paths = 
BulkLoadProcessor.processBulkLoadFiles(Arrays.asList(entry, entry2));
+
+    assertEquals(2, paths.size());
+    assertTrue(paths.stream().anyMatch(p -> 
p.toString().contains("cf1/file1")));
+    assertTrue(paths.stream().anyMatch(p -> 
p.toString().contains("cf2/fileA")));
+  }
+}
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
index 3919746d3b7..8f8e83dbda6 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hbase.backup.replication;
 
+import static 
org.apache.hadoop.hbase.HConstants.REPLICATION_BULKLOAD_ENABLE_KEY;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_ROOT_DIR;
@@ -27,11 +29,18 @@ import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplica
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
 import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.WAL_FILE_PREFIX;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.copyWithCleanup;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.text.SimpleDateFormat;
@@ -46,8 +55,10 @@ import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -64,8 +75,11 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.AfterClass;
@@ -73,6 +87,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.MockedStatic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,12 +106,14 @@ public class TestContinuousBackupReplicationEndpoint {
 
   private final String replicationEndpoint = 
ContinuousBackupReplicationEndpoint.class.getName();
   private static final String CF_NAME = "cf";
+  private static final byte[] QUALIFIER = Bytes.toBytes("my-qualifier");
   static FileSystem fs = null;
   static Path root;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     // Set the configuration properties as required
+    conf.setBoolean(REPLICATION_BULKLOAD_ENABLE_KEY, true);
     conf.set(REPLICATION_CLUSTER_ID, "clusterId1");
 
     TEST_UTIL.startMiniZKCluster();
@@ -115,7 +132,7 @@ public class TestContinuousBackupReplicationEndpoint {
   }
 
   @Test
-  public void testWALBackup() throws IOException {
+  public void testWALAndBulkLoadFileBackup() throws IOException {
     String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
     TableName tableName = TableName.valueOf("table_" + methodName);
     String peerId = "peerId";
@@ -133,10 +150,15 @@ public class TestContinuousBackupReplicationEndpoint {
     loadRandomData(tableName, 100);
     assertEquals(100, getRowCount(tableName));
 
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
+    generateHFiles(dir);
+    bulkLoadHFiles(tableName, dir);
+    assertEquals(1100, getRowCount(tableName));
+
     waitForReplication(15000);
     deleteReplicationPeer(peerId);
 
-    verifyBackup(backupRootDir.toString(), Map.of(tableName, 100));
+    verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100));
 
     deleteTable(tableName);
   }
@@ -184,7 +206,7 @@ public class TestContinuousBackupReplicationEndpoint {
     waitForReplication(15000);
     deleteReplicationPeer(peerId);
 
-    verifyBackup(backupRootDir.toString(), Map.of(table1, 100, table2, 100, 
table3, 50));
+    verifyBackup(backupRootDir.toString(), false, Map.of(table1, 100, table2, 
100, table3, 50));
 
     for (TableName table : List.of(table1, table2, table3)) {
       deleteTable(table);
@@ -242,7 +264,7 @@ public class TestContinuousBackupReplicationEndpoint {
     waitForReplication(20000);
     deleteReplicationPeer(peerId);
 
-    verifyBackup(backupRootDir.toString(), Map.of(tableName, 
getRowCount(tableName)));
+    verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 
getRowCount(tableName)));
 
     deleteTable(tableName);
   }
@@ -289,7 +311,7 @@ public class TestContinuousBackupReplicationEndpoint {
     waitForReplication(15000);
     deleteReplicationPeer(peerId);
 
-    verifyBackup(backupRootDir.toString(), Map.of(tableName, 200));
+    verifyBackup(backupRootDir.toString(), false, Map.of(tableName, 200));
 
     // Verify that WALs are stored in two directories, one for each day
     Path walDir = new Path(backupRootDir, WALS_DIR);
@@ -312,6 +334,204 @@ public class TestContinuousBackupReplicationEndpoint {
     deleteTable(tableName);
   }
 
+  /**
+   * Simulates a one-time failure during bulk load file upload. This validates 
that the retry logic
+   * in the replication endpoint works as expected.
+   */
+  @Test
+  public void testBulkLoadFileUploadRetry() throws IOException {
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName = TableName.valueOf("table_" + methodName);
+    String peerId = "peerId";
+
+    // Reset static failure flag before test
+    FailingOnceContinuousBackupReplicationEndpoint.reset();
+
+    createTable(tableName);
+
+    Path backupRootDir = new Path(root, methodName);
+    fs.mkdirs(backupRootDir);
+
+    Map<TableName, List<String>> tableMap = new HashMap<>();
+    tableMap.put(tableName, new ArrayList<>());
+
+    addReplicationPeer(peerId, backupRootDir, tableMap,
+      FailingOnceContinuousBackupReplicationEndpoint.class.getName());
+
+    loadRandomData(tableName, 100);
+    assertEquals(100, getRowCount(tableName));
+
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
+    generateHFiles(dir);
+    bulkLoadHFiles(tableName, dir);
+    assertEquals(1100, getRowCount(tableName));
+
+    // Replication: first attempt fails, second attempt succeeds
+    waitForReplication(15000);
+    deleteReplicationPeer(peerId);
+
+    verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100));
+
+    deleteTable(tableName);
+  }
+
+  /**
+   * Replication endpoint that fails only once on first upload attempt, then 
succeeds on retry.
+   */
+  public static class FailingOnceContinuousBackupReplicationEndpoint
+    extends ContinuousBackupReplicationEndpoint {
+
+    private static boolean failedOnce = false;
+
+    @Override
+    protected void uploadBulkLoadFiles(long dayInMillis, List<Path> 
bulkLoadFiles)
+      throws BulkLoadUploadException {
+      if (!failedOnce) {
+        failedOnce = true;
+        throw new BulkLoadUploadException("Simulated upload failure on first 
attempt");
+      }
+      super.uploadBulkLoadFiles(dayInMillis, bulkLoadFiles);
+    }
+
+    /** Reset failure state for new tests */
+    public static void reset() {
+      failedOnce = false;
+    }
+  }
+
+  /**
+   * Unit test for verifying cleanup of partial files. Simulates a failure 
during
+   * {@link FileUtil#copy(FileSystem, Path, FileSystem, Path, boolean, 
boolean, Configuration)} and
+   * checks that the destination file is deleted.
+   */
+  @Test
+  public void testCopyWithCleanupDeletesPartialFile() throws Exception {
+    FileSystem srcFS = mock(FileSystem.class);
+    FileSystem dstFS = mock(FileSystem.class);
+    Path src = new Path("/src/file");
+    Path dst = new Path("/dst/file");
+    Configuration conf = new Configuration();
+
+    FileStatus srcStatus = mock(FileStatus.class);
+    FileStatus dstStatus = mock(FileStatus.class);
+
+    when(srcFS.getFileStatus(src)).thenReturn(srcStatus);
+    when(dstFS.getFileStatus(dst)).thenReturn(dstStatus);
+
+    // lengths differ -> should attempt to overwrite and then cleanup
+    when(srcStatus.getLen()).thenReturn(200L);
+    when(dstStatus.getLen()).thenReturn(100L);
+
+    // Simulate FileUtil.copy failing
+    try (MockedStatic<FileUtil> mockedFileUtil = mockStatic(FileUtil.class)) {
+      mockedFileUtil.when(
+        () -> FileUtil.copy(eq(srcFS), eq(src), eq(dstFS), eq(dst), eq(false), 
eq(true), eq(conf)))
+        .thenThrow(new IOException("simulated copy failure"));
+
+      // Pretend partial file exists in destination
+      when(dstFS.exists(dst)).thenReturn(true);
+
+      // Run the method under test
+      assertThrows(IOException.class, () -> copyWithCleanup(srcFS, src, dstFS, 
dst, conf));
+
+      // Verify cleanup happened
+      verify(dstFS).delete(dst, true);
+    }
+  }
+
+  /**
+   * Simulates a stale/partial file left behind after a failed bulk load. On 
retry, the stale file
+   * should be overwritten and replication succeeds.
+   */
+  @Test
+  public void testBulkLoadFileUploadWithStaleFileRetry() throws Exception {
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName = TableName.valueOf("table_" + methodName);
+    String peerId = "peerId";
+
+    // Reset static failure flag before test
+    PartiallyUploadedBulkloadFileEndpoint.reset();
+
+    createTable(tableName);
+
+    Path backupRootDir = new Path(root, methodName);
+    fs.mkdirs(backupRootDir);
+    conf.set(CONF_BACKUP_ROOT_DIR, backupRootDir.toString());
+
+    Map<TableName, List<String>> tableMap = new HashMap<>();
+    tableMap.put(tableName, new ArrayList<>());
+
+    addReplicationPeer(peerId, backupRootDir, tableMap,
+      PartiallyUploadedBulkloadFileEndpoint.class.getName());
+
+    loadRandomData(tableName, 100);
+    assertEquals(100, getRowCount(tableName));
+
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadByFamily");
+    generateHFiles(dir);
+    bulkLoadHFiles(tableName, dir);
+    assertEquals(1100, getRowCount(tableName));
+
+    // first attempt will fail leaving stale file, second attempt should 
overwrite and succeed
+    waitForReplication(15000);
+    deleteReplicationPeer(peerId);
+
+    verifyBackup(backupRootDir.toString(), true, Map.of(tableName, 1100));
+
+    deleteTable(tableName);
+  }
+
+  /**
+   * Replication endpoint that simulates leaving a partial file behind on 
first attempt, then
+   * succeeds on second attempt by overwriting it.
+   */
+  public static class PartiallyUploadedBulkloadFileEndpoint
+    extends ContinuousBackupReplicationEndpoint {
+
+    private static boolean firstAttempt = true;
+
+    @Override
+    protected void uploadBulkLoadFiles(long dayInMillis, List<Path> 
bulkLoadFiles)
+      throws BulkLoadUploadException {
+      if (firstAttempt) {
+        firstAttempt = false;
+        try {
+          // Construct destination path and create a partial file
+          String dayDirectoryName = formatToDateString(dayInMillis);
+          BackupFileSystemManager backupFileSystemManager =
+            new BackupFileSystemManager("peer1", conf, 
conf.get(CONF_BACKUP_ROOT_DIR));
+          Path bulkloadDir =
+            new Path(backupFileSystemManager.getBulkLoadFilesDir(), 
dayDirectoryName);
+
+          FileSystem dstFs = backupFileSystemManager.getBackupFs();
+          if (!dstFs.exists(bulkloadDir)) {
+            dstFs.mkdirs(bulkloadDir);
+          }
+
+          for (Path file : bulkLoadFiles) {
+            Path destPath = new Path(bulkloadDir, file);
+            try (FSDataOutputStream out = dstFs.create(destPath, true)) {
+              out.writeBytes("partial-data"); // simulate incomplete upload
+            }
+          }
+        } catch (IOException e) {
+          throw new BulkLoadUploadException("Simulated failure while creating 
partial file", e);
+        }
+
+        // Fail after leaving partial files behind
+        throw new BulkLoadUploadException("Simulated upload failure on first 
attempt");
+      }
+
+      // Retry succeeds, overwriting stale files
+      super.uploadBulkLoadFiles(dayInMillis, bulkLoadFiles);
+    }
+
+    /** Reset for new tests */
+    public static void reset() {
+      firstAttempt = true;
+    }
+  }
+
   private void createTable(TableName tableName) throws IOException {
     ColumnFamilyDescriptor columnFamilyDescriptor =
       
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CF_NAME)).setScope(1).build();
@@ -332,6 +552,12 @@ public class TestContinuousBackupReplicationEndpoint {
 
   private void addReplicationPeer(String peerId, Path backupRootDir,
     Map<TableName, List<String>> tableMap) throws IOException {
+    addReplicationPeer(peerId, backupRootDir, tableMap, replicationEndpoint);
+  }
+
+  private void addReplicationPeer(String peerId, Path backupRootDir,
+    Map<TableName, List<String>> tableMap, String 
customReplicationEndpointImpl)
+    throws IOException {
     Map<String, String> additionalArgs = new HashMap<>();
     additionalArgs.put(CONF_PEER_UUID, UUID.randomUUID().toString());
     additionalArgs.put(CONF_BACKUP_ROOT_DIR, backupRootDir.toString());
@@ -340,7 +566,7 @@ public class TestContinuousBackupReplicationEndpoint {
     additionalArgs.put(CONF_STAGED_WAL_FLUSH_INTERVAL, "10");
 
     ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
-      
.setReplicationEndpointImpl(replicationEndpoint).setReplicateAllUserTables(false)
+      
.setReplicationEndpointImpl(customReplicationEndpointImpl).setReplicateAllUserTables(false)
       .setTableCFsMap(tableMap).putAllConfiguration(additionalArgs).build();
 
     admin.addReplicationPeer(peerId, peerConfig);
@@ -358,6 +584,42 @@ public class TestContinuousBackupReplicationEndpoint {
     }
   }
 
+  private void bulkLoadHFiles(TableName tableName, Path inputDir) throws 
IOException {
+    
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 true);
+
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      BulkLoadHFiles loader = new 
BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
+      loader.bulkLoad(table.getName(), inputDir);
+    } finally {
+      
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 false);
+    }
+  }
+
+  private void bulkLoadHFiles(TableName tableName, Map<byte[], List<Path>> 
family2Files)
+    throws IOException {
+    
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 true);
+
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      BulkLoadHFiles loader = new 
BulkLoadHFilesTool(TEST_UTIL.getConfiguration());
+      loader.bulkLoad(table.getName(), family2Files);
+    } finally {
+      
TEST_UTIL.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY,
 false);
+    }
+  }
+
+  private void generateHFiles(Path outputDir) throws IOException {
+    String hFileName = "MyHFile";
+    int numRows = 1000;
+    outputDir = outputDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+    byte[] from = Bytes.toBytes(CF_NAME + "begin");
+    byte[] to = Bytes.toBytes(CF_NAME + "end");
+
+    Path familyDir = new Path(outputDir, CF_NAME);
+    HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, new 
Path(familyDir, hFileName),
+      Bytes.toBytes(CF_NAME), QUALIFIER, from, to, numRows);
+  }
+
   private void waitForReplication(int durationInMillis) {
     LOG.info("Waiting for replication to complete for {} ms", 
durationInMillis);
     try {
@@ -370,12 +632,17 @@ public class TestContinuousBackupReplicationEndpoint {
 
   /**
    * Verifies the backup process by: 1. Checking whether any WAL (Write-Ahead 
Log) files were
-   * generated in the backup directory. 2. Replaying the WAL files to restore 
data and check
-   * consistency by verifying that the restored data matches the expected row 
count for each table.
+   * generated in the backup directory. 2. Checking whether any bulk-loaded 
files were generated in
+   * the backup directory. 3. Replaying the WAL and bulk-loaded files (if 
present) to restore data
+   * and check consistency by verifying that the restored data matches the 
expected row count for
+   * each table.
    */
-  private void verifyBackup(String backupRootDir, Map<TableName, Integer> 
tablesWithExpectedRows)
-    throws IOException {
+  private void verifyBackup(String backupRootDir, boolean hasBulkLoadFiles,
+    Map<TableName, Integer> tablesWithExpectedRows) throws IOException {
     verifyWALBackup(backupRootDir);
+    if (hasBulkLoadFiles) {
+      verifyBulkLoadBackup(backupRootDir);
+    }
 
     for (Map.Entry<TableName, Integer> entry : 
tablesWithExpectedRows.entrySet()) {
       TableName tableName = entry.getKey();
@@ -387,6 +654,21 @@ public class TestContinuousBackupReplicationEndpoint {
 
       replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName);
 
+      // replay Bulk loaded HFiles if Present
+      try {
+        Path bulkloadDir = new Path(backupRootDir, BULKLOAD_FILES_DIR);
+        if (fs.exists(bulkloadDir)) {
+          FileStatus[] directories = fs.listStatus(bulkloadDir);
+          for (FileStatus dirStatus : directories) {
+            if (dirStatus.isDirectory()) {
+              replayBulkLoadHFilesIfPresent(dirStatus.getPath().toString(), 
tableName);
+            }
+          }
+        }
+      } catch (Exception e) {
+        fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage());
+      }
+
       assertEquals(expectedRows, getRowCount(tableName));
     }
   }
@@ -412,6 +694,15 @@ public class TestContinuousBackupReplicationEndpoint {
     assertFalse("Expected some WAL files but found none!", walFiles.isEmpty());
   }
 
+  private void verifyBulkLoadBackup(String backupRootDir) throws IOException {
+    Path bulkLoadFilesDir = new Path(backupRootDir, BULKLOAD_FILES_DIR);
+    assertTrue("BulkLoad Files directory does not exist!", 
fs.exists(bulkLoadFilesDir));
+
+    FileStatus[] bulkLoadFiles = fs.listStatus(bulkLoadFilesDir);
+    assertNotNull("No Bulk load files found!", bulkLoadFiles);
+    assertTrue("Expected some Bulk load files but found none!", 
bulkLoadFiles.length > 0);
+  }
+
   private void replayWALs(String walDir, TableName tableName) {
     WALPlayer player = new WALPlayer();
     try {
@@ -422,6 +713,28 @@ public class TestContinuousBackupReplicationEndpoint {
     }
   }
 
+  private void replayBulkLoadHFilesIfPresent(String bulkLoadDir, TableName 
tableName) {
+    try {
+      Path tableBulkLoadDir = new Path(bulkLoadDir + "/default/" + tableName);
+      if (fs.exists(tableBulkLoadDir)) {
+        RemoteIterator<LocatedFileStatus> fileStatusIterator = 
fs.listFiles(tableBulkLoadDir, true);
+        List<Path> bulkLoadFiles = new ArrayList<>();
+
+        while (fileStatusIterator.hasNext()) {
+          LocatedFileStatus fileStatus = fileStatusIterator.next();
+          Path filePath = fileStatus.getPath();
+
+          if (!fileStatus.isDirectory()) {
+            bulkLoadFiles.add(filePath);
+          }
+        }
+        bulkLoadHFiles(tableName, Map.of(Bytes.toBytes(CF_NAME), 
bulkLoadFiles));
+      }
+    } catch (Exception e) {
+      fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage());
+    }
+  }
+
   private int getRowCount(TableName tableName) throws IOException {
     try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
       return HBaseTestingUtil.countRows(table);

Reply via email to