This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push:
new aaaea57890e HBASE-29891: Multi-table continuous incremental backup is
failing bec… (#7891)
aaaea57890e is described below
commit aaaea57890e4b95ab6939ac80d0a6367b41d24b2
Author: Kevin Geiszler <[email protected]>
AuthorDate: Mon Mar 30 14:11:42 2026 -0700
HBASE-29891: Multi-table continuous incremental backup is failing bec…
(#7891)
* HBASE-29891: Multi-table continuous incremental backup is failing because
output directory already exists
Change-Id: I710cc8d0d87a299b7782a19d93f28bf6283c2436
* Revert incrementalCopyBulkloadHFiles()
Change-Id: I92c8874bd73449d7341515d6aa06beac014577f6
* Fix spotless error
Change-Id: I8768261ade3a9da11a3c53ba5093c8920ef7a3d7
---
.../backup/impl/AbstractPitrRestoreHandler.java | 4 +
.../backup/impl/IncrementalTableBackupClient.java | 102 +++++++++++---
.../hadoop/hbase/backup/util/RestoreTool.java | 14 +-
.../TestIncrementalBackupWithContinuous.java | 154 +++++++++++++++++++--
.../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 3 +-
.../apache/hadoop/hbase/mapreduce/WALPlayer.java | 34 ++++-
.../hadoop/hbase/mapreduce/TestWALPlayer.java | 154 +++++++++++++++++++--
7 files changed, 419 insertions(+), 46 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
index 3f31255d60f..34accf14f36 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.backup.impl;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
+import static
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
import java.io.IOException;
@@ -411,6 +412,9 @@ public abstract class AbstractPitrRestoreHandler {
conf.setLong(WALInputFormat.START_TIME_KEY, startTime);
conf.setLong(WALInputFormat.END_TIME_KEY, endTime);
conf.setBoolean(IGNORE_EMPTY_FILES, true);
+ // HFile output format defaults to false in HFileOutputFormat2, but we are
explicitly setting
+ // it here just in case
+ conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
Tool walPlayer = new WALPlayer();
walPlayer.setConf(conf);
return walPlayer;
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 8339786f973..345ab4c7a5a 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.backup.impl;
import static org.apache.hadoop.hbase.backup.BackupInfo.withState;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
+import static
org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY;
+import static
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
import java.io.IOException;
import java.net.URI;
@@ -33,6 +35,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -45,7 +48,6 @@ import org.apache.hadoop.hbase.backup.BackupRequest;
import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
-import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
@@ -251,8 +253,12 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName
tn, FileSystem tgtFs)
throws IOException {
MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob();
+ Configuration conf = new Configuration(this.conf);
conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY,
getBulkOutputDirForTable(tn).toString());
+ if (backupInfo.isContinuousBackupEnabled()) {
+ conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
+ }
player.setConf(conf);
String inputDirs = StringUtils.join(files, ",");
@@ -361,10 +367,26 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
setupRegionLocator();
// convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
convertWALsToHFiles(tablesToWALFileList, tablesToPrevBackupTs);
- incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
- backupInfo.getBackupRootDir());
+
+ String[] bulkOutputFiles;
+ String backupDest = backupInfo.getBackupRootDir();
+ if (backupInfo.isContinuousBackupEnabled()) {
+ // For the continuous backup case, the WALs have been converted to
HFiles in a separate
+ // map-reduce job for each table. In order to prevent MR job failures
due to HBASE-29891,
+ // these HFiles were sent to a different output directory for each
table. This means
+ // continuous backups require a list of source directories and a
different destination
+ // directory when copying HFiles to the incremental backup directory.
+ List<String> uniqueNamespaces = tablesToWALFileList.keySet().stream()
+ .map(TableName::getNamespaceAsString).distinct().toList();
+ bulkOutputFiles = uniqueNamespaces.stream()
+ .map(ns -> new Path(getBulkOutputDir(),
ns).toString()).toArray(String[]::new);
+ backupDest = backupDest + Path.SEPARATOR + backupId;
+ } else {
+ bulkOutputFiles = new String[] { getBulkOutputDir().toString() };
+ }
+ incrementalCopyHFiles(bulkOutputFiles, backupDest);
} catch (Exception e) {
- String msg = "Unexpected exception in incremental-backup: incremental
copy " + backupId;
+ String msg = "Unexpected exception in incremental-backup: incremental
copy " + backupId + " ";
// fail the overall backup and return
failBackup(conn, backupInfo, backupManager, e, msg,
BackupType.INCREMENTAL, conf);
throw new IOException(e);
@@ -418,7 +440,8 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
System.arraycopy(files, 0, strArr, 0, files.length);
strArr[strArr.length - 1] = backupDest;
- String jobname = "Incremental_Backup-HFileCopy-" +
backupInfo.getBackupId();
+ String jobname = "Incremental_Backup-HFileCopy-" +
backupInfo.getBackupId() + "-"
+ + System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
}
@@ -517,23 +540,25 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
protected void walToHFiles(List<String> dirPaths, List<String> tableList,
long previousBackupTs)
throws IOException {
Tool player = new WALPlayer();
+ Configuration conf = new Configuration(this.conf);
// Player reads all files in arbitrary directory structure and creates
// a Map task for each file. We use ';' as separator
// because WAL file names contains ','
String dirs = StringUtils.join(dirPaths, ';');
- String jobname = "Incremental_Backup-" + backupId;
+ String jobname = "Incremental_Backup-" + backupId + "-" +
System.currentTimeMillis();
- Path bulkOutputPath = getBulkOutputDir();
- conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+ setBulkOutputPath(conf, tableList);
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
conf.set(JOB_NAME_CONF_KEY, jobname);
- boolean diskBasedSortingEnabledOriginalValue =
HFileOutputFormat2.diskBasedSortingEnabled(conf);
conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
if (backupInfo.isContinuousBackupEnabled()) {
conf.set(WALInputFormat.START_TIME_KEY, Long.toString(previousBackupTs));
conf.set(WALInputFormat.END_TIME_KEY,
Long.toString(backupInfo.getIncrCommittedWalTs()));
+ // We do not want a multi-table HFile format here because continuous
backups run the WALPlayer
+ // individually on each table in the backup set.
+ conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
}
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
@@ -548,19 +573,32 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
} catch (Exception ee) {
throw new IOException("Can not convert from directory " + dirs
+ " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
- } finally {
- conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
- diskBasedSortingEnabledOriginalValue);
- conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
- conf.unset(JOB_NAME_CONF_KEY);
}
}
+ private void setBulkOutputPath(Configuration conf, List<String> tableList) {
+ Path bulkOutputPath = getBulkOutputDir();
+ if (backupInfo.isContinuousBackupEnabled()) {
+ if (tableList.size() != 1) {
+ // Continuous backups run the WALPlayer job on one table at a time, so
the list of tables
+ // should have only one element.
+ throw new RuntimeException(
+ "Expected table list to have only one element, but got: " +
tableList);
+ }
+ bulkOutputPath =
getTmpBackupDirForTable(TableName.valueOf(tableList.get(0)));
+ }
+ conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
+ }
+
private void incrementalCopyBulkloadHFiles(FileSystem tgtFs, TableName tn)
throws IOException {
Path bulkOutDir = getBulkOutputDirForTable(tn);
if (tgtFs.exists(bulkOutDir)) {
- conf.setInt(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2);
+ conf.setInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 2);
+ LOG.debug(
+ "{} has been set to {}. This affects what source files are actually
copied in the "
+ + "next Incremental copy HFiles job",
+ NUMBER_OF_LEVELS_TO_PRESERVE_KEY,
conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY));
Path tgtPath = getTargetDirForTable(tn);
try {
RemoteIterator<LocatedFileStatus> locatedFiles =
tgtFs.listFiles(bulkOutDir, true);
@@ -573,18 +611,40 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
}
incrementalCopyHFiles(files.toArray(files.toArray(new String[0])),
tgtPath.toString());
} finally {
- conf.unset(MapReduceBackupCopyJob.NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
+ conf.unset(NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
+ LOG.debug("{} has been unset", NUMBER_OF_LEVELS_TO_PRESERVE_KEY);
}
}
}
+ /**
+ * Creates a path to the bulk load output directory for a table. This
directory will look like:
+ * .../backupRoot/.tmp/backupId/namespace/table/data
+ * @param table The table whose HFiles are being bulk loaded
+ * @return A Path object representing the directory
+ */
protected Path getBulkOutputDirForTable(TableName table) {
+ Path tablePath = getTmpBackupDirForTable(table);
+ return new Path(tablePath, "data");
+ }
+
+ /**
+ * Creates a path to a table's directory within the temporary directory.
This directory will look
+ * like: .../backupRoot/.tmp/backupId/namespace/table
+ * @param table The table whose HFiles are being bulk loaded
+ * @return A Path object representing the directory
+ */
+ protected Path getTmpBackupDirForTable(TableName table) {
Path tablePath = getBulkOutputDir();
tablePath = new Path(tablePath, table.getNamespaceAsString());
- tablePath = new Path(tablePath, table.getQualifierAsString());
- return new Path(tablePath, "data");
+ return new Path(tablePath, table.getQualifierAsString());
}
+ /**
+ * Creates a path to a temporary backup directory. This directory will look
like:
+ * .../backupRoot/.tmp/backupId
+ * @return A Path object representing the directory
+ */
protected Path getBulkOutputDir() {
String backupId = backupInfo.getBackupId();
Path path = new Path(backupInfo.getBackupRootDir());
@@ -593,6 +653,12 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
return path;
}
+ /**
+ * Creates a path to a destination directory for bulk loaded HFiles. This
directory will look
+ * like: .../backupRoot/backupID/namespace/table
+ * @param table The table whose HFiles are being bulk loaded
+ * @return A Path object representing the directory
+ */
private Path getTargetDirForTable(TableName table) {
Path path = new Path(backupInfo.getBackupRootDir() + Path.SEPARATOR +
backupInfo.getBackupId());
path = new Path(path, table.getNamespaceAsString());
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
index 50b47565d74..93cd4b6b732 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.backup.util;
+import static
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
@@ -33,9 +35,11 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupRestoreFactory;
import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreJob;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
@@ -158,11 +162,17 @@ public class RestoreTool {
public void incrementalRestoreTable(Connection conn, Path tableBackupPath,
Path[] logDirs,
TableName[] tableNames, TableName[] newTableNames, String incrBackupId,
boolean keepOriginalSplits) throws IOException {
- try (Admin admin = conn.getAdmin()) {
+ try (Admin admin = conn.getAdmin(); BackupAdminImpl backupAdmin = new
BackupAdminImpl(conn)) {
if (tableNames.length != newTableNames.length) {
throw new IOException("Number of source tables and target tables does
not match!");
}
- FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
+ Configuration conf = new Configuration(this.conf);
+ FileSystem fileSys = tableBackupPath.getFileSystem(conf);
+
+ BackupInfo backupInfo = backupAdmin.getBackupInfo(incrBackupId);
+ if (backupInfo.isContinuousBackupEnabled()) {
+ conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
+ }
// for incremental backup image, expect the table already created either
by user or previous
// full backup. Here, check that all new tables exists
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
index 54a65f9f1a4..2dc52354952 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -22,11 +22,14 @@ import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarker
import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -40,6 +43,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
@@ -93,16 +99,18 @@ public class TestIncrementalBackupWithContinuous extends
TestBackupBase {
TableName tableName = TableName.valueOf("table_" + methodName);
Table t1 = TEST_UTIL.createTable(tableName, famName);
- try (BackupSystemTable table = new
BackupSystemTable(TEST_UTIL.getConnection())) {
- int before = table.getBackupHistory().size();
+ try (BackupSystemTable backupSystemTable = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ int before = backupSystemTable.getBackupHistory().size();
// Run continuous backup
+ LOG.info("Running full backup with continuous backup enabled on table:
{}", tableName);
String backup1 = backupTables(BackupType.FULL, List.of(tableName),
BACKUP_ROOT_DIR, true);
+ LOG.info("Full backup complete with ID {} for table: {}", backup1,
tableName);
assertTrue(checkSucceeded(backup1));
// Verify backup history increased and all the backups are succeeded
LOG.info("Verify backup history increased and all the backups are
succeeded");
- List<BackupInfo> backups = table.getBackupHistory();
+ List<BackupInfo> backups = backupSystemTable.getBackupHistory();
assertEquals(before + 1, backups.size(), "Backup history should
increase");
// Verify backup manifest contains the correct tables
@@ -115,12 +123,12 @@ public class TestIncrementalBackupWithContinuous extends
TestBackupBase {
Thread.sleep(10000);
// Run incremental backup
- LOG.info("Run incremental backup now");
- before = table.getBackupHistory().size();
+ LOG.info("Run incremental backup now on table: {}", tableName);
+ before = backupSystemTable.getBackupHistory().size();
String backup2 =
backupTables(BackupType.INCREMENTAL, List.of(tableName),
BACKUP_ROOT_DIR, true);
assertTrue(checkSucceeded(backup2));
- LOG.info("Incremental backup completed");
+ LOG.info("Incremental backup completed for table: {}", tableName);
// Verify the temporary backup directory was deleted
Path backupTmpDir = new Path(BACKUP_ROOT_DIR, ".tmp");
@@ -129,18 +137,107 @@ public class TestIncrementalBackupWithContinuous extends
TestBackupBase {
"Bulk load output directory " + bulkLoadOutputDir + " should have been
deleted");
// Verify backup history increased and all the backups are succeeded
- backups = table.getBackupHistory();
+ backups = backupSystemTable.getBackupHistory();
assertEquals(before + 1, backups.size(), "Backup history should
increase");
+ String originalTableChecksum = TEST_UTIL.checksumRows(t1);
+
+ LOG.info("Truncating table: {}", tableName);
TEST_UTIL.truncateTable(tableName);
// Restore incremental backup
TableName[] tables = new TableName[] { tableName };
BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
+ LOG.info("Restoring table: {}", tableName);
+ // In the restore request, the original table is both the "from table"
and the "to table".
+ // This means the table is being restored "into itself". It is not being
restored into
+ // separate table.
client.restore(
BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false,
tables, tables, true));
- assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName));
+ LOG.info("Verifying data integrity for restored table: {}", tableName);
+ verifyRestoredTableDataIntegrity(tables[0], originalTableChecksum,
NB_ROWS_IN_BATCH);
+ }
+ }
+
+ @Test
+ public void testMultiTableContinuousBackupWithIncrementalBackupSuccess()
throws Exception {
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ List<Table> tables = new ArrayList<>();
+ List<TableName> tableNames = new ArrayList<>();
+ tableNames.add(TableName.valueOf("table_" + methodName + "_0"));
+ tableNames.add(TableName.valueOf("table_" + methodName + "_1"));
+ tableNames.add(TableName.valueOf("ns1", "ns1_table_" + methodName + "_0"));
+ tableNames.add(TableName.valueOf("ns1", "ns1_table_" + methodName + "_1"));
+ tableNames.add(TableName.valueOf("sameTableNameDifferentNamespace"));
+ tableNames.add(TableName.valueOf("ns3",
"sameTableNameDifferentNamespace"));
+
+ for (TableName table : tableNames) {
+ LOG.info("Creating table: {}", table);
+ tables.add(TEST_UTIL.createTable(table, famName));
+ }
+
+ try (BackupSystemTable backupSystemTable = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ int before = backupSystemTable.getBackupHistory().size();
+
+ // Run continuous backup on multiple tables
+ LOG.info("Running full backup with continuous backup enabled on tables:
{}", tableNames);
+ String backup1 = backupTables(BackupType.FULL, tableNames,
BACKUP_ROOT_DIR, true);
+ LOG.info("Full backup complete with ID {} for tables: {}", backup1,
tableNames);
+ assertTrue(checkSucceeded(backup1));
+
+ // Verify backup history increased and all backups have succeeded
+ LOG.info("Verify backup history increased and all backups have
succeeded");
+ List<BackupInfo> backups = backupSystemTable.getBackupHistory();
+ assertEquals(before + 1, backups.size(), "Backup history should
increase");
+
+ // Verify backup manifest contains the correct tables
+ LOG.info("Verify backup manifest contains the correct tables");
+ BackupManifest manifest = getLatestBackupManifest(backups);
+ assertEquals(Sets.newHashSet(tableNames), new
HashSet<>(manifest.getTableList()),
+ "Backup should contain the expected tables");
+
+ loadTables(tables);
+ Thread.sleep(10000);
+
+ // Run incremental backup
+ LOG.info("Running incremental backup on tables: {}", tableNames);
+ before = backupSystemTable.getBackupHistory().size();
+ String backup2 = backupTables(BackupType.INCREMENTAL, tableNames,
BACKUP_ROOT_DIR, true);
+ assertTrue(checkSucceeded(backup2));
+ LOG.info("Incremental backup completed with ID {} for tables: {}",
backup2, tableNames);
+
+ // Verify backup history increased and all the backups are succeeded
+ backups = backupSystemTable.getBackupHistory();
+ assertEquals(before + 1, backups.size(), "Backup history should
increase");
+
+ // We need to get each table's original row checksum before truncating
each table
+ LinkedHashMap<TableName, String> originalTableChecksums = new
LinkedHashMap<>();
+ for (Table table : tables) {
+ LOG.info("Getting row checksum for table: {}", table);
+ originalTableChecksums.put(table.getName(),
TEST_UTIL.checksumRows(table));
+ }
+
+ for (TableName tableName : tableNames) {
+ LOG.info("Truncating table: {}", tableName);
+ TEST_UTIL.truncateTable(tableName);
+ }
+
+ // Restore incremental backup
+ TableName[] tableNamesArray = tableNames.toArray(new TableName[0]);
+ BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
+ LOG.info("Restoring tables: {}", tableNames);
+ // In the restore request, the original tables are both the list of
"from tables" and the
+ // list of "to tables". This means the tables are being restored "into
themselves". They are
+ // not being restored into separate tables.
+ client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
backup2, false,
+ tableNamesArray, tableNamesArray, true));
+
+ for (TableName tableName : originalTableChecksums.keySet()) {
+ LOG.info("Verifying data integrity for restored table: {}", tableName);
+ verifyRestoredTableDataIntegrity(tableName,
originalTableChecksums.get(tableName),
+ NB_ROWS_IN_BATCH);
+ }
}
}
@@ -232,4 +329,45 @@ public class TestIncrementalBackupWithContinuous extends
TestBackupBase {
table.put(p);
}
}
+
+ protected static void loadTables(List<Table> tables) throws Exception {
+ for (Table table : tables) {
+ LOG.info("Loading data into table: {}", table);
+ loadTable(table);
+ }
+ }
+
+ private void verifyRestoredTableDataIntegrity(TableName restoredTableName,
+ String originalTableChecksum, int expectedRowCount) throws Exception {
+ try (Table restoredTable =
TEST_UTIL.getConnection().getTable(restoredTableName);
+ ResultScanner scanner = restoredTable.getScanner(new Scan())) {
+
+ // Verify the checksum for the original table (before it was truncated)
matches the checksum
+ // of the restored table.
+ String restoredTableChecksum = TEST_UTIL.checksumRows(restoredTable);
+ assertEquals(originalTableChecksum, restoredTableChecksum,
+ "The restored table's row checksum did not match the original table's
checksum");
+
+ // Verify the data in the restored table is the same as when it was
originally loaded
+ // into the table.
+ int count = 0;
+ for (Result result : scanner) {
+ // The data has a numerical match between its row key and value (such
as rowLoad1 and
+ // value1)
+ // We can use this to ensure a row key has the expected value.
+ String rowKey = Bytes.toString(result.getRow());
+ int index = Integer.parseInt(rowKey.replace("rowLoad", ""));
+
+ // Verify the Value
+ byte[] actualValue = result.getValue(famName, qualName);
+ assertNotNull(actualValue, "Value missing for row key: " + rowKey);
+ String expectedValue = "val" + index;
+ assertEquals(expectedValue, Bytes.toString(actualValue),
+ "Value mismatch for row key: " + rowKey);
+
+ count++;
+ }
+ assertEquals(expectedRowCount, count);
+ }
+ }
}
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 0e81c95677c..87241d437cd 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -161,8 +161,9 @@ public class HFileOutputFormat2 extends
FileOutputFormat<ImmutableBytesWritable,
"hbase.bulkload.locality.sensitive.enabled";
private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
static final String OUTPUT_TABLE_NAME_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.table.name";
- static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
+ public static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
"hbase.mapreduce.use.multi.table.hfileoutputformat";
+ public static final boolean MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT =
true;
/**
* ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private.
We expose this config
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 4c0b12ef733..cf4397d1052 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import static
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT;
+import static
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
+
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@@ -378,14 +381,35 @@ public class WALPlayer extends Configured implements Tool
{
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(MapReduceExtendedCell.class);
- try (Connection conn = ConnectionFactory.createConnection(conf);) {
- List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
- for (TableName tableName : tableNames) {
+ try (Connection conn = ConnectionFactory.createConnection(conf)) {
+ if (
+ conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
+ MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_DEFAULT)
+ ) {
+ // The HFiles will be output to something like this for each table:
+ // .../BULK_OUTPUT_CONF_KEY/namespace/table/columnFamily
+ List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
+ for (TableName tableName : tableNames) {
+ Table table = conn.getTable(tableName);
+ RegionLocator regionLocator = getRegionLocator(tableName, conf,
conn);
+ tableInfoList.add(new TableInfo(table.getDescriptor(),
regionLocator));
+ }
+ MultiTableHFileOutputFormat.configureIncrementalLoad(job,
tableInfoList);
+ } else {
+ // The HFiles will be output to something like:
.../BULK_OUTPUT_CONF_KEY/columnFamily
+ // This is useful for scenarios where we are running the WALPlayer
consecutively on just
+ // one table at a time, and BULK_OUTPUT_CONF_KEY is already set to a
"namespace/table"
+ // directory path for each table.
+ if (tableNames.size() != 1) {
+ throw new IOException("Expected table names list to have only one
table since "
+ + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false.
Got the following "
+ + "list of tables instead: " + tableNames);
+ }
+ TableName tableName = tableNames.get(0);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = getRegionLocator(tableName, conf,
conn);
- tableInfoList.add(new TableInfo(table.getDescriptor(),
regionLocator));
+ HFileOutputFormat2.configureIncrementalLoad(job,
table.getDescriptor(), regionLocator);
}
- MultiTableHFileOutputFormat.configureIncrementalLoad(job,
tableInfoList);
}
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index bbadabab69b..8250300d169 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import static
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
@@ -34,6 +35,7 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
@@ -90,13 +93,20 @@ public class TestWALPlayer {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALPlayer.class);
+ private static final byte[] FAMILY = Bytes.toBytes("family");
+ private static final byte[] COLUMN1 = Bytes.toBytes("c1");
+ private static final byte[] COLUMN2 = Bytes.toBytes("c2");
+ private static final byte[] ROW = Bytes.toBytes("row");
+
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static SingleProcessHBaseCluster cluster;
private static Path rootDir;
private static Path walRootDir;
- private static FileSystem fs;
+ private static FileSystem localFs;
private static FileSystem logFs;
private static Configuration conf;
+ private static FileSystem hdfs;
+ private static String bulkLoadOutputDir;
@Rule
public TestName name = new TestName();
@@ -106,15 +116,18 @@ public class TestWALPlayer {
conf = TEST_UTIL.getConfiguration();
rootDir = TEST_UTIL.createRootDir();
walRootDir = TEST_UTIL.createWALRootDir();
- fs = CommonFSUtils.getRootDirFileSystem(conf);
+ localFs = CommonFSUtils.getRootDirFileSystem(conf);
logFs = CommonFSUtils.getWALFileSystem(conf);
cluster = TEST_UTIL.startMiniCluster();
+ hdfs = TEST_UTIL.getTestFileSystem();
+ bulkLoadOutputDir = new Path(new
Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")),
+ Path.SEPARATOR + "bulkLoadOutput").toString();
}
@AfterClass
public static void afterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
- fs.delete(rootDir, true);
+ localFs.delete(rootDir, true);
logFs.delete(walRootDir, true);
}
@@ -235,18 +248,11 @@ public class TestWALPlayer {
public void testWALPlayer() throws Exception {
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
- final byte[] FAMILY = Bytes.toBytes("family");
- final byte[] COLUMN1 = Bytes.toBytes("c1");
- final byte[] COLUMN2 = Bytes.toBytes("c2");
- final byte[] ROW = Bytes.toBytes("row");
Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
- // put a row into the first table
- Put p = new Put(ROW);
- p.addColumn(FAMILY, COLUMN1, COLUMN1);
- p.addColumn(FAMILY, COLUMN2, COLUMN2);
- t1.put(p);
+ putRowIntoTable(t1);
+
// delete one column
Delete d = new Delete(ROW);
d.addColumns(FAMILY, COLUMN1);
@@ -411,6 +417,112 @@ public class TestWALPlayer {
assertNotEquals("WALPlayer should fail on empty files when not ignored",
0, exitCode);
}
+ /**
+ * Verifies the HFile output format for WALPlayer has the following
directory structure when
+ * {@value HFileOutputFormat2#MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY} is set
to true:<br>
+ * <br>
+ * .../BULK_OUTPUT_CONF_KEY/namespace/tableName/columnFamily
+ */
+ @Test
+ public void testWALPlayerMultiTableHFileOutputFormat() throws Exception {
+ String namespace = "ns_" + name.getMethodName();
+
TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
+ final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+ final TableName tableName2 = TableName.valueOf(namespace,
name.getMethodName() + "2");
+ Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
+ Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
+
+ putRowIntoTable(t1);
+ putRowIntoTable(t2);
+
+ Configuration multiTableOutputConf = new Configuration(conf);
+ setConfSimilarToIncrementalBackupWALToHFilesMethod(multiTableOutputConf);
+
+ // We are testing this config variable's effect on HFile output for the
WALPlayer
+ multiTableOutputConf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
true);
+
+ WALPlayer player = new WALPlayer(multiTableOutputConf);
+ String walInputDir = new
Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
+ HConstants.HREGION_LOGDIR_NAME).toString();
+ String tables = tableName1.getNameAsString() + "," +
tableName2.getNameAsString();
+
+ ToolRunner.run(multiTableOutputConf, player, new String[] { walInputDir,
tables });
+
+ assertMultiTableOutputFormatDirStructure(tableName1, "default");
+ assertMultiTableOutputFormatDirStructure(tableName2, namespace);
+
+ hdfs.delete(new Path(bulkLoadOutputDir), true);
+ }
+
+ /**
+ * Verifies the HFile output format for WALPlayer has the following
directory structure when
+ * {@value HFileOutputFormat2#MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY} is set
to false:<br>
+ * <br>
+ * .../BULK_OUTPUT_CONF_KEY/columnFamily <br>
+ * <br>
+ * Also verifies an exception occurs when the WALPlayer is run on multiple
tables at once while
+ * hbase.mapreduce.use.multi.table.hfileoutputformat is set to false.
+ */
+ @Test
+ public void testWALPlayerSingleTableHFileOutputFormat() throws Exception {
+ String namespace = "ns_" + name.getMethodName();
+
TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
+ final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
+ final TableName tableName2 = TableName.valueOf(namespace,
name.getMethodName() + "2");
+ Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
+ Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
+
+ putRowIntoTable(t1);
+ putRowIntoTable(t2);
+
+ String bulkLoadOutputDir = new Path(new
Path(TEST_UTIL.getConfiguration().get("fs.defaultFS")),
+ Path.SEPARATOR + "bulkLoadOutput").toString();
+
+ Configuration singleTableOutputConf = new Configuration(conf);
+ setConfSimilarToIncrementalBackupWALToHFilesMethod(singleTableOutputConf);
+
+ // We are testing this config variable's effect on HFile output for the
WALPlayer
+ singleTableOutputConf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
false);
+
+ WALPlayer player = new WALPlayer(singleTableOutputConf);
+
+ String walInputDir = new
Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
+ HConstants.HREGION_LOGDIR_NAME).toString();
+ String tables = tableName1.getNameAsString() + "," +
tableName2.getNameAsString();
+
+ // Expecting a failure here since we are running WALPlayer on multiple
tables even though the
+ // multi-table HFile output format is disabled
+ try {
+ ToolRunner.run(singleTableOutputConf, player, new String[] {
walInputDir, tables });
+ fail("Expected a failure to occur due to using WALPlayer with multiple
tables while having "
+ + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " set to false");
+ } catch (IOException e) {
+ String expectedMsg = "Expected table names list to have only one table
since "
+ + MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY + " is set to false. Got the
following "
+ + "list of tables instead:
[testWALPlayerSingleTableHFileOutputFormat1, " + namespace
+ + ":testWALPlayerSingleTableHFileOutputFormat2]";
+ assertTrue(e.getMessage().contains(expectedMsg));
+ }
+
+ // Successfully run WALPlayer on just one table while having multi-table
HFile output format
+ // disabled
+ ToolRunner.run(singleTableOutputConf, player,
+ new String[] { walInputDir, tableName1.getNameAsString() });
+
+ Path bulkLoadOutputDirForTable = new Path(bulkLoadOutputDir, "family");
+ assertTrue("Expected path to exist: " + bulkLoadOutputDirForTable,
+ hdfs.exists(bulkLoadOutputDirForTable));
+
+ hdfs.delete(new Path(bulkLoadOutputDir), true);
+ }
+
+ private void putRowIntoTable(Table table) throws IOException {
+ Put p = new Put(ROW);
+ p.addColumn(FAMILY, COLUMN1, COLUMN1);
+ p.addColumn(FAMILY, COLUMN2, COLUMN2);
+ table.put(p);
+ }
+
private Path createEmptyWALFile(String walDir) throws IOException {
FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
Path inputDir = new Path("/" + walDir);
@@ -422,4 +534,22 @@ public class TestWALPlayer {
return inputDir;
}
+
+ private void
setConfSimilarToIncrementalBackupWALToHFilesMethod(Configuration conf) {
+ conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkLoadOutputDir);
+ conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
+ conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
+ conf.set("mapreduce.job.name", name.getMethodName() + "-" +
System.currentTimeMillis());
+ conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
+ }
+
+ private void assertMultiTableOutputFormatDirStructure(TableName tableName,
String namespace)
+ throws IOException {
+ Path qualifierAndFamilyDir =
+ new Path(tableName.getQualifierAsString(), new String(FAMILY,
StandardCharsets.UTF_8));
+ Path namespaceQualifierFamilyDir = new Path(namespace,
qualifierAndFamilyDir);
+ Path bulkLoadOutputDirForTable = new Path(bulkLoadOutputDir,
namespaceQualifierFamilyDir);
+ assertTrue("Expected path to exist: " + bulkLoadOutputDirForTable,
+ hdfs.exists(bulkLoadOutputDirForTable));
+ }
}