Repository: hbase
Updated Branches:
  refs/heads/master 8ab7b20f4 -> 91075276e


HBASE-19441: Implement retry logic around starting exclusive backup operation

Signed-off-by: tedyu <yuzhih...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/91075276
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/91075276
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/91075276

Branch: refs/heads/master
Commit: 91075276e7638f64e3e213358edd37198f540e1b
Parents: 8ab7b20
Author: Vladimir Rodionov <vrodio...@hortonworks.com>
Authored: Tue Mar 20 16:00:16 2018 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Wed Mar 21 09:34:45 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/backup/impl/BackupManager.java |  86 +++++++----
 .../hbase/backup/impl/BackupSystemTable.java    | 145 +++++++++----------
 .../impl/ExclusiveOperationException.java       |  33 +++++
 .../hadoop/hbase/backup/TestBackupManager.java  | 137 ++++++++++++++++++
 4 files changed, 296 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/91075276/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index f09d6d0..8bebc91 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,7 +16,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.backup.impl;
 
 import java.io.Closeable;
@@ -47,18 +47,22 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
 import org.apache.hadoop.hbase.util.Pair;
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 /**
- * Handles backup requests, creates backup info records in backup system table 
to
- * keep track of backup sessions, dispatches backup request.
+ * Handles backup requests, creates backup info records in backup system table 
to keep track of
+ * backup sessions, dispatches backup request.
  */
 @InterfaceAudience.Private
 public class BackupManager implements Closeable {
+  // in seconds
+  public final static String BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY =
+      "hbase.backup.exclusive.op.timeout.seconds";
+  // In seconds
+  private final static int DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT = 3600;
   private static final Logger LOG = 
LoggerFactory.getLogger(BackupManager.class);
 
   protected Configuration conf = null;
@@ -112,8 +116,8 @@ public class BackupManager implements Closeable {
     if (classes == null) {
       conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, 
masterProcedureClass);
     } else if (!classes.contains(masterProcedureClass)) {
-      conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY, classes + ","
-              + masterProcedureClass);
+      conf.set(ProcedureManagerHost.MASTER_PROCEDURE_CONF_KEY,
+        classes + "," + masterProcedureClass);
     }
 
     if (LOG.isDebugEnabled()) {
@@ -138,16 +142,16 @@ public class BackupManager implements Closeable {
     if (classes == null) {
       conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, 
regionProcedureClass);
     } else if (!classes.contains(regionProcedureClass)) {
-      conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY, classes + 
","
-          + regionProcedureClass);
+      conf.set(ProcedureManagerHost.REGIONSERVER_PROCEDURE_CONF_KEY,
+        classes + "," + regionProcedureClass);
     }
     String coproc = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
     String regionObserverClass = BackupObserver.class.getName();
-    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" 
: coproc + ",") +
-        regionObserverClass);
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+      (coproc == null ? "" : coproc + ",") + regionObserverClass);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Added region procedure manager: " + regionProcedureClass +
-        ". Added region observer: " + regionObserverClass);
+      LOG.debug("Added region procedure manager: " + regionProcedureClass
+        + ". Added region observer: " + regionObserverClass);
     }
   }
 
@@ -223,9 +227,8 @@ public class BackupManager implements Closeable {
     }
 
     // there are one or more tables in the table list
-    backupInfo =
-        new BackupInfo(backupId, type, tableList.toArray(new 
TableName[tableList.size()]),
-            targetRootDir);
+    backupInfo = new BackupInfo(backupId, type, tableList.toArray(new 
TableName[tableList.size()]),
+      targetRootDir);
     backupInfo.setBandwidth(bandwidth);
     backupInfo.setWorkers(workers);
     return backupInfo;
@@ -254,7 +257,7 @@ public class BackupManager implements Closeable {
     String ongoingBackupId = this.getOngoingBackupId();
     if (ongoingBackupId != null) {
       LOG.info("There is a ongoing backup " + ongoingBackupId
-          + ". Can not launch new backup until no ongoing backup remains.");
+        + ". Can not launch new backup until no ongoing backup remains.");
       throw new BackupException("There is ongoing backup.");
     }
   }
