This is an automated email from the ASF dual-hosted git repository.
taklwu pushed a commit to branch HBASE-28957
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-28957 by this push:
new 15dc75ff303 HBASE-29459 Capture bulkload files only till
IncrCommittedWalTs during Incremental Backup (#7166)
15dc75ff303 is described below
commit 15dc75ff3032df4ba5377f50535ce1f9775c83f9
Author: asolomon <[email protected]>
AuthorDate: Tue Jul 22 22:13:49 2025 +0530
HBASE-29459 Capture bulkload files only till IncrCommittedWalTs during
Incremental Backup (#7166)
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
Reviewed by: Kevin Geiszler <[email protected]>
---
.../hadoop/hbase/backup/impl/BackupManager.java | 5 ++
.../hbase/backup/impl/BackupSystemTable.java | 19 +++-
.../apache/hadoop/hbase/backup/impl/BulkLoad.java | 15 +++-
.../backup/impl/IncrementalTableBackupClient.java | 8 +-
.../TestIncrementalBackupWithContinuous.java | 100 +++++++++------------
5 files changed, 83 insertions(+), 64 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 91563a29b5e..0bc25072517 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -356,6 +356,11 @@ public class BackupManager implements Closeable {
return
systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
}
+ public List<BulkLoad> readBulkloadRows(List<TableName> tableList, long
endTimestamp)
+ throws IOException {
+ return systemTable.readBulkloadRows(tableList, endTimestamp);
+ }
+
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws
IOException {
return systemTable.readBulkloadRows(tableList);
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index f5be8893047..2afefa4a550 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -445,6 +445,16 @@ public final class BackupSystemTable implements Closeable {
* @param tableList list of table names
*/
public List<BulkLoad> readBulkloadRows(List<TableName> tableList) throws
IOException {
+ return readBulkloadRows(tableList, Long.MAX_VALUE);
+ }
+
+ /**
+ * Reads the rows from backup table recording bulk loaded hfiles
+ * @param tableList list of table names
+ * @param endTimestamp upper bound timestamp for bulkload entries retrieval
+ */
+ public List<BulkLoad> readBulkloadRows(List<TableName> tableList, long
endTimestamp)
+ throws IOException {
List<BulkLoad> result = new ArrayList<>();
for (TableName table : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(table);
@@ -457,8 +467,10 @@ public final class BackupSystemTable implements Closeable {
String path = null;
String region = null;
byte[] row = null;
+ long timestamp = 0L;
for (Cell cell : res.listCells()) {
row = CellUtil.cloneRow(cell);
+ timestamp = cell.getTimestamp();
String rowStr = Bytes.toString(row);
region =
BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
if (
@@ -473,8 +485,11 @@ public final class BackupSystemTable implements Closeable {
path = Bytes.toString(CellUtil.cloneValue(cell));
}
}
- result.add(new BulkLoad(table, region, fam, path, row));
- LOG.debug("found orig " + path + " for " + fam + " of table " +
region);
+ LOG.debug("found orig {} for {} of table {} with timestamp {}",
path, fam, region,
+ timestamp);
+ if (timestamp <= endTimestamp) {
+ result.add(new BulkLoad(table, region, fam, path, row, timestamp));
+ }
}
}
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
index 0f1e79c976b..1befe7c469c 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BulkLoad.java
@@ -34,14 +34,16 @@ public class BulkLoad {
private final String columnFamily;
private final String hfilePath;
private final byte[] rowKey;
+ private final long timestamp;
public BulkLoad(TableName tableName, String region, String columnFamily,
String hfilePath,
- byte[] rowKey) {
+ byte[] rowKey, long timestamp) {
this.tableName = tableName;
this.region = region;
this.columnFamily = columnFamily;
this.hfilePath = hfilePath;
this.rowKey = rowKey;
+ this.timestamp = timestamp;
}
public TableName getTableName() {
@@ -64,6 +66,10 @@ public class BulkLoad {
return rowKey;
}
+ public long getTimestamp() {
+ return timestamp;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -75,19 +81,20 @@ public class BulkLoad {
BulkLoad that = (BulkLoad) o;
return new EqualsBuilder().append(tableName,
that.tableName).append(region, that.region)
.append(columnFamily, that.columnFamily).append(hfilePath,
that.hfilePath)
- .append(rowKey, that.rowKey).isEquals();
+ .append(rowKey, that.rowKey).append(timestamp,
that.timestamp).isEquals();
}
@Override
public int hashCode() {
return new
HashCodeBuilder().append(tableName).append(region).append(columnFamily)
- .append(hfilePath).append(rowKey).toHashCode();
+ .append(hfilePath).append(rowKey).append(timestamp).toHashCode();
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.NO_CLASS_NAME_STYLE)
.append("tableName", tableName).append("region",
region).append("columnFamily", columnFamily)
- .append("hfilePath", hfilePath).append("rowKey", rowKey).toString();
+ .append("hfilePath", hfilePath).append("rowKey",
rowKey).append("timestamp", timestamp)
+ .toString();
}
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index de97694f22f..2621ae3c8eb 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -136,7 +136,13 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup)
throws IOException {
List<String> activeFiles = new ArrayList<>();
List<String> archiveFiles = new ArrayList<>();
- List<BulkLoad> bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+ List<BulkLoad> bulkLoads;
+ if (backupInfo.isContinuousBackupEnabled()) {
+ bulkLoads =
+ backupManager.readBulkloadRows(tablesToBackup,
backupInfo.getIncrCommittedWalTs());
+ } else {
+ bulkLoads = backupManager.readBulkloadRows(tablesToBackup);
+ }
FileSystem tgtFs;
try {
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
index 79d1df645b9..170cc866568 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithContinuous.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
-import org.apache.hadoop.hbase.backup.impl.BulkLoad;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -70,7 +69,6 @@ public class TestIncrementalBackupWithContinuous extends
TestContinuousBackup {
LoggerFactory.getLogger(TestIncrementalBackupWithContinuous.class);
private byte[] ROW = Bytes.toBytes("row1");
- private final byte[] FAMILY = Bytes.toBytes("family");
private final byte[] COLUMN = Bytes.toBytes("col");
private static final int ROWS_IN_BULK_LOAD = 100;
@@ -80,7 +78,7 @@ public class TestIncrementalBackupWithContinuous extends
TestContinuousBackup {
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
TableName tableName = TableName.valueOf("table_" + methodName);
- Table t1 = TEST_UTIL.createTable(tableName, FAMILY);
+ Table t1 = TEST_UTIL.createTable(tableName, famName);
try (BackupSystemTable table = new
BackupSystemTable(TEST_UTIL.getConnection())) {
int before = table.getBackupHistory().size();
@@ -105,10 +103,8 @@ public class TestIncrementalBackupWithContinuous extends
TestContinuousBackup {
assertEquals("Backup should contain the expected tables",
Sets.newHashSet(tableName),
new HashSet<>(manifest.getTableList()));
- Put p = new Put(ROW);
- p.addColumn(FAMILY, COLUMN, COLUMN);
- t1.put(p);
- Thread.sleep(5000);
+ loadTable(t1);
+ Thread.sleep(10000);
// Run incremental backup
LOG.info("Run incremental backup now");
@@ -135,68 +131,57 @@ public class TestIncrementalBackupWithContinuous extends
TestContinuousBackup {
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
incrementalBackupid, false,
tables, tables, true));
- verifyTable(t1);
+ assertEquals(NB_ROWS_IN_BATCH, TEST_UTIL.countRows(tableName));
+ } finally {
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY,
REPLICATION_MARKER_ENABLED_DEFAULT);
}
}
@Test
- public void testContinuousBackupWithIncrementalBackupAndBulkloadSuccess()
throws Exception {
+ public void testIncrementalBackupCopyingBulkloadTillIncrCommittedWalTs()
throws Exception {
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
String methodName =
Thread.currentThread().getStackTrace()[1].getMethodName();
+ TableName tableName1 = TableName.valueOf("table_" + methodName);
+ TEST_UTIL.createTable(tableName1, famName);
try (BackupSystemTable systemTable = new
BackupSystemTable(TEST_UTIL.getConnection())) {
- // The test starts with some data, and no bulk loaded rows.
- int expectedRowCount = NB_ROWS_IN_BATCH;
- assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
- assertTrue(systemTable.readBulkloadRows(List.of(table1)).isEmpty());
- // Bulk loads aren't tracked if the table isn't backed up yet
- performBulkLoad("bulk1", methodName);
- expectedRowCount += ROWS_IN_BULK_LOAD;
- assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
- assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
+ // The test starts with no data, and no bulk loaded rows.
+ int expectedRowCount = 0;
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
+ assertTrue(systemTable.readBulkloadRows(List.of(tableName1)).isEmpty());
- // Create a backup, bulk loads are now being tracked
- String backup1 = backupTables(BackupType.FULL, List.of(table1),
BACKUP_ROOT_DIR, true);
+ // Create continuous backup, bulk loads are now being tracked
+ String backup1 = backupTables(BackupType.FULL, List.of(tableName1),
BACKUP_ROOT_DIR, true);
assertTrue(checkSucceeded(backup1));
- loadTable(TEST_UTIL.getConnection().getTable(table1));
- assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
- performBulkLoad("bulk2", methodName);
+ loadTable(TEST_UTIL.getConnection().getTable(tableName1));
+ expectedRowCount = expectedRowCount + NB_ROWS_IN_BATCH;
+ performBulkLoad("bulkPreIncr", methodName, tableName1);
expectedRowCount += ROWS_IN_BULK_LOAD;
- assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
- assertEquals(1, systemTable.readBulkloadRows(List.of(table1)).size());
-
- // Creating an incremental backup clears the bulk loads
- performBulkLoad("bulk4", methodName);
- performBulkLoad("bulk5", methodName);
- performBulkLoad("bulk6", methodName);
- expectedRowCount += 3 * ROWS_IN_BULK_LOAD;
- assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
- assertEquals(4, systemTable.readBulkloadRows(List.of(table1)).size());
- String backup2 = backupTables(BackupType.INCREMENTAL, List.of(table1),
BACKUP_ROOT_DIR, true);
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
+ assertEquals(1,
systemTable.readBulkloadRows(List.of(tableName1)).size());
+ loadTable(TEST_UTIL.getConnection().getTable(tableName1));
+ Thread.sleep(10000);
+
+ performBulkLoad("bulkPostIncr", methodName, tableName1);
+ assertEquals(2,
systemTable.readBulkloadRows(List.of(tableName1)).size());
+
+ // Incremental backup
+ String backup2 =
+ backupTables(BackupType.INCREMENTAL, List.of(tableName1),
BACKUP_ROOT_DIR, true);
assertTrue(checkSucceeded(backup2));
- assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
- assertEquals(0, systemTable.readBulkloadRows(List.of(table1)).size());
- int rowCountAfterBackup2 = expectedRowCount;
- // Doing another bulk load, to check that this data will disappear after
a restore operation
- performBulkLoad("bulk7", methodName);
- expectedRowCount += ROWS_IN_BULK_LOAD;
- assertEquals(expectedRowCount, TEST_UTIL.countRows(table1));
- List<BulkLoad> bulkloadsTemp =
systemTable.readBulkloadRows(List.of(table1));
- assertEquals(1, bulkloadsTemp.size());
- BulkLoad bulk7 = bulkloadsTemp.get(0);
-
- // Doing a restore. Overwriting the table implies clearing the bulk
loads,
- // but the loading of restored data involves loading bulk data, we
expect 2 bulk loads
- // associated with backup 3 (loading of full backup, loading of
incremental backup).
- BackupAdmin client = getBackupAdmin();
- client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR,
backup2, false,
- new TableName[] { table1 }, new TableName[] { table1 }, true));
- assertEquals(rowCountAfterBackup2, TEST_UTIL.countRows(table1));
- List<BulkLoad> bulkLoads = systemTable.readBulkloadRows(List.of(table1));
- assertEquals(3, bulkLoads.size());
+ // bulkPostIncr Bulkload entry should not be deleted post incremental
backup
+ assertEquals(1,
systemTable.readBulkloadRows(List.of(tableName1)).size());
+
+ TEST_UTIL.truncateTable(tableName1);
+ // Restore incremental backup
+ TableName[] tables = new TableName[] { tableName1 };
+ BackupAdminImpl client = new BackupAdminImpl(TEST_UTIL.getConnection());
+ client.restore(
+ BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backup2, false,
tables, tables, true));
+ assertEquals(expectedRowCount, TEST_UTIL.countRows(tableName1));
+ } finally {
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY,
REPLICATION_MARKER_ENABLED_DEFAULT);
}
}
@@ -208,7 +193,8 @@ public class TestIncrementalBackupWithContinuous extends
TestContinuousBackup {
assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN));
}
- private void performBulkLoad(String keyPrefix, String testDir) throws
IOException {
+ private void performBulkLoad(String keyPrefix, String testDir, TableName
tableName)
+ throws IOException {
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path baseDirectory = TEST_UTIL.getDataTestDirOnTestFS(testDir);
Path hfilePath =
@@ -220,7 +206,7 @@ public class TestIncrementalBackupWithContinuous extends
TestContinuousBackup {
listFiles(fs, baseDirectory, baseDirectory);
Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
- BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(table1,
baseDirectory);
+ BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tableName,
baseDirectory);
assertFalse(result.isEmpty());
}
@@ -246,7 +232,7 @@ public class TestIncrementalBackupWithContinuous extends
TestContinuousBackup {
protected static void loadTable(Table table) throws Exception {
Put p; // 100 + 1 row to t1_syncup
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
- p = new Put(Bytes.toBytes("row" + i));
+ p = new Put(Bytes.toBytes("rowLoad" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
table.put(p);
}