This is an automated email from the ASF dual-hosted git repository.

anmolnar pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit dffff296efecd7b9d9ba88dce3ffdda3d6c912db
Author: Kevin Geiszler <[email protected]>
AuthorDate: Mon Mar 30 14:11:42 2026 -0700

    HBASE-29891: Multi-table continuous incremental backup is failing bec… 
(#7891)
    
    * HBASE-29891: Multi-table continuous incremental backup is failing because 
output directory already exists
    
    Change-Id: I710cc8d0d87a299b7782a19d93f28bf6283c2436
    
    * Revert incrementalCopyBulkloadHFiles()
    
    Change-Id: I92c8874bd73449d7341515d6aa06beac014577f6
    
    * Fix spotless error
    
    Change-Id: I8768261ade3a9da11a3c53ba5093c8920ef7a3d7
---
 .../backup/impl/AbstractPitrRestoreHandler.java    |   4 +
 .../backup/impl/IncrementalTableBackupClient.java  | 102 +++++++++++---
 .../hadoop/hbase/backup/util/RestoreTool.java      |  14 +-
 .../TestIncrementalBackupWithContinuous.java       | 154 +++++++++++++++++++--
 .../hadoop/hbase/mapreduce/HFileOutputFormat2.java |   3 +-
 .../apache/hadoop/hbase/mapreduce/WALPlayer.java   |  34 ++++-
 .../hadoop/hbase/mapreduce/TestWALPlayer.java      | 150 ++++++++++++++++++--
 7 files changed, 419 insertions(+), 42 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
index 3f31255d60f..34accf14f36 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.backup.impl;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static 
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
 import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
 
 import java.io.IOException;
@@ -411,6 +412,9 @@ public abstract class AbstractPitrRestoreHandler {
     conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
     conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
     conf.setBoolean(IGNORE_EMPTY_FILES, true);
+    // HFile output format defaults to false in HFileOutputFormat2, but we are 
explicitly setting
+    // it here just in case
+    conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
     Tool walPlayer = new WALPlayer();
     walPlayer.setConf(conf);
     return walPlayer;
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 8339786f973..345ab4c7a5a 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.backup.impl;
 import static org.apache.hadoop.hbase.backup.BackupInfo.withState;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
 import static 
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
+import static 
org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY;
+import static 
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
 
 import java.io.IOException;
 import java.net.URI;
@@ -33,6 +35,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
@@ -45,7 +48,6 @@ import org.apache.hadoop.hbase.backup.BackupRequest;
 import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
 import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Admin;
@@ -251,8 +253,12 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
   private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName 
tn, FileSystem tgtFs)
     throws IOException {
     MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob();
+    Configuration conf = new Configuration(this.conf);
     conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY,
       getBulkOutputDirForTable(tn).toString());
+    if (backupInfo.isContinuousBackupEnabled()) {
+      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
+    }
     player.setConf(conf);
 
     String inputDirs = StringUtils.join(files, ",");
@@ -361,10 +367,26 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
       setupRegionLocator();
       // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
       convertWALsToHFiles(tablesToWALFileList, tablesToPrevBackupTs);
-      incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
-        backupInfo.getBackupRootDir());
+
+      String[] bulkOutputFiles;
+      String backupDest = backupInfo.getBackupRootDir();
+      if (backupInfo.isContinuousBackupEnabled()) {
+        // For the continuous backup case, the WALs have been converted to 
HFiles in a separate
+        // map-reduce job for each table. In order to prevent MR job failures 
due to HBASE-29891,
+        // these HFiles were sent to a different output directory for each 
table. This means
+        // continuous backups require a list of source directories and a 
different destination
+        // directory when copying HFiles to the incremental backup directory.
+        List<String> uniqueNamespaces = tablesToWALFileList.keySet().stream()
+          .map(TableName::getNamespaceAsString).distinct().toList();
+        bulkOutputFiles = uniqueNamespaces.stream()
+          .map(ns -> new Path(getBulkOutputDir(), 
ns).toString()).toArray(String[]::new);
+        backupDest = backupDest + Path.SEPARATOR + backupId;
+      } else {
+        bulkOutputFiles = new String[] { getBulkOutputDir().toString() };
+      }
+      incrementalCopyHFiles(bulkOutputFiles, backupDest);
     } catch (Exception e) {
-      String msg = "Unexpected exception in incremental-backup: incremental 
copy " + backupId;
+      String msg = "Unexpected exception in incremental-backup: incremental 
copy " + backupId + " ";
       // fail the overall backup and return
       failBackup(conn, backupInfo, backupManager, e, msg, 
BackupType.INCREMENTAL, conf);
       throw new IOException(e);
@@ -418,7 +440,8 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
       System.arraycopy(files, 0, strArr, 0, files.length);
       strArr[strArr.length - 1] = backupDest;
 
-      String jobname = "Incremental_Backup-HFileCopy-" + 
backupInfo.getBackupId();
+      String jobname = "Incremental_Backup-HFileCopy-" + 
backupInfo.getBackupId() + "-"
+        + System.currentTimeMillis();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
       }