@@ -269,7 +272,7 @@ public class BackupManager implements Closeable {
    * @return The ancestors for the current backup
    * @throws IOException exception
    */
-  public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo) throws 
IOException  {
+  public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo) throws 
IOException {
     LOG.debug("Getting the direct ancestors of the current backup " + 
backupInfo.getBackupId());
 
     ArrayList<BackupImage> ancestors = new ArrayList<>();
@@ -286,10 +289,9 @@ public class BackupManager implements Closeable {
 
       BackupImage.Builder builder = BackupImage.newBuilder();
 
-      BackupImage image =
-          builder.withBackupId(backup.getBackupId()).withType(backup.getType())
-              
.withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames())
-              
.withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build();
+      BackupImage image = 
builder.withBackupId(backup.getBackupId()).withType(backup.getType())
+          
.withRootDir(backup.getBackupRootDir()).withTableList(backup.getTableNames())
+          
.withStartTime(backup.getStartTs()).withCompleteTime(backup.getCompleteTs()).build();
 
       // add the full backup image as an ancestor until the last incremental 
backup
       if (backup.getType().equals(BackupType.FULL)) {
@@ -319,9 +321,9 @@ public class BackupManager implements Closeable {
           BackupImage lastIncrImage = lastIncrImgManifest.getBackupImage();
           ancestors.add(lastIncrImage);
 
-          LOG.debug("Last dependent incremental backup image: " + "{BackupID="
-                  + lastIncrImage.getBackupId() + "," + "BackupDir=" + 
lastIncrImage.getRootDir()
-                  + "}");
+          LOG.debug(
+            "Last dependent incremental backup image: " + "{BackupID=" + 
lastIncrImage.getBackupId()
+            + "," + "BackupDir=" + lastIncrImage.getRootDir() + "}");
         }
       }
     }
