This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch HBASE-29164
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-29164 by this push:
new 75373629373 HBASE-29687: Extend IntegrationTestBackupRestore to handle
continuous backups (#7417)
75373629373 is described below
commit 753736293736f815f050674308e3cfe4a0c8c9f1
Author: Kevin Geiszler <[email protected]>
AuthorDate: Wed Nov 5 10:43:49 2025 -0800
HBASE-29687: Extend IntegrationTestBackupRestore to handle continuous
backups (#7417)
* Extend IntegrationTestBackup restore into a base class with continuous
and non-continuous subclasses
Change-Id: I0c70c417b86c7732b58642a51c75897c35b16cb6
* Add more test cases to runTestSingle for testContinuousBackupRestore
Change-Id: Id043400bf85c7b696bb94bef7cb17ed9dad13334
* Add more test cases for full continuous backup; Change while loop to a
for loop
Change-Id: I5ba3276919e6bbdf343c134fa287c69f3854a8a2
* Add delete test case
Change-Id: I25fe484e9c227b7a31cb3768def3c12f66d617ac
* Start adding changes after looking at WIP PR in GitHub
Change-Id: Ie9aece8a3ec55739d618ebf2d2f173a41a116eb6
* Continue adding changes after looking at WIP PR in GitHub
Change-Id: Ie345e623089979f028b13aed13e5ec93e025eff8
* Run mvn spotless:apply
Change-Id: I98eb019dd93dfc8e21b6c730e0e2e60314102724
* Add documentation for runTestMulti and runTestSingle
Change-Id: I4de6fc485aa1ff6e0d8d837e081f8dde20bb3f67
* Update documentation
Change-Id: I911180a8f263f801a5c299d43d0215fe444f22d3
* Enhance delete test case
Change-Id: I78fe59f800cde7c89b11760a49d774c5173a862c
* Update method name to verifyBackupExistenceAfterMerge
Change-Id: Ia150d21f48bb160d9e8bcf922799dc18c0b7c77c
* Address review comments
Change-Id: I9d5b55e36b44367ac8ace08a5859c42b796fefd4
* Add wait for region servers in replication checkpoint to catch up with
latest Put timestamp
Change-Id: Ic438ca292bc01827d46725e006bfa0c21bc95f01
* Handle command line arg parsing and conf setup in base class
Change-Id: I9d52e774e84dc389d42aa63315529a2590c40cb8
* Fix spotless error
Change-Id: I27eec25091842376ee7a059a9688c6f5ab385ac7
* Fix checkstyle errors for IntegrationTestBackupRestore.java
Change-Id: I18ab629df4af4e93b42ec1b0d576fd411279c775
* Remove initializeConfFromCommandLine()
Change-Id: Ibc96fd712e384cc3ca5a2c4575e47e65e62c60fa
* Change info log message to debug
Change-Id: Ie8e94ce978836b1314525138726a13641360aae6
* Run mvn spotless:apply
Change-Id: Ibeea379a65e801b60ec5124938b7aa17087025f0
---
.../apache/hadoop/hbase/backup/BackupTestUtil.java | 21 +-
.../hadoop/hbase/backup/TestContinuousBackup.java | 20 +-
hbase-it/pom.xml | 7 +
.../hadoop/hbase/IntegrationTestBackupRestore.java | 468 -------------
.../hbase/backup/IntegrationTestBackupRestore.java | 86 +++
.../backup/IntegrationTestBackupRestoreBase.java | 756 +++++++++++++++++++++
.../IntegrationTestContinuousBackupRestore.java | 103 +++
7 files changed, 974 insertions(+), 487 deletions(-)
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java
index 3665eeb7a76..4d9577431d3 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java
@@ -17,14 +17,20 @@
*/
package org.apache.hadoop.hbase.backup;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
@@ -46,10 +52,21 @@ public class BackupTestUtil {
}
}
- static void enableBackup(Configuration conf) {
- // Enable backup
+ public static void enableBackup(Configuration conf) {
conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
BackupManager.decorateMasterConfiguration(conf);
BackupManager.decorateRegionServerConfiguration(conf);
}
+
+ public static void verifyReplicationPeerSubscription(HBaseTestingUtil util,
TableName tableName)
+ throws IOException {
+ try (Admin admin = util.getAdmin()) {
+ ReplicationPeerDescription peerDesc =
admin.listReplicationPeers().stream()
+ .filter(peer ->
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)).findFirst()
+ .orElseThrow(() -> new AssertionError("Replication peer not found"));
+
+ assertTrue("Table should be subscribed to the replication peer",
+ peerDesc.getPeerConfig().getTableCFsMap().containsKey(tableName));
+ }
+ }
}
diff --git
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
index 2fdfa8b73f8..874ae88320e 100644
---
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
+++
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.backup;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
-import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER;
import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -36,8 +35,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupManifest;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
@@ -114,7 +111,7 @@ public class TestContinuousBackup extends TestBackupBase {
}
// Verify replication peer subscription
- verifyReplicationPeerSubscription(tableName);
+ BackupTestUtil.verifyReplicationPeerSubscription(TEST_UTIL, tableName);
// Verify table is registered in Backup System Table
verifyTableInBackupSystemTable(tableName);
@@ -157,8 +154,8 @@ public class TestContinuousBackup extends TestBackupBase {
}
// Verify replication peer subscription for each table
- verifyReplicationPeerSubscription(tableName1);
- verifyReplicationPeerSubscription(tableName2);
+ BackupTestUtil.verifyReplicationPeerSubscription(TEST_UTIL, tableName1);
+ BackupTestUtil.verifyReplicationPeerSubscription(TEST_UTIL, tableName2);
// Verify tables are registered in Backup System Table
verifyTableInBackupSystemTable(tableName1);
@@ -248,17 +245,6 @@ public class TestContinuousBackup extends TestBackupBase {
}
}
- private void verifyReplicationPeerSubscription(TableName table) throws
IOException {
- try (Admin admin = TEST_UTIL.getAdmin()) {
- ReplicationPeerDescription peerDesc =
admin.listReplicationPeers().stream()
- .filter(peer ->
peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)).findFirst()
- .orElseThrow(() -> new AssertionError("Replication peer not found"));
-
- assertTrue("Table should be subscribed to the replication peer",
- peerDesc.getPeerConfig().getTableCFsMap().containsKey(table));
- }
- }
-
String[] buildBackupArgs(String backupType, TableName[] tables, boolean
continuousEnabled) {
String tableNames =
Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(","));
diff --git a/hbase-it/pom.xml b/hbase-it/pom.xml
index 5bc9af02782..a282fcc28d5 100644
--- a/hbase-it/pom.xml
+++ b/hbase-it/pom.xml
@@ -60,6 +60,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-backup</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-logging</artifactId>
diff --git
a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java
deleted file mode 100644
index f910df67200..00000000000
---
a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase;
-
-import static
org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.backup.BackupAdmin;
-import org.apache.hadoop.hbase.backup.BackupInfo;
-import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
-import org.apache.hadoop.hbase.backup.BackupRequest;
-import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
-import org.apache.hadoop.hbase.backup.BackupType;
-import org.apache.hadoop.hbase.backup.RestoreRequest;
-import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
-import org.apache.hadoop.hbase.backup.impl.BackupManager;
-import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
-import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
-import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
-import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
-import org.apache.hadoop.hbase.chaos.policies.Policy;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.testclassification.IntegrationTests;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
-
-/**
- * An integration test to detect regressions in HBASE-7912. Create a table
with many regions, load
- * data, perform series backup/load operations, then restore and verify data
- * @see <a
href="https://issues.apache.org/jira/browse/HBASE-7912">HBASE-7912</a>
- * @see <a
href="https://issues.apache.org/jira/browse/HBASE-14123">HBASE-14123</a>
- */
-@Category(IntegrationTests.class)
-public class IntegrationTestBackupRestore extends IntegrationTestBase {
- private static final String CLASS_NAME =
IntegrationTestBackupRestore.class.getSimpleName();
- protected static final Logger LOG =
LoggerFactory.getLogger(IntegrationTestBackupRestore.class);
- protected static final String NUMBER_OF_TABLES_KEY = "num_tables";
- protected static final String COLUMN_NAME = "f";
- protected static final String REGION_COUNT_KEY = "regions_per_rs";
- protected static final String REGIONSERVER_COUNT_KEY = "region_servers";
- protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration";
- protected static final String NUM_ITERATIONS_KEY = "num_iterations";
- protected static final int DEFAULT_REGION_COUNT = 10;
- protected static final int DEFAULT_REGIONSERVER_COUNT = 5;
- protected static final int DEFAULT_NUMBER_OF_TABLES = 1;
- protected static final int DEFAULT_NUM_ITERATIONS = 10;
- protected static final int DEFAULT_ROWS_IN_ITERATION = 10000;
- protected static final String SLEEP_TIME_KEY = "sleeptime";
- // short default interval because tests don't run very long.
- protected static final long SLEEP_TIME_DEFAULT = 50000L;
-
- protected static int rowsInIteration;
- protected static int regionsCountPerServer;
- protected static int regionServerCount;
-
- protected static int numIterations;
- protected static int numTables;
- protected static TableName[] tableNames;
- protected long sleepTime;
- protected static Object lock = new Object();
-
- private static String BACKUP_ROOT_DIR = "backupIT";
-
- /*
- * This class is used to run the backup and restore thread(s). Throwing an
exception in this
- * thread will not cause the test to fail, so the purpose of this class is
to both kick off the
- * backup and restore and record any exceptions that occur so they can be
thrown in the main
- * thread.
- */
- protected class BackupAndRestoreThread implements Runnable {
- private final TableName table;
- private Throwable throwable;
-
- public BackupAndRestoreThread(TableName table) {
- this.table = table;
- this.throwable = null;
- }
-
- public Throwable getThrowable() {
- return this.throwable;
- }
-
- @Override
- public void run() {
- try {
- runTestSingle(this.table);
- } catch (Throwable t) {
- LOG.error(
- "An error occurred in thread {} when performing a backup and restore
with table {}: ",
- Thread.currentThread().getName(), this.table.getNameAsString(), t);
- this.throwable = t;
- }
- }
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- util = new IntegrationTestingUtility();
- Configuration conf = util.getConfiguration();
- regionsCountPerServer = conf.getInt(REGION_COUNT_KEY,
DEFAULT_REGION_COUNT);
- regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY,
DEFAULT_REGIONSERVER_COUNT);
- rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY,
DEFAULT_ROWS_IN_ITERATION);
- numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS);
- numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES);
- sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT);
- enableBackup(conf);
- LOG.info("Initializing cluster with {} region servers.",
regionServerCount);
- util.initializeCluster(regionServerCount);
- LOG.info("Cluster initialized and ready");
- }
-
- @After
- public void tearDown() throws IOException {
- LOG.info("Cleaning up after test.");
- if (util.isDistributedCluster()) {
- deleteTablesIfAny();
- LOG.info("Cleaning up after test. Deleted tables");
- cleanUpBackupDir();
- }
- LOG.info("Restoring cluster.");
- util.restoreCluster();
- LOG.info("Cluster restored.");
- }
-
- @Override
- public void setUpMonkey() throws Exception {
- Policy p =
- new PeriodicRandomActionPolicy(sleepTime, new
RestartRandomRsExceptMetaAction(sleepTime));
- this.monkey = new PolicyBasedChaosMonkey(util, p);
- startMonkey();
- }
-
- private void deleteTablesIfAny() throws IOException {
- for (TableName table : tableNames) {
- util.deleteTableIfAny(table);
- }
- }
-
- private void createTables() throws Exception {
- tableNames = new TableName[numTables];
- for (int i = 0; i < numTables; i++) {
- tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i);
- }
- for (TableName table : tableNames) {
- createTable(table);
- }
- }
-
- private void enableBackup(Configuration conf) {
- // Enable backup
- conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
- BackupManager.decorateMasterConfiguration(conf);
- BackupManager.decorateRegionServerConfiguration(conf);
- }
-
- private void cleanUpBackupDir() throws IOException {
- FileSystem fs = FileSystem.get(util.getConfiguration());
- fs.delete(new Path(BACKUP_ROOT_DIR), true);
- }
-
- @Test
- public void testBackupRestore() throws Exception {
- BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR +
BACKUP_ROOT_DIR;
- createTables();
- runTestMulti();
- }
-
- private void runTestMulti() throws Exception {
- LOG.info("IT backup & restore started");
- Thread[] workers = new Thread[numTables];
- BackupAndRestoreThread[] backupAndRestoreThreads = new
BackupAndRestoreThread[numTables];
- for (int i = 0; i < numTables; i++) {
- final TableName table = tableNames[i];
- BackupAndRestoreThread backupAndRestoreThread = new
BackupAndRestoreThread(table);
- backupAndRestoreThreads[i] = backupAndRestoreThread;
- workers[i] = new Thread(backupAndRestoreThread);
- workers[i].start();
- }
- // Wait for all workers to finish and check for errors
- Throwable error = null;
- Throwable threadThrowable;
- for (int i = 0; i < numTables; i++) {
- Uninterruptibles.joinUninterruptibly(workers[i]);
- threadThrowable = backupAndRestoreThreads[i].getThrowable();
- if (threadThrowable == null) {
- continue;
- }
- if (error == null) {
- error = threadThrowable;
- } else {
- error.addSuppressed(threadThrowable);
- }
- }
- // Throw any found errors after all threads have completed
- if (error != null) {
- throw new AssertionError("An error occurred in a backup and restore
thread", error);
- }
- LOG.info("IT backup & restore finished");
- }
-
- private void createTable(TableName tableName) throws Exception {
- long startTime, endTime;
-
- TableDescriptorBuilder builder =
TableDescriptorBuilder.newBuilder(tableName);
-
- TableDescriptor desc = builder.build();
- ColumnFamilyDescriptorBuilder cbuilder =
-
ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset()));
- ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] {
cbuilder.build() };
- LOG.info("Creating table {} with {} splits.", tableName,
- regionsCountPerServer * regionServerCount);
- startTime = EnvironmentEdgeManager.currentTime();
- createPreSplitLoadTestTable(util.getConfiguration(), desc, columns,
regionsCountPerServer);
- util.waitTableAvailable(tableName);
- endTime = EnvironmentEdgeManager.currentTime();
- LOG.info("Pre-split table created successfully in {}ms.", (endTime -
startTime));
- }
-
- private void loadData(TableName table, int numRows) throws IOException {
- Connection conn = util.getConnection();
- // #0- insert some data to a table
- Table t1 = conn.getTable(table);
- util.loadRandomRows(t1, new byte[] { 'f' }, 100, numRows);
- // flush table
- conn.getAdmin().flush(TableName.valueOf(table.getName()));
- }
-
- private String backup(BackupRequest request, BackupAdmin client) throws
IOException {
- return client.backupTables(request);
- }
-
- private void restore(RestoreRequest request, BackupAdmin client) throws
IOException {
- client.restore(request);
- }
-
- private void merge(String[] backupIds, BackupAdmin client) throws
IOException {
- client.mergeBackups(backupIds);
- }
-
- private void runTestSingle(TableName table) throws IOException {
-
- List<String> backupIds = new ArrayList<String>();
-
- try (Connection conn = util.getConnection(); Admin admin = conn.getAdmin();
- BackupAdmin client = new BackupAdminImpl(conn);) {
-
- // #0- insert some data to table 'table'
- loadData(table, rowsInIteration);
-
- // #1 - create full backup for table first
- LOG.info("create full backup image for {}", table);
- List<TableName> tables = Lists.newArrayList(table);
- BackupRequest.Builder builder = new BackupRequest.Builder();
- BackupRequest request =
builder.withBackupType(BackupType.FULL).withTableList(tables)
- .withTargetRootDir(BACKUP_ROOT_DIR).build();
-
- String backupIdFull = backup(request, client);
- assertTrue(checkSucceeded(backupIdFull));
-
- backupIds.add(backupIdFull);
- // Now continue with incremental backups
- int count = 1;
- while (count++ < numIterations) {
-
- // Load data
- loadData(table, rowsInIteration);
- // Do incremental backup
- builder = new BackupRequest.Builder();
- request =
builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
- .withTargetRootDir(BACKUP_ROOT_DIR).build();
- String backupId = backup(request, client);
- assertTrue(checkSucceeded(backupId));
- backupIds.add(backupId);
-
- // Restore incremental backup for table, with overwrite for previous
backup
- String previousBackupId = backupIds.get(backupIds.size() - 2);
- restoreVerifyTable(conn, client, table, previousBackupId,
rowsInIteration * (count - 1));
- // Restore incremental backup for table, with overwrite for last backup
- restoreVerifyTable(conn, client, table, backupId, rowsInIteration *
count);
- }
- // Now merge all incremental and restore
- String[] incBackupIds = allIncremental(backupIds);
- merge(incBackupIds, client);
- // Restore last one
- String backupId = incBackupIds[incBackupIds.length - 1];
- // restore incremental backup for table, with overwrite
- TableName[] tablesRestoreIncMultiple = new TableName[] { table };
- restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false,
tablesRestoreIncMultiple, null,
- true), client);
- Table hTable = conn.getTable(table);
- Assert.assertEquals(util.countRows(hTable), rowsInIteration *
numIterations);
- hTable.close();
- LOG.info("{} loop {} finished.", Thread.currentThread().getName(),
(count - 1));
- }
- }
-
- private void restoreVerifyTable(Connection conn, BackupAdmin client,
TableName table,
- String backupId, long expectedRows) throws IOException {
- TableName[] tablesRestoreIncMultiple = new TableName[] { table };
- restore(
- createRestoreRequest(BACKUP_ROOT_DIR, backupId, false,
tablesRestoreIncMultiple, null, true),
- client);
- Table hTable = conn.getTable(table);
- Assert.assertEquals(expectedRows, util.countRows(hTable));
- hTable.close();
- }
-
- private String[] allIncremental(List<String> backupIds) {
- int size = backupIds.size();
- backupIds = backupIds.subList(1, size);
- String[] arr = new String[size - 1];
- backupIds.toArray(arr);
- return arr;
- }
-
- /** Returns status of backup */
- protected boolean checkSucceeded(String backupId) throws IOException {
- BackupInfo status = getBackupInfo(backupId);
- if (status == null) {
- return false;
- }
- return status.getState() == BackupState.COMPLETE;
- }
-
- private BackupInfo getBackupInfo(String backupId) throws IOException {
- try (BackupSystemTable table = new
BackupSystemTable(util.getConnection())) {
- return table.readBackupInfo(backupId);
- }
- }
-
- /**
- * Get restore request.
- * @param backupRootDir directory where backup is located
- * @param backupId backup ID
- * @param check check the backup
- * @param fromTables table names to restore from
- * @param toTables new table names to restore to
- * @param isOverwrite overwrite the table(s)
- * @return an instance of RestoreRequest
- */
- public RestoreRequest createRestoreRequest(String backupRootDir, String
backupId, boolean check,
- TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
- RestoreRequest.Builder builder = new RestoreRequest.Builder();
- return
builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
-
.withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
- }
-
- @Override
- public void setUpCluster() throws Exception {
- util = getTestingUtil(getConf());
- enableBackup(getConf());
- LOG.debug("Initializing/checking cluster has {} servers",
regionServerCount);
- util.initializeCluster(regionServerCount);
- LOG.debug("Done initializing/checking cluster");
- }
-
- /** Returns status of CLI execution */
- @Override
- public int runTestFromCommandLine() throws Exception {
- // Check if backup is enabled
- if (!BackupManager.isBackupEnabled(getConf())) {
- System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
- return -1;
- }
- System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
- testBackupRestore();
- return 0;
- }
-
- @Override
- public TableName getTablename() {
- // That is only valid when Monkey is CALM (no monkey)
- return null;
- }
-
- @Override
- protected Set<String> getColumnFamilies() {
- // That is only valid when Monkey is CALM (no monkey)
- return null;
- }
-
- @Override
- protected void addOptions() {
- addOptWithArg(REGIONSERVER_COUNT_KEY,
- "Total number of region servers. Default: '" +
DEFAULT_REGIONSERVER_COUNT + "'");
- addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " +
DEFAULT_REGION_COUNT);
- addOptWithArg(ROWS_PER_ITERATION_KEY,
- "Total number of data rows to be loaded during one iteration." + "
Default: "
- + DEFAULT_ROWS_IN_ITERATION);
- addOptWithArg(NUM_ITERATIONS_KEY,
- "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS);
- addOptWithArg(NUMBER_OF_TABLES_KEY,
- "Total number of tables in the test." + " Default: " +
DEFAULT_NUMBER_OF_TABLES);
- addOptWithArg(SLEEP_TIME_KEY, "Sleep time of chaos monkey in ms "
- + "to restart random region server. Default: " + SLEEP_TIME_DEFAULT);
- }
-
- @Override
- protected void processOptions(CommandLine cmd) {
- super.processOptions(cmd);
- regionsCountPerServer = Integer
- .parseInt(cmd.getOptionValue(REGION_COUNT_KEY,
Integer.toString(DEFAULT_REGION_COUNT)));
- regionServerCount = Integer.parseInt(
- cmd.getOptionValue(REGIONSERVER_COUNT_KEY,
Integer.toString(DEFAULT_REGIONSERVER_COUNT)));
- rowsInIteration = Integer.parseInt(
- cmd.getOptionValue(ROWS_PER_ITERATION_KEY,
Integer.toString(DEFAULT_ROWS_IN_ITERATION)));
- numIterations = Integer
- .parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY,
Integer.toString(DEFAULT_NUM_ITERATIONS)));
- numTables = Integer.parseInt(
- cmd.getOptionValue(NUMBER_OF_TABLES_KEY,
Integer.toString(DEFAULT_NUMBER_OF_TABLES)));
- sleepTime =
- Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY,
Long.toString(SLEEP_TIME_DEFAULT)));
-
- LOG.info(MoreObjects.toStringHelper("Parsed Options")
- .add(REGION_COUNT_KEY,
regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount)
- .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY,
numIterations)
- .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY,
sleepTime).toString());
- }
-
- public static void main(String[] args) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- IntegrationTestingUtility.setUseDistributedCluster(conf);
- int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(),
args);
- System.exit(status);
- }
-}
diff --git
a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java
new file mode 100644
index 00000000000..99fbcb21a2b
--- /dev/null
+++
b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An integration test to detect regressions in HBASE-7912. Create a table
with many regions, load
+ * data, perform series backup/load operations, then restore and verify data
+ * @see <a
href="https://issues.apache.org/jira/browse/HBASE-7912">HBASE-7912</a>
+ * @see <a
href="https://issues.apache.org/jira/browse/HBASE-14123">HBASE-14123</a>
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestBackupRestore extends
IntegrationTestBackupRestoreBase {
+ private static final String CLASS_NAME =
IntegrationTestBackupRestore.class.getSimpleName();
+ protected static final Logger LOG =
LoggerFactory.getLogger(IntegrationTestBackupRestore.class);
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ util = new IntegrationTestingUtility();
+ conf = util.getConfiguration();
+ BackupTestUtil.enableBackup(conf);
+ LOG.info("Initializing cluster with {} region servers.",
regionServerCount);
+ util.initializeCluster(regionServerCount);
+ LOG.info("Cluster initialized and ready");
+
+ backupRootDir = util.getDataTestDirOnTestFS() + Path.SEPARATOR +
DEFAULT_BACKUP_ROOT_DIR;
+ LOG.info("The backup root directory is: {}", backupRootDir);
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void testBackupRestore() throws Exception {
+ LOG.info("Running backup and restore integration test with continuous
backup disabled");
+ createTables(CLASS_NAME);
+ runTestMulti(false);
+ }
+
+ /** Returns status of CLI execution */
+ @Override
+ public int runTestFromCommandLine() throws Exception {
+ // Check if backup is enabled
+ if (!BackupManager.isBackupEnabled(getConf())) {
+ System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
+ return -1;
+ }
+ System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
+ testBackupRestore();
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(),
args);
+ System.exit(status);
+ }
+}
diff --git
a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java
new file mode 100644
index 00000000000..56349fe055a
--- /dev/null
+++
b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static
org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable;
+import static
org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.IntegrationTestBase;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.backup.util.BackupUtils;
+import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
+import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
+import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
+import org.apache.hadoop.hbase.chaos.policies.Policy;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+
+/**
+ * An abstract base class that is used to run backup, restore, and delete
integration tests. This
+ * class performs both full backups and incremental backups. Both continuous
backup and
+ * non-continuous backup test cases are supported. The number of incremental
backups performed
+ * depends on the number of iterations defined by the user. The class performs
the backup/restore in
+ * a separate thread, where one thread is created per table. The number of
tables is user-defined,
+ * along with other various configurations.
+ */
+public abstract class IntegrationTestBackupRestoreBase extends
IntegrationTestBase {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(IntegrationTestBackupRestoreBase.class);
+ protected static final String NUMBER_OF_TABLES_KEY = "num_tables";
+ protected static final String COLUMN_NAME = "f";
+ protected static final String REGION_COUNT_KEY = "regions_per_rs";
+ protected static final String REGIONSERVER_COUNT_KEY = "region_servers";
+ protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration";
+ protected static final String NUM_ITERATIONS_KEY = "num_iterations";
+ protected static final int DEFAULT_REGION_COUNT = 10;
+ protected static final int DEFAULT_REGIONSERVER_COUNT = 5;
+ protected static final int DEFAULT_NUMBER_OF_TABLES = 1;
+ protected static final int DEFAULT_NUM_ITERATIONS = 10;
+ protected static final int DEFAULT_ROWS_IN_ITERATION = 10000;
+ protected static final String SLEEP_TIME_KEY = "sleeptime";
+ // short default interval because tests don't run very long.
+ protected static final long SLEEP_TIME_DEFAULT = 50000L;
+ protected static String DEFAULT_BACKUP_ROOT_DIR = "backupIT";
+
+ protected static int rowsInIteration;
+ protected static int regionsCountPerServer;
+ protected static int regionServerCount;
+
+ protected static int numIterations;
+ protected static int numTables;
+ protected static TableName[] tableNames;
+ protected long sleepTime;
+ protected static Object lock = new Object();
+
+ protected FileSystem fs;
+ protected String backupRootDir;
+
+ /*
+ * This class is used to run the backup and restore thread(s). Throwing an
exception in this
+ * thread will not cause the test to fail, so the purpose of this class is
to both kick off the
+ * backup and restore, as well as record any exceptions that occur so they
can be thrown in the
+ * main thread.
+ */
+ protected class BackupAndRestoreThread implements Runnable {
+ private final TableName table;
+ private final boolean isContinuousBackupEnabled;
+ private Throwable throwable;
+
+ public BackupAndRestoreThread(TableName table, boolean
isContinuousBackupEnabled) {
+ this.table = table;
+ this.isContinuousBackupEnabled = isContinuousBackupEnabled;
+ this.throwable = null;
+ }
+
+ public Throwable getThrowable() {
+ return this.throwable;
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.info("Running backup and restore test for {} in thread {}",
this.table,
+ Thread.currentThread());
+ runTestSingle(this.table, isContinuousBackupEnabled);
+ } catch (Throwable t) {
+ LOG.error(
+ "An error occurred in thread {} when performing a backup and restore
with table {}: ",
+ Thread.currentThread().getName(), this.table.getNameAsString(), t);
+ this.throwable = t;
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ LOG.info("Cleaning up after test.");
+ if (util.isDistributedCluster()) {
+ deleteTablesIfAny();
+ LOG.info("Cleaning up after test. Deleted tables");
+ cleanUpBackupDir();
+ }
+ LOG.info("Restoring cluster.");
+ util.restoreCluster();
+ LOG.info("Cluster restored.");
+ }
+
+ @Override
+ public void setUpMonkey() throws Exception {
+ Policy p =
+ new PeriodicRandomActionPolicy(sleepTime, new
RestartRandomRsExceptMetaAction(sleepTime));
+ this.monkey = new PolicyBasedChaosMonkey(util, p);
+ startMonkey();
+ }
+
+ private void deleteTablesIfAny() throws IOException {
+ for (TableName table : tableNames) {
+ util.deleteTableIfAny(table);
+ }
+ }
+
+ protected void createTables(String tableBaseName) throws Exception {
+ tableNames = new TableName[numTables];
+ for (int i = 0; i < numTables; i++) {
+ tableNames[i] = TableName.valueOf(tableBaseName + ".table." + i);
+ }
+ for (TableName table : tableNames) {
+ createTable(table);
+ }
+ }
+
+ /**
+ * Creates a directory specified by backupWALDir and sets this directory to
+ * CONF_CONTINUOUS_BACKUP_WAL_DIR in the configuration.
+ */
+ protected void createAndSetBackupWalDir() throws IOException {
+ Path root = util.getDataTestDirOnTestFS();
+ Path backupWalDir = new Path(root, "backupWALDir");
+ FileSystem fs = FileSystem.get(conf);
+ fs.mkdirs(backupWalDir);
+ conf.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString());
+ LOG.info(
+ "The continuous backup WAL directory has been created and set in the
configuration to: {}",
+ backupWalDir);
+ }
+
+ private void cleanUpBackupDir() throws IOException {
+ FileSystem fs = FileSystem.get(util.getConfiguration());
+ fs.delete(new Path(backupRootDir), true);
+ }
+
+ /**
+ * This is the main driver method used by tests that extend this abstract
base class. This method
+ * starts one backup and restore thread per table.
+ * @param isContinuousBackupEnabled Boolean flag used to specify if the
backups should have
+ * continuous backup enabled.
+ */
+ protected void runTestMulti(boolean isContinuousBackupEnabled) {
+ Thread[] workers = new Thread[numTables];
+ BackupAndRestoreThread[] backupAndRestoreThreads = new
BackupAndRestoreThread[numTables];
+ for (int i = 0; i < numTables; i++) {
+ final TableName table = tableNames[i];
+ BackupAndRestoreThread backupAndRestoreThread =
+ new BackupAndRestoreThread(table, isContinuousBackupEnabled);
+ backupAndRestoreThreads[i] = backupAndRestoreThread;
+ workers[i] = new Thread(backupAndRestoreThread);
+ workers[i].start();
+ }
+ // Wait for all workers to finish and check for errors
+ Throwable error = null;
+ Throwable threadThrowable;
+ for (int i = 0; i < numTables; i++) {
+ Uninterruptibles.joinUninterruptibly(workers[i]);
+ threadThrowable = backupAndRestoreThreads[i].getThrowable();
+ if (threadThrowable == null) {
+ continue;
+ }
+ if (error == null) {
+ error = threadThrowable;
+ } else {
+ error.addSuppressed(threadThrowable);
+ }
+ }
+ // Throw any found errors after all threads have completed
+ if (error != null) {
+ throw new AssertionError("An error occurred in a backup and restore
thread", error);
+ }
+ LOG.info("IT backup & restore finished");
+ }
+
+ /**
+ * This method is what performs the actual backup, restore, merge, and
delete operations. This
+ * method is run in a separate thread. It first performs a full backup.
After, it iteratively
+ * performs a series of incremental backups and restores. Later, it deletes
the backups.
+ * @param tableName The table the backups are performed on
+ * @param isContinuousBackupEnabled Boolean flag used to indicate if the
backups should have
+ * continuous backup enabled.
+ */
+ private void runTestSingle(TableName tableName, boolean
isContinuousBackupEnabled)
+ throws IOException, InterruptedException {
+ String enabledOrDisabled = isContinuousBackupEnabled ? "enabled" :
"disabled";
+ List<String> backupIds = new ArrayList<>();
+
+ try (Connection conn = util.getConnection(); BackupAdmin client = new
BackupAdminImpl(conn);
+ Table tableConn = conn.getTable(tableName)) {
+ loadData(tableName, rowsInIteration);
+
+ // First create a full backup for the table
+ LOG.info("Creating full backup image for {} with continuous backup {}",
tableName,
+ enabledOrDisabled);
+ List<TableName> tables = Lists.newArrayList(tableName);
+ BackupRequest.Builder builder = new BackupRequest.Builder();
+ BackupRequest request =
builder.withBackupType(BackupType.FULL).withTableList(tables)
+
.withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled)
+ .build();
+
+ String fullBackupId = backup(request, client, backupIds);
+ LOG.info("Created full backup with ID: {}", fullBackupId);
+
+ verifySnapshotExists(tableName, fullBackupId);
+
+ // Run full backup verifications specific to continuous backup
+ if (isContinuousBackupEnabled) {
+ BackupTestUtil.verifyReplicationPeerSubscription(util, tableName);
+ Path backupWALs = verifyWALsDirectoryExists();
+ Path walPartitionDir = verifyWALPartitionDirExists(backupWALs);
+ verifyBackupWALFiles(walPartitionDir);
+ }
+
+ // Now continue with incremental backups
+ String incrementalBackupId;
+ for (int count = 1; count <= numIterations; count++) {
+ LOG.info("{} - Starting incremental backup iteration {} of {} for {}",
+ Thread.currentThread().getName(), count, numIterations, tableName);
+ loadData(tableName, rowsInIteration);
+ if (isContinuousBackupEnabled) {
+ long latestPutTimestamp = getLatestPutTimestamp(tableConn);
+ waitForCheckpointTimestampsToUpdate(conn, latestPutTimestamp,
tableName);
+ }
+
+ // Do incremental backup
+ LOG.info("Creating incremental backup number {} with continuous backup
{} for {}", count,
+ enabledOrDisabled, tableName);
+ builder = new BackupRequest.Builder();
+ request =
builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
+
.withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled)
+ .build();
+ incrementalBackupId = backup(request, client, backupIds);
+ LOG.info("Created incremental backup with ID: {}",
incrementalBackupId);
+
+ // Restore table using backup taken "two backups ago"
+ // On the first iteration, this backup will be the full backup
+ String previousBackupId = backupIds.get(backupIds.size() - 2);
+ if (previousBackupId.equals(fullBackupId)) {
+ LOG.info("Restoring {} using original full backup with ID: {}",
tableName,
+ previousBackupId);
+ } else {
+ LOG.info("Restoring {} using second most recent incremental backup
with ID: {}",
+ tableName, previousBackupId);
+ }
+ restoreTableAndVerifyRowCount(conn, client, tableName,
previousBackupId,
+ (long) rowsInIteration * count);
+
+ // Restore table using the most recently created incremental backup
+ LOG.info("Restoring {} using most recent incremental backup with ID:
{}", tableName,
+ incrementalBackupId);
+ restoreTableAndVerifyRowCount(conn, client, tableName,
incrementalBackupId,
+ (long) rowsInIteration * (count + 1));
+ LOG.info("{} - Finished incremental backup iteration {} of {} for {}",
+ Thread.currentThread().getName(), count, numIterations, tableName);
+ }
+
+ // Now merge all incremental and restore
+ String[] incrementalBackupIds = getAllIncrementalBackupIds(backupIds);
+ merge(incrementalBackupIds, client);
+ verifyBackupExistenceAfterMerge(backupIds);
+ removeNonexistentBackups(backupIds);
+ // Restore the last incremental backup
+ incrementalBackupId = incrementalBackupIds[incrementalBackupIds.length -
1];
+ // restore incremental backup for table, with overwrite
+ TableName[] tablesToRestoreFrom = new TableName[] { tableName };
+ restore(createRestoreRequest(backupRootDir, incrementalBackupId, false,
tablesToRestoreFrom,
+ null, true), client);
+ Table hTable = conn.getTable(tableName);
+ Assert.assertEquals(rowsInIteration * (numIterations + 1),
+ HBaseTestingUtil.countRows(hTable));
+ hTable.close();
+
+ // Create another incremental backup to show it can be deleted on its own
+ backup(request, client, backupIds);
+ deleteMostRecentIncrementalBackup(backupIds, client);
+ // The full backup and the second most recent incremental backup should
still exist
+ assertEquals(2, backupIds.size());
+ verifyAllBackupsExist(backupIds);
+ // Delete the full backup, which should also automatically delete any
incremental backups that
+ // depend on it
+ LOG.info("Deleting full backup: {}. This will also delete any remaining
incremental backups",
+ fullBackupId);
+ delete(new String[] { fullBackupId }, client);
+ verifyNoBackupsExist(backupIds);
+ }
+ }
+
+ private void createTable(TableName tableName) throws Exception {
+ long startTime, endTime;
+
+ TableDescriptorBuilder builder =
TableDescriptorBuilder.newBuilder(tableName);
+
+ TableDescriptor desc = builder.build();
+ ColumnFamilyDescriptorBuilder cbuilder =
+
ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset()));
+ ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] {
cbuilder.build() };
+ LOG.info("Creating table {} with {} splits.", tableName,
+ regionsCountPerServer * regionServerCount);
+ startTime = EnvironmentEdgeManager.currentTime();
+ createPreSplitLoadTestTable(util.getConfiguration(), desc, columns,
regionsCountPerServer);
+ util.waitTableAvailable(tableName);
+ endTime = EnvironmentEdgeManager.currentTime();
+ LOG.info("Pre-split table created successfully in {}ms.", (endTime -
startTime));
+ }
+
+ private void loadData(TableName table, int numRows) throws IOException {
+ Connection conn = util.getConnection();
+ // #0- insert some data to a table
+ Table t1 = conn.getTable(table);
+ util.loadRandomRows(t1, new byte[] { 'f' }, 100, numRows);
+ // flush table
+ conn.getAdmin().flush(TableName.valueOf(table.getName()));
+ }
+
+ private String backup(BackupRequest request, BackupAdmin client,
List<String> backupIds)
+ throws IOException {
+ String backupId = client.backupTables(request);
+ assertTrue(checkSucceeded(backupId));
+ verifyBackupExists(backupId);
+ backupIds.add(backupId);
+ return backupId;
+ }
+
+ private void restore(RestoreRequest request, BackupAdmin client) throws
IOException {
+ client.restore(request);
+ }
+
+ private void merge(String[] backupIds, BackupAdmin client) throws
IOException {
+ client.mergeBackups(backupIds);
+ }
+
+ private void delete(String[] backupIds, BackupAdmin client) throws
IOException {
+ client.deleteBackups(backupIds);
+ }
+
+ /**
+ * Verifies a snapshot's "data.manifest" file exists after a full backup has
been performed for a
+ * table. The "data.manifest" file's path will look like the following:
+ * .../backupRootDir/backup_1760572298945/default/TABLE_NAME/.hbase-snapshot/
+ * snapshot_1760572306407_default_TABLE_NAME/data.manifest
+ */
+ private void verifySnapshotExists(TableName tableName, String backupId)
throws IOException {
+ RemoteIterator<LocatedFileStatus> fileStatusIterator =
+ fs.listFiles(new Path(backupRootDir, backupId), true);
+ Path dataManifestPath = null;
+ while (fileStatusIterator.hasNext()) {
+ LocatedFileStatus fileStatus = fileStatusIterator.next();
+ if (fileStatus.getPath().getName().endsWith("data.manifest")) {
+ dataManifestPath = fileStatus.getPath();
+ LOG.info("Found snapshot manifest for table '{}' at: {}", tableName,
dataManifestPath);
+ }
+ }
+
+ if (dataManifestPath == null) {
+ fail("Could not find snapshot data manifest for table '" + tableName +
"'");
+ }
+ }
+
+ /** Verifies the .../backupWALDir/WALs directory exists and returns its Path
*/
+ private Path verifyWALsDirectoryExists() throws IOException {
+ String backupWALDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
+ Path backupWALs = new Path(backupWALDir, "WALs");
+ assertTrue(
+ "There should be a WALs directory inside of the backup WAL directory at:
" + backupWALDir,
+ fs.exists(backupWALs));
+ return backupWALs;
+ }
+
+ /**
+ * Waits for a WAL partition directory to exist inside the backup WAL
directory. The directory
+ * should be something like: .../backupWALDir/WALs/2025-10-17. The
directory's existence is either
+ * eventually asserted, or an assertion error is thrown if it does not exist
past the wait
+ * deadline. This verification is to be used for full backups with
continuous backup enabled.
+ * @param backupWALs The directory that should contain the partition
directory. i.e.
+ * .../backupWALDir/WALs
+ * @return The Path to the WAL partition directory
+ */
+ private Path verifyWALPartitionDirExists(Path backupWALs)
+ throws IOException, InterruptedException {
+ long currentTimeMs = System.currentTimeMillis();
+ String currentDateUTC = BackupUtils.formatToDateString(currentTimeMs);
+ Path walPartitionDir = new Path(backupWALs, currentDateUTC);
+ int waitTimeSec = 30;
+ while (true) {
+ try {
+ assertTrue("A backup WALs subdirectory with today's date should exist:
" + walPartitionDir,
+ fs.exists(walPartitionDir));
+ // The directory exists - stop waiting
+ break;
+ } catch (AssertionError e) {
+ // Reach here when the directory currently does not exist
+ if ((System.currentTimeMillis() - currentTimeMs) >= waitTimeSec *
1000) {
+ throw new AssertionError(e);
+ }
+ LOG.info("Waiting up to {} seconds for WAL partition directory to
exist: {}", waitTimeSec,
+ walPartitionDir);
+ Thread.sleep(1000);
+ }
+ }
+ return walPartitionDir;
+ }
+
+ /**
+ * Verifies the WAL partition directory contains a backup WAL file. The WAL
file's path will look
+ * something like the following:
+ *
.../backupWALDir/WALs/2025-10-17/wal_file.1760738249595.1880be89-0b69-4bad-8d0e-acbf25c63b7e
+ * @param walPartitionDir The date directory for a backup WAL i.e.
+ * .../backupWALDir/WALs/2025-10-17
+ */
+ private void verifyBackupWALFiles(Path walPartitionDir) throws IOException {
+ FileStatus[] fileStatuses = fs.listStatus(walPartitionDir);
+ for (FileStatus fileStatus : fileStatuses) {
+ String walFileName = fileStatus.getPath().getName();
+ String[] splitName = walFileName.split("\\.");
+ assertEquals("The WAL partition directory should only have files that
start with 'wal_file'",
+ "wal_file", splitName[0]);
+ assertEquals(
+ "The timestamp in the WAL file's name should match the "
+ + "date for the WAL partition directory",
+ walPartitionDir.getName(),
BackupUtils.formatToDateString(Long.parseLong(splitName[1])));
+ }
+ }
+
+ /**
+ * Checks if all timestamps in a map with ServerName and timestamp
key-values pairs are past the
+ * provided timestamp threshold.
+ * @param timestamps Map with ServerName and timestamp key-value pairs
+ * @param threshold Timestamp to check all timestamp values in the map
against
+ * @return True if all timestamps values in the map have met or are path the
timestamp threshold;
+ * False otherwise
+ */
+ private boolean areAllTimestampsPastThreshold(Map<ServerName, Long>
timestamps, long threshold,
+ TableName tableName) {
+ boolean haveAllTimestampsReachedThreshold = true;
+ LOG.info("Checking if each region server in the replication check point "
+ + "has caught up to the latest Put on {}", tableName.getNameAsString());
+ for (Map.Entry<ServerName, Long> entry : timestamps.entrySet()) {
+ LOG.debug("host={}, checkpoint timestamp={}, latest put timestamp={},
caught up={}",
+ entry.getKey(), entry.getValue(), threshold, entry.getValue() >=
threshold);
+ if (entry.getValue() < threshold) {
+ // Not returning right away so all hosts and timestamps are logged
+ haveAllTimestampsReachedThreshold = false;
+ }
+ }
+ return haveAllTimestampsReachedThreshold;
+ }
+
+ /**
+ * Waits for the replication checkpoint timestamp of each region server to
meet or pass the
+ * timestamp of the latest Put operation on the backed-up table.
+ * @param conn Minicluster connection
+ * @param latestPutTimestamp Timestamp of the latest Put operation on the
backed-up table
+ */
+ private void waitForCheckpointTimestampsToUpdate(Connection conn, long
latestPutTimestamp,
+ TableName tableName) throws IOException, InterruptedException {
+ BackupSystemTable backupSystemTable = new BackupSystemTable(conn);
+ Map<ServerName, Long> checkpointTimestamps =
backupSystemTable.getBackupCheckpointTimestamps();
+ int i = 0;
+ int sleepTimeSec = 10;
+ int waitThresholdMs = 15 * 60 * 1000;
+ long waitStartTime = System.currentTimeMillis();
+ while (!areAllTimestampsPastThreshold(checkpointTimestamps,
latestPutTimestamp, tableName)) {
+ if ((System.currentTimeMillis() - waitStartTime) >= waitThresholdMs) {
+ throw new RuntimeException("Timed out waiting for the replication
checkpoint timestamp of "
+ + "each region server to catch up with timestamp of the latest Put
on "
+ + tableName.getNameAsString());
+ }
+ LOG.info(
+ "Waiting {} seconds for the replication checkpoint timestamp for each
region server "
+ + "to catch up with the timestamp of the latest Put on {}",
+ sleepTimeSec, tableName.getNameAsString());
+ Thread.sleep(sleepTimeSec * 1000);
+ checkpointTimestamps = backupSystemTable.getBackupCheckpointTimestamps();
+ i++;
+ }
+ LOG.info("Done waiting. Total wait time: {} seconds", i * sleepTimeSec);
+ }
+
+ /**
+ * Scans the backed-up table and returns the timestamp (ms) of the latest
Put operation on the
+ * table.
+ * @param table The backed-up table to scan
+ * @return Timestamp of the most recent Put on the backed-up table
+ */
+ private long getLatestPutTimestamp(Table table) throws IOException {
+ Scan scan = new Scan();
+ ResultScanner resultScanner = table.getScanner(scan);
+ long timestamp = 0;
+ for (Result result : resultScanner) {
+ for (Cell cell : result.rawCells()) {
+ if (cell.getTimestamp() > timestamp) {
+ timestamp = cell.getTimestamp();
+ }
+ }
+ }
+ return timestamp;
+ }
+
+ /**
+ * Restores a table using the provided backup ID and ensure the table has
the correct row count
+ * after
+ */
+ private void restoreTableAndVerifyRowCount(Connection conn, BackupAdmin
client, TableName table,
+ String backupId, long expectedRows) throws IOException {
+ TableName[] tablesRestoreIncMultiple = new TableName[] { table };
+ restore(
+ createRestoreRequest(backupRootDir, backupId, false,
tablesRestoreIncMultiple, null, true),
+ client);
+ Table hTable = conn.getTable(table);
+ Assert.assertEquals(expectedRows, HBaseTestingUtil.countRows(hTable));
+ hTable.close();
+ }
+
+ /**
+ * Uses the list of all backup IDs to return a sublist of incremental backup
IDs. This method
+ * assumes the first backup in the list is a full backup, followed by
incremental backups.
+ */
+ private String[] getAllIncrementalBackupIds(List<String> backupIds) {
+ int size = backupIds.size();
+ backupIds = backupIds.subList(1, size);
+ String[] arr = new String[size - 1];
+ backupIds.toArray(arr);
+ return arr;
+ }
+
+ /** Returns status of backup */
+ protected boolean checkSucceeded(String backupId) throws IOException {
+ BackupInfo status = getBackupInfo(backupId);
+ if (status == null) {
+ return false;
+ }
+ return status.getState() == BackupInfo.BackupState.COMPLETE;
+ }
+
+ private BackupInfo getBackupInfo(String backupId) throws IOException {
+ try (BackupSystemTable table = new
BackupSystemTable(util.getConnection())) {
+ return table.readBackupInfo(backupId);
+ }
+ }
+
+ /**
+ * Get restore request.
+ * @param backupId backup ID
+ * @param check check the backup
+ * @param fromTables table names to restore from
+ * @param toTables new table names to restore to
+ * @param isOverwrite overwrite the table(s)
+ * @return an instance of RestoreRequest
+ */
+ public RestoreRequest createRestoreRequest(String backupRootDir, String
backupId, boolean check,
+ TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
+ RestoreRequest.Builder builder = new RestoreRequest.Builder();
+ return
builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check)
+
.withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build();
+ }
+
+ /**
+ * Iterates through the list of backups and verifies the full backup and
latest incremental backup
+ * still exist, while also verifying all other backups no longer exist. This
method is meant to be
+ * run after all incremental backups have been merged.
+ */
+ private void verifyBackupExistenceAfterMerge(List<String> backupIds) throws
IOException {
+ String fullBackupId = backupIds.get(0);
+ String mostRecentIncrementalBackup = backupIds.get(backupIds.size() - 1);
+ for (String backupId : backupIds) {
+ if (backupId.equals(fullBackupId) ||
backupId.equals(mostRecentIncrementalBackup)) {
+ verifyBackupExists(backupId);
+ } else {
+ verifyBackupDoesNotExist(backupId);
+ }
+ }
+ }
+
+ private void removeNonexistentBackups(List<String> backupIds) throws
IOException {
+ List<String> backupsToRemove = new ArrayList<>();
+ for (String backupId : backupIds) {
+ if (!fs.exists(new Path(backupRootDir, backupId))) {
+ backupsToRemove.add(backupId);
+ }
+ }
+ for (String backupId : backupsToRemove) {
+ LOG.info("Removing {} from list of backup IDs since it no longer
exists", backupId);
+ backupIds.remove(backupId);
+ }
+ }
+
+ /**
+ * Performs the delete command for the most recently taken incremental
backup, and also removes
+ * this backup from the list of backup IDs.
+ */
+ private void deleteMostRecentIncrementalBackup(List<String> backupIds,
BackupAdmin client)
+ throws IOException {
+ String incrementalBackupId = backupIds.get(backupIds.size() - 1);
+ LOG.info("Deleting the most recently created incremental backup: {}",
incrementalBackupId);
+ verifyBackupExists(incrementalBackupId);
+ delete(new String[] { incrementalBackupId }, client);
+ backupIds.remove(backupIds.size() - 1);
+ verifyBackupDoesNotExist(incrementalBackupId);
+ }
+
+ /**
+ * Verifies all backups in the list of backup IDs actually exist on the
filesystem.
+ */
+ private void verifyAllBackupsExist(List<String> backupIds) throws
IOException {
+ for (String backupId : backupIds) {
+ verifyBackupExists(backupId);
+ }
+ }
+
+ /**
+ * Verifies zero backups in the list of backup IDs exist on the filesystem.
+ */
+ private void verifyNoBackupsExist(List<String> backupIds) throws IOException
{
+ for (String backupId : backupIds) {
+ verifyBackupDoesNotExist(backupId);
+ }
+ }
+
+ private void verifyBackupExists(String backupId) throws IOException {
+ assertTrue("Backup " + backupId + " should exist inside of " +
backupRootDir,
+ fs.exists(new Path(backupRootDir, backupId)));
+ }
+
+ private void verifyBackupDoesNotExist(String backupId) throws IOException {
+ assertFalse("Backup " + backupId + " should not exist inside of " +
backupRootDir,
+ fs.exists(new Path(backupRootDir, backupId)));
+ }
+
+ @Override
+ public void setUpCluster() throws Exception {
+ util = getTestingUtil(getConf());
+ conf = getConf();
+ BackupTestUtil.enableBackup(conf);
+ LOG.debug("Initializing/checking cluster has {} servers",
regionServerCount);
+ util.initializeCluster(regionServerCount);
+ LOG.debug("Done initializing/checking cluster");
+ }
+
+ @Override
+ public TableName getTablename() {
+ // That is only valid when Monkey is CALM (no monkey)
+ return null;
+ }
+
+ @Override
+ protected Set<String> getColumnFamilies() {
+ // That is only valid when Monkey is CALM (no monkey)
+ return null;
+ }
+
+ @Override
+ protected void addOptions() {
+ addOptWithArg(REGIONSERVER_COUNT_KEY,
+ "Total number of region servers. Default: '" +
DEFAULT_REGIONSERVER_COUNT + "'");
+ addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " +
DEFAULT_REGION_COUNT);
+ addOptWithArg(ROWS_PER_ITERATION_KEY,
+ "Total number of data rows to be loaded during one iteration." + "
Default: "
+ + DEFAULT_ROWS_IN_ITERATION);
+ addOptWithArg(NUM_ITERATIONS_KEY,
+ "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS);
+ addOptWithArg(NUMBER_OF_TABLES_KEY,
+ "Total number of tables in the test." + " Default: " +
DEFAULT_NUMBER_OF_TABLES);
+ addOptWithArg(SLEEP_TIME_KEY, "Sleep time of chaos monkey in ms "
+ + "to restart random region server. Default: " + SLEEP_TIME_DEFAULT);
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ super.processOptions(cmd);
+ regionsCountPerServer = Integer
+ .parseInt(cmd.getOptionValue(REGION_COUNT_KEY,
Integer.toString(DEFAULT_REGION_COUNT)));
+ regionServerCount = Integer.parseInt(
+ cmd.getOptionValue(REGIONSERVER_COUNT_KEY,
Integer.toString(DEFAULT_REGIONSERVER_COUNT)));
+ rowsInIteration = Integer.parseInt(
+ cmd.getOptionValue(ROWS_PER_ITERATION_KEY,
Integer.toString(DEFAULT_ROWS_IN_ITERATION)));
+ numIterations = Integer
+ .parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY,
Integer.toString(DEFAULT_NUM_ITERATIONS)));
+ numTables = Integer.parseInt(
+ cmd.getOptionValue(NUMBER_OF_TABLES_KEY,
Integer.toString(DEFAULT_NUMBER_OF_TABLES)));
+ sleepTime =
+ Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY,
Long.toString(SLEEP_TIME_DEFAULT)));
+
+ LOG.info(MoreObjects.toStringHelper("Parsed Options")
+ .add(REGION_COUNT_KEY,
regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount)
+ .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY,
numIterations)
+ .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY,
sleepTime).toString());
+ }
+
+}
diff --git
a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java
new file mode 100644
index 00000000000..61ce3eea2b1
--- /dev/null
+++
b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY;
+import static
org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_MISSING_FILES;
+import static
org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An integration test to detect regressions in HBASE-28957. Create a table
with many regions, load
+ * data, perform series backup/load operations with continuous backup enabled,
then restore and
+ * verify data.
+ * @see <a
href="https://issues.apache.org/jira/browse/HBASE-28957">HBASE-28957</a>
+ */
+@Category(IntegrationTests.class)
+public class IntegrationTestContinuousBackupRestore extends
IntegrationTestBackupRestoreBase {
+ private static final String CLASS_NAME =
+ IntegrationTestContinuousBackupRestore.class.getSimpleName();
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(IntegrationTestContinuousBackupRestore.class);
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ util = new IntegrationTestingUtility();
+ conf = util.getConfiguration();
+ BackupTestUtil.enableBackup(conf);
+ conf.set(CONF_BACKUP_MAX_WAL_SIZE, "10240");
+ conf.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10");
+ conf.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10");
+ conf.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
+ conf.setBoolean(IGNORE_EMPTY_FILES, true);
+ conf.setBoolean(IGNORE_MISSING_FILES, true);
+
+ LOG.info("Initializing cluster with {} region server(s)",
regionServerCount);
+ util.initializeCluster(regionServerCount);
+ LOG.info("Cluster initialized and ready");
+
+ backupRootDir = util.getDataTestDirOnTestFS() + Path.SEPARATOR +
DEFAULT_BACKUP_ROOT_DIR;
+ LOG.info("The backup root directory is: {}", backupRootDir);
+ createAndSetBackupWalDir();
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void testContinuousBackupRestore() throws Exception {
+ LOG.info("Running backup and restore integration test with continuous
backup enabled");
+ createTables(CLASS_NAME);
+ runTestMulti(true);
+ }
+
+ /** Returns status of CLI execution */
+ @Override
+ public int runTestFromCommandLine() throws Exception {
+ // Check if backup is enabled
+ if (!BackupManager.isBackupEnabled(getConf())) {
+ System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
+ return -1;
+ }
+ System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
+ testContinuousBackupRestore();
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ int status = ToolRunner.run(conf, new
IntegrationTestContinuousBackupRestore(), args);
+ System.exit(status);
+ }
+}