This is an automated email from the ASF dual-hosted git repository.
taklwu pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push:
new b2f2c2feec8 HBASE-29310 Handle Bulk Load Operations in Continuous
Backup (#7150)
b2f2c2feec8 is described below
commit b2f2c2feec81be1a41f4e71f6de377159a69913d
Author: asolomon <[email protected]>
AuthorDate: Wed Jul 23 21:49:35 2025 +0530
HBASE-29310 Handle Bulk Load Operations in Continuous Backup (#7150)
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
Reviewed by: Kevin Geiszler <[email protected]>
---
.../backup/impl/AbstractPitrRestoreHandler.java | 28 ++++++++++
.../hbase/backup/impl/BackupImageAdapter.java | 11 ++++
.../hbase/backup/impl/BackupInfoAdapter.java | 11 ++++
.../hadoop/hbase/backup/impl/BackupManifest.java | 22 +++++++-
.../hbase/backup/impl/BackupSystemTable.java | 4 +-
.../hbase/backup/impl/PitrBackupMetadata.java | 7 +++
.../TestIncrementalBackupWithContinuous.java | 60 ++++++++++++++++++----
.../hbase/backup/TestPointInTimeRestore.java | 4 +-
.../src/main/protobuf/Backup.proto | 1 +
.../hadoop/hbase/tool/BulkLoadHFilesTool.java | 5 ++
10 files changed, 137 insertions(+), 16 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
index b2edce6b0fd..8072277bf68 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.PointInTimeRestoreRequest;
import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
@@ -248,6 +249,8 @@ public abstract class AbstractPitrRestoreHandler {
try {
if (backupAdmin.validateRequest(restoreRequest)) {
+ // check if any bulkload entry exists post this backup time and
before "endtime"
+ checkBulkLoadAfterBackup(conn, sTableName, backup, endTime);
return backup;
}
} catch (IOException e) {
@@ -259,6 +262,31 @@ public abstract class AbstractPitrRestoreHandler {
return null;
}
+ /**
+ * Checks if any bulk load operation occurred for the specified table post
last successful backup
+ * and before restore time.
+ * @param conn Active HBase connection
+ * @param sTableName Table for which to check bulk load history
+ * @param backup Last successful backup before the target recovery time
+ * @param endTime Target recovery time
+ * @throws IOException if a bulkload entry is found in between backup time
and endtime
+ */
+ private void checkBulkLoadAfterBackup(Connection conn, TableName sTableName,
+ PitrBackupMetadata backup, long endTime) throws IOException {
+ try (BackupSystemTable backupSystemTable = new BackupSystemTable(conn)) {
+ List<BulkLoad> bulkLoads =
backupSystemTable.readBulkloadRows(List.of(sTableName));
+ for (BulkLoad load : bulkLoads) {
+ long lastBackupTs = (backup.getType() == BackupType.FULL)
+ ? backup.getStartTs()
+ : backup.getIncrCommittedWalTs();
+ if (lastBackupTs < load.getTimestamp() && load.getTimestamp() <
endTime) {
+ throw new IOException("Bulk load operation detected after last
successful backup for "
+ + "table: " + sTableName);
+ }
+ }
+ }
+ }
+
/**
* Determines if the given backup is valid for PITR.
* <p>
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
index 8b785a0f050..b6d8d4901a2 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupImageAdapter.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.impl;
import java.util.List;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.yetus.audience.InterfaceAudience;
@@ -57,4 +58,14 @@ public class BackupImageAdapter implements
PitrBackupMetadata {
public String getRootDir() {
return image.getRootDir();
}
+
+ @Override
+ public BackupType getType() {
+ return image.getType();
+ }
+
+ @Override
+ public long getIncrCommittedWalTs() {
+ return image.getIncrCommittedWalTs();
+ }
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
index 967fae551cb..34d812121e0 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupInfoAdapter.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.backup.impl;
import java.util.List;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -57,4 +58,14 @@ public class BackupInfoAdapter implements PitrBackupMetadata
{
public String getRootDir() {
return info.getBackupRootDir();
}
+
+ @Override
+ public BackupType getType() {
+ return info.getType();
+ }
+
+ @Override
+ public long getIncrCommittedWalTs() {
+ return info.getIncrCommittedWalTs();
+ }
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
index 59ae3857f2e..f35755d2451 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
@@ -101,6 +101,11 @@ public class BackupManifest {
return this;
}
+ Builder withIncrCommittedWalTs(long incrCommittedWalTs) {
+ image.setIncrCommittedWalTs(incrCommittedWalTs);
+ return this;
+ }
+
BackupImage build() {
return image;
}
@@ -115,6 +120,7 @@ public class BackupManifest {
private long completeTs;
private ArrayList<BackupImage> ancestors;
private Map<TableName, Map<String, Long>> incrTimeRanges;
+ private long incrCommittedWalTs;
static Builder newBuilder() {
return new Builder();
@@ -125,13 +131,14 @@ public class BackupManifest {
}
private BackupImage(String backupId, BackupType type, String rootDir,
List<TableName> tableList,
- long startTs, long completeTs) {
+ long startTs, long completeTs, long incrCommittedWalTs) {
this.backupId = backupId;
this.type = type;
this.rootDir = rootDir;
this.tableList = tableList;
this.startTs = startTs;
this.completeTs = completeTs;
+ this.incrCommittedWalTs = incrCommittedWalTs;
}
static BackupImage fromProto(BackupProtos.BackupImage im) {
@@ -139,6 +146,7 @@ public class BackupManifest {
String rootDir = im.getBackupRootDir();
long startTs = im.getStartTs();
long completeTs = im.getCompleteTs();
+ long incrCommittedWalTs = im.getIncrCommittedWalTs();
List<HBaseProtos.TableName> tableListList = im.getTableListList();
List<TableName> tableList = new ArrayList<>();
for (HBaseProtos.TableName tn : tableListList) {
@@ -151,7 +159,8 @@ public class BackupManifest {
? BackupType.FULL
: BackupType.INCREMENTAL;
- BackupImage image = new BackupImage(backupId, type, rootDir, tableList,
startTs, completeTs);
+ BackupImage image = new BackupImage(backupId, type, rootDir, tableList,
startTs, completeTs,
+ incrCommittedWalTs);
for (BackupProtos.BackupImage img : ancestorList) {
image.addAncestor(fromProto(img));
}
@@ -170,6 +179,7 @@ public class BackupManifest {
builder.setBackupId(backupId);
builder.setCompleteTs(completeTs);
builder.setStartTs(startTs);
+ builder.setIncrCommittedWalTs(incrCommittedWalTs);
if (type == BackupType.FULL) {
builder.setBackupType(BackupProtos.BackupType.FULL);
} else {
@@ -287,6 +297,14 @@ public class BackupManifest {
return completeTs;
}
+ public long getIncrCommittedWalTs() {
+ return incrCommittedWalTs;
+ }
+
+ public void setIncrCommittedWalTs(long incrCommittedWalTs) {
+ this.incrCommittedWalTs = incrCommittedWalTs;
+ }
+
private void setCompleteTs(long completeTs) {
this.completeTs = completeTs;
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 2afefa4a550..31ada8b040b 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -485,8 +485,8 @@ public final class BackupSystemTable implements Closeable {
path = Bytes.toString(CellUtil.cloneValue(cell));
}
}
- LOG.debug("found orig {} for {} of table {} with timestamp {}",
path, fam, region,
- timestamp);
+ LOG.debug("Found orig path {} for family {} of table {} and region
{} with timestamp {}",
+ path, fam, table, region, timestamp);
if (timestamp <= endTimestamp) {
result.add(new BulkLoad(table, region, fam, path, row, timestamp));
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
index dc135ce79c0..3d143b33657 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/PitrBackupMetadata.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.backup.impl;
import java.util.List;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.yetus.audience.InterfaceAudience;
@@ -47,4 +48,10 @@ public interface PitrBackupMetadata {
/** Returns Root directory where the backup is stored */
String getRootDir();
+
+ /** Returns backup type */
+ BackupType getType();
+
+ /** Returns incrCommittedWalTs */
+ long getIncrCommittedWalTs();
}
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
index 170cc866568..0978ff3ebef 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -21,6 +21,7 @@ import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarker
import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -33,16 +34,13 @@ import java.util.Set;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
@@ -68,8 +66,6 @@ public class TestIncrementalBackupWithContinuous extends
TestContinuousBackup {
private static final Logger LOG =
LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);
- private byte[] ROW = Bytes.toBytes("row1");
- private final byte[] COLUMN = Bytes.toBytes("col");
private static final int ROWS_IN_BULK_LOAD = 100;
@Test
@@ -186,11 +182,55 @@ public class TestIncrementalBackupWithContinuous extends
TestContinuousBackup {
}
}
- private void verifyTable(Table t1) throws IOException {
- Get g = new Get(ROW);
- Result r = t1.get(g);
- assertEquals(1, r.size());
- assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN));
+ @Test
+ public void testPitrFailureDueToMissingBackupPostBulkload() throws Exception
{
+ conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
+ String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName1 = TableName.valueOf("table_" + methodName);
+ TEST_UTIL.createTable(tableName1, famName);
+ try (BackupSystemTable systemTable = new
BackupSystemTable(TEST_UTIL.getConnection())) {
+
+ // The test starts with no data, and no bulk loaded rows.
+ int expectedRowCount = 0;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
+ assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
+
+ // Create continuous backup, bulk loads are now being tracked
+ String backup1 = backupTables(BackupType.FULL, List.of(tableName1),
BACKUP_ROOT_DIR, true);
+ assertTrue(checkSucceeded(backup1));
+
+ loadTable(TEST_UTIL.getConnection().getTable(tableName1));
+ expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH;
+ performBulkLoad("bulkPreIncr", methodName, tableName1);
+ expectedRowCount += ROWS_IN_BULK_LOAD;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
+ assertEquals(1,
systemTable.readBulkloadRows(List.of(tableName1)).size());
+
+ loadTable(TEST_UTIL.getConnection().getTable(tableName1));
+ Thread.sleep(5000);
+
+ // Incremental backup
+ String backup2 =
+ backupTables(BackupType.INCREMENTAL, List.of(tableName1),
BACKUP_ROOT_DIR, true);
+ assertTrue(checkSucceeded(backup2));
+ assertEquals(0,
systemTable.readBulkloadRows(List.of(tableName1)).size());
+
+ performBulkLoad("bulkPostIncr", methodName, tableName1);
+ assertEquals(1,
systemTable.readBulkloadRows(List.of(tableName1)).size());
+
+ loadTable(TEST_UTIL.getConnection().getTable(tableName1));
+ Thread.sleep(10000);
+ long restoreTs =
BackupUtils.getReplicationCheckpoint(TEST_UTIL.getConnection());
+
+ // expect restore failure due to no backup post bulkPostIncr bulkload
+ TableName restoredTable = TableName.valueOf("restoredTable");
+ String[] args = PITRTestUtil.buildPITRArgs(new TableName[] { tableName1
},
+ new TableName[] { restoredTable }, restoreTs, null);
+ int ret = ToolRunner.run(conf1, new PointInTimeRestoreDriver(), args);
+ assertNotEquals("Restore should fail since there is one bulkload without
any backup", 0, ret);
+ } finally {
+ conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY,
REPLICATION_MARKER_ENABLED_DEFAULT);
+ }
}
private void performBulkLoad(String keyPrefix, String testDir, TableName
tableName)
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
index a1ce9c97a68..e9a0b50abcf 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestPointInTimeRestore.java
@@ -67,8 +67,8 @@ public class TestPointInTimeRestore extends TestBackupBase {
// Simulate a backup taken 20 days ago
EnvironmentEdgeManager
.injectEdge(() -> System.currentTimeMillis() - 20 *
ONE_DAY_IN_MILLISECONDS);
- PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000); // Insert
initial data into
- // table1
+ // Insert initial data into table1
+ PITRTestUtil.loadRandomData(TEST_UTIL, table1, famName, 1000);
// Perform a full backup for table1 with continuous backup enabled
String[] args =
diff --git a/hbase-protocol-shaded/src/main/protobuf/Backup.proto
b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
index 0ad1f5ba619..b173848cd09 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Backup.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Backup.proto
@@ -65,6 +65,7 @@ message BackupImage {
optional uint64 complete_ts = 6;
repeated BackupImage ancestors = 7;
repeated TableServerTimestamp tst_map = 8;
+ optional uint64 incr_committed_wal_ts = 9;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index 98e6631e305..7c6e9c01025 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -1195,6 +1195,11 @@ public class BulkLoadHFilesTool extends Configured
implements BulkLoadHFiles, To
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
+ if (ret == 0) {
+ System.out.println("Bulk load completed successfully.");
+ System.out.println("IMPORTANT: Please take a backup of the table
immediately if this table "
+ + "is part of continuous backup");
+ }
System.exit(ret);
}