This is an automated email from the ASF dual-hosted git repository.

rmattingly pushed a commit to branch HBASE-29631-branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 45a9ef8df76460d02b8fdbcd5d6aa049468b370d
Author: Ray Mattingly <[email protected]>
AuthorDate: Fri Oct 3 13:51:00 2025 -0400

    HBASE-29631 Fix race condition in IncrementalTableBackupClient when HFiles 
are archived during backup (#7346) (#7357)
    
    Signed-off-by: Ray Mattingly <[email protected]>
    Co-authored-by: Siddharth Khillon <[email protected]>
    Co-authored-by: Hernan Romer <[email protected]>
    Co-authored-by: skhillon <[email protected]>
---
 .../backup/impl/IncrementalTableBackupClient.java  |  23 +-
 .../backup/TestIncrementalBackupWithBulkLoad.java  | 262 +++++++++++++++++++++
 2 files changed, 282 insertions(+), 3 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index ae32a8dbeb5..e5599c8357c 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -200,6 +200,9 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
         int numActiveFiles = activeFiles.size();
         updateFileLists(activeFiles, archiveFiles);
         if (activeFiles.size() < numActiveFiles) {
+          // We've archived some files, delete bulkloads directory
+          // and re-try
+          deleteBulkLoadDirectory();
           continue;
         }
 
@@ -242,7 +245,7 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     incrementalCopyBulkloadHFiles(tgtFs, tn);
   }
 
-  private void updateFileLists(List<String> activeFiles, List<String> 
archiveFiles)
+  public void updateFileLists(List<String> activeFiles, List<String> 
archiveFiles)
     throws IOException {
     List<String> newlyArchived = new ArrayList<>();
 
@@ -252,9 +255,23 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
       }
     }
 
