kgeisz commented on code in PR #7417: URL: https://github.com/apache/hbase/pull/7417#discussion_r2476125740
########## hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java: ########## @@ -0,0 +1,657 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.IntegrationTestBase; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.util.BackupUtils; +import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction; +import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; +import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; +import org.apache.hadoop.hbase.chaos.policies.Policy; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; + +/** + * An abstract base class that is used to run backup, restore, and delete integration tests. This + * class performs both full backups and incremental backups. Both continuous backup and + * non-continuous backup test cases are supported. The number of incremental backups performed + * depends on the number of iterations defined by the user. The class performs the backup/restore in + * a separate thread, where one thread is created per table. The number of tables is user-defined, + * along with other various configurations. + */ +public abstract class IntegrationTestBackupRestoreBase extends IntegrationTestBase { + protected static final Logger LOG = + LoggerFactory.getLogger(IntegrationTestBackupRestoreBase.class); + protected static final String NUMBER_OF_TABLES_KEY = "num_tables"; + protected static final String COLUMN_NAME = "f"; + protected static final String REGION_COUNT_KEY = "regions_per_rs"; + protected static final String REGIONSERVER_COUNT_KEY = "region_servers"; + protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration"; + protected static final String NUM_ITERATIONS_KEY = "num_iterations"; + protected static final int DEFAULT_REGION_COUNT = 10; + protected static final int DEFAULT_REGIONSERVER_COUNT = 5; + protected static final int DEFAULT_NUMBER_OF_TABLES = 1; + protected static final int DEFAULT_NUM_ITERATIONS = 10; + protected static final int DEFAULT_ROWS_IN_ITERATION = 10000; + protected static final String SLEEP_TIME_KEY = "sleeptime"; + // short default interval because tests don't run very long. + protected static final long SLEEP_TIME_DEFAULT = 50000L; + + protected static int rowsInIteration; + protected static int regionsCountPerServer; + protected static int regionServerCount; + + protected static int numIterations; + protected static int numTables; + protected static TableName[] tableNames; + protected long sleepTime; + protected static Object lock = new Object(); + + protected FileSystem fs; + protected String backupRootDir = "backupRootDir"; + + /* + * This class is used to run the backup and restore thread(s). Throwing an exception in this + * thread will not cause the test to fail, so the purpose of this class is to both kick off the + * backup and restore, as well as record any exceptions that occur so they can be thrown in the + * main thread. + */ + protected class BackupAndRestoreThread implements Runnable { + private final TableName table; + private final boolean isContinuousBackupEnabled; + private Throwable throwable; + + public BackupAndRestoreThread(TableName table, boolean isContinuousBackupEnabled) { + this.table = table; + this.isContinuousBackupEnabled = isContinuousBackupEnabled; + this.throwable = null; + } + + public Throwable getThrowable() { + return this.throwable; + } + + @Override + public void run() { + try { + LOG.info("Running backup and restore test for {} in thread {}", this.table, + Thread.currentThread()); + runTestSingle(this.table, isContinuousBackupEnabled); + } catch (Throwable t) { + LOG.error( + "An error occurred in thread {} when performing a backup and restore with table {}: ", + Thread.currentThread().getName(), this.table.getNameAsString(), t); + this.throwable = t; + } + } + } + + @After + public void tearDown() throws IOException { + LOG.info("Cleaning up after test."); + if (util.isDistributedCluster()) { + deleteTablesIfAny(); + LOG.info("Cleaning up after test. Deleted tables"); + cleanUpBackupDir(); + } + LOG.info("Restoring cluster."); + util.restoreCluster(); + LOG.info("Cluster restored."); + } + + @Override + public void setUpMonkey() throws Exception { + Policy p = + new PeriodicRandomActionPolicy(sleepTime, new RestartRandomRsExceptMetaAction(sleepTime)); + this.monkey = new PolicyBasedChaosMonkey(util, p); + startMonkey(); + } + + private void deleteTablesIfAny() throws IOException { + for (TableName table : tableNames) { + util.deleteTableIfAny(table); + } + } + + protected void createTables(String tableBaseName) throws Exception { + tableNames = new TableName[numTables]; + for (int i = 0; i < numTables; i++) { + tableNames[i] = TableName.valueOf(tableBaseName + ".table." + i); + } + for (TableName table : tableNames) { + createTable(table); + } + } + + /** + * Creates a directory specified by backupWALDir and sets this directory to + * CONF_CONTINUOUS_BACKUP_WAL_DIR in the configuration. + */ + protected void createAndSetBackupWalDir() throws IOException { + Path root = util.getDataTestDirOnTestFS(); + Path backupWalDir = new Path(root, "backupWALDir"); + FileSystem fs = FileSystem.get(conf); + fs.mkdirs(backupWalDir); + conf.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); + LOG.info( + "The continuous backup WAL directory has been created and set in the configuration to: {}", + backupWalDir); + } + + private void cleanUpBackupDir() throws IOException { + FileSystem fs = FileSystem.get(util.getConfiguration()); + fs.delete(new Path(backupRootDir), true); + } + + /** + * This is the main driver method used by tests that extend this abstract base class. This method + * starts one backup and restore thread per table. + * @param isContinuousBackupEnabled Boolean flag used to specify if the backups should have + * continuous backup enabled. + */ + protected void runTestMulti(boolean isContinuousBackupEnabled) { + Thread[] workers = new Thread[numTables]; + BackupAndRestoreThread[] backupAndRestoreThreads = new BackupAndRestoreThread[numTables]; + for (int i = 0; i < numTables; i++) { + final TableName table = tableNames[i]; + BackupAndRestoreThread backupAndRestoreThread = + new BackupAndRestoreThread(table, isContinuousBackupEnabled); + backupAndRestoreThreads[i] = backupAndRestoreThread; + workers[i] = new Thread(backupAndRestoreThread); + workers[i].start(); + } + // Wait for all workers to finish and check for errors + Throwable error = null; + Throwable threadThrowable; + for (int i = 0; i < numTables; i++) { + Uninterruptibles.joinUninterruptibly(workers[i]); + threadThrowable = backupAndRestoreThreads[i].getThrowable(); + if (threadThrowable == null) { + continue; + } + if (error == null) { + error = threadThrowable; + } else { + error.addSuppressed(threadThrowable); + } + } + // Throw any found errors after all threads have completed + if (error != null) { + throw new AssertionError("An error occurred in a backup and restore thread", error); + } + LOG.info("IT backup & restore finished"); + } + + /** + * This method is what performs the actual backup, restore, merge, and delete operations. This + * method is run in a separate thread. It first performs a full backup. After, it iteratively + * performs a series of incremental backups and restores. Later, it deletes the backups. + * @param table The table the backups are performed on + * @param isContinuousBackupEnabled Boolean flag used to indicate if the backups should have + * continuous backup enabled. + */ + private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) + throws IOException, InterruptedException { + String enabledOrDisabled = isContinuousBackupEnabled ? "enabled" : "disabled"; + List<String> backupIds = new ArrayList<>(); + + try (Connection conn = util.getConnection(); BackupAdmin client = new BackupAdminImpl(conn)) { + loadData(table, rowsInIteration); + + // First create a full backup for the table + LOG.info("Creating full backup image for {} with continuous backup {}", table, + enabledOrDisabled); + List<TableName> tables = Lists.newArrayList(table); + BackupRequest.Builder builder = new BackupRequest.Builder(); + BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables) + .withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled) + .build(); + + String fullBackupId = backup(request, client, backupIds); + LOG.info("Created full backup with ID: {}", fullBackupId); + + verifySnapshotExists(table, fullBackupId); + + // Run full backup verifications specific to continuous backup + if (isContinuousBackupEnabled) { + BackupTestUtil.verifyReplicationPeerSubscription(util, table); + Path backupWALs = verifyWALsDirectoryExists(); + Path walPartitionDir = verifyWALPartitionDirExists(backupWALs); + verifyBackupWALFiles(walPartitionDir); + } + + // Now continue with incremental backups + String incrementalBackupId; + for (int count = 1; count <= numIterations; count++) { + LOG.info("{} - Starting incremental backup iteration {} of {} for {}", + Thread.currentThread().getName(), count, numIterations, table); + loadData(table, rowsInIteration); + + // Do incremental backup + LOG.info("Creating incremental backup number {} with continuous backup {} for {}", count, + enabledOrDisabled, table); + builder = new BackupRequest.Builder(); + request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) + .withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled) + .build(); + incrementalBackupId = backup(request, client, backupIds); + LOG.info("Created incremental backup with ID: {}", incrementalBackupId); + + // Restore table using backup taken "two backups ago" + // On the first iteration, this backup will be the full backup + String previousBackupId = backupIds.get(backupIds.size() - 2); + if (previousBackupId.equals(fullBackupId)) { + LOG.info("Restoring {} using original full backup with ID: {}", table, previousBackupId); + } else { + LOG.info("Restoring {} using second most recent incremental backup with ID: {}", table, + previousBackupId); + } + restoreTableAndVerifyRowCount(conn, client, table, previousBackupId, + (long) rowsInIteration * count); + + // Restore table using the most recently created incremental backup + LOG.info("Restoring {} using most recent incremental backup with ID: {}", table, + incrementalBackupId); + restoreTableAndVerifyRowCount(conn, client, table, incrementalBackupId, + (long) rowsInIteration * (count + 1)); + LOG.info("{} - Finished incremental backup iteration {} of {} for {}", + Thread.currentThread().getName(), count, numIterations, table); + } + + // Now merge all incremental and restore + String[] incrementalBackupIds = getAllIncrementalBackupIds(backupIds); + merge(incrementalBackupIds, client); + verifyBackupExistenceAfterMerge(backupIds); + removeNonexistentBackups(backupIds); + // Restore the last incremental backup + incrementalBackupId = incrementalBackupIds[incrementalBackupIds.length - 1]; + // restore incremental backup for table, with overwrite + TableName[] tablesToRestoreFrom = new TableName[] { table }; + restore(createRestoreRequest(incrementalBackupId, false, tablesToRestoreFrom, null, true), + client); + Table hTable = conn.getTable(table); + Assert.assertEquals(rowsInIteration * (numIterations + 1), + HBaseTestingUtil.countRows(hTable)); + hTable.close(); + + // Create another incremental backup to show it can be deleted on its own + backup(request, client, backupIds); + deleteMostRecentIncrementalBackup(backupIds, client); + // The full backup and the second most recent incremental backup should still exist + assertEquals(2, backupIds.size()); + verifyAllBackupsExist(backupIds); + // Delete the full backup, which should also automatically delete any incremental backups that + // depend on it + LOG.info("Deleting full backup: {}. This will also delete any remaining incremental backups", + fullBackupId); + delete(new String[] { fullBackupId }, client); + verifyNoBackupsExist(backupIds); Review Comment: Yes correct. The original test wasn't performing any deletes, so I thought this would be good to add. We need to cover deletes for the continuous backup test case anyway, so I figured I could add some deletes here since they can be done for both continuous and non-continuous backups. Also, deleting a full backup also deletes the incremental backups that depend on it. I wanted to show this can be done, along with showing I can delete an incremental backup on its own. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
