This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch HBASE-28957_rebased
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 3c5c99944f001375c92e30066df967bf10d1acee
Author: asolomon <[email protected]>
AuthorDate: Wed Jul 23 21:49:35 2025 +0530

    HBASE-29310 Handle Bulk Load Operations in Continuous Backup (#7150)
    
    Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
    Reviewed by: Kevin Geiszler <[email protected]>
---
 .../backup/impl/AbstractPitrRestoreHandler.java    | 28 ++++++++++
 .../hbase/backup/impl/BackupImageAdapter.java      | 11 ++++
 .../hbase/backup/impl/BackupInfoAdapter.java       | 11 ++++
 .../hadoop/hbase/backup/impl/BackupManifest.java   | 22 +++++++-
 .../hbase/backup/impl/PitrBackupMetadata.java      |  7 +++
 .../TestIncrementalBackupWithContinuous.java       | 60 ++++++++++++++++++----
 .../hbase/backup/TestPointInTimeRestore.java       |  4 +-
 .../src/main/protobuf/Backup.proto                 |  1 +
 .../hadoop/hbase/tool/BulkLoadHFilesTool.java      |  5 ++
 9 files changed, 135 insertions(+), 14 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
index b2edce6b0fd..8072277bf68 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
 import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
@@ -248,6 +249,8 @@ public abstract class AbstractPitrRestoreHandler {
 
         try {
           if (backupAdmin.validateRequest(restoreRequest)) {
+            // check if any bulkload entry exists post this backup time and 
before "endtime"
+            checkBulkLoadAfterBackup(conn, sTableName, backup, endTime);
             return backup;
           }
         } catch (IOException e) {
@@ -259,6 +262,31 @@ public abstract class AbstractPitrRestoreHandler {
     return null;
   }
 
+  /**
+   * Checks if any bulk load operation occurred for the specified table post 
last successful backup
+   * and before restore time.
+   * @param conn       Active HBase connection
+   * @param sTableName Table for which to check bulk load history
+   * @param backup     Last successful backup before the target recovery time
+   * @param endTime    Target recovery time
+   * @throws IOException if a bulkload entry is found in between backup time 
and endtime
+   */
+  private void checkBulkLoadAfterBackup(Connection conn, TableName sTableName,
+    PitrBackupMetadata backup, long endTime) throws IOException {
+    try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
+      List<BulkLoad> bulkLoads = 
backupSystemTable.readBulkloadRows(List.of(sTableName));
+      for (BulkLoad load : bulkLoads) {
+        long lastBackupTs = (backup.getType() == BackupType.FULL)
+          ? backup.getStartTs()
+          : backup.getIncrCommittedWalTs();
+        if (lastBackupTs < load.getTimestamp() && load.getTimestamp() < 
endTime) {
+          throw new IOException("Bulk load operation detected after last 
successful backup for "
+            + "table: " + sTableName);
+        }
+      }
+    }
+  }
+
   /**
    * Determines if the given backup is valid for PITR.
    * <p>
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
index 8b785a0f050..b6d8d4901a2 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.impl;
 
 import java.util.List;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -57,4 +58,14 @@ public class BackupImageAdapter implements 
PitrBackupMetadata {
   public String getRootDir() {
     return image.getRootDir();
   }
+
+  @Override
+  public BackupType getType() {
+    return image.getType();
+  }
+
+  @Override
+  public long getIncrCommittedWalTs() {
+    return image.getIncrCommittedWalTs();
+  }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
index 967fae551cb..34d812121e0 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.backup.impl;
 import java.util.List;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -57,4 +58,14 @@ public class BackupInfoAdapter implements PitrBackupMetadata 
{
   public String getRootDir() {
     return info.getBackupRootDir();
   }
+
+  @Override
+  public BackupType getType() {
+    return info.getType();
+  }
+
+  @Override
+  public long getIncrCommittedWalTs() {
+    return info.getIncrCommittedWalTs();
+  }
 }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
index 59ae3857f2e..f35755d2451 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
@@ -101,6 +101,11 @@ public class BackupManifest {
         return this;
       }
 
+      Builder withIncrCommittedWalTs(long incrCommittedWalTs) {
+        image.setIncrCommittedWalTs(incrCommittedWalTs);
+        return this;
+      }
+
       BackupImage build() {
         return image;
       }
@@ -115,6 +120,7 @@ public class BackupManifest {
     private long completeTs;
     private ArrayList<BackupImage> ancestors;
     private Map<TableName, Map<String, Long>> incrTimeRanges;
+    private long incrCommittedWalTs;
 
     static Builder newBuilder() {
       return new Builder();
@@ -125,13 +131,14 @@ public class BackupManifest {
     }
 
     private BackupImage(String backupId, BackupType type, String rootDir, 
List<TableName> tableList,
-      long startTs, long completeTs) {
+      long startTs, long completeTs, long incrCommittedWalTs) {
       this.backupId = backupId;
       this.type = type;
       this.rootDir = rootDir;
       this.tableList = tableList;
       this.startTs = startTs;
       this.completeTs = completeTs;
+      this.incrCommittedWalTs = incrCommittedWalTs;
     }
 
     static BackupImage fromProto(BackupProtos.BackupImage im) {
@@ -139,6 +146,7 @@ public class BackupManifest {
       String rootDir = im.getBackupRootDir();
       long startTs = im.getStartTs();
       long completeTs = im.getCompleteTs();
+      long incrCommittedWalTs = im.getIncrCommittedWalTs();
       List<HBaseProtos.TableName> tableListList = im.getTableListList();
       List<TableName> tableList = new ArrayList<>();
       for (HBaseProtos.TableName tn : tableListList) {
@@ -151,7 +159,8 @@ public class BackupManifest {
         ? BackupType.FULL
         : BackupType.INCREMENTAL;
 
-      BackupImage image = new BackupImage(backupId, type, rootDir, tableList, 
startTs, completeTs);
+      BackupImage image = new BackupImage(backupId, type, rootDir, tableList, 
startTs, completeTs,
+        incrCommittedWalTs);
       for (BackupProtos.BackupImage img : ancestorList) {
         image.addAncestor(fromProto(img));
       }
@@ -170,6 +179,7 @@ public class BackupManifest {
       builder.setBackupId(backupId);
       builder.setCompleteTs(completeTs);
       builder.setStartTs(startTs);
+      builder.setIncrCommittedWalTs(incrCommittedWalTs);
       if (type == BackupType.FULL) {
         builder.setBackupType(BackupProtos.BackupType.FULL);
       } else {
@@ -287,6 +297,14 @@ public class BackupManifest {
       return completeTs;
     }
 
+    public long getIncrCommittedWalTs() {
+      return incrCommittedWalTs;
+    }
+
+    public void setIncrCommittedWalTs(long incrCommittedWalTs) {
+      this.incrCommittedWalTs = incrCommittedWalTs;
+    }
+
     private void setCompleteTs(long completeTs) {
       this.completeTs = completeTs;
     }
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
index dc135ce79c0..3d143b33657 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.backup.impl;
 import java.util.List;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -47,4 +48,10 @@ public interface PitrBackupMetadata {
 
   /** Returns Root directory where the backup is stored */
   String getRootDir();
+
+  /** Returns backup type */
+  BackupType getType();
+
+  /** Returns incrCommittedWalTs */
+  long getIncrCommittedWalTs();
 }
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
index 170cc866568..0978ff3ebef 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -21,6 +21,7 @@ import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarker
 import static 
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -33,16 +34,13 @@ import java.util.Set;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
 import org.apache.hadoop.hbase.backup.impl.BackupManifest;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
@@ -68,8 +66,6 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
   private static final Logger LOG =
     LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);
 