@@ -369,7 +371,36 @@ public class BackupManager implements Closeable {
    * @throws IOException if active session already exists
    */
   public void startBackupSession() throws IOException {
-    systemTable.startBackupExclusiveOperation();
+    long startTime = System.currentTimeMillis();
+    long timeout = conf.getInt(BACKUP_EXCLUSIVE_OPERATION_TIMEOUT_SECONDS_KEY,
+      DEFAULT_BACKUP_EXCLUSIVE_OPERATION_TIMEOUT) * 1000L;
+    long lastWarningOutputTime = 0;
+    while (System.currentTimeMillis() - startTime < timeout) {
+      try {
+        systemTable.startBackupExclusiveOperation();
+        return;
+      } catch (IOException e) {
+        if (e instanceof ExclusiveOperationException) {
+          // sleep, then repeat
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e1) {
+            // Restore the interrupted status
+            Thread.currentThread().interrupt();
+          }
+          if (lastWarningOutputTime == 0
+              || (System.currentTimeMillis() - lastWarningOutputTime) > 60000) 
{
+            lastWarningOutputTime = System.currentTimeMillis();
+            LOG.warn("Waiting to acquire backup exclusive lock for "
+                + (lastWarningOutputTime - startTime) / 1000 + "s");
+          }
+        } else {
+          throw e;
+        }
+      }
+    }
+    throw new IOException(
+      "Failed to acquire backup system table exclusive lock after " + timeout 
/ 1000 + "s");
   }
 
   /**
@@ -410,7 +441,7 @@ public class BackupManager implements Closeable {
   }
 
   public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, 
Boolean>>>>>, List<byte[]>>
-      readBulkloadRows(List<TableName> tableList) throws IOException {
+    readBulkloadRows(List<TableName> tableList) throws IOException {
     return systemTable.readBulkloadRows(tableList);
   }
 
@@ -483,7 +514,6 @@ public class BackupManager implements Closeable {
 
   /**
    * Get WAL files iterator.
-   *
    * @return WAL files iterator from backup system table
    * @throws IOException if getting the WAL files iterator fails
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/91075276/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
index 4a860d9..5e174eb 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -1,4 +1,5 @@
 /**
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -64,6 +65,8 @@ import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
@@ -71,26 +74,25 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-
 /**
  * This class provides API to access backup system table<br>
- *
  * Backup system table schema:<br>
- * <p><ul>
+ * <p>
+ * <ul>
  * <li>1. Backup sessions rowkey= "session:"+backupId; value =serialized 
BackupInfo</li>
  * <li>2. Backup start code rowkey = "startcode:"+backupRoot; value = 
startcode</li>
  * <li>3. Incremental backup set rowkey="incrbackupset:"+backupRoot; 
value=[list of tables]</li>
- * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name;
- * value = map[RS-> last WAL timestamp]</li>
+ * <li>4. Table-RS-timestamp map rowkey="trslm:"+backupRoot+table_name; value 
= map[RS-> last WAL
+ * timestamp]</li>
  * <li>5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last 
WAL timestamp</li>
- * <li>6. WALs recorded rowkey="wals:"+WAL unique file name;
- * value = backupId and full WAL file name</li>
- * </ul></p>
+ * <li>6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId 
and full WAL file
+ * name</li>
+ * </ul>
+ * </p>
  */
 @InterfaceAudience.Private
 public final class BackupSystemTable implements Closeable {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(BackupSystemTable.class);
 
   static class WALItem {
@@ -128,10 +130,9 @@ public final class BackupSystemTable implements Closeable {
   private TableName tableName;
 
   /**
-   * Backup System table name for bulk loaded files.
-   * We keep all bulk loaded file references in a separate table
-   * because we have to isolate general backup operations: create, merge etc
-   * from activity of RegionObserver, which controls process of a bulk loading
+   * Backup System table name for bulk loaded files. We keep all bulk loaded 
file references in a
+   * separate table because we have to isolate general backup operations: 
create, merge etc from
+   * activity of RegionObserver, which controls process of a bulk loading
    * {@link org.apache.hadoop.hbase.backup.BackupObserver}
    */
   private TableName bulkLoadTableName;
@@ -198,13 +199,11 @@ public final class BackupSystemTable implements Closeable 
{
       verifyNamespaceExists(admin);
       Configuration conf = connection.getConfiguration();
       if (!admin.tableExists(tableName)) {
-        TableDescriptor backupHTD =
-            BackupSystemTable.getSystemTableDescriptor(conf);
+        TableDescriptor backupHTD = 
BackupSystemTable.getSystemTableDescriptor(conf);
         admin.createTable(backupHTD);
       }
       if (!admin.tableExists(bulkLoadTableName)) {
-        TableDescriptor blHTD =
-            BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
+        TableDescriptor blHTD = 
BackupSystemTable.getSystemTableForBulkLoadedDataDescriptor(conf);
         admin.createTable(blHTD);
       }
       waitForSystemTable(admin, tableName);
@@ -237,11 +236,11 @@ public final class BackupSystemTable implements Closeable 
{
       } catch (InterruptedException e) {
       }
       if (EnvironmentEdgeManager.currentTime() - startTime > TIMEOUT) {
-        throw new IOException("Failed to create backup system table "+
-      tableName +" after " + TIMEOUT + "ms");
+        throw new IOException(
+          "Failed to create backup system table " + tableName + " after " + 
TIMEOUT + "ms");
       }
     }
-    LOG.debug("Backup table "+tableName+" exists and available");
+    LOG.debug("Backup table " + tableName + " exists and available");
   }
 
   @Override
@@ -257,7 +256,7 @@ public final class BackupSystemTable implements Closeable {
   public void updateBackupInfo(BackupInfo info) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("update backup status in backup system table for: " + 
info.getBackupId()
-          + " set status=" + info.getState());
+        + " set status=" + info.getState());
     }
     try (Table table = connection.getTable(tableName)) {
       Put put = createPutForBackupInfo(info);
@@ -344,7 +343,6 @@ public final class BackupSystemTable implements Closeable {
     }
   }
 
-
   /**
    * Deletes backup status from backup system table table
    * @param backupId backup id
@@ -370,7 +368,7 @@ public final class BackupSystemTable implements Closeable {
       Map<byte[], List<Path>> finalPaths) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("write bulk load descriptor to backup " + tabName + " with " + 
finalPaths.size()
-          + " entries");
+        + " entries");
     }
     try (Table table = connection.getTable(bulkLoadTableName)) {
       List<Put> puts = 
BackupSystemTable.createPutForCommittedBulkload(tabName, region, finalPaths);
@@ -389,8 +387,8 @@ public final class BackupSystemTable implements Closeable {
   public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region, 
final byte[] family,
       final List<Pair<Path, Path>> pairs) throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("write bulk load descriptor to backup " + tabName + " with " + 
pairs.size()
-          + " entries");
+      LOG.debug(
+        "write bulk load descriptor to backup " + tabName + " with " + 
pairs.size() + " entries");
     }
     try (Table table = connection.getTable(bulkLoadTableName)) {
       List<Put> puts =
@@ -425,7 +423,8 @@ public final class BackupSystemTable implements Closeable {
    * whether the hfile was recorded by preCommitStoreFile hook (true)
    */
   public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, 
Boolean>>>>>, List<byte[]>>
-      readBulkloadRows(List<TableName> tableList) throws IOException {
+    readBulkloadRows(List<TableName> tableList) throws IOException {
+
     Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map 
= new HashMap<>();
     List<byte[]> rows = new ArrayList<>();
     for (TableName tTable : tableList) {
@@ -504,9 +503,8 @@ public final class BackupSystemTable implements Closeable {
           byte[] fam = entry.getKey();
           List<Path> paths = entry.getValue();
           for (Path p : paths) {
-            Put put =
-                BackupSystemTable.createPutForBulkLoadedFile(tn, fam, 
p.toString(), backupId, ts,
-                  cnt++);
+            Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, 
p.toString(), backupId,
+              ts, cnt++);
             puts.add(put);
           }
         }
@@ -580,10 +578,9 @@ public final class BackupSystemTable implements Closeable {
   }
 
   /**
-   * Exclusive operations are:
-   * create, delete, merge
+   * Exclusive operations are: create, delete, merge
    * @throws IOException if a table operation fails or an active backup 
exclusive operation is
-   *                     already underway
+   *           already underway
    */
   public void startBackupExclusiveOperation() throws IOException {
     LOG.debug("Start new backup exclusive operation");
@@ -596,7 +593,7 @@ public final class BackupSystemTable implements Closeable {
         // Row exists, try to put if value == ACTIVE_SESSION_NO
         if (!table.checkAndMutate(ACTIVE_SESSION_ROW, 
SESSIONS_FAMILY).qualifier(ACTIVE_SESSION_COL)
             .ifEquals(ACTIVE_SESSION_NO).thenPut(put)) {
-          throw new IOException("There is an active backup exclusive 
operation");
+          throw new ExclusiveOperationException();
         }
       }
     }
@@ -696,8 +693,7 @@ public final class BackupSystemTable implements Closeable {
 
   /**
    * Get first n backup history records
-   * @param n number of records, if n== -1 - max number
-   *        is ignored
+   * @param n number of records, if n== -1 - max number is ignored
    * @return list of records
    * @throws IOException if getting the backup history fails
    */
@@ -711,8 +707,7 @@ public final class BackupSystemTable implements Closeable {
 
   /**
    * Get backup history records filtered by list of filters.
-   * @param n max number of records, if n == -1 , then max number
-   *        is ignored
+   * @param n max number of records, if n == -1 , then max number is ignored
    * @param filters list of filters
    * @return backup records
    * @throws IOException if getting the backup history fails
@@ -917,8 +912,8 @@ public final class BackupSystemTable implements Closeable {
       Map<String, Long> map) {
     BackupProtos.TableServerTimestamp.Builder tstBuilder =
         BackupProtos.TableServerTimestamp.newBuilder();
-    
tstBuilder.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil
-        .toProtoTableName(table));
+    tstBuilder
+    
.setTableName(org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.toProtoTableName(table));
 
     for (Entry<String, Long> entry : map.entrySet()) {
       BackupProtos.ServerTimestamp.Builder builder = 
BackupProtos.ServerTimestamp.newBuilder();
@@ -934,8 +929,9 @@ public final class BackupSystemTable implements Closeable {
     return tstBuilder.build();
   }
 
-  private HashMap<String, Long> fromTableServerTimestampProto(
-      BackupProtos.TableServerTimestamp proto) {
+  private HashMap<String, Long>
+    fromTableServerTimestampProto(BackupProtos.TableServerTimestamp proto) {
+
     HashMap<String, Long> map = new HashMap<>();
     List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
     for (BackupProtos.ServerTimestamp st : list) {
@@ -982,7 +978,7 @@ public final class BackupSystemTable implements Closeable {
       throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Add incremental backup table set to backup system table. 
ROOT=" + backupRoot
-          + " tables [" + StringUtils.join(tables, " ") + "]");
+        + " tables [" + StringUtils.join(tables, " ") + "]");
     }
     if (LOG.isDebugEnabled()) {
       tables.forEach(table -> LOG.debug(Objects.toString(table)));
@@ -1106,12 +1102,12 @@ public final class BackupSystemTable implements 
Closeable {
   /**
    * Check if WAL file is eligible for deletion using multi-get
    * @param files names of a file to check
-   * @return map of results
-   *         (key: FileStatus object. value: true if the file is deletable, 
false otherwise)
+   * @return map of results (key: FileStatus object. value: true if the file 
is deletable, false
+   *         otherwise)
    * @throws IOException exception
    */
   public Map<FileStatus, Boolean> areWALFilesDeletable(Iterable<FileStatus> 
files)
-    throws IOException {
+      throws IOException {
     final int BUF_SIZE = 100;
 
     Map<FileStatus, Boolean> ret = new HashMap<>();
@@ -1223,8 +1219,8 @@ public final class BackupSystemTable implements Closeable 
{
 
       res.advance();
       String[] tables = cellValueToBackupSet(res.current());
-      return Arrays.asList(tables).stream().map(item -> 
TableName.valueOf(item)).
-        collect(Collectors.toList());
+      return Arrays.asList(tables).stream().map(item -> 
TableName.valueOf(item))
+          .collect(Collectors.toList());
     } finally {
       if (table != null) {
         table.close();
@@ -1266,8 +1262,8 @@ public final class BackupSystemTable implements Closeable 
{
    */
   public void removeFromBackupSet(String name, String[] toRemove) throws 
IOException {
     if (LOG.isTraceEnabled()) {
-      LOG.trace(" Backup set remove from : " + name + " tables [" + 
StringUtils.join(toRemove, " ")
-          + "]");
+      LOG.trace(
+        " Backup set remove from : " + name + " tables [" + 
StringUtils.join(toRemove, " ") + "]");
     }
     String[] disjoint;
     String[] tables;
@@ -1336,23 +1332,21 @@ public final class BackupSystemTable implements 
Closeable {
     colBuilder.setMaxVersions(1);
     Configuration config = HBaseConfiguration.create();
     int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
-          BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
     colBuilder.setTimeToLive(ttl);
 
     ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
     builder.setColumnFamily(colSessionsDesc);
 
-    colBuilder =
-        ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
+    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
     colBuilder.setTimeToLive(ttl);
     builder.setColumnFamily(colBuilder.build());
     return builder.build();
   }
 
   public static TableName getTableName(Configuration conf) {
-    String name =
-        conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
-          BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
+    String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
+      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT);
     return TableName.valueOf(name);
   }
 
@@ -1377,12 +1371,11 @@ public final class BackupSystemTable implements 
Closeable {
     colBuilder.setMaxVersions(1);
     Configuration config = HBaseConfiguration.create();
     int ttl = config.getInt(BackupRestoreConstants.BACKUP_SYSTEM_TTL_KEY,
-          BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+      BackupRestoreConstants.BACKUP_SYSTEM_TTL_DEFAULT);
     colBuilder.setTimeToLive(ttl);
     ColumnFamilyDescriptor colSessionsDesc = colBuilder.build();
     builder.setColumnFamily(colSessionsDesc);
-    colBuilder =
-        ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
+    colBuilder = ColumnFamilyDescriptorBuilder.newBuilder(META_FAMILY);
     colBuilder.setTimeToLive(ttl);
     builder.setColumnFamily(colBuilder.build());
     return builder.build();
@@ -1390,9 +1383,10 @@ public final class BackupSystemTable implements 
Closeable {
 
   public static TableName getTableNameForBulkLoadedData(Configuration conf) {
     String name = conf.get(BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_KEY,
-          BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
+      BackupRestoreConstants.BACKUP_SYSTEM_TABLE_NAME_DEFAULT) + "_bulk";
     return TableName.valueOf(name);
   }
+
   /**
    * Creates Put operation for a given backup info object
    * @param context backup info
@@ -1622,16 +1616,15 @@ public final class BackupSystemTable implements 
Closeable {
         String file = path.toString();
         int lastSlash = file.lastIndexOf("/");
         String filename = file.substring(lastSlash + 1);
-        Put put =
-            new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
-              Bytes.toString(region), BLK_LD_DELIM, filename));
+        Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), 
BLK_LD_DELIM,
+          Bytes.toString(region), BLK_LD_DELIM, filename));
         put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
         put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
         put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, 
file.getBytes());
         put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
         puts.add(put);
-        LOG.debug("writing done bulk path " + file + " for " + table + " "
-                + Bytes.toString(region));
+        LOG.debug(
+          "writing done bulk path " + file + " for " + table + " " + 
Bytes.toString(region));
       }
     }
     return puts;
@@ -1658,8 +1651,8 @@ public final class BackupSystemTable implements Closeable 
{
         // Snapshot does not exists, i.e completeBackup failed after
         // deleting backup system table snapshot
         // In this case we log WARN and proceed
-        LOG.warn("Could not restore backup system table. Snapshot " + 
snapshotName
-            + " does not exists.");
+        LOG.warn(
+          "Could not restore backup system table. Snapshot " + snapshotName + 
" does not exists.");
       }
     }
   }
@@ -1695,17 +1688,16 @@ public final class BackupSystemTable implements 
Closeable {
   /*
    * Creates Put's for bulk load resulting from running LoadIncrementalHFiles
    */
-  static List<Put> createPutForPreparedBulkload(TableName table, byte[] region,
-      final byte[] family, final List<Pair<Path, Path>> pairs) {
+  static List<Put> createPutForPreparedBulkload(TableName table, byte[] 
region, final byte[] family,
+      final List<Pair<Path, Path>> pairs) {
     List<Put> puts = new ArrayList<>(pairs.size());
     for (Pair<Path, Path> pair : pairs) {
       Path path = pair.getSecond();
       String file = path.toString();
       int lastSlash = file.lastIndexOf("/");
       String filename = file.substring(lastSlash + 1);
-      Put put =
-          new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM, 
Bytes.toString(region),
-            BLK_LD_DELIM, filename));
+      Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), 
BLK_LD_DELIM,
+        Bytes.toString(region), BLK_LD_DELIM, filename));
       put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
       put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
       put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, file.getBytes());
@@ -1899,9 +1891,8 @@ public final class BackupSystemTable implements Closeable 
{
    */
   static Scan createScanForBulkLoadedFiles(String backupId) {
     Scan scan = new Scan();
-    byte[] startRow =
-        backupId == null ? BULK_LOAD_PREFIX_BYTES : rowkey(BULK_LOAD_PREFIX, 
backupId
-            + BLK_LD_DELIM);
+    byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES
+        : rowkey(BULK_LOAD_PREFIX, backupId + BLK_LD_DELIM);
     byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
     stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
     scan.setStartRow(startRow);
@@ -1927,7 +1918,7 @@ public final class BackupSystemTable implements Closeable 
{
    * @return put list
    */
   private List<Put> createPutsForAddWALFiles(List<String> files, String 
backupId,
-          String backupRoot) {
+      String backupRoot) {
     List<Put> puts = new ArrayList<>(files.size());
     for (String file : files) {
       Put put = new Put(rowkey(WALS_PREFIX, 
BackupUtils.getUniqueWALFileNamePart(file)));
@@ -1935,7 +1926,7 @@ public final class BackupSystemTable implements Closeable 
{
         Bytes.toBytes(backupId));
       put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), 
Bytes.toBytes(file));
       put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"),
-              Bytes.toBytes(backupRoot));
+        Bytes.toBytes(backupRoot));
       puts.add(put);
     }
     return puts;

http://git-wip-us.apache.org/repos/asf/hbase/blob/91075276/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java
----------------------------------------------------------------------
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java
new file mode 100644
index 0000000..af7fd8b
--- /dev/null
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/ExclusiveOperationException.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+@SuppressWarnings("serial")
+public class ExclusiveOperationException extends IOException {
+
+  public ExclusiveOperationException() {
+    super();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/91075276/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java
new file mode 100644
index 0000000..3e42294
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupManager.java
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+
+@Category(MediumTests.class)
+public class TestBackupManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestBackupManager.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBackupManager.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  protected static Configuration conf = UTIL.getConfiguration();
+  protected static MiniHBaseCluster cluster;
+  protected static Connection conn;
+  protected BackupManager backupManager;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
+    BackupManager.decorateMasterConfiguration(conf);
+    BackupManager.decorateRegionServerConfiguration(conf);
+    cluster = UTIL.startMiniCluster();
+    conn = UTIL.getConnection();
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void before() throws IOException {
+    backupManager = new BackupManager(conn, conn.getConfiguration());
+  }
+
+  @After
+  public void after() {
+    backupManager.close();
+  }
+
+  AtomicLongArray startTimes = new AtomicLongArray(2);
+  AtomicLongArray stopTimes = new AtomicLongArray(2);
+
+  @Test
+  public void testStartBackupExclusiveOperation() {
+
+    long sleepTime = 2000;
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          backupManager.startBackupSession();
+          boolean result = startTimes.compareAndSet(0, 0, 
System.currentTimeMillis());
+          if (!result) {
+            result = startTimes.compareAndSet(1, 0, 
System.currentTimeMillis());
+            if (!result) {
+              throw new IOException("PANIC! Unreachable code");
+            }
+          }
+          Thread.sleep(sleepTime);
+          result = stopTimes.compareAndSet(0, 0, System.currentTimeMillis());
+          if (!result) {
+            result = stopTimes.compareAndSet(1, 0, System.currentTimeMillis());
+            if (!result) {
+              throw new IOException("PANIC! Unreachable code");
+            }
+          }
+          backupManager.finishBackupSession();
+        } catch (IOException | InterruptedException e) {
+          fail("Unexpected exception: " + e.getMessage());
+        }
+      }
+    };
+
+    Thread[] workers = new Thread[2];
+    for (int i = 0; i < workers.length; i++) {
+      workers[i] = new Thread(r);
+      workers[i].start();
+    }
+
+    for (int i = 0; i < workers.length; i++) {
+      Uninterruptibles.joinUninterruptibly(workers[i]);
+    }
+    LOG.info("Diff start time=" + (startTimes.get(1) - startTimes.get(0)) + 
"ms");
+    LOG.info("Diff finish time=" + (stopTimes.get(1) - stopTimes.get(0)) + 
"ms");
+    assertTrue(startTimes.get(1) - startTimes.get(0) >= sleepTime);
+    assertTrue(stopTimes.get(1) - stopTimes.get(0) >= sleepTime);
+
+  }
+
+}

Reply via email to