This is an automated email from the ASF dual-hosted git repository. rmattingly pushed a commit to branch HBASE-29146-branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 421c5c27c9d4264e043d8e08b7a13bfa083cbcc5 Author: Hernan Romer <[email protected]> AuthorDate: Mon Mar 24 14:18:38 2025 -0400 HBASE-29146 Incremental backups can fail due to not cleaning up the MR bulkload output directory (#6747) Co-authored-by: Hernan Gelaf-Romer <[email protected]> Signed-off-by: Ray Mattingly <[email protected]> --- .../backup/impl/IncrementalTableBackupClient.java | 16 +++-- .../hadoop/hbase/backup/TestIncrementalBackup.java | 78 +++++++++++++++++++++- 2 files changed, 84 insertions(+), 10 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java index 50eceb84996..ab1163aeed8 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java @@ -171,14 +171,13 @@ public class IncrementalTableBackupClient extends TableBackupClient { LOG.debug("copying archive {} to {}", archive, tgt); archiveFiles.add(archive.toString()); } - mergeSplitBulkloads(activeFiles, archiveFiles, srcTable); - incrementalCopyBulkloadHFiles(tgtFs, srcTable); + mergeSplitAndCopyBulkloadedHFiles(activeFiles, archiveFiles, srcTable, tgtFs); } return bulkLoads; } - private void mergeSplitBulkloads(List<String> activeFiles, List<String> archiveFiles, - TableName tn) throws IOException { + private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles, + List<String> archiveFiles, TableName tn, FileSystem tgtFs) throws IOException { int attempt = 1; while (!activeFiles.isEmpty()) { @@ -186,7 +185,7 @@ public class IncrementalTableBackupClient extends TableBackupClient { // Active file can be archived during copy operation, // we need to handle this properly try { - mergeSplitBulkloads(activeFiles, tn); + mergeSplitAndCopyBulkloadedHFiles(activeFiles, tn, tgtFs); break; } catch (IOException e) { int numActiveFiles = activeFiles.size(); @@ -200,11 +199,12 @@ public class IncrementalTableBackupClient extends TableBackupClient { } if (!archiveFiles.isEmpty()) { - mergeSplitBulkloads(archiveFiles, tn); + mergeSplitAndCopyBulkloadedHFiles(archiveFiles, tn, tgtFs); } } - private void mergeSplitBulkloads(List<String> files, TableName tn) throws IOException { + private void mergeSplitAndCopyBulkloadedHFiles(List<String> files, TableName tn, FileSystem tgtFs) + throws IOException { MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob(); conf.set(MapReduceHFileSplitterJob.BULK_OUTPUT_CONF_KEY, getBulkOutputDirForTable(tn).toString()); @@ -226,6 +226,8 @@ public class IncrementalTableBackupClient extends TableBackupClient { throw new IOException( "Failed to run MapReduceHFileSplitterJob with invalid result: " + result); } + + incrementalCopyBulkloadHFiles(tgtFs, tn); } private void updateFileLists(List<String> activeFiles, List<String> archiveFiles) diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java index a91e6f01a6f..df187f75295 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.junit.After; import org.junit.Assert; @@ -101,11 +102,14 @@ public class TestIncrementalBackup extends TestBackupBase { public void ensurePreviousBackupTestsAreCleanedUp() throws Exception { TEST_UTIL.flush(table1); TEST_UTIL.flush(table2); - TEST_UTIL.flush(table1_restore); TEST_UTIL.truncateTable(table1).close(); TEST_UTIL.truncateTable(table2).close(); - TEST_UTIL.truncateTable(table1_restore).close(); + + if (TEST_UTIL.getAdmin().tableExists(table1_restore)) { + TEST_UTIL.flush(table1_restore); + TEST_UTIL.truncateTable(table1_restore).close(); + } TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(rst -> { try { @@ -427,6 +431,73 @@ public class TestIncrementalBackup extends TestBackupBase { } + @Test + public void TestIncBackupRestoreHandlesArchivedFiles() throws Exception { + byte[] fam2 = Bytes.toBytes("f2"); + TableDescriptor newTable1Desc = TableDescriptorBuilder.newBuilder(table1Desc) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam2).build()).build(); + TEST_UTIL.getAdmin().modifyTable(newTable1Desc); + try (Connection conn = ConnectionFactory.createConnection(conf1); + BackupAdminImpl admin = new BackupAdminImpl(conn)) { + String backupTargetDir = TEST_UTIL.getDataTestDir("backupTarget").toString(); + BACKUP_ROOT_DIR = new File(backupTargetDir).toURI().toString(); + + List<TableName> tables = Lists.newArrayList(table1); + + insertIntoTable(conn, table1, famName, 3, 100); + String fullBackupId = takeFullBackup(tables, admin, true); + assertTrue(checkSucceeded(fullBackupId)); + + insertIntoTable(conn, table1, famName, 4, 100); + + HRegion regionToBulkload = TEST_UTIL.getHBaseCluster().getRegions(table1).get(0); + String regionName = regionToBulkload.getRegionInfo().getEncodedName(); + // Requires a mult-fam bulkload to ensure we're appropriately handling + // multi-file bulkloads + Path regionDir = doBulkload(table1, regionName, famName, fam2); + + // archive the files in the region directory + Path archiveDir = + HFileArchiveUtil.getStoreArchivePath(conf1, table1, regionName, Bytes.toString(famName)); + TEST_UTIL.getTestFileSystem().mkdirs(archiveDir); + RemoteIterator<LocatedFileStatus> iter = + TEST_UTIL.getTestFileSystem().listFiles(regionDir, true); + List<Path> paths = new ArrayList<>(); + while (iter.hasNext()) { + Path path = iter.next().getPath(); + if (path.toString().contains("_SeqId_")) { + paths.add(path); + } + } + assertTrue(paths.size() > 1); + Path path = paths.get(0); + String name = path.toString(); + int startIdx = name.lastIndexOf(Path.SEPARATOR); + String filename = name.substring(startIdx + 1); + Path archiveFile = new Path(archiveDir, filename); + // archive 1 of the files + boolean success = TEST_UTIL.getTestFileSystem().rename(path, archiveFile); + assertTrue(success); + assertTrue(TEST_UTIL.getTestFileSystem().exists(archiveFile)); + assertFalse(TEST_UTIL.getTestFileSystem().exists(path)); + + BackupRequest request = + createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR, true); + String incrementalBackupId = admin.backupTables(request); + assertTrue(checkSucceeded(incrementalBackupId)); + + TableName[] fromTable = new TableName[] { table1 }; + TableName[] toTable = new TableName[] { table1_restore }; + + admin.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, incrementalBackupId, false, + fromTable, toTable, true)); + + int actualRowCount = TEST_UTIL.countRows(table1_restore); + int expectedRowCount = TEST_UTIL.countRows(table1); + assertEquals(expectedRowCount, actualRowCount); + } + } + private void checkThrowsCFMismatch(IOException ex, List<TableName> tables) { Throwable cause = Throwables.getRootCause(ex); assertEquals(cause.getClass(), ColumnFamilyMismatchException.class); @@ -448,12 +519,13 @@ public class TestIncrementalBackup extends TestBackupBase { return backupId; } - private static void doBulkload(TableName tn, String regionName, byte[]... fams) + private static Path doBulkload(TableName tn, String regionName, byte[]... fams) throws IOException { Path regionDir = createHFiles(tn, regionName, fams); Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> results = BulkLoadHFiles.create(conf1).bulkLoad(tn, regionDir); assertFalse(results.isEmpty()); + return regionDir; } private static Path createHFiles(TableName tn, String regionName, byte[]... fams)