@@ -517,23 +540,25 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
   protected void walToHFiles(List<String> dirPaths, List<String> tableList, 
long previousBackupTs)
     throws IOException {
     Tool player = new WALPlayer();
+    Configuration conf = new Configuration(this.conf);
 
     // Player reads all files in arbitrary directory structure and creates
     // a Map task for each file. We use ';' as separator
     // because WAL file names contains ','
     String dirs = StringUtils.join(dirPaths, ';');
-    String jobname = "Incremental_Backup-" + backupId;
+    String jobname = "Incremental_Backup-" + backupId + "-" + 
System.currentTimeMillis();
 
-    Path bulkOutputPath = getBulkOutputDir();
-    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+    setBulkOutputPath(conf, tableList);
     conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
     conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
     conf.set(JOB_NAME_CONF_KEY, jobname);
-    boolean diskBasedSortingEnabledOriginalValue = 
HFileOutputFormat2.diskBasedSortingEnabled(conf);
     conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
     if (backupInfo.isContinuousBackupEnabled()) {
       conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs));
       conf.set(WALInputFormat.END_TIME_KEY, 
Long.toString(backupInfo.getIncrCommittedWalTs()));
+      // We do not want a multi-table HFile format here because continuous 
backups run the WALPlayer
+      // individually on each table in the backup set.
+      conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
     }
     String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
 
@@ -548,19 +573,32 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     } catch (Exception ee) {
       throw new IOException("Can not convert from directory " + dirs
         + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
-    } finally {
-      conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
-        diskBasedSortingEnabledOriginalValue);
-      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
-      conf.unset(JOB_NAME_CONF_KEY);
     }
   }
 
+  private void setBulkOutputPath(Configuration conf, List<String> tableList) {
+    Path bulkOutputPath = getBulkOutputDir();
+    if (backupInfo.isContinuousBackupEnabled()) {
+      if (tableList.size() != 1) {
+        // Continuous backups run the WALPlayer job on one table at a time, so 
the list of tables
+        // should have only one element.
+        throw new RuntimeException(
+          "Expected table list to have only one element, but got: " + 
tableList);
+      }
+      bulkOutputPath = 
getTmpBackupDirForTable(TableName.valueOf(tableList.get(0)));
+    }
+    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+  }
+
   private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn) 
throws IOException {
     Path bulkOutDir = getBulkOutputDirForTable(tn);
 
     if (tgtFs.exists(bulkOutDir)) {
-      conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2);
+      conf.setInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2);
+      LOG.debug(
+        "{} has been set to {}. This affects what source files are actually 
copied in the "
+          + "next Incremental copy HFiles job",
+        NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 
conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY));
       Path tgtPath = getTargetDirForTable(tn);
       try {
         RemoteIterator<LocatedFileStatus> locatedFiles = 
tgtFs.listFiles(bulkOutDir, true);
@@ -573,18 +611,40 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
         }
         incrementalCopyHFiles(files.toArray(files.toArray(new String[0])), 
tgtPath.toString());
       } finally {
-        conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
+        conf.unset(NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
+        LOG.debug("{} has been unset", NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
       }
     }
   }
 
