This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957_rebased
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 716dab83d91d5b9510dde65344a42d21fd473c8a
Author: vinayak hegde <[email protected]>
AuthorDate: Wed Jun 11 19:55:48 2025 +0530

    HBASE-29255: Integrate backup WAL cleanup logic with the delete command 
(#7007)
    
    * Store bulkload files in daywise bucket as well
    
    * Integrate backup WAL cleanup logic with the delete command
    
    * address the review comments
    
    * address the review comments
    
    * address the review comments
    
    * add more unit tests to cover all cases
    
    * address the review comments
---
 hbase-backup/pom.xml                               |   5 +
 .../hadoop/hbase/backup/impl/BackupCommands.java   | 147 ++++++++++++++++
 .../hbase/backup/impl/BackupSystemTable.java       |  26 +++
 .../ContinuousBackupReplicationEndpoint.java       |  24 ++-
 .../hbase/backup/TestBackupDeleteWithCleanup.java  | 184 +++++++++++++++++++++
 .../hbase/backup/impl/TestBackupCommands.java      | 177 ++++++++++++++++++++
 .../TestContinuousBackupReplicationEndpoint.java   |  20 ++-
 7 files changed, 575 insertions(+), 8 deletions(-)

diff --git a/hbase-backup/pom.xml b/hbase-backup/pom.xml
index 7cac75f88d2..0bb4e4e03ca 100644
--- a/hbase-backup/pom.xml
+++ b/hbase-backup/pom.xml
@@ -182,6 +182,11 @@
       <artifactId>junit-vintage-engine</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-inline</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
index 804dc7141a1..11b6890ed03 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupCommands.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.backup.impl;
 
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BACKUP_LIST_DESC;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_BANDWIDTH;
@@ -47,18 +49,26 @@ import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKE
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_WORKERS_DESC;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_YARN_QUEUE_NAME_DESC;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
 
 import java.io.IOException;
 import java.net.URI;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -71,6 +81,7 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.BackupRestoreConstants.BackupCommand;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager;
 import org.apache.hadoop.hbase.backup.util.BackupSet;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Connection;
@@ -80,6 +91,7 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.base.Strings;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
@@ -649,6 +661,8 @@ public final class BackupCommands {
       } else if (cmdline.hasOption(OPTION_LIST)) {
         executeDeleteListOfBackups(cmdline, isForceDelete);
       }
+
+      cleanUpUnusedBackupWALs();
     }
 
     private void executeDeleteOlderThan(CommandLine cmdline, boolean 
isForceDelete)
@@ -876,6 +890,139 @@ public final class BackupCommands {
       return false;
     }
 