-  private byte[] ROW = Bytes.toBytes("row1");
-  private final byte[] COLUMN = Bytes.toBytes("col");
   private static final int ROWS_IN_BULK_LOAD = 100;
 
   @Test
@@ -186,11 +182,55 @@ public class TestIncrementalBackupWithContinuous extends 
TestContinuousBackup {
     }
   }
 
-  private void verifyTable(Table t1) throws IOException {
-    Get g = new Get(ROW);
-    Result r = t1.get(g);
-    assertEquals(1, r.size());
-    assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN));
+  @Test
+  public void testPitrFailureDueToMissingBackupPostBulkload() throws Exception 
{
+    conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
+    String methodName = 
Thread.currentThread().getStackTrace()[1].getMethodName();
+    TableName tableName1 = TableName.valueOf("table_" + methodName);
+    TEST_UTIL.createTable(tableName1, famName);
+    try (BackupSystemTable systemTable = new 
BackupSystemTable(TEST_UTIL.getConnection())) {
+
+      // The test starts with no data, and no bulk loaded rows.
+      int expectedRowCount = 0;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
+      assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
+
+      // Create continuous backup, bulk loads are now being tracked
+      String backup1 = backupTables(BackupType.FULL, List.of(tableName1), 
BACKUP_ROOT_DIR, true);
+      assertTrue(checkSucceeded(backup1));
+
+      loadTable(TEST_UTIL.getConnection().getTable(tableName1));
+      expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH;
+      performBulkLoad("bulkPreIncr", methodName, tableName1);
+      expectedRowCount += ROWS_IN_BULK_LOAD;
+      assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
+      assertEquals(1, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
+
+      loadTable(TEST_UTIL.getConnection().getTable(tableName1));
+      Thread.sleep(5000);
+
+      // Incremental backup
+      String backup2 =
+        backupTables(BackupType.INCREMENTAL, List.of(tableName1), 
BACKUP_ROOT_DIR, true);
+      assertTrue(checkSucceeded(backup2));
+      assertEquals(0, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
+
+      performBulkLoad("bulkPostIncr", methodName, tableName1);
+      assertEquals(1, 
systemTable.readBulkloadRows(List.of(tableName1)).size());
+
+      loadTable(TEST_UTIL.getConnection().getTable(tableName1));
+      Thread.sleep(10000);
+      long restoreTs = 
BackupUtils.getReplicationCheckpoint(TEST_UTIL.getConnection());
+
+      // expect restore failure due to no backup post bulkPostIncr bulkload
+      TableName restoredTable = TableName.valueOf("restoredTable");
+      String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { tableName1 
},
+        new TableName[] { restoredTable }, restoreTs, null);
+      int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+      assertNotEquals("Restore should fail since there is one bulkload without 
any backup", 0, ret);
+    } finally {
+      conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, 
REPLICATION_MARKER_ENABLED_DEFAULT);
+    }
   }
 
   private void performBulkLoad(String keyPrefix, String testDir, TableName 
tableName)
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
index a1ce9c97a68..e9a0b50abcf 100644
--- 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
@@ -67,8 +67,8 @@ public class TestPointInTimeRestore extends TestBackupBase {
     // Simulate a backup taken 20 days ago
     EnvironmentEdgeManager
       .injectEdge(() -> System.currentTimeMillis() - 20 * 
ONE_DAY_IN_MILLISECONDS);
-    PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert 
initial data into
-                                                                   // table1
+    // Insert initial data into table1
+    PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000);
 
     // Perform a full backup for table1 with continuous backup enabled
     String[] args =
diff --git a/hbase-protocol-shaded/src/main/protobuf/Backup.proto 
b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
index 0ad1f5ba619..b173848cd09 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Backup.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
@@ -65,6 +65,7 @@ message BackupImage {
   optional uint64 complete_ts = 6;
   repeated BackupImage ancestors = 7;
   repeated TableServerTimestamp tst_map = 8;
+  optional uint64 incr_committed_wal_ts = 9;
 
 }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index 4d6f57e22ed..cd9bc32e3e1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -1197,6 +1197,11 @@ public class BulkLoadHFilesTool extends Configured 
implements BulkLoadHFiles, To
   public static void main(String[] args) throws Exception {
     Configuration conf = HBaseConfiguration.create();
     int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
+    if (ret == 0) {
+      System.out.println("Bulk load completed successfully.");
+      System.out.println("IMPORTANT: Please take a backup of the table 
immediately if this table "
+        + "is part of continuous backup");
+    }
     System.exit(ret);
   }
 

Reply via email to