+  /**
+   * Creates a path to the bulk load output directory for a table. This 
directory will look like:
+   * .../backupRoot/.tmp/backupId/namespace/table/data
+   * @param table The table whose HFiles are being bulk loaded
+   * @return A Path object representing the directory
+   */
   protected Path getBulkOutputDirForTable(TableName table) {
+    Path tablePath = getTmpBackupDirForTable(table);
+    return new Path(tablePath, "data");
+  }
+
+  /**
+   * Creates a path to a table's directory within the temporary directory. 
This directory will look
+   * like: .../backupRoot/.tmp/backupId/namespace/table
+   * @param table The table whose HFiles are being bulk loaded
+   * @return A Path object representing the directory
+   */
+  protected Path getTmpBackupDirForTable(TableName table) {
     Path tablePath = getBulkOutputDir();
     tablePath = new Path(tablePath, table.getNamespaceAsString());
-    tablePath = new Path(tablePath, table.getQualifierAsString());
-    return new Path(tablePath, "data");
+    return new Path(tablePath, table.getQualifierAsString());
   }
 
+  /**
+   * Creates a path to a temporary backup directory. This directory will look 
like:
+   * .../backupRoot/.tmp/backupId
+   * @return A Path object representing the directory
+   */
   protected Path getBulkOutputDir() {
     String backupId = backupInfo.getBackupId();
     Path path = new Path(backupInfo.getBackupRootDir());
@@ -593,6 +653,12 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     return path;
   }
 
+  /**
+   * Creates a path to a destination directory for bulk loaded HFiles. This 
directory will look
+   * like: .../backupRoot/backupID/namespace/table
+   * @param table The table whose HFiles are being bulk loaded
+   * @return A Path object representing the directory
+   */
   private Path getTargetDirForTable(TableName table) {
     Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR + 
backupInfo.getBackupId());
     path = new Path(path, table.getNamespaceAsString());
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
index 50b47565d74..93cd4b6b732 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.backup.util;
 
+import static 
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,9 +35,11 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.NamespaceNotFoundException;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
 import org.apache.hadoop.hbase.backup.HBackupFileSystem;
 import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