+    /**
+     * Cleans up Write-Ahead Logs (WALs) that are no longer required for PITR 
after a successful
+     * backup deletion.
+     */
+    private void cleanUpUnusedBackupWALs() throws IOException {
+      Configuration conf = getConf() != null ? getConf() : 
HBaseConfiguration.create();
+      String backupWalDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+
+      if (Strings.isNullOrEmpty(backupWalDir)) {
+        System.out.println("No WAL directory specified for continuous backup. 
Skipping cleanup.");
+        return;
+      }
+
+      try (BackupSystemTable sysTable = new BackupSystemTable(conn)) {
+        // Get list of tables under continuous backup
+        Map<TableName, Long> continuousBackupTables = 
sysTable.getContinuousBackupTableSet();
+        if (continuousBackupTables.isEmpty()) {
+          System.out.println("No continuous backups configured. Skipping WAL 
cleanup.");
+          return;
+        }
+
+        // Find the earliest timestamp after which WALs are still needed
+        long cutoffTimestamp = determineWALCleanupCutoffTime(sysTable);
+        if (cutoffTimestamp == 0) {
+          System.err.println("ERROR: No valid full backup found. Skipping WAL 
cleanup.");
+          return;
+        }
+
+        // Update metadata before actual cleanup to avoid inconsistencies
+        updateBackupTableStartTimes(sysTable, cutoffTimestamp);
+
+        // Delete WAL files older than cutoff timestamp
+        deleteOldWALFiles(conf, backupWalDir, cutoffTimestamp);
+
+      }
+    }
+
+    /**
+     * Determines the cutoff time for cleaning WAL files.
+     * @param sysTable Backup system table
+     * @return cutoff timestamp or 0 if not found
+     */
+    long determineWALCleanupCutoffTime(BackupSystemTable sysTable) throws 
IOException {
+      List<BackupInfo> backupInfos = 
sysTable.getBackupInfos(BackupState.COMPLETE);
+      Collections.reverse(backupInfos); // Start from oldest
+
+      for (BackupInfo backupInfo : backupInfos) {
+        if (BackupType.FULL.equals(backupInfo.getType())) {
+          return backupInfo.getStartTs();
+        }
+      }
+      return 0;
+    }
+
+    /**
+     * Updates the start time for continuous backups if older than cutoff 
timestamp.
+     * @param sysTable        Backup system table
+     * @param cutoffTimestamp Timestamp before which WALs are no longer needed
+     */
+    void updateBackupTableStartTimes(BackupSystemTable sysTable, long 
cutoffTimestamp)
+      throws IOException {
+
+      Map<TableName, Long> backupTables = 
sysTable.getContinuousBackupTableSet();
+      Set<TableName> tablesToUpdate = new HashSet<>();
+
+      for (Map.Entry<TableName, Long> entry : backupTables.entrySet()) {
+        if (entry.getValue() < cutoffTimestamp) {
+          tablesToUpdate.add(entry.getKey());
+        }
+      }
+
+      if (!tablesToUpdate.isEmpty()) {
+        sysTable.updateContinuousBackupTableSet(tablesToUpdate, 
cutoffTimestamp);
+      }
+    }
+
+    /**
+     * Cleans up old WAL and bulk-loaded files based on the determined cutoff 
timestamp.
+     */
+    void deleteOldWALFiles(Configuration conf, String backupWalDir, long 
cutoffTime)
+      throws IOException {
+      System.out.println("Starting WAL cleanup in backup directory: " + 
backupWalDir
+        + " with cutoff time: " + cutoffTime);
+
+      BackupFileSystemManager manager =
+        new BackupFileSystemManager(CONTINUOUS_BACKUP_REPLICATION_PEER, conf, 
backupWalDir);
+      FileSystem fs = manager.getBackupFs();
+      Path walDir = manager.getWalsDir();
+      Path bulkloadDir = manager.getBulkLoadFilesDir();
+
+      SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+      dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+      System.out.println("Listing directories under: " + walDir);
+
+      FileStatus[] directories = fs.listStatus(walDir);
+
+      for (FileStatus dirStatus : directories) {
+        if (!dirStatus.isDirectory()) {
+          continue; // Skip files, we only want directories
+        }
+
+        Path dirPath = dirStatus.getPath();
+        String dirName = dirPath.getName();
+
+        try {
+          long dayStart = parseDayDirectory(dirName, dateFormat);
+          System.out
+            .println("Checking WAL directory: " + dirName + " (Start Time: " + 
dayStart + ")");
+
+          // If WAL files of that day are older than cutoff time, delete them
+          if (dayStart + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
+            System.out.println("Deleting outdated WAL directory: " + dirPath);
+            fs.delete(dirPath, true);
+            fs.delete(new Path(bulkloadDir, dirName), true);
+          }
+        } catch (ParseException e) {
+          System.out.println("WARNING: Failed to parse directory name '" + 
dirName
+            + "'. Skipping. Error: " + e.getMessage());
+        } catch (IOException e) {
+          System.out.println("WARNING: Failed to delete directory '" + dirPath
+            + "'. Skipping. Error: " + e.getMessage());
+        }
+      }
+
+      System.out.println("Completed WAL cleanup for backup directory: " + 
backupWalDir);
+    }
+
+    private long parseDayDirectory(String dayDir, SimpleDateFormat dateFormat)
+      throws ParseException {
+      return dateFormat.parse(dayDir).getTime();
+    }
+
     @Override
     protected void printUsage() {
       System.out.println(DELETE_CMD_USAGE);
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 3166ddf3ef2..24bd0888f9c 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -979,6 +979,32 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
+  /**
+   * Updates the system table with the new start timestamps for continuous 
backup tables.
+   * @param tablesToUpdate    The set of tables that need their start 
timestamps updated.
+   * @param newStartTimestamp The new start timestamp to be set.
+   */
+  public void updateContinuousBackupTableSet(Set<TableName> tablesToUpdate, 
long newStartTimestamp)
+    throws IOException {
+    if (tablesToUpdate == null || tablesToUpdate.isEmpty()) {
+      LOG.warn("No tables provided for updating start timestamps.");
+      return;
+    }
+
+    try (Table table = connection.getTable(tableName)) {
+      Put put = new Put(rowkey(CONTINUOUS_BACKUP_SET));
+
+      for (TableName tableName : tablesToUpdate) {
+        put.addColumn(BackupSystemTable.META_FAMILY, 
Bytes.toBytes(tableName.getNameAsString()),
+          Bytes.toBytes(newStartTimestamp));
+      }
+
+      table.put(put);
+      LOG.info("Successfully updated start timestamps for {} tables in the 
backup system table.",
+        tablesToUpdate.size());
+    }
+  }
+
   /**
    * Removes tables from the global continuous backup set. Only removes 
entries that currently exist
    * in the backup system table.
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
index 34fcd76bf9c..eeacc8fbf34 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/replication/ContinuousBackupReplicationEndpoint.java
@@ -23,6 +23,7 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -304,7 +305,7 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
         walWriter.append(entry);
       }
       walWriter.sync(true);
-      uploadBulkLoadFiles(bulkLoadFiles);
+      uploadBulkLoadFiles(day, bulkLoadFiles);
     } catch (UncheckedIOException e) {
       String errorMsg = Utils.logPeerId(peerId) + " Failed to get or create 
WAL Writer for " + day;
       LOG.error("{} Backup failed for day {}. Error: {}", 
Utils.logPeerId(peerId), day,
@@ -314,9 +315,7 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
   }
 
   private FSHLogProvider.Writer createWalWriter(long dayInMillis) {
-    // Convert dayInMillis to "yyyy-MM-dd" format
-    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
-    String dayDirectoryName = dateFormat.format(new Date(dayInMillis));
+    String dayDirectoryName = formatToDateString(dayInMillis);
 
     FileSystem fs = backupFileSystemManager.getBackupFs();
     Path walsDir = backupFileSystemManager.getWalsDir();
@@ -376,7 +375,7 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
     }
   }
 
-  private void uploadBulkLoadFiles(List<Path> bulkLoadFiles) throws 
IOException {
+  private void uploadBulkLoadFiles(long dayInMillis, List<Path> bulkLoadFiles) 
throws IOException {
     LOG.debug("{} Starting upload of {} bulk load files", 
Utils.logPeerId(peerId),
       bulkLoadFiles.size());
 
@@ -384,9 +383,13 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
       LOG.trace("{} Bulk load files to upload: {}", Utils.logPeerId(peerId),
         
bulkLoadFiles.stream().map(Path::toString).collect(Collectors.joining(", ")));
     }
+    String dayDirectoryName = formatToDateString(dayInMillis);
+    Path bulkloadDir = new Path(backupFileSystemManager.getBulkLoadFilesDir(), 
dayDirectoryName);
+    backupFileSystemManager.getBackupFs().mkdirs(bulkloadDir);
+
     for (Path file : bulkLoadFiles) {
       Path sourcePath = getBulkLoadFileStagingPath(file);
-      Path destPath = new Path(backupFileSystemManager.getBulkLoadFilesDir(), 
file);
+      Path destPath = new Path(bulkloadDir, file);
 
       try {
         LOG.debug("{} Copying bulk load file from {} to {}", 
Utils.logPeerId(peerId), sourcePath,
@@ -407,6 +410,15 @@ public class ContinuousBackupReplicationEndpoint extends 
BaseReplicationEndpoint
     LOG.debug("{} Completed upload of bulk load files", 
Utils.logPeerId(peerId));
   }
 
+  /**
+   * Convert dayInMillis to "yyyy-MM-dd" format
+   */
+  private String formatToDateString(long dayInMillis) {
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    return dateFormat.format(new Date(dayInMillis));
+  }
+
   private Path getBulkLoadFileStagingPath(Path relativePathFromNamespace) 
throws IOException {
     FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf);
     Path rootDir = CommonFSUtils.getRootDir(conf);
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
new file mode 100644
index 00000000000..6d76ac4e89b
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDeleteWithCleanup.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category(LargeTests.class)
+public class TestBackupDeleteWithCleanup extends TestBackupBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBackupDeleteWithCleanup.class);
+
+  String backupWalDirName = "TestBackupDeleteWithCleanup";
+
+  @Test
+  public void testBackupDeleteWithCleanupLogic() throws Exception {
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    Path backupWalDir = new Path(root, backupWalDirName);
+    conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+    FileSystem fs = FileSystem.get(conf1);
+    fs.mkdirs(backupWalDir);
+
+    // Step 1: Setup Backup Folders
+    long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+    setupBackupFolders(fs, backupWalDir, currentTime);
+
+    // Log the directory structure before cleanup
+    logDirectoryStructure(fs, backupWalDir, "Directory structure BEFORE 
cleanup:");
+
+    // Step 2: Simulate Backup Creation
+    BackupSystemTable backupSystemTable = new 
BackupSystemTable(TEST_UTIL.getConnection());
+    backupSystemTable.addContinuousBackupTableSet(Set.of(table1),
+      currentTime - (2 * ONE_DAY_IN_MILLISECONDS));
+
+    EnvironmentEdgeManager
+      .injectEdge(() -> System.currentTimeMillis() - (2 * 
ONE_DAY_IN_MILLISECONDS));
+    String backupId = fullTableBackup(Lists.newArrayList(table1));
+    assertTrue(checkSucceeded(backupId));
+
+    String anotherBackupId = fullTableBackup(Lists.newArrayList(table1));
+    assertTrue(checkSucceeded(anotherBackupId));
+
+    // Step 3: Run Delete Command
+    int ret =
+      ToolRunner.run(conf1, new BackupDriver(), new String[] { "delete", "-l", 
backupId, "-fd" });
+    assertEquals(0, ret);
+
+    // Log the directory structure after cleanup
+    logDirectoryStructure(fs, backupWalDir, "Directory structure AFTER 
cleanup:");
+
+    // Step 4: Verify Cleanup
+    verifyBackupCleanup(fs, backupWalDir, currentTime);
+
+    // Step 5: Verify System Table Update
+    verifySystemTableUpdate(backupSystemTable, currentTime);
+  }
+
+  public static void setupBackupFolders(FileSystem fs, Path backupWalDir, long 
currentTime)
+    throws IOException {
+    Path walsDir = new Path(backupWalDir, WALS_DIR);
+    Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
+
+    fs.mkdirs(walsDir);
+    fs.mkdirs(bulkLoadDir);
+
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+    for (int i = 0; i < 5; i++) {
+      String dateStr = dateFormat.format(new Date(currentTime - (i * 
ONE_DAY_IN_MILLISECONDS)));
+      fs.mkdirs(new Path(walsDir, dateStr));
+      fs.mkdirs(new Path(bulkLoadDir, dateStr));
+    }
+  }
+
+  private static void verifyBackupCleanup(FileSystem fs, Path backupWalDir, 
long currentTime)
+    throws IOException {
+    Path walsDir = new Path(backupWalDir, WALS_DIR);
+    Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+
+    // Expect folders older than 3 days to be deleted
+    for (int i = 3; i < 5; i++) {
+      String oldDateStr = dateFormat.format(new Date(currentTime - (i * 
ONE_DAY_IN_MILLISECONDS)));
+      Path walPath = new Path(walsDir, oldDateStr);
+      Path bulkLoadPath = new Path(bulkLoadDir, oldDateStr);
+      assertFalse("Old WAL directory (" + walPath + ") should be deleted, but 
it exists!",
+        fs.exists(walPath));
+      assertFalse("Old BulkLoad directory (" + bulkLoadPath + ") should be 
deleted, but it exists!",
+        fs.exists(bulkLoadPath));
+    }
+
+    // Expect folders within the last 3 days to exist
+    for (int i = 0; i < 3; i++) {
+      String recentDateStr =
+        dateFormat.format(new Date(currentTime - (i * 
ONE_DAY_IN_MILLISECONDS)));
+      Path walPath = new Path(walsDir, recentDateStr);
+      Path bulkLoadPath = new Path(bulkLoadDir, recentDateStr);
+
+      assertTrue("Recent WAL directory (" + walPath + ") should exist, but it 
is missing!",
+        fs.exists(walPath));
+      assertTrue(
+        "Recent BulkLoad directory (" + bulkLoadPath + ") should exist, but it 
is missing!",
+        fs.exists(bulkLoadPath));
+    }
+  }
+
+  private void verifySystemTableUpdate(BackupSystemTable backupSystemTable, 
long currentTime)
+    throws IOException {
+    Map<TableName, Long> updatedTables = 
backupSystemTable.getContinuousBackupTableSet();
+
+    for (Map.Entry<TableName, Long> entry : updatedTables.entrySet()) {
+      long updatedStartTime = entry.getValue();
+
+      // Ensure that the updated start time is not earlier than the expected 
cutoff time
+      assertTrue("System table update failed!",
+        updatedStartTime >= (currentTime - (3 * ONE_DAY_IN_MILLISECONDS)));
+    }
+  }
+
+  public static void logDirectoryStructure(FileSystem fs, Path dir, String 
message)
+    throws IOException {
+    System.out.println(message);
+    listDirectory(fs, dir, "  ");
+  }
+
+  public static void listDirectory(FileSystem fs, Path dir, String indent) 
throws IOException {
+    if (!fs.exists(dir)) {
+      System.out.println(indent + "[Missing] " + dir);
+      return;
+    }
+    FileStatus[] files = fs.listStatus(dir);
+    System.out.println(indent + dir);
+    for (FileStatus file : files) {
+      if (file.isDirectory()) {
+        listDirectory(fs, file.getPath(), indent + "  ");
+      } else {
+        System.out.println(indent + "  " + file.getPath());
+      }
+    }
+  }
+}
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
new file mode 100644
index 00000000000..b2ebbd640bb
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/impl/TestBackupCommands.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static 
org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.logDirectoryStructure;
+import static 
org.apache.hadoop.hbase.backup.TestBackupDeleteWithCleanup.setupBackupFolders;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.BULKLOAD_FILES_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.BackupFileSystemManager.WALS_DIR;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.DATE_FORMAT;
+import static 
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.TestBackupBase;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestBackupCommands extends TestBackupBase {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBackupCommands.class);
+
+  String backupWalDirName = "TestBackupWalDir";
+
+  /**
+   * Tests whether determineWALCleanupCutoffTime returns the correct FULL 
backup start timestamp.
+   */
+  @Test
+  public void testDetermineWALCleanupCutoffTimeOfCleanupCommand() throws 
IOException {
+    // GIVEN
+    BackupSystemTable sysTable = mock(BackupSystemTable.class);
+
+    BackupInfo full1 = new BackupInfo();
+    full1.setType(BackupType.FULL);
+    full1.setStartTs(1111L);
+    full1.setState(BackupInfo.BackupState.COMPLETE);
+
+    BackupInfo inc = new BackupInfo();
+    inc.setType(BackupType.INCREMENTAL);
+    inc.setStartTs(2222L);
+    inc.setState(BackupInfo.BackupState.COMPLETE);
+
+    BackupInfo full2 = new BackupInfo();
+    full2.setType(BackupType.FULL);
+    full2.setStartTs(3333L);
+    full2.setState(BackupInfo.BackupState.COMPLETE);
+
+    // Ordered as newest to oldest, will be reversed in the method
+    List<BackupInfo> backupInfos = List.of(full2, inc, full1);
+    when(sysTable.getBackupInfos(BackupInfo.BackupState.COMPLETE))
+      .thenReturn(new ArrayList<>(backupInfos));
+
+    // WHEN
+    BackupCommands.DeleteCommand command = new 
BackupCommands.DeleteCommand(conf1, null);
+    long cutoff = command.determineWALCleanupCutoffTime(sysTable);
+
+    // THEN
+    assertEquals("Expected oldest FULL backup timestamp", 1111L, cutoff);
+  }
+
+  @Test
+  public void testUpdateBackupTableStartTimesOfCleanupCommand() throws 
IOException {
+    // GIVEN
+    BackupSystemTable mockSysTable = mock(BackupSystemTable.class);
+
+    TableName tableA = TableName.valueOf("ns", "tableA");
+    TableName tableB = TableName.valueOf("ns", "tableB");
+    TableName tableC = TableName.valueOf("ns", "tableC");
+
+    long cutoffTimestamp = 1_000_000L;
+
+    // Simulate current table start times
+    Map<TableName, Long> tableSet = Map.of(tableA, 900_000L, // Before cutoff 
→ should be updated
+      tableB, 1_100_000L, // After cutoff → should NOT be updated
+      tableC, 800_000L // Before cutoff → should be updated
+    );
+
+    when(mockSysTable.getContinuousBackupTableSet()).thenReturn(tableSet);
+
+    // WHEN
+    BackupCommands.DeleteCommand command = new 
BackupCommands.DeleteCommand(conf1, null);
+    command.updateBackupTableStartTimes(mockSysTable, cutoffTimestamp);
+
+    // THEN
+    Set<TableName> expectedUpdated = Set.of(tableA, tableC);
+    verify(mockSysTable).updateContinuousBackupTableSet(expectedUpdated, 
cutoffTimestamp);
+  }
+
+  @Test
+  public void testDeleteOldWALFilesOfCleanupCommand() throws IOException {
+    // GIVEN
+    Path root = TEST_UTIL.getDataTestDirOnTestFS();
+    Path backupWalDir = new Path(root, backupWalDirName);
+    conf1.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+
+    FileSystem fs = FileSystem.get(conf1);
+    fs.mkdirs(backupWalDir);
+
+    long currentTime = EnvironmentEdgeManager.getDelegate().currentTime();
+    setupBackupFolders(fs, backupWalDir, currentTime); // Create 5 days of 
WAL/bulk folders
+
+    logDirectoryStructure(fs, backupWalDir, "Before cleanup:");
+
+    // Delete files older than 2 days from current time
+    long cutoffTime = currentTime - (2 * ONE_DAY_IN_MILLISECONDS);
+
+    // WHEN
+    BackupCommands.DeleteCommand command = new 
BackupCommands.DeleteCommand(conf1, null);
+    command.deleteOldWALFiles(conf1, backupWalDir.toString(), cutoffTime);
+
+    logDirectoryStructure(fs, backupWalDir, "After cleanup:");
+
+    // THEN
+    verifyCleanupOutcome(fs, backupWalDir, currentTime, cutoffTime);
+  }
+
+  private static void verifyCleanupOutcome(FileSystem fs, Path backupWalDir, 
long currentTime,
+    long cutoffTime) throws IOException {
+    Path walsDir = new Path(backupWalDir, WALS_DIR);
+    Path bulkLoadDir = new Path(backupWalDir, BULKLOAD_FILES_DIR);
+    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+    for (int i = 0; i < 5; i++) {
+      long dayTime = currentTime - (i * ONE_DAY_IN_MILLISECONDS);
+      String dayDir = dateFormat.format(new Date(dayTime));
+      Path walPath = new Path(walsDir, dayDir);
+      Path bulkPath = new Path(bulkLoadDir, dayDir);
+
+      if (dayTime + ONE_DAY_IN_MILLISECONDS - 1 < cutoffTime) {
+        assertFalse("Old WAL dir should be deleted: " + walPath, 
fs.exists(walPath));
+        assertFalse("Old BulkLoad dir should be deleted: " + bulkPath, 
fs.exists(bulkPath));
+      } else {
+        assertTrue("Recent WAL dir should exist: " + walPath, 
fs.exists(walPath));
+        assertTrue("Recent BulkLoad dir should exist: " + bulkPath, 
fs.exists(bulkPath));
+      }
+    }
+  }
+}
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
index cd1f758f760..253675f85d9 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/replication/TestContinuousBackupReplicationEndpoint.java
@@ -44,6 +44,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
@@ -282,6 +283,7 @@ public class TestContinuousBackupReplicationEndpoint {
     long oneDayBackTime = currentTime - ONE_DAY_IN_MILLISECONDS;
 
     SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
     String expectedPrevDayDir = dateFormat.format(new Date(oneDayBackTime));
     String expectedCurrentDayDir = dateFormat.format(new Date(currentTime));
 
@@ -437,8 +439,22 @@ public class TestContinuousBackupReplicationEndpoint {
       assertEquals(0, getRowCount(tableName));
 
       replayWALs(new Path(backupRootDir, WALS_DIR).toString(), tableName);
-      replayBulkLoadHFilesIfPresent(new Path(backupRootDir, 
BULKLOAD_FILES_DIR).toString(),
-        tableName);
+
+      // replay Bulk loaded HFiles if Present
+      try {
+        Path bulkloadDir = new Path(backupRootDir, BULKLOAD_FILES_DIR);
+        if (fs.exists(bulkloadDir)) {
+          FileStatus[] directories = fs.listStatus(bulkloadDir);
+          for (FileStatus dirStatus : directories) {
+            if (dirStatus.isDirectory()) {
+              replayBulkLoadHFilesIfPresent(dirStatus.getPath().toString(), 
tableName);
+            }
+          }
+        }
+      } catch (Exception e) {
+        fail("Failed to replay BulkLoad HFiles properly: " + e.getMessage());
+      }
+
       assertEquals(expectedRows, getRowCount(tableName));
     }
   }

Reply via email to