-    if (newlyArchived.size() > 0) {
+    if (!newlyArchived.isEmpty()) {
+      String rootDir = CommonFSUtils.getRootDir(conf).toString();
+
       activeFiles.removeAll(newlyArchived);
-      archiveFiles.addAll(newlyArchived);
+      for (String file : newlyArchived) {
+        String archivedFile = file.substring(rootDir.length() + 1);
+        Path archivedFilePath = new 
Path(HFileArchiveUtil.getArchivePath(conf), archivedFile);
+        archivedFile = archivedFilePath.toString();
+
+        if (!fs.exists(archivedFilePath)) {
+          throw new IOException(String.format(
+            "File %s no longer exists, and no archived file %s exists for it", 
file, archivedFile));
+        }
+
+        LOG.debug("Archived file {} has been updated", archivedFile);
+        archiveFiles.add(archivedFile);
+      }
     }
 
     LOG.debug(newlyArchived.size() + " files have been archived.");
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
new file mode 100644
index 00000000000..a2ca4304174
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.impl.BulkLoad;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+
+/**
+ * This test checks whether backups properly track & manage bulk files loads.
+ */
+@Category(LargeTests.class)
+public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestIncrementalBackupWithBulkLoad.class);
+
+  private static final String TEST_NAME = 
TestIncrementalBackupWithBulkLoad.class.getSimpleName();
+  private static final int ROWS_IN_BULK_LOAD = 100;
+
+  // implement all test cases in 1 test since incremental backup/restore has 
dependencies
+  @Test
+  public void TestIncBackupDeleteTable() throws Exception {
+    try (BackupSystemTable systemTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      // The test starts with some data, and no bulk loaded rows.
+      int expectedRowCount = NB_ROWS_IN_BATCH;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      
assertTrue(systemTable.readBulkloadRows(ImmutableList.of(table1)).isEmpty());
+
+      // Bulk loads aren't tracked if the table isn't backed up yet
+      performBulkLoad("bulk1");
+      expectedRowCount += ROWS_IN_BULK_LOAD;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(0, 
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+
+      // Create a backup, bulk loads are now being tracked
+      String backup1 = backupTables(BackupType.FULL, ImmutableList.of(table1), 
BACKUP_ROOT_DIR);
+      assertTrue(checkSucceeded(backup1));
+      performBulkLoad("bulk2");
+      expectedRowCount += ROWS_IN_BULK_LOAD;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(1, 
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+
+      // Truncating or deleting a table clears the tracked bulk loads (and all 
rows)
+      TEST_UTIL.truncateTable(table1).close();
+      expectedRowCount = 0;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(0, 
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+
+      // Creating a full backup clears the bulk loads (since they are captured 
in the snapshot)
+      performBulkLoad("bulk3");
+      expectedRowCount = ROWS_IN_BULK_LOAD;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(1, 
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+      String backup2 = backupTables(BackupType.FULL, ImmutableList.of(table1), 
BACKUP_ROOT_DIR);
+      assertTrue(checkSucceeded(backup2));
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(0, 
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+
+      // Creating an incremental backup clears the bulk loads
+      performBulkLoad("bulk4");
+      performBulkLoad("bulk5");
+      performBulkLoad("bulk6");
+      expectedRowCount += 3 * ROWS_IN_BULK_LOAD;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(3, 
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+      String backup3 =
+        backupTables(BackupType.INCREMENTAL, ImmutableList.of(table1), 
BACKUP_ROOT_DIR);
+      assertTrue(checkSucceeded(backup3));
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      assertEquals(0, 
systemTable.readBulkloadRows(ImmutableList.of(table1)).size());
+      int rowCountAfterBackup3 = expectedRowCount;
+
+      // Doing another bulk load, to check that this data will disappear after 
a restore operation
+      performBulkLoad("bulk7");
+      expectedRowCount += ROWS_IN_BULK_LOAD;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
+      List<BulkLoad> bulkloadsTemp = 
systemTable.readBulkloadRows(ImmutableList.of(table1));
+      assertEquals(1, bulkloadsTemp.size());
+      BulkLoad bulk7 = bulkloadsTemp.get(0);
+
+      // Doing a restore. Overwriting the table implies clearing the bulk 
loads,
+      // but the loading of restored data involves loading bulk data, we 
expect 2 bulk loads
+      // associated with backup 3 (loading of full backup, loading of 
incremental backup).
+      BackupAdmin client = getBackupAdmin();
+      client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, 
backup3, false,
+        new TableName[] { table1 }, new TableName[] { table1 }, true));
+      assertEquals(rowCountAfterBackup3, TEST_UTIL.countRows(table1));
+      List<BulkLoad> bulkLoads = 
systemTable.readBulkloadRows(ImmutableList.of(table1));
+      assertEquals(2, bulkLoads.size());
+      assertFalse(bulkLoads.contains(bulk7));
+
+      // Check that we have data of all expected bulk loads
+      try (Table restoredTable = TEST_UTIL.getConnection().getTable(table1)) {
+        assertFalse(containsRowWithKey(restoredTable, "bulk1"));
+        assertFalse(containsRowWithKey(restoredTable, "bulk2"));
+        assertTrue(containsRowWithKey(restoredTable, "bulk3"));
+        assertTrue(containsRowWithKey(restoredTable, "bulk4"));
+        assertTrue(containsRowWithKey(restoredTable, "bulk5"));
+        assertTrue(containsRowWithKey(restoredTable, "bulk6"));
+        assertFalse(containsRowWithKey(restoredTable, "bulk7"));
+      }
+    }
+  }
+
+  private boolean containsRowWithKey(Table table, String rowKey) throws 
IOException {
+    byte[] data = Bytes.toBytes(rowKey);
+    Get get = new Get(data);
+    Result result = table.get(get);
+    return result.containsColumn(famName, qualName);
+  }
+
+  @Test
+  public void testUpdateFileListsRaceCondition() throws Exception {
+    try (BackupSystemTable systemTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      // Test the race condition where files are archived during incremental 
backup
+      FileSystem fs = TEST_UTIL.getTestFileSystem();
+
+      String regionName = "region1";
+      String columnFamily = "cf";
+      String filename1 = "hfile1";
+      String filename2 = "hfile2";
+
+      Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
+      Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
+      Path activeFile1 =
+        new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + 
Path.SEPARATOR + filename1);
+      Path activeFile2 =
+        new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + 
Path.SEPARATOR + filename2);
+
+      fs.mkdirs(activeFile1.getParent());
+      fs.create(activeFile1).close();
+      fs.create(activeFile2).close();
+
+      List<String> activeFiles = new ArrayList<>();
+      activeFiles.add(activeFile1.toString());
+      activeFiles.add(activeFile2.toString());
+      List<String> archiveFiles = new ArrayList<>();
+
+      Path archiveDir = 
HFileArchiveUtil.getStoreArchivePath(TEST_UTIL.getConfiguration(), table1,
+        regionName, columnFamily);
+      Path archivedFile1 = new Path(archiveDir, filename1);
+      fs.mkdirs(archiveDir);
+      assertTrue("File should be moved to archive", fs.rename(activeFile1, 
archivedFile1));
+
+      TestBackupBase.IncrementalTableBackupClientForTest client =
+        new 
TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
+          "test_backup_id",
+          createBackupRequest(BackupType.INCREMENTAL, 
ImmutableList.of(table1), BACKUP_ROOT_DIR));
+
+      client.updateFileLists(activeFiles, archiveFiles);
+
+      assertEquals("Only one file should remain in active files", 1, 
activeFiles.size());
+      assertEquals("File2 should still be in active files", 
activeFile2.toString(),
+        activeFiles.get(0));
+      assertEquals("One file should be added to archive files", 1, 
archiveFiles.size());
+      assertEquals("Archived file should have correct path", 
archivedFile1.toString(),
+        archiveFiles.get(0));
+      systemTable.finishBackupExclusiveOperation();
+    }
+
+  }
+
+  @Test
+  public void testUpdateFileListsMissingArchivedFile() throws Exception {
+    try (BackupSystemTable systemTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+      // Test that IOException is thrown when file doesn't exist in archive 
location
+      FileSystem fs = TEST_UTIL.getTestFileSystem();
+
+      String regionName = "region2";
+      String columnFamily = "cf";
+      String filename = "missing_file";
+
+      Path rootDir = CommonFSUtils.getRootDir(TEST_UTIL.getConfiguration());
+      Path tableDir = CommonFSUtils.getTableDir(rootDir, table1);
+      Path activeFile =
+        new Path(tableDir, regionName + Path.SEPARATOR + columnFamily + 
Path.SEPARATOR + filename);
+
+      fs.mkdirs(activeFile.getParent());
+      fs.create(activeFile).close();
+
+      List<String> activeFiles = new ArrayList<>();
+      activeFiles.add(activeFile.toString());
+      List<String> archiveFiles = new ArrayList<>();
+
+      // Delete the file but don't create it in archive location
+      fs.delete(activeFile, false);
+
+      TestBackupBase.IncrementalTableBackupClientForTest client =
+        new 
TestBackupBase.IncrementalTableBackupClientForTest(TEST_UTIL.getConnection(),
+          "test_backup_id",
+          createBackupRequest(BackupType.INCREMENTAL, 
ImmutableList.of(table1), BACKUP_ROOT_DIR));
+
+      // This should throw IOException since file doesn't exist in archive
+      try {
+        client.updateFileLists(activeFiles, archiveFiles);
+        fail("Expected IOException to be thrown");
+      } catch (IOException e) {
+        // Expected
+      }
+      systemTable.finishBackupExclusiveOperation();
+    }
+  }
+
+  private void performBulkLoad(String keyPrefix) throws IOException {
+    FileSystem fs = TEST_UTIL.getTestFileSystem();
+    Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(TEST_NAME);
+    Path hfilePath =
+      new Path(baseDirectory, Bytes.toString(famName) + Path.SEPARATOR + 
"hfile_" + keyPrefix);
+
+    HFileTestUtil.createHFile(TEST_UTIL.getConfiguration(), fs, hfilePath, 
famName, qualName,
+      Bytes.toBytes(keyPrefix), Bytes.toBytes(keyPrefix + "z"), 
ROWS_IN_BULK_LOAD);
+
+    Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
+      BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1, 
baseDirectory);
+    assertFalse(result.isEmpty());
+  }
+}

Reply via email to