@@ -158,11 +162,17 @@ public class RestoreTool {
   public void incrementalRestoreTable(Connection conn, Path tableBackupPath, 
Path[] logDirs,
     TableName[] tableNames, TableName[] newTableNames, String incrBackupId,
     boolean keepOriginalSplits) throws IOException {
-    try (Admin admin = conn.getAdmin()) {
+    try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new 
BackupAdminImpl(conn)) {
       if (tableNames.length != newTableNames.length) {
         throw new IOException("Number of source tables and target tables does 
not match!");
       }
-      FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
+      Configuration conf = new Configuration(this.conf);
+      FileSystem fileSys = tableBackupPath.getFileSystem(conf);
+
+      BackupInfo backupInfo = backupAdmin.getBackupInfo(incrBackupId);
+      if (backupInfo.isContinuousBackupEnabled()) {
+        conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
+      }
 
       // for incremental backup image, expect the table already created either 
by user or previous
       // full backup. Here, check that all new tables exists
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
index 54a65f9f1a4..2dc52354952 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -22,11 +22,14 @@ import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarker
 import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -40,6 +43,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
@@ -93,16 +99,18 @@ public class TestIncrementalBackupWithContinuous extends 
TestBackupBase {
     TableName tableName = TableName.valueOf("table_" + methodName);
     Table t1 = TEST_UTIL.createTable(tableName, famName);
 
-    try (BackupSystemTable table = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
-      int before = table.getBackupHistory().size();
+    try (BackupSystemTable backupSystemTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      int before = backupSystemTable.getBackupHistory().size();
 
       // Run continuous backup
+      LOG.info("Running full backup with continuous backup enabled on table: 
{}", tableName);
       String backup1 = backupTables(BackupType.FULL, List.of(tableName), 
BACKUP_ROOT_DIR, true);
+      LOG.info("Full backup complete with ID {} for table: {}", backup1, 
tableName);
       assertTrue(checkSucceeded(backup1));
 
       // Verify backup history increased and all the backups are succeeded
       LOG.info("Verify backup history increased and all the backups are 
succeeded");
-      List<BackupInfo> backups = table.getBackupHistory();
+      List<BackupInfo> backups = backupSystemTable.getBackupHistory();
       assertEquals(before + 1, backups.size(), "Backup history should 
increase");
 
       // Verify backup manifest contains the correct tables
@@ -115,12 +123,12 @@ public class TestIncrementalBackupWithContinuous extends 
TestBackupBase {
       Thread.sleep(10000);
 
       // Run incremental backup
-      LOG.info("Run incremental backup now");
-      before = table.getBackupHistory().size();
+      LOG.info("Run incremental backup now on table: {}", tableName);
+      before = backupSystemTable.getBackupHistory().size();
       String backup2 =
         backupTables(BackupType.INCREMENTAL, List.of(tableName), 
BACKUP_ROOT_DIR, true);
       assertTrue(checkSucceeded(backup2));
-      LOG.info("Incremental backup completed");
+      LOG.info("Incremental backup completed for table: {}", tableName);
 
       // Verify the temporary backup directory was deleted
       Path backupTmpDir = new Path(BACKUP_ROOT_DIR, ".tmp");
@@ -129,18 +137,107 @@ public class TestIncrementalBackupWithContinuous extends 
TestBackupBase {
         "Bulk load output directory " + bulkLoadOutputDir + " should have been 
deleted");
 
       // Verify backup history increased and all the backups are succeeded
-      backups = table.getBackupHistory();
+      backups = backupSystemTable.getBackupHistory();
       assertEquals(before + 1, backups.size(), "Backup history should 
increase");
 
+      String originalTableChecksum = TEST_UTIL.checksumRows(t1);
+
+      LOG.info("Truncating table: {}", tableName);
       TEST_UTIL.truncateTable(tableName);
 
       // Restore incremental backup
       TableName[] tables = new TableName[] { tableName };
       BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
+      LOG.info("Restoring table: {}", tableName);
+      // In the restore request, the original table is both the "from table" 
and the "to table".
+      // This means the table is being restored "into itself". It is not being 
restored into
+      // separate table.
       client.restore(
         BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false, 
tables, tables, true));
 
-      assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName));
+      LOG.info("Verifying data integrity for restored table: {}", tableName);
+      verifyRestoredTableDataIntegrity(tables[0], originalTableChecksum, 
NB_ROWS_IN_BATCH);
+    }
+  }
+
+  @Test
+  public void testMultiTableContinuousBackupWithIncrementalBackupSuccess() 
throws Exception {
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    List<Table> tables = new ArrayList<>();
+    List<TableName> tableNames = new ArrayList<>();
+    tableNames.add(TableName.valueOf("table_" + methodName + "_0"));
+    tableNames.add(TableName.valueOf("table_" + methodName + "_1"));
+    tableNames.add(TableName.valueOf("ns1", "ns1_table_" + methodName + "_0"));
+    tableNames.add(TableName.valueOf("ns1", "ns1_table_" + methodName + "_1"));
+    tableNames.add(TableName.valueOf("sameTableNameDifferentNamespace"));
+    tableNames.add(TableName.valueOf("ns3", 
"sameTableNameDifferentNamespace"));
+
+    for (TableName table : tableNames) {
+      LOG.info("Creating table: {}", table);
+      tables.add(TEST_UTIL.createTable(table, famName));
+    }
+
+    try (BackupSystemTable backupSystemTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      int before = backupSystemTable.getBackupHistory().size();
+
+      // Run continuous backup on multiple tables
+      LOG.info("Running full backup with continuous backup enabled on tables: 
{}", tableNames);
+      String backup1 = backupTables(BackupType.FULL, tableNames, 
BACKUP_ROOT_DIR, true);
+      LOG.info("Full backup complete with ID {} for tables: {}", backup1, 
tableNames);
+      assertTrue(checkSucceeded(backup1));
+
+      // Verify backup history increased and all backups have succeeded
+      LOG.info("Verify backup history increased and all backups have 
succeeded");
+      List<BackupInfo> backups = backupSystemTable.getBackupHistory();
+      assertEquals(before + 1, backups.size(), "Backup history should 
increase");
+
+      // Verify backup manifest contains the correct tables
+      LOG.info("Verify backup manifest contains the correct tables");
+      BackupManifest manifest = getLatestBackupManifest(backups);
+      assertEquals(Sets.newHashSet(tableNames), new 
HashSet<>(manifest.getTableList()),
+        "Backup should contain the expected tables");
+
+      loadTables(tables);
+      Thread.sleep(10000);
+
+      // Run incremental backup
+      LOG.info("Running incremental backup on tables: {}", tableNames);
+      before = backupSystemTable.getBackupHistory().size();
+      String backup2 = backupTables(BackupType.INCREMENTAL, tableNames, 
BACKUP_ROOT_DIR, true);
+      assertTrue(checkSucceeded(backup2));
+      LOG.info("Incremental backup completed with ID {} for tables: {}", 
backup2, tableNames);
+
+      // Verify backup history increased and all the backups are succeeded
+      backups = backupSystemTable.getBackupHistory();
+      assertEquals(before + 1, backups.size(), "Backup history should 
increase");
+
+      // We need to get each table's original row checksum before truncating 
each table
+      LinkedHashMap<TableName, String> originalTableChecksums = new 
LinkedHashMap<>();
+      for (Table table : tables) {
+        LOG.info("Getting row checksum for table: {}", table);
+        originalTableChecksums.put(table.getName(), 
TEST_UTIL.checksumRows(table));
+      }
+
+      for (TableName tableName : tableNames) {
+        LOG.info("Truncating table: {}", tableName);
+        TEST_UTIL.truncateTable(tableName);
+      }
+
+      // Restore incremental backup
+      TableName[] tableNamesArray = tableNames.toArray(new TableName[0]);
+      BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
+      LOG.info("Restoring tables: {}", tableNames);
+      // In the restore request, the original tables are both the list of 
"from tables" and the
+      // list of "to tables". This means the tables are being restored "into 
themselves". They are
+      // not being restored into separate tables.
+      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backup2, false,
+        tableNamesArray, tableNamesArray, true));
+
+      for (TableName tableName : originalTableChecksums.keySet()) {
+        LOG.info("Verifying data integrity for restored table: {}", tableName);
+        verifyRestoredTableDataIntegrity(tableName, 
originalTableChecksums.get(tableName),
+          NB_ROWS_IN_BATCH);
+      }
     }
   }
 
@@ -232,4 +329,45 @@ public class TestIncrementalBackupWithContinuous extends 
TestBackupBase {
       table.put(p);
     }
   }
+
+  protected static void loadTables(List<Table> tables) throws Exception {
+    for (Table table : tables) {
+      LOG.info("Loading data into table: {}", table);
+      loadTable(table);
+    }
+  }
+
+  private void verifyRestoredTableDataIntegrity(TableName restoredTableName,
+    String originalTableChecksum, int expectedRowCount) throws Exception {
+    try (Table restoredTable = 
TEST_UTIL.getConnection().getTable(restoredTableName);
+      ResultScanner scanner = restoredTable.getScanner(new Scan())) {
+
+      // Verify the checksum for the original table (before it was truncated) 
matches the checksum
+      // of the restored table.
+      String restoredTableChecksum = TEST_UTIL.checksumRows(restoredTable);
+      assertEquals(originalTableChecksum, restoredTableChecksum,
+        "The restored table's row checksum did not match the original table's 
checksum");
+
+      // Verify the data in the restored table is the same as when it was 
originally loaded
+      // into the table.
+      int count = 0;
+      for (Result result : scanner) {
+        // The data has a numerical match between its row key and value (such 
as rowLoad1 and
+        // value1)
+        // We can use this to ensure a row key has the expected value.
+        String rowKey = Bytes.toString(result.getRow());
+        int index = Integer.parseInt(rowKey.replace("rowLoad", ""));
+
+        // Verify the Value
+        byte[] actualValue = result.getValue(famName, qualName);
+        assertNotNull(actualValue, "Value missing for row key: " + rowKey);
+        String expectedValue = "val" + index;
+        assertEquals(expectedValue, Bytes.toString(actualValue),
+          "Value mismatch for row key: " + rowKey);
+
+        count++;
+      }
+      assertEquals(expectedRowCount, count);
+    }
+  }
 }
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 0e81c95677c..87241d437cd 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -161,8 +161,9 @@ public class HFileOutputFormat2 extends 
FileOutputFormat<ImmutableBytesWritable,
     "hbase.bulkload.locality.sensitive.enabled";
   private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
   static final String OUTPUT_TABLE_NAME_CONF_KEY = 
"hbase.mapreduce.hfileoutputformat.table.name";
-  static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
+  public static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
     "hbase.mapreduce.use.multi.table.hfileoutputformat";
+  public static final boolean MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT = 
true;
 
   /**
    * ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. 
We expose this config
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 4c0b12ef733..cf4397d1052 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import static 
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT;
+import static 
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -378,14 +381,35 @@ public class WALPlayer extends Configured implements Tool 
{
       Path outputDir = new Path(hfileOutPath);
       FileOutputFormat.setOutputPath(job, outputDir);
       job.setMapOutputValueClass(MapReduceExtendedCell.class);
-      try (Connection conn = ConnectionFactory.createConnection(conf);) {
-        List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
-        for (TableName tableName : tableNames) {
+      try (Connection conn = ConnectionFactory.createConnection(conf)) {
+        if (
+          conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
+            MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT)
+        ) {
+          // The HFiles will be output to something like this for each table:
+          // .../BULK_OUTPUT_CONF_KEY/namespace/table/columnFamily
+          List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
+          for (TableName tableName : tableNames) {
+            Table table = conn.getTable(tableName);
+            RegionLocator regionLocator = getRegionLocator(tableName, conf, 
conn);
+            tableInfoList.add(new TableInfo(table.getDescriptor(), 
regionLocator));
+          }
+          MultiTableHFileOutputFormat.configureIncrementalLoad(job, 
tableInfoList);
+        } else {
+          // The HFiles will be output to something like: 
.../BULK_OUTPUT_CONF_KEY/columnFamily
+          // This is useful for scenarios where we are running the WALPlayer 
consecutively on just
+          // one table at a time, and BULK_OUTPUT_CONF_KEY is already set to a 
"namespace/table"
+          // directory path for each table.
+          if (tableNames.size() != 1) {
+            throw new IOException("Expected table names list to have only one 
table since "
+              + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false. 
Got the following "
+              + "list of tables instead: " + tableNames);
+          }
+          TableName tableName = tableNames.get(0);
           Table table = conn.getTable(tableName);
           RegionLocator regionLocator = getRegionLocator(tableName, conf, 
conn);
-          tableInfoList.add(new TableInfo(table.getDescriptor(), 
regionLocator));
+          HFileOutputFormat2.configureIncrementalLoad(job, 
table.getDescriptor(), regionLocator);
         }
-        MultiTableHFileOutputFormat.configureIncrementalLoad(job, 
tableInfoList);
       }
       TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
         
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index c4890a9ab4f..65a3598b65c 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import static 
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -33,6 +34,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.conf.Configuration;
@@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
@@ -84,28 +87,38 @@ import org.mockito.stubbing.Answer;
 @Tag(LargeTests.TAG)
 public class TestWALPlayer {
 
+  private static final byte[] FAMILY = Bytes.toBytes("family");
+  private static final byte[] COLUMN1 = Bytes.toBytes("c1");
+  private static final byte[] COLUMN2 = Bytes.toBytes("c2");
+  private static final byte[] ROW = Bytes.toBytes("row");
+
   private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
   private static SingleProcessHBaseCluster cluster;
   private static Path rootDir;
   private static Path walRootDir;
-  private static FileSystem fs;
+  private static FileSystem localFs;
   private static FileSystem logFs;
   private static Configuration conf;
+  private static FileSystem hdfs;
+  private static String bulkLoadOutputDir;
 
   @BeforeAll
   public static void beforeClass() throws Exception {
     conf = TEST_UTIL.getConfiguration();
     rootDir = TEST_UTIL.createRootDir();
     walRootDir = TEST_UTIL.createWALRootDir();
-    fs = CommonFSUtils.getRootDirFileSystem(conf);
+    localFs = CommonFSUtils.getRootDirFileSystem(conf);
     logFs = CommonFSUtils.getWALFileSystem(conf);
     cluster = TEST_UTIL.startMiniCluster();
+    hdfs = TEST_UTIL.getTestFileSystem();
+    bulkLoadOutputDir = new Path(new 
Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")),
+      Path.SEPARATOR + "bulkLoadOutput").toString();
   }
 
   @AfterAll
   public static void afterClass() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
-    fs.delete(rootDir, true);
+    localFs.delete(rootDir, true);
     logFs.delete(walRootDir, true);
   }
 
@@ -233,11 +246,8 @@ public class TestWALPlayer {
     Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
     Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
 
-    // put a row into the first table
-    Put p = new Put(ROW);
-    p.addColumn(FAMILY, COLUMN1, COLUMN1);
-    p.addColumn(FAMILY, COLUMN2, COLUMN2);
-    t1.put(p);
+    putRowIntoTable(t1);
+
     // delete one column
     Delete d = new Delete(ROW);
     d.addColumns(FAMILY, COLUMN1);
@@ -402,6 +412,112 @@ public class TestWALPlayer {
     assertNotEquals("WALPlayer should fail on empty files when not ignored", 
0, exitCode);
   }
 
+  /**
+   * Verifies the HFile output format for WALPlayer has the following 
directory structure when
+   * {@value HFileOutputFormat2#MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY} is set 
to true:<br>
+   * <br>
+   * .../BULK_OUTPUT_CONF_KEY/namespace/tableName/columnFamily
+   */
+  @Test
+  public void testWALPlayerMultiTableHFileOutputFormat() throws Exception {
+    String namespace = "ns_" + name.getMethodName();
+    
TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
+    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+    final TableName tableName2 = TableName.valueOf(namespace, 
name.getMethodName() + "2");
+    Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
+    Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
+
+    putRowIntoTable(t1);
+    putRowIntoTable(t2);
+
+    Configuration multiTableOutputConf = new Configuration(conf);
+    setConfSimilarToIncrementalBackupWALToHFilesMethod(multiTableOutputConf);
+
+    // We are testing this config variable's effect on HFile output for the 
WALPlayer
+    multiTableOutputConf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 
true);
+
+    WALPlayer player = new WALPlayer(multiTableOutputConf);
+    String walInputDir = new 
Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
+      HConstants.HREGION_LOGDIR_NAME).toString();
+    String tables = tableName1.getNameAsString() + "," + 
tableName2.getNameAsString();
+
+    ToolRunner.run(multiTableOutputConf, player, new String[] { walInputDir, 
tables });
+
+    assertMultiTableOutputFormatDirStructure(tableName1, "default");
+    assertMultiTableOutputFormatDirStructure(tableName2, namespace);
+
+    hdfs.delete(new Path(bulkLoadOutputDir), true);
+  }
+
+  /**
+   * Verifies the HFile output format for WALPlayer has the following 
directory structure when
+   * {@value HFileOutputFormat2#MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY} is set 
to false:<br>
+   * <br>
+   * .../BULK_OUTPUT_CONF_KEY/columnFamily <br>
+   * <br>
+   * Also verifies an exception occurs when the WALPlayer is run on multiple 
tables at once while
+   * hbase.mapreduce.use.multi.table.hfileoutputformat is set to false.
+   */
+  @Test
+  public void testWALPlayerSingleTableHFileOutputFormat() throws Exception {
+    String namespace = "ns_" + name.getMethodName();
+    
TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
+    final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+    final TableName tableName2 = TableName.valueOf(namespace, 
name.getMethodName() + "2");
+    Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
+    Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
+
+    putRowIntoTable(t1);
+    putRowIntoTable(t2);
+
+    String bulkLoadOutputDir = new Path(new 
Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")),
+      Path.SEPARATOR + "bulkLoadOutput").toString();
+
+    Configuration singleTableOutputConf = new Configuration(conf);
+    setConfSimilarToIncrementalBackupWALToHFilesMethod(singleTableOutputConf);
+
+    // We are testing this config variable's effect on HFile output for the 
WALPlayer
+    singleTableOutputConf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, 
false);
+
+    WALPlayer player = new WALPlayer(singleTableOutputConf);
+
+    String walInputDir = new 
Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
+      HConstants.HREGION_LOGDIR_NAME).toString();
+    String tables = tableName1.getNameAsString() + "," + 
tableName2.getNameAsString();
+
+    // Expecting a failure here since we are running WALPlayer on multiple 
tables even though the
+    // multi-table HFile output format is disabled
+    try {
+      ToolRunner.run(singleTableOutputConf, player, new String[] { 
walInputDir, tables });
+      fail("Expected a failure to occur due to using WALPlayer with multiple 
tables while having "
+        + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " set to false");
+    } catch (IOException e) {
+      String expectedMsg = "Expected table names list to have only one table 
since "
+        + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false. Got the 
following "
+        + "list of tables instead: 
[testWALPlayerSingleTableHFileOutputFormat1, " + namespace
+        + ":testWALPlayerSingleTableHFileOutputFormat2]";
+      assertTrue(e.getMessage().contains(expectedMsg));
+    }
+
+    // Successfully run WALPlayer on just one table while having multi-table 
HFile output format
+    // disabled
+    ToolRunner.run(singleTableOutputConf, player,
+      new String[] { walInputDir, tableName1.getNameAsString() });
+
+    Path bulkLoadOutputDirForTable = new Path(bulkLoadOutputDir, "family");
+    assertTrue("Expected path to exist: " + bulkLoadOutputDirForTable,
+      hdfs.exists(bulkLoadOutputDirForTable));
+
+    hdfs.delete(new Path(bulkLoadOutputDir), true);
+  }
+
+  private void putRowIntoTable(Table table) throws IOException {
+    Put p = new Put(ROW);
+    p.addColumn(FAMILY, COLUMN1, COLUMN1);
+    p.addColumn(FAMILY, COLUMN2, COLUMN2);
+    table.put(p);
+  }
+
   private Path createEmptyWALFile(String walDir) throws IOException {
     FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
     Path inputDir = new Path("/" + walDir);
@@ -413,4 +529,22 @@ public class TestWALPlayer {
 
     return inputDir;
   }
+
+  private void 
setConfSimilarToIncrementalBackupWALToHFilesMethod(Configuration conf) {
+    conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkLoadOutputDir);
+    conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
+    conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
+    conf.set("mapreduce.job.name", name.getMethodName() + "-" + 
System.currentTimeMillis());
+    conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
+  }
+
+  private void assertMultiTableOutputFormatDirStructure(TableName tableName, 
String namespace)
+    throws IOException {
+    Path qualifierAndFamilyDir =
+      new Path(tableName.getQualifierAsString(), new String(FAMILY, 
StandardCharsets.UTF_8));
+    Path namespaceQualifierFamilyDir = new Path(namespace, 
qualifierAndFamilyDir);
+    Path bulkLoadOutputDirForTable = new Path(bulkLoadOutputDir, 
namespaceQualifierFamilyDir);
+    assertTrue("Expected path to exist: " + bulkLoadOutputDirForTable,
+      hdfs.exists(bulkLoadOutputDirForTable));
+  }
 }


Reply via email to