Repository: hbase Updated Branches: refs/heads/master 8ab7b20f4 -> 91075276e
HBASE-19441: Implement retry logic around starting exclusive backup operation Signed-off-by: tedyu <yuzhih...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/91075276 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/91075276 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/91075276 Branch: refs/heads/master Commit: 91075276e7638f64e3e213358edd37198f540e1b Parents: 8ab7b20 Author: Vladimir Rodionov <vrodio...@hortonworks.com> Authored: Tue Mar 20 16:00:16 2018 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Wed Mar 21 09:34:45 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/backup/impl/BackupManager.java | 86 +++++++---- .../hbase/backup/impl/BackupSystemTable.java | 145 +++++++++---------- .../impl/ExclusiveOperationException.java | 33 +++++ .../hadoop/hbase/backup/TestBackupManager.java | 137 ++++++++++++++++++ 4 files changed, 296 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/91075276/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java index f09d6d0..8bebc91 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java @@ -1,4 +1,5 @@ /** + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.backup.impl; import java.io.Closeable; @@ -47,18 +47,22 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.procedure.ProcedureManagerHost; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - /** - * Handles backup requests, creates backup info records in backup system table to - * keep track of backup sessions, dispatches backup request. + * Handles backup requests, creates backup info records in backup system table to keep track of + * backup sessions, dispatches backup request. */ @InterfaceAudience.Private public class BackupManager implements Closeable { + // in seconds + public final static String BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY = + "hbase.backup.exclusive.op.timeout.seconds"; + // In seconds + private final static int DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT = 3600; private static final Logger LOG = LoggerFactory.getLogger(BackupManager.class); protected Configuration conf = null; @@ -112,8 +116,8 @@ public class BackupManager implements Closeable { if (classes == null) { conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, masterProcedureClass); } else if (!classes.contains(masterProcedureClass)) { - conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, classes + "," - + masterProcedureClass); + conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, + classes + "," + masterProcedureClass); } if (LOG.isDebugEnabled()) { @@ -138,16 +142,16 @@ public class BackupManager implements Closeable { if (classes == null) { conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, regionProcedureClass); } else if (!classes.contains(regionProcedureClass)) { - conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, classes + "," - + regionProcedureClass); + conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, + classes + "," + regionProcedureClass); } String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY); String regionObserverClass = BackupObserver.class.getName(); - conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") + - regionObserverClass); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + (coproc == null ? "" : coproc + ",") + regionObserverClass); if (LOG.isDebugEnabled()) { - LOG.debug("Added region procedure manager: " + regionProcedureClass + - ". Added region observer: " + regionObserverClass); + LOG.debug("Added region procedure manager: " + regionProcedureClass + + ". Added region observer: " + regionObserverClass); } } @@ -223,9 +227,8 @@ public class BackupManager implements Closeable { } // there are one or more tables in the table list - backupInfo = - new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]), - targetRootDir); + backupInfo = new BackupInfo(backupId, type, tableList.toArray(new TableName[tableList.size()]), + targetRootDir); backupInfo.setBandwidth(bandwidth); backupInfo.setWorkers(workers); return backupInfo; @@ -254,7 +257,7 @@ public class BackupManager implements Closeable { String ongoingBackupId = this.getOngoingBackupId(); if (ongoingBackupId != null) { LOG.info("There is a ongoing backup " + ongoingBackupId - + ". Can not launch new backup until no ongoing backup remains."); + + ". Can not launch new backup until no ongoing backup remains."); throw new BackupException("There is ongoing backup."); } } @@ -269,7 +272,7 @@ public class BackupManager implements Closeable { * @return The ancestors for the current backup * @throws IOException exception */ - public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo) throws IOException { + public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo) throws IOException { LOG.debug("Getting the direct ancestors of the current backup " + backupInfo.getBackupId()); ArrayList<BackupImage> ancestors = new ArrayList<>(); @@ -286,10 +289,9 @@ public class BackupManager implements Closeable { BackupImage.Builder builder = BackupImage.newBuilder(); - BackupImage image = - builder.withBackupId(backup.getBackupId()).withType(backup.getType()) - .withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames()) - .withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build(); + BackupImage image = builder.withBackupId(backup.getBackupId()).withType(backup.getType()) + .withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames()) + .withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build(); // add the full backup image as an ancestor until the last incremental backup if (backup.getType().equals(BackupType.FULL)) { @@ -319,9 +321,9 @@ public class BackupManager implements Closeable { BackupImage lastIncrImage = lastIncrImgManifest.getBackupImage(); ancestors.add(lastIncrImage); - LOG.debug("Last dependent incremental backup image: " + "{BackupID=" - + lastIncrImage.getBackupId() + "," + "BackupDir=" + lastIncrImage.getRootDir() - + "}"); + LOG.debug( + "Last dependent incremental backup image: " + "{BackupID=" + lastIncrImage.getBackupId() + + "," + "BackupDir=" + lastIncrImage.getRootDir() + "}"); } } } @@ -369,7 +371,36 @@ public class BackupManager implements Closeable { * @throws IOException if active session already exists */ public void startBackupSession() throws IOException { - systemTable.startBackupExclusiveOperation(); + long startTime = System.currentTimeMillis(); + long timeout = conf.getInt(BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY, + DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT) * 1000L; + long lastWarningOutputTime = 0; + while (System.currentTimeMillis() - startTime < timeout) { + try { + systemTable.startBackupExclusiveOperation(); + return; + } catch (IOException e) { + if (e instanceof ExclusiveOperationException) { + // sleep, then repeat + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + // Restore the interrupted status + Thread.currentThread().interrupt(); + } + if (lastWarningOutputTime == 0 + || (System.currentTimeMillis() - lastWarningOutputTime) > 60000) { + lastWarningOutputTime = System.currentTimeMillis(); + LOG.warn("Waiting to acquire backup exclusive lock for " + + (lastWarningOutputTime - startTime) / 1000 + "s"); + } + } else { + throw e; + } + } + } + throw new IOException( + "Failed to acquire backup system table exclusive lock after " + timeout / 1000 + "s"); } /** @@ -410,7 +441,7 @@ public class BackupManager implements Closeable { } public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> - readBulkloadRows(List<TableName> tableList) throws IOException { + readBulkloadRows(List<TableName> tableList) throws IOException { return systemTable.readBulkloadRows(tableList); } @@ -483,7 +514,6 @@ public class BackupManager implements Closeable { /** * Get WAL files iterator. - * * @return WAL files iterator from backup system table * @throws IOException if getting the WAL files iterator fails */ http://git-wip-us.apache.org/repos/asf/hbase/blob/91075276/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java index 4a860d9..5e174eb 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java @@ -1,4 +1,5 @@ /** + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -64,6 +65,8 @@ import org.apache.hadoop.hbase.client.SnapshotDescription; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -71,26 +74,25 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; - /** * This class provides API to access backup system table<br> - * * Backup system table schema:<br> - * <p><ul> + * <p> + * <ul> * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized BackupInfo</li> * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = startcode</li> * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; value=[list of tables]</li> - * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; - * value = map[RS-> last WAL timestamp]</li> + * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value = map[RS-> last WAL + * timestamp]</li> * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp</li> - * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; - * value = backupId and full WAL file name</li> - * </ul></p> + * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file + * name</li> + * </ul> + * </p> */ @InterfaceAudience.Private public final class BackupSystemTable implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(BackupSystemTable.class); static class WALItem { @@ -128,10 +130,9 @@ public final class BackupSystemTable implements Closeable { private TableName tableName; /** - * Backup System table name for bulk loaded files. - * We keep all bulk loaded file references in a separate table - * because we have to isolate general backup operations: create, merge etc - * from activity of RegionObserver, which controls process of a bulk loading + * Backup System table name for bulk loaded files. We keep all bulk loaded file references in a + * separate table because we have to isolate general backup operations: create, merge etc from + * activity of RegionObserver, which controls process of a bulk loading * {@link org.apache.hadoop.hbase.backup.BackupObserver} */ private TableName bulkLoadTableName; @@ -198,13 +199,11 @@ public final class BackupSystemTable implements Closeable { verifyNamespaceExists(admin); Configuration conf = connection.getConfiguration(); if (!admin.tableExists(tableName)) { - TableDescriptor backupHTD = - BackupSystemTable.getSystemTableDescriptor(conf); + TableDescriptor backupHTD = BackupSystemTable.getSystemTableDescriptor(conf); admin.createTable(backupHTD); } if (!admin.tableExists(bulkLoadTableName)) { - TableDescriptor blHTD = - BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); + TableDescriptor blHTD = BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf); admin.createTable(blHTD); } waitForSystemTable(admin, tableName); @@ -237,11 +236,11 @@ public final class BackupSystemTable implements Closeable { } catch (InterruptedException e) { } if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) { - throw new IOException("Failed to create backup system table "+ - tableName +" after " + TIMEOUT + "ms"); + throw new IOException( + "Failed to create backup system table " + tableName + " after " + TIMEOUT + "ms"); } } - LOG.debug("Backup table "+tableName+" exists and available"); + LOG.debug("Backup table " + tableName + " exists and available"); } @Override @@ -257,7 +256,7 @@ public final class BackupSystemTable implements Closeable { public void updateBackupInfo(BackupInfo info) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("update backup status in backup system table for: " + info.getBackupId() - + " set status=" + info.getState()); + + " set status=" + info.getState()); } try (Table table = connection.getTable(tableName)) { Put put = createPutForBackupInfo(info); @@ -344,7 +343,6 @@ public final class BackupSystemTable implements Closeable { } } - /** * Deletes backup status from backup system table table * @param backupId backup id @@ -370,7 +368,7 @@ public final class BackupSystemTable implements Closeable { Map<byte[], List<Path>> finalPaths) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("write bulk load descriptor to backup " + tabName + " with " + finalPaths.size() - + " entries"); + + " entries"); } try (Table table = connection.getTable(bulkLoadTableName)) { List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths); @@ -389,8 +387,8 @@ public final class BackupSystemTable implements Closeable { public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("write bulk load descriptor to backup " + tabName + " with " + pairs.size() - + " entries"); + LOG.debug( + "write bulk load descriptor to backup " + tabName + " with " + pairs.size() + " entries"); } try (Table table = connection.getTable(bulkLoadTableName)) { List<Put> puts = @@ -425,7 +423,8 @@ public final class BackupSystemTable implements Closeable { * whether the hfile was recorded by preCommitStoreFile hook (true) */ public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> - readBulkloadRows(List<TableName> tableList) throws IOException { + readBulkloadRows(List<TableName> tableList) throws IOException { + Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>(); List<byte[]> rows = new ArrayList<>(); for (TableName tTable : tableList) { @@ -504,9 +503,8 @@ public final class BackupSystemTable implements Closeable { byte[] fam = entry.getKey(); List<Path> paths = entry.getValue(); for (Path p : paths) { - Put put = - BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, ts, - cnt++); + Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(), backupId, + ts, cnt++); puts.add(put); } } @@ -580,10 +578,9 @@ public final class BackupSystemTable implements Closeable { } /** - * Exclusive operations are: - * create, delete, merge + * Exclusive operations are: create, delete, merge * @throws IOException if a table operation fails or an active backup exclusive operation is - * already underway + * already underway */ public void startBackupExclusiveOperation() throws IOException { LOG.debug("Start new backup exclusive operation"); @@ -596,7 +593,7 @@ public final class BackupSystemTable implements Closeable { // Row exists, try to put if value == ACTIVE_SESSION_NO if (!table.checkAndMutate(ACTIVE_SESSION_ROW, SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL) .ifEquals(ACTIVE_SESSION_NO).thenPut(put)) { - throw new IOException("There is an active backup exclusive operation"); + throw new ExclusiveOperationException(); } } } @@ -696,8 +693,7 @@ public final class BackupSystemTable implements Closeable { /** * Get first n backup history records - * @param n number of records, if n== -1 - max number - * is ignored + * @param n number of records, if n== -1 - max number is ignored * @return list of records * @throws IOException if getting the backup history fails */ @@ -711,8 +707,7 @@ public final class BackupSystemTable implements Closeable { /** * Get backup history records filtered by list of filters. - * @param n max number of records, if n == -1 , then max number - * is ignored + * @param n max number of records, if n == -1 , then max number is ignored * @param filters list of filters * @return backup records * @throws IOException if getting the backup history fails @@ -917,8 +912,8 @@ public final class BackupSystemTable implements Closeable { Map<String, Long> map) { BackupProtos.TableServerTimestamp.Builder tstBuilder = BackupProtos.TableServerTimestamp.newBuilder(); - tstBuilder.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil - .toProtoTableName(table)); + tstBuilder + .setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table)); for (Entry<String, Long> entry : map.entrySet()) { BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder(); @@ -934,8 +929,9 @@ public final class BackupSystemTable implements Closeable { return tstBuilder.build(); } - private HashMap<String, Long> fromTableServerTimestampProto( - BackupProtos.TableServerTimestamp proto) { + private HashMap<String, Long> + fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) { + HashMap<String, Long> map = new HashMap<>(); List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList(); for (BackupProtos.ServerTimestamp st : list) { @@ -982,7 +978,7 @@ public final class BackupSystemTable implements Closeable { throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Add incremental backup table set to backup system table. ROOT=" + backupRoot - + " tables [" + StringUtils.join(tables, " ") + "]"); + + " tables [" + StringUtils.join(tables, " ") + "]"); } if (LOG.isDebugEnabled()) { tables.forEach(table -> LOG.debug(Objects.toString(table))); @@ -1106,12 +1102,12 @@ public final class BackupSystemTable implements Closeable { /** * Check if WAL file is eligible for deletion using multi-get * @param files names of a file to check - * @return map of results - * (key: FileStatus object. value: true if the file is deletable, false otherwise) + * @return map of results (key: FileStatus object. value: true if the file is deletable, false + * otherwise) * @throws IOException exception */ public Map<FileStatus, Boolean> areWALFilesDeletable(Iterable<FileStatus> files) - throws IOException { + throws IOException { final int BUF_SIZE = 100; Map<FileStatus, Boolean> ret = new HashMap<>(); @@ -1223,8 +1219,8 @@ public final class BackupSystemTable implements Closeable { res.advance(); String[] tables = cellValueToBackupSet(res.current()); - return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)). - collect(Collectors.toList()); + return Arrays.asList(tables).stream().map(item -> TableName.valueOf(item)) + .collect(Collectors.toList()); } finally { if (table != null) { table.close(); @@ -1266,8 +1262,8 @@ public final class BackupSystemTable implements Closeable { */ public void removeFromBackupSet(String name, String[] toRemove) throws IOException { if (LOG.isTraceEnabled()) { - LOG.trace(" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") - + "]"); + LOG.trace( + " Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ") + "]"); } String[] disjoint; String[] tables; @@ -1336,23 +1332,21 @@ public final class BackupSystemTable implements Closeable { colBuilder.setMaxVersions(1); Configuration config = HBaseConfiguration.create(); int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, - BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); + BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); colBuilder.setTimeToLive(ttl); ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); builder.setColumnFamily(colSessionsDesc); - colBuilder = - ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); + colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); colBuilder.setTimeToLive(ttl); builder.setColumnFamily(colBuilder.build()); return builder.build(); } public static TableName getTableName(Configuration conf) { - String name = - conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, - BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); + String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, + BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT); return TableName.valueOf(name); } @@ -1377,12 +1371,11 @@ public final class BackupSystemTable implements Closeable { colBuilder.setMaxVersions(1); Configuration config = HBaseConfiguration.create(); int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY, - BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); + BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT); colBuilder.setTimeToLive(ttl); ColumnFamilyDescriptor colSessionsDesc = colBuilder.build(); builder.setColumnFamily(colSessionsDesc); - colBuilder = - ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); + colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY); colBuilder.setTimeToLive(ttl); builder.setColumnFamily(colBuilder.build()); return builder.build(); @@ -1390,9 +1383,10 @@ public final class BackupSystemTable implements Closeable { public static TableName getTableNameForBulkLoadedData(Configuration conf) { String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY, - BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; + BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk"; return TableName.valueOf(name); } + /** * Creates Put operation for a given backup info object * @param context backup info @@ -1622,16 +1616,15 @@ public final class BackupSystemTable implements Closeable { String file = path.toString(); int lastSlash = file.lastIndexOf("/"); String filename = file.substring(lastSlash + 1); - Put put = - new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, - Bytes.toString(region), BLK_LD_DELIM, filename)); + Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, + Bytes.toString(region), BLK_LD_DELIM, filename)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey()); put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT); puts.add(put); - LOG.debug("writing done bulk path " + file + " for " + table + " " - + Bytes.toString(region)); + LOG.debug( + "writing done bulk path " + file + " for " + table + " " + Bytes.toString(region)); } } return puts; @@ -1658,8 +1651,8 @@ public final class BackupSystemTable implements Closeable { // Snapshot does not exists, i.e completeBackup failed after // deleting backup system table snapshot // In this case we log WARN and proceed - LOG.warn("Could not restore backup system table. Snapshot " + snapshotName - + " does not exists."); + LOG.warn( + "Could not restore backup system table. Snapshot " + snapshotName + " does not exists."); } } } @@ -1695,17 +1688,16 @@ public final class BackupSystemTable implements Closeable { /* * Creates Put's for bulk load resulting from running LoadIncrementalHFiles */ - static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, - final byte[] family, final List<Pair<Path, Path>> pairs) { + static List<Put> createPutForPreparedBulkload(TableName table, byte[] region, final byte[] family, + final List<Pair<Path, Path>> pairs) { List<Put> puts = new ArrayList<>(pairs.size()); for (Pair<Path, Path> pair : pairs) { Path path = pair.getSecond(); String file = path.toString(); int lastSlash = file.lastIndexOf("/"); String filename = file.substring(lastSlash + 1); - Put put = - new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, Bytes.toString(region), - BLK_LD_DELIM, filename)); + Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, + Bytes.toString(region), BLK_LD_DELIM, filename)); put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName()); put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family); put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes()); @@ -1899,9 +1891,8 @@ public final class BackupSystemTable implements Closeable { */ static Scan createScanForBulkLoadedFiles(String backupId) { Scan scan = new Scan(); - byte[] startRow = - backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, backupId - + BLK_LD_DELIM); + byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES + : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM); byte[] stopRow = Arrays.copyOf(startRow, startRow.length); stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1); scan.setStartRow(startRow); @@ -1927,7 +1918,7 @@ public final class BackupSystemTable implements Closeable { * @return put list */ private List<Put> createPutsForAddWALFiles(List<String> files, String backupId, - String backupRoot) { + String backupRoot) { List<Put> puts = new ArrayList<>(files.size()); for (String file : files) { Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file))); @@ -1935,7 +1926,7 @@ public final class BackupSystemTable implements Closeable { Bytes.toBytes(backupId)); put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), Bytes.toBytes(file)); put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"), - Bytes.toBytes(backupRoot)); + Bytes.toBytes(backupRoot)); puts.add(put); } return puts; http://git-wip-us.apache.org/repos/asf/hbase/blob/91075276/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java new file mode 100644 index 0000000..af7fd8b --- /dev/null +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java @@ -0,0 +1,33 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.impl; + +import java.io.IOException; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +@SuppressWarnings("serial") +public class ExclusiveOperationException extends IOException { + + public ExclusiveOperationException() { + super(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/91075276/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java ---------------------------------------------------------------------- diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java new file mode 100644 index 0000000..3e42294 --- /dev/null +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java @@ -0,0 +1,137 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLongArray; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; + +@Category(MediumTests.class) +public class TestBackupManager { + + private static final Logger LOG = LoggerFactory.getLogger(TestBackupManager.class); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBackupManager.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + protected static Configuration conf = UTIL.getConfiguration(); + protected static MiniHBaseCluster cluster; + protected static Connection conn; + protected BackupManager backupManager; + + @BeforeClass + public static void setUp() throws Exception { + conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); + BackupManager.decorateMasterConfiguration(conf); + BackupManager.decorateRegionServerConfiguration(conf); + cluster = UTIL.startMiniCluster(); + conn = UTIL.getConnection(); + } + + @AfterClass + public static void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void before() throws IOException { + backupManager = new BackupManager(conn, conn.getConfiguration()); + } + + @After + public void after() { + backupManager.close(); + } + + AtomicLongArray startTimes = new AtomicLongArray(2); + AtomicLongArray stopTimes = new AtomicLongArray(2); + + @Test + public void testStartBackupExclusiveOperation() { + + long sleepTime = 2000; + Runnable r = new Runnable() { + @Override + public void run() { + try { + backupManager.startBackupSession(); + boolean result = startTimes.compareAndSet(0, 0, System.currentTimeMillis()); + if (!result) { + result = startTimes.compareAndSet(1, 0, System.currentTimeMillis()); + if (!result) { + throw new IOException("PANIC! Unreachable code"); + } + } + Thread.sleep(sleepTime); + result = stopTimes.compareAndSet(0, 0, System.currentTimeMillis()); + if (!result) { + result = stopTimes.compareAndSet(1, 0, System.currentTimeMillis()); + if (!result) { + throw new IOException("PANIC! Unreachable code"); + } + } + backupManager.finishBackupSession(); + } catch (IOException | InterruptedException e) { + fail("Unexpected exception: " + e.getMessage()); + } + } + }; + + Thread[] workers = new Thread[2]; + for (int i = 0; i < workers.length; i++) { + workers[i] = new Thread(r); + workers[i].start(); + } + + for (int i = 0; i < workers.length; i++) { + Uninterruptibles.joinUninterruptibly(workers[i]); + } + LOG.info("Diff start time=" + (startTimes.get(1) - startTimes.get(0)) + "ms"); + LOG.info("Diff finish time=" + (stopTimes.get(1) - stopTimes.get(0)) + "ms"); + assertTrue(startTimes.get(1) - startTimes.get(0) >= sleepTime); + assertTrue(stopTimes.get(1) - stopTimes.get(0) >= sleepTime); + + } + +}