This is an automated email from the ASF dual-hosted git repository.
rmattingly pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 19c7cc7c328 HBASE-29146 Incremental backups can fail due to not
cleaning up the MR bulkload output directory (#6747) (#6854)
19c7cc7c328 is described below
commit 19c7cc7c3280630ebbf0ce3da5a1dc10b4d8c62d
Author: Ray Mattingly <[email protected]>
AuthorDate: Tue Mar 25 09:28:18 2025 -0400
HBASE-29146 Incremental backups can fail due to not cleaning up the MR
bulkload output directory (#6747) (#6854)
Signed-off-by: Ray Mattingly <[email protected]>
Co-authored-by: Hernan Romer <[email protected]>
Co-authored-by: Hernan Gelaf-Romer <[email protected]>
---
.../backup/impl/IncrementalTableBackupClient.java | 16 +++--
.../hadoop/hbase/backup/TestIncrementalBackup.java | 78 +++++++++++++++++++++-
2 files changed, 84 insertions(+), 10 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index 50eceb84996..ab1163aeed8 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -171,14 +171,13 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
LOG.debug("copying archive {} to {}", archive, tgt);
archiveFiles.add(archive.toString());
}
- mergeSplitBulkloads(activeFiles, archiveFiles, srcTable);
- incrementalCopyBulkloadHFiles(tgtFs, srcTable);
+ mergeSplitAndCopyBulkloadedHFiles(activeFiles, archiveFiles, srcTable,
tgtFs);
}
return bulkLoads;
}
- private void mergeSplitBulkloads(List<String> activeFiles, List<String>
archiveFiles,
- TableName tn) throws IOException {
+ private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles,
+ List<String> archiveFiles, TableName tn, FileSystem tgtFs) throws
IOException {
int attempt = 1;
while (!activeFiles.isEmpty()) {
@@ -186,7 +185,7 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
// Active file can be archived during copy operation,
// we need to handle this properly
try {
- mergeSplitBulkloads(activeFiles, tn);
+ mergeSplitAndCopyBulkloadedHFiles(activeFiles, tn, tgtFs);
break;
} catch (IOException e) {
int numActiveFiles = activeFiles.size();
@@ -200,11 +199,12 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
}
if (!archiveFiles.isEmpty()) {
- mergeSplitBulkloads(archiveFiles, tn);
+ mergeSplitAndCopyBulkloadedHFiles(archiveFiles, tn, tgtFs);
}
}
- private void mergeSplitBulkloads(List<String> files, TableName tn) throws
IOException {
+ private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName
tn, FileSystem tgtFs)
+ throws IOException {
MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob();
conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY,
getBulkOutputDirForTable(tn).toString());
@@ -226,6 +226,8 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
throw new IOException(
"Failed to run MapReduceHFileSplitterJob with invalid result: " +
result);
}
+
+ incrementalCopyBulkloadHFiles(tgtFs, tn);
}
private void updateFileLists(List<String> activeFiles, List<String>
archiveFiles)
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
index a91e6f01a6f..df187f75295 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.After;
import org.junit.Assert;
@@ -101,11 +102,14 @@ public class TestIncrementalBackup extends TestBackupBase
{
public void ensurePreviousBackupTestsAreCleanedUp() throws Exception {
TEST_UTIL.flush(table1);
TEST_UTIL.flush(table2);
- TEST_UTIL.flush(table1_restore);
TEST_UTIL.truncateTable(table1).close();
TEST_UTIL.truncateTable(table2).close();
- TEST_UTIL.truncateTable(table1_restore).close();
+
+ if (TEST_UTIL.getAdmin().tableExists(table1_restore)) {
+ TEST_UTIL.flush(table1_restore);
+ TEST_UTIL.truncateTable(table1_restore).close();
+ }
TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(rst -> {
try {
@@ -427,6 +431,73 @@ public class TestIncrementalBackup extends TestBackupBase {
}
+ @Test
+ public void TestIncBackupRestoreHandlesArchivedFiles() throws Exception {
+ byte[] fam2 = Bytes.toBytes("f2");
+ TableDescriptor newTable1Desc =
TableDescriptorBuilder.newBuilder(table1Desc)
+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam2).build()).build();
+ TEST_UTIL.getAdmin().modifyTable(newTable1Desc);
+ try (Connection conn = ConnectionFactory.createConnection(conf1);
+ BackupAdminImpl admin = new BackupAdminImpl(conn)) {
+ String backupTargetDir =
TEST_UTIL.getDataTestDir("backupTarget").toString();
+ BACKUP_ROOT_DIR = new File(backupTargetDir).toURI().toString();
+
+ List<TableName> tables = Lists.newArrayList(table1);
+
+ insertIntoTable(conn, table1, famName, 3, 100);
+ String fullBackupId = takeFullBackup(tables, admin, true);
+ assertTrue(checkSucceeded(fullBackupId));
+
+ insertIntoTable(conn, table1, famName, 4, 100);
+
+ HRegion regionToBulkload =
TEST_UTIL.getHBaseCluster().getRegions(table1).get(0);
+ String regionName = regionToBulkload.getRegionInfo().getEncodedName();
+ // Requires a mult-fam bulkload to ensure we're appropriately handling
+ // multi-file bulkloads
+ Path regionDir = doBulkload(table1, regionName, famName, fam2);
+
+ // archive the files in the region directory
+ Path archiveDir =
+ HFileArchiveUtil.getStoreArchivePath(conf1, table1, regionName,
Bytes.toString(famName));
+ TEST_UTIL.getTestFileSystem().mkdirs(archiveDir);
+ RemoteIterator<LocatedFileStatus> iter =
+ TEST_UTIL.getTestFileSystem().listFiles(regionDir, true);
+ List<Path> paths = new ArrayList<>();
+ while (iter.hasNext()) {
+ Path path = iter.next().getPath();
+ if (path.toString().contains("_SeqId_")) {
+ paths.add(path);
+ }
+ }
+ assertTrue(paths.size() > 1);
+ Path path = paths.get(0);
+ String name = path.toString();
+ int startIdx = name.lastIndexOf(Path.SEPARATOR);
+ String filename = name.substring(startIdx + 1);
+ Path archiveFile = new Path(archiveDir, filename);
+ // archive 1 of the files
+ boolean success = TEST_UTIL.getTestFileSystem().rename(path,
archiveFile);
+ assertTrue(success);
+ assertTrue(TEST_UTIL.getTestFileSystem().exists(archiveFile));
+ assertFalse(TEST_UTIL.getTestFileSystem().exists(path));
+
+ BackupRequest request =
+ createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR,
true);
+ String incrementalBackupId = admin.backupTables(request);
+ assertTrue(checkSucceeded(incrementalBackupId));
+
+ TableName[] fromTable = new TableName[] { table1 };
+ TableName[] toTable = new TableName[] { table1_restore };
+
+ admin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
incrementalBackupId, false,
+ fromTable, toTable, true));
+
+ int actualRowCount = TEST_UTIL.countRows(table1_restore);
+ int expectedRowCount = TEST_UTIL.countRows(table1);
+ assertEquals(expectedRowCount, actualRowCount);
+ }
+ }
+
private void checkThrowsCFMismatch(IOException ex, List<TableName> tables) {
Throwable cause = Throwables.getRootCause(ex);
assertEquals(cause.getClass(), ColumnFamilyMismatchException.class);
@@ -448,12 +519,13 @@ public class TestIncrementalBackup extends TestBackupBase
{
return backupId;
}
- private static void doBulkload(TableName tn, String regionName, byte[]...
fams)
+ private static Path doBulkload(TableName tn, String regionName, byte[]...
fams)
throws IOException {
Path regionDir = createHFiles(tn, regionName, fams);
Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> results =
BulkLoadHFiles.create(conf1).bulkLoad(tn, regionDir);
assertFalse(results.isEmpty());
+ return regionDir;
}
private static Path createHFiles(TableName tn, String regionName, byte[]...
fams)