This is an automated email from the ASF dual-hosted git repository.
rmattingly pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new c2ac5c85895 HBASE-29631 Fix race condition in
IncrementalTableBackupClient when HFiles are archived during backup (#7346)
(#7357)
c2ac5c85895 is described below
commit c2ac5c85895e5fbb07387855f05001c6171ce586
Author: Ray Mattingly <[email protected]>
AuthorDate: Fri Oct 3 13:51:00 2025 -0400
HBASE-29631 Fix race condition in IncrementalTableBackupClient when HFiles
are archived during backup (#7346) (#7357)
Signed-off-by: Ray Mattingly <[email protected]>
Co-authored-by: Siddharth Khillon <[email protected]>
Co-authored-by: Hernan Romer <[email protected]>
Co-authored-by: skhillon <[email protected]>
---
.../backup/impl/IncrementalTableBackupClient.java | 23 +-
.../backup/TestIncrementalBackupWithBulkLoad.java | 262 +++++++++++++++++++++
2 files changed, 282 insertions(+), 3 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index ae32a8dbeb5..e5599c8357c 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -200,6 +200,9 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
int numActiveFiles = activeFiles.size();
updateFileLists(activeFiles, archiveFiles);
if (activeFiles.size() < numActiveFiles) {
+ // We've archived some files, delete bulkloads directory
+ // and re-try
+ deleteBulkLoadDirectory();
continue;
}
@@ -242,7 +245,7 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
incrementalCopyBulkloadHFiles(tgtFs, tn);
}
- private void updateFileLists(List<String> activeFiles, List<String>
archiveFiles)
+ public void updateFileLists(List<String> activeFiles, List<String>
archiveFiles)
throws IOException {
List<String> newlyArchived = new ArrayList<>();
@@ -252,9 +255,23 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
}
}
- if (newlyArchived.size() > 0) {
+ if (!newlyArchived.isEmpty()) {
+ String rootDir = CommonFSUtils.getRootDir(conf).toString();
+
activeFiles.removeAll(newlyArchived);
- archiveFiles.addAll(newlyArchived);
+ for (String file : newlyArchived) {
+ String archivedFile = file.substring(rootDir.length() + 1);
+ Path archivedFilePath = new
Path(HFileArchiveUtil.getArchivePath(conf), archivedFile);
+ archivedFile = archivedFilePath.toString();
+
+ if (!fs.exists(archivedFilePath)) {
+ throw new IOException(String.format(
+ "File %s no longer exists, and no archived file %s exists for it",
file, archivedFile));
+ }
+
+ LOG.debug("Archived file {} has been updated", archivedFile);
+ archiveFiles.add(archivedFile);
+ }
}
LOG.debug(newlyArchived.size() + " files have been archived.");
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
new file mode 100644
index 00000000000..a2ca4304174
--- /dev/null
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.impl.BulkLoad;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
+/**
+ * This test checks whether backups properly track & manage bulk files loads.
+ */
+@Category(LargeTests.class)
+public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestIncrementalBackupWithBulkLoad.class);
+
+ private static final String TEST_NAME =
TestIncrementalBackupWithBulkLoad.class.getSimpleName();
+ private static final int ROWS_IN_BULK_LOAD = 100;
+
+ // implement all test cases in 1 test since incremental backup/restore has
dependencies
+ @Test
+ public void TestIncBackupDeleteTable() throws Exception {
+ try (BackupSystemTable systemTable = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ // The test starts with some data, and no bulk loaded rows.
+ int expectedRowCount = NB_ROWS_IN_BATCH;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+
assertTrue(systemTable.readBulkloadRows(ImmutableList.of(table1)).isEmpty());
+
+ // Bulk loads aren't tracked if the table isn't backed up yet
+ performBulkLoad("bulk1");
+ expectedRowCount += ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(0,
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+
+ // Create a backup, bulk loads are now being tracked
+ String backup1 = backupTables(BackupType.FULL, ImmutableList.of(table1),
BACKUP_ROOT_DIR);
+ assertTrue(checkSucceeded(backup1));
+ performBulkLoad("bulk2");
+ expectedRowCount += ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(1,
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+
+ // Truncating or deleting a table clears the tracked bulk loads (and all
rows)
+ TEST_UTIL.truncateTable(table1).close();
+ expectedRowCount = 0;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(0,
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+
+ // Creating a full backup clears the bulk loads (since they are captured
in the snapshot)
+ performBulkLoad("bulk3");
+ expectedRowCount = ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(1,
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+ String backup2 = backupTables(BackupType.FULL, ImmutableList.of(table1),
BACKUP_ROOT_DIR);
+ assertTrue(checkSucceeded(backup2));
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(0,
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+
+ // Creating an incremental backup clears the bulk loads
+ performBulkLoad("bulk4");
+ performBulkLoad("bulk5");
+ performBulkLoad("bulk6");
+ expectedRowCount += 3 * ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(3,
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+ String backup3 =
+ backupTables(BackupType.INCREMENTAL, ImmutableList.of(table1),
BACKUP_ROOT_DIR);
+ assertTrue(checkSucceeded(backup3));
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ assertEquals(0,
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+ int rowCountAfterBackup3 = expectedRowCount;
+
+ // Doing another bulk load, to check that this data will disappear after
a restore operation
+ performBulkLoad("bulk7");
+ expectedRowCount += ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+ List<BulkLoad> bulkloadsTemp =
systemTable.readBulkloadRows(ImmutableList.of(table1));
+ assertEquals(1, bulkloadsTemp.size());
+ BulkLoad bulk7 = bulkloadsTemp.get(0);
+
+ // Doing a restore. Overwriting the table implies clearing the bulk
loads,
+ // but the loading of restored data involves loading bulk data, we
expect 2 bulk loads
+ // associated with backup 3 (loading of full backup, loading of
incremental backup).
+ BackupAdmin client = getBackupAdmin();
+ client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
backup3, false,
+ new TableName[] { table1 }, new TableName[] { table1 }, true));
+ assertEquals(rowCountAfterBackup3, TEST_UTIL.countRows(table1));
+ List<BulkLoad> bulkLoads =
systemTable.readBulkloadRows(ImmutableList.of(table1));
+ assertEquals(2, bulkLoads.size());
+ assertFalse(bulkLoads.contains(bulk7));
+
+ // Check that we have data of all expected bulk loads
+ try (Table restoredTable = TEST_UTIL.getConnection().getTable(table1)) {
+ assertFalse(containsRowWithKey(restoredTable, "bulk1"));
+ assertFalse(containsRowWithKey(restoredTable, "bulk2"));
+ assertTrue(containsRowWithKey(restoredTable, "bulk3"));
+ assertTrue(containsRowWithKey(restoredTable, "bulk4"));
+ assertTrue(containsRowWithKey(restoredTable, "bulk5"));
+ assertTrue(containsRowWithKey(restoredTable, "bulk6"));
+ assertFalse(containsRowWithKey(restoredTable, "bulk7"));
+ }
+ }
+ }
+
+ private boolean containsRowWithKey(Table table, String rowKey) throws
IOException {
+ byte[] data = Bytes.toBytes(rowKey);
+ Get get = new Get(data);
+ Result result = table.get(get);
+ return result.containsColumn(famName, qualName);
+ }
+
+ @Test
+ public void testUpdateFileListsRaceCondition() throws Exception {
+ try (BackupSystemTable systemTable = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ // Test the race condition where files are archived during incremental
backup
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+
+ String regionName = "region1";
+ String columnFamily = "cf";
+ String filename1 = "hfile1";
+ String filename2 = "hfile2";
+
+ Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
+ Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
+ Path activeFile1 =
+ new Path(tableDir, regionName + Path.SEPARATOR + columnFamily +
Path.SEPARATOR + filename1);
+ Path activeFile2 =
+ new Path(tableDir, regionName + Path.SEPARATOR + columnFamily +
Path.SEPARATOR + filename2);
+
+ fs.mkdirs(activeFile1.getParent());
+ fs.create(activeFile1).close();
+ fs.create(activeFile2).close();
+
+ List<String> activeFiles = new ArrayList<>();
+ activeFiles.add(activeFile1.toString());
+ activeFiles.add(activeFile2.toString());
+ List<String> archiveFiles = new ArrayList<>();
+
+ Path archiveDir =
HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), table1,
+ regionName, columnFamily);
+ Path archivedFile1 = new Path(archiveDir, filename1);
+ fs.mkdirs(archiveDir);
+ assertTrue("File should be moved to archive", fs.rename(activeFile1,
archivedFile1));
+
+ TestBackupBase.IncrementalTableBackupClientForTest client =
+ new
TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
+ "test_backup_id",
+ createBackupRequest(BackupType.INCREMENTAL,
ImmutableList.of(table1), BACKUP_ROOT_DIR));
+
+ client.updateFileLists(activeFiles, archiveFiles);
+
+ assertEquals("Only one file should remain in active files", 1,
activeFiles.size());
+ assertEquals("File2 should still be in active files",
activeFile2.toString(),
+ activeFiles.get(0));
+ assertEquals("One file should be added to archive files", 1,
archiveFiles.size());
+ assertEquals("Archived file should have correct path",
archivedFile1.toString(),
+ archiveFiles.get(0));
+ systemTable.finishBackupExclusiveOperation();
+ }
+
+ }
+
+ @Test
+ public void testUpdateFileListsMissingArchivedFile() throws Exception {
+ try (BackupSystemTable systemTable = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+ // Test that IOException is thrown when file doesn't exist in archive
location
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+
+ String regionName = "region2";
+ String columnFamily = "cf";
+ String filename = "missing_file";
+
+ Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
+ Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
+ Path activeFile =
+ new Path(tableDir, regionName + Path.SEPARATOR + columnFamily +
Path.SEPARATOR + filename);
+
+ fs.mkdirs(activeFile.getParent());
+ fs.create(activeFile).close();
+
+ List<String> activeFiles = new ArrayList<>();
+ activeFiles.add(activeFile.toString());
+ List<String> archiveFiles = new ArrayList<>();
+
+ // Delete the file but don't create it in archive location
+ fs.delete(activeFile, false);
+
+ TestBackupBase.IncrementalTableBackupClientForTest client =
+ new
TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
+ "test_backup_id",
+ createBackupRequest(BackupType.INCREMENTAL,
ImmutableList.of(table1), BACKUP_ROOT_DIR));
+
+ // This should throw IOException since file doesn't exist in archive
+ try {
+ client.updateFileLists(activeFiles, archiveFiles);
+ fail("Expected IOException to be thrown");
+ } catch (IOException e) {
+ // Expected
+ }
+ systemTable.finishBackupExclusiveOperation();
+ }
+ }
+
+ private void performBulkLoad(String keyPrefix) throws IOException {
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(TEST_NAME);
+ Path hfilePath =
+ new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR +
"hfile_" + keyPrefix);
+
+ HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath,
famName, qualName,
+ Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"),
ROWS_IN_BULK_LOAD);
+
+ Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
+ BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1,
baseDirectory);
+ assertFalse(result.isEmpty());
+ }
+}