http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
new file mode 100644
index 0000000..8a01a65
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java
@@ -0,0 +1,755 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory;
+import org.apache.hadoop.hbase.backup.HBackupFileSystem;
+import org.apache.hadoop.hbase.backup.RestoreRequest;
+import org.apache.hadoop.hbase.backup.RestoreTask;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest;
+import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+
+/**
+ * A collection for methods used by multiple classes to restore HBase tables.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RestoreServerUtil {
+
+  public static final Log LOG = LogFactory.getLog(RestoreServerUtil.class);
+
+  private final String[] ignoreDirs = { "recovered.edits" };
+
+  private final long TABLE_AVAILABILITY_WAIT_TIME = 180000;
+
+  protected Configuration conf = null;
+
+  protected Path backupRootPath;
+
+  protected String backupId;
+
+  protected FileSystem fs;
+  private final Path restoreTmpPath;
+
+  // store table name and snapshot dir mapping
+  private final HashMap<TableName, Path> snapshotMap = new HashMap<>();
+
+  public RestoreServerUtil(Configuration conf, final Path backupRootPath, 
final String backupId)
+      throws IOException {
+    this.conf = conf;
+    this.backupRootPath = backupRootPath;
+    this.backupId = backupId;
+    this.fs = backupRootPath.getFileSystem(conf);
+    this.restoreTmpPath = new 
Path(conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+        HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY), "restore");
+  }
+
+  /**
+   * return value represent path for:
+   * 
".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/archive/data/default/t1_dn"
+   * @param tabelName table name
+   * @return path to table archive
+   * @throws IOException exception
+   */
+  Path getTableArchivePath(TableName tableName)
+      throws IOException {
+
+    Path baseDir = new Path(HBackupFileSystem.getTableBackupPath(tableName, 
backupRootPath,
+      backupId), HConstants.HFILE_ARCHIVE_DIRECTORY);
+    Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR);
+    Path archivePath = new Path(dataDir, tableName.getNamespaceAsString());
+    Path tableArchivePath =
+        new Path(archivePath, tableName.getQualifierAsString());
+    if (!fs.exists(tableArchivePath) || 
!fs.getFileStatus(tableArchivePath).isDirectory()) {
+      LOG.debug("Folder tableArchivePath: " + tableArchivePath.toString() + " 
does not exists");
+      tableArchivePath = null; // empty table has no archive
+    }
+    return tableArchivePath;
+  }
+
+  /**
+   * Gets region list
+   * @param tableName table name
+   * @return RegionList region list
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  ArrayList<Path> getRegionList(TableName tableName)
+      throws FileNotFoundException, IOException {
+    Path tableArchivePath = this.getTableArchivePath(tableName);
+    ArrayList<Path> regionDirList = new ArrayList<Path>();
+    FileStatus[] children = fs.listStatus(tableArchivePath);
+    for (FileStatus childStatus : children) {
+      // here child refer to each region(Name)
+      Path child = childStatus.getPath();
+      regionDirList.add(child);
+    }
+    return regionDirList;
+  }
+
+  /**
+   * Gets region list
+   * @param tableName table name
+   * @param backupId backup id
+   * @return RegionList region list
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  ArrayList<Path> getRegionList(TableName tableName, String backupId) throws 
FileNotFoundException,
+  IOException {
+    Path tableArchivePath =
+        new Path(BackupClientUtil.getTableBackupDir(backupRootPath.toString(),
+            backupId, tableName));
+
+    ArrayList<Path> regionDirList = new ArrayList<Path>();
+    FileStatus[] children = fs.listStatus(tableArchivePath);
+    for (FileStatus childStatus : children) {
+      // here child refer to each region(Name)
+      Path child = childStatus.getPath();
+      regionDirList.add(child);
+    }
+    return regionDirList;
+  }
+
+  static void modifyTableSync(Connection conn, HTableDescriptor desc) throws 
IOException {
+
+    try (Admin admin = conn.getAdmin();) {
+      admin.modifyTable(desc.getTableName(), desc);
+      int attempt = 0;
+      int maxAttempts = 600;
+      while (!admin.isTableAvailable(desc.getTableName())) {
+        Thread.sleep(100);
+        attempt++;
+        if (attempt++ > maxAttempts) {
+          throw new IOException("Timeout expired " + (maxAttempts * 100) + 
"ms");
+        }
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * During incremental backup operation. Call WalPlayer to replay WAL in 
backup image Currently
+   * tableNames and newTablesNames only contain single table, will be expanded 
to multiple tables
+   * in the future
+   * @param conn HBase connection
+   * @param tableBackupPath backup path
+   * @param logDirs : incremental backup folders, which contains WAL
+   * @param tableNames : source tableNames(table names were backuped)
+   * @param newTableNames : target tableNames(table names to be restored to)
+   * @param incrBackupId incremental backup Id
+   * @throws IOException exception
+   */
+  public void incrementalRestoreTable(Connection conn, Path tableBackupPath, 
Path[] logDirs,
+      TableName[] tableNames, TableName[] newTableNames, String incrBackupId) 
throws IOException {
+
+    try (Admin admin = conn.getAdmin();) {
+    if (tableNames.length != newTableNames.length) {
+      throw new IOException("Number of source tables and target tables does 
not match!");
+    }
+    FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
+
+    // for incremental backup image, expect the table already created either 
by user or previous
+    // full backup. Here, check that all new tables exists
+    for (TableName tableName : newTableNames) {
+      if (!admin.tableExists(tableName)) {
+        throw new IOException("HBase table " + tableName
+            + " does not exist. Create the table first, e.g. by restoring a 
full backup.");
+      }
+    }
+    // adjust table schema
+    for (int i = 0; i < tableNames.length; i++) {
+      TableName tableName = tableNames[i];
+      HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, 
tableName, incrBackupId);
+      LOG.debug("Found descriptor " + tableDescriptor + " through " + 
incrBackupId);
+
+      TableName newTableName = newTableNames[i];
+      HTableDescriptor newTableDescriptor = 
admin.getTableDescriptor(newTableName);
+      List<HColumnDescriptor> families = 
Arrays.asList(tableDescriptor.getColumnFamilies());
+      List<HColumnDescriptor> existingFamilies =
+          Arrays.asList(newTableDescriptor.getColumnFamilies());
+      boolean schemaChangeNeeded = false;
+      for (HColumnDescriptor family : families) {
+        if (!existingFamilies.contains(family)) {
+          newTableDescriptor.addFamily(family);
+          schemaChangeNeeded = true;
+        }
+      }
+      for (HColumnDescriptor family : existingFamilies) {
+        if (!families.contains(family)) {
+          newTableDescriptor.removeFamily(family.getName());
+          schemaChangeNeeded = true;
+        }
+      }
+      if (schemaChangeNeeded) {
+        RestoreServerUtil.modifyTableSync(conn, newTableDescriptor);
+        LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + 
newTableDescriptor);
+      }
+    }
+    RestoreTask restoreService =
+        BackupRestoreServerFactory.getRestoreTask(conf);
+
+    restoreService.run(logDirs, tableNames, newTableNames, false);
+    }
+  }
+
+  public void fullRestoreTable(Connection conn, Path tableBackupPath, 
TableName tableName,
+      TableName newTableName, boolean truncateIfExists, String 
lastIncrBackupId)
+          throws IOException {
+    restoreTableAndCreate(conn, tableName, newTableName, tableBackupPath, 
truncateIfExists,
+        lastIncrBackupId);
+  }
+
+  /**
+   * return value represent path for:
+   * 
".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/.hbase-snapshot"
+   * @param backupRootPath backup root path
+   * @param tableName table name
+   * @param backupId backup Id
+   * @return path for snapshot
+   */
+  static Path getTableSnapshotPath(Path backupRootPath, TableName tableName,
+      String backupId) {
+    return new Path(HBackupFileSystem.getTableBackupPath(tableName, 
backupRootPath, backupId),
+      HConstants.SNAPSHOT_DIR_NAME);
+  }
+
+  /**
+   * return value represent path for:
+   * 
"..../default/t1_dn/backup_1396650096738/.hbase-snapshot/snapshot_1396650097621_default_t1_dn"
+   * this path contains .snapshotinfo, .tabledesc (0.96 and 0.98) this path 
contains .snapshotinfo,
+   * .data.manifest (trunk)
+   * @param tableName table name
+   * @return path to table info
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  Path getTableInfoPath(TableName tableName)
+      throws FileNotFoundException, IOException {
+    Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, 
backupId);
+    Path tableInfoPath = null;
+
+    // can't build the path directly as the timestamp values are different
+    FileStatus[] snapshots = fs.listStatus(tableSnapShotPath);
+    for (FileStatus snapshot : snapshots) {
+      tableInfoPath = snapshot.getPath();
+      // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest";
+      if (tableInfoPath.getName().endsWith("data.manifest")) {
+        break;
+      }
+    }
+    return tableInfoPath;
+  }
+
+  /**
+   * @param tableName is the table backed up
+   * @return {@link HTableDescriptor} saved in backup image of the table
+   */
+  HTableDescriptor getTableDesc(TableName tableName)
+      throws FileNotFoundException, IOException {
+    Path tableInfoPath = this.getTableInfoPath(tableName);
+    SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, 
tableInfoPath);
+    SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, 
desc);
+    HTableDescriptor tableDescriptor = manifest.getTableDescriptor();
+    if (!tableDescriptor.getTableName().equals(tableName)) {
+      LOG.error("couldn't find Table Desc for table: " + tableName + " under 
tableInfoPath: "
+          + tableInfoPath.toString());
+      LOG.error("tableDescriptor.getNameAsString() = " + 
tableDescriptor.getNameAsString());
+      throw new FileNotFoundException("couldn't find Table Desc for table: " + 
tableName +
+        " under tableInfoPath: " + tableInfoPath.toString());
+    }
+    return tableDescriptor;
+  }
+
+  /**
+   * Duplicate the backup image if it's on local cluster
+   * @see HStore#bulkLoadHFile(String, long)
+   * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, 
long seqNum)
+   * @param tableArchivePath archive path
+   * @return the new tableArchivePath
+   * @throws IOException exception
+   */
+  Path checkLocalAndBackup(Path tableArchivePath) throws IOException {
+    // Move the file if it's on local cluster
+    boolean isCopyNeeded = false;
+
+    FileSystem srcFs = tableArchivePath.getFileSystem(conf);
+    FileSystem desFs = FileSystem.get(conf);
+    if (tableArchivePath.getName().startsWith("/")) {
+      isCopyNeeded = true;
+    } else {
+      // This should match what is done in @see 
HRegionFileSystem#bulkLoadStoreFile(String, Path,
+      // long)
+      if (srcFs.getUri().equals(desFs.getUri())) {
+        LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; 
local cluster node: "
+            + desFs.getUri());
+        isCopyNeeded = true;
+      }
+    }
+    if (isCopyNeeded) {
+      LOG.debug("File " + tableArchivePath + " on local cluster, back it up 
before restore");
+      if (desFs.exists(restoreTmpPath)) {
+        try {
+          desFs.delete(restoreTmpPath, true);
+        } catch (IOException e) {
+          LOG.debug("Failed to delete path: " + restoreTmpPath
+            + ", need to check whether restore target DFS cluster is healthy");
+        }
+      }
+      FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, 
conf);
+      LOG.debug("Copied to temporary path on local cluster: " + 
restoreTmpPath);
+      tableArchivePath = restoreTmpPath;
+    }
+    return tableArchivePath;
+  }
+
+  private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName 
tableName,
+      String lastIncrBackupId) throws IOException {
+    if (lastIncrBackupId != null) {
+      String target = 
BackupClientUtil.getTableBackupDir(backupRootPath.toString(),
+          lastIncrBackupId, tableName);
+      // Path target = new 
Path(info.getBackupStatus(tableName).getTargetDir());
+      return FSTableDescriptors.getTableDescriptorFromFs(fileSys,
+          new Path(target));
+    }
+    return null;
+  }
+
+  private void restoreTableAndCreate(Connection conn, TableName tableName, 
TableName newTableName,
+      Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) 
throws IOException {
+    if (newTableName == null) {
+      newTableName = tableName;
+    }
+    FileSystem fileSys = tableBackupPath.getFileSystem(this.conf);
+
+    // get table descriptor first
+    HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, 
lastIncrBackupId);
+    if (tableDescriptor != null) {
+      LOG.debug("Retrieved descriptor: " + tableDescriptor + " thru " + 
lastIncrBackupId);
+    }
+
+    if (tableDescriptor == null) {
+      Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, 
backupId);
+      if (fileSys.exists(tableSnapshotPath)) {
+        // snapshot path exist means the backup path is in HDFS
+        // check whether snapshot dir already recorded for target table
+        if (snapshotMap.get(tableName) != null) {
+          SnapshotDescription desc =
+              SnapshotDescriptionUtils.readSnapshotInfo(fileSys, 
tableSnapshotPath);
+          SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, 
tableSnapshotPath, desc);
+          tableDescriptor = manifest.getTableDescriptor();
+        } else {
+          tableDescriptor = getTableDesc(tableName);
+          snapshotMap.put(tableName, getTableInfoPath(tableName));
+        }
+        if (tableDescriptor == null) {
+          LOG.debug("Found no table descriptor in the snapshot dir, previous 
schema would be lost");
+        }
+      } else {
+        throw new IOException("Table snapshot directory: " + tableSnapshotPath
+            + " does not exist.");
+      }
+    }
+
+    Path tableArchivePath = getTableArchivePath(tableName);
+    if (tableArchivePath == null) {
+      if (tableDescriptor != null) {
+        // find table descriptor but no archive dir means the table is empty, 
create table and exit
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("find table descriptor but no archive dir for table " + 
tableName
+              + ", will only create table");
+        }
+        tableDescriptor.setName(newTableName);
+        checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, 
null,
+            tableDescriptor, truncateIfExists);
+        return;
+      } else {
+        throw new IllegalStateException("Cannot restore hbase table because 
directory '"
+            + " tableArchivePath is null.");
+      }
+    }
+
+    if (tableDescriptor == null) {
+      tableDescriptor = new HTableDescriptor(newTableName);
+    } else {
+      tableDescriptor.setName(newTableName);
+    }
+
+    // record all region dirs:
+    // load all files in dir
+    try {
+      ArrayList<Path> regionPathList = getRegionList(tableName);
+
+      // should only try to create the table with all region informations, so 
we could pre-split
+      // the regions in fine grain
+      checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, 
regionPathList,
+          tableDescriptor, truncateIfExists);
+      if (tableArchivePath != null) {
+        // start real restore through bulkload
+        // if the backup target is on local cluster, special action needed
+        Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath);
+        if (tempTableArchivePath.equals(tableArchivePath)) {
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("TableArchivePath for bulkload using existPath: " + 
tableArchivePath);
+          }
+        } else {
+          regionPathList = getRegionList(tempTableArchivePath); // point to 
the tempDir
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("TableArchivePath for bulkload using tempPath: " + 
tempTableArchivePath);
+          }
+        }
+
+        LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, 
false);
+        for (Path regionPath : regionPathList) {
+          String regionName = regionPath.toString();
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("Restoring HFiles from directory " + regionName);
+          }
+          String[] args = { regionName, newTableName.getNameAsString()};
+          loader.run(args);
+        }
+      }
+      // we do not recovered edits
+    } catch (Exception e) {
+      throw new IllegalStateException("Cannot restore hbase table", e);
+    }
+  }
+
+  /**
+   * Gets region list
+   * @param tableArchivePath table archive path
+   * @return RegionList region list
+   * @throws FileNotFoundException exception
+   * @throws IOException exception
+   */
+  ArrayList<Path> getRegionList(Path tableArchivePath) throws 
FileNotFoundException,
+  IOException {
+    ArrayList<Path> regionDirList = new ArrayList<Path>();
+    FileStatus[] children = fs.listStatus(tableArchivePath);
+    for (FileStatus childStatus : children) {
+      // here child refer to each region(Name)
+      Path child = childStatus.getPath();
+      regionDirList.add(child);
+    }
+    return regionDirList;
+  }
+
+  /**
+   * Counts the number of files in all subdirectories of an HBase table, i.e. 
HFiles.
+   * @param regionPath Path to an HBase table directory
+   * @return the number of files all directories
+   * @throws IOException exception
+   */
+  int getNumberOfFilesInDir(Path regionPath) throws IOException {
+    int result = 0;
+
+    if (!fs.exists(regionPath) || !fs.getFileStatus(regionPath).isDirectory()) 
{
+      throw new IllegalStateException("Cannot restore hbase table because 
directory '"
+          + regionPath.toString() + "' is not a directory.");
+    }
+
+    FileStatus[] tableDirContent = fs.listStatus(regionPath);
+    for (FileStatus subDirStatus : tableDirContent) {
+      FileStatus[] colFamilies = fs.listStatus(subDirStatus.getPath());
+      for (FileStatus colFamilyStatus : colFamilies) {
+        FileStatus[] colFamilyContent = 
fs.listStatus(colFamilyStatus.getPath());
+        result += colFamilyContent.length;
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Counts the number of files in all subdirectories of an HBase tables, i.e. 
HFiles. And finds the
+   * maximum number of files in one HBase table.
+   * @param tableArchivePath archive path
+   * @return the maximum number of files found in 1 HBase table
+   * @throws IOException exception
+   */
+  int getMaxNumberOfFilesInSubDir(Path tableArchivePath) throws IOException {
+    int result = 1;
+    ArrayList<Path> regionPathList = getRegionList(tableArchivePath);
+    // tableArchivePath = this.getTableArchivePath(tableName);
+
+    if (regionPathList == null || regionPathList.size() == 0) {
+      throw new IllegalStateException("Cannot restore hbase table because 
directory '"
+          + tableArchivePath + "' is not a directory.");
+    }
+
+    for (Path regionPath : regionPathList) {
+      result = Math.max(result, getNumberOfFilesInDir(regionPath));
+    }
+    return result;
+  }
+
+  /**
+   * Create a {@link LoadIncrementalHFiles} instance to be used to restore the 
HFiles of a full
+   * backup.
+   * @return the {@link LoadIncrementalHFiles} instance
+   * @throws IOException exception
+   */
+  private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean 
multipleTables)
+      throws IOException {
+    // set configuration for restore:
+    // LoadIncrementalHFile needs more time
+    // <name>hbase.rpc.timeout</name> <value>600000</value>
+    // calculates
+    Integer milliSecInMin = 60000;
+    Integer previousMillis = this.conf.getInt("hbase.rpc.timeout", 0);
+    Integer numberOfFilesInDir =
+        multipleTables ? getMaxNumberOfFilesInSubDir(tableArchivePath) :
+            getNumberOfFilesInDir(tableArchivePath);
+    Integer calculatedMillis = numberOfFilesInDir * milliSecInMin; // 1 minute 
per file
+    Integer resultMillis = Math.max(calculatedMillis, previousMillis);
+    if (resultMillis > previousMillis) {
+      LOG.info("Setting configuration for restore with LoadIncrementalHFile: "
+          + "hbase.rpc.timeout to " + calculatedMillis / milliSecInMin
+          + " minutes, to handle the number of files in backup " + 
tableArchivePath);
+      this.conf.setInt("hbase.rpc.timeout", resultMillis);
+    }
+
+    // By default, it is 32 and loader will fail if # of files in any region 
exceed this
+    // limit. Bad for snapshot restore.
+    this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 
Integer.MAX_VALUE);
+    this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+    LoadIncrementalHFiles loader = null;
+    try {
+      loader = new LoadIncrementalHFiles(this.conf);
+    } catch (Exception e1) {
+      throw new IOException(e1);
+    }
+    return loader;
+  }
+
+  /**
+   * Calculate region boundaries and add all the column families to the table 
descriptor
+   * @param regionDirList region dir list
+   * @return a set of keys to store the boundaries
+   */
+  byte[][] generateBoundaryKeys(ArrayList<Path> regionDirList)
+      throws FileNotFoundException, IOException {
+    TreeMap<byte[], Integer> map = new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR);
+    // Build a set of keys to store the boundaries
+    byte[][] keys = null;
+    // calculate region boundaries and add all the column families to the 
table descriptor
+    for (Path regionDir : regionDirList) {
+      LOG.debug("Parsing region dir: " + regionDir);
+      Path hfofDir = regionDir;
+
+      if (!fs.exists(hfofDir)) {
+        LOG.warn("HFileOutputFormat dir " + hfofDir + " not found");
+      }
+
+      FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
+      if (familyDirStatuses == null) {
+        throw new IOException("No families found in " + hfofDir);
+      }
+
+      for (FileStatus stat : familyDirStatuses) {
+        if (!stat.isDirectory()) {
+          LOG.warn("Skipping non-directory " + stat.getPath());
+          continue;
+        }
+        boolean isIgnore = false;
+        String pathName = stat.getPath().getName();
+        for (String ignore : ignoreDirs) {
+          if (pathName.contains(ignore)) {
+            LOG.warn("Skipping non-family directory" + pathName);
+            isIgnore = true;
+            break;
+          }
+        }
+        if (isIgnore) {
+          continue;
+        }
+        Path familyDir = stat.getPath();
+        LOG.debug("Parsing family dir [" + familyDir.toString() + " in region 
[" + regionDir + "]");
+        // Skip _logs, etc
+        if (familyDir.getName().startsWith("_") || 
familyDir.getName().startsWith(".")) {
+          continue;
+        }
+
+        // start to parse hfile inside one family dir
+        Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
+        for (Path hfile : hfiles) {
+          if (hfile.getName().startsWith("_") || 
hfile.getName().startsWith(".")
+              || StoreFileInfo.isReference(hfile.getName())
+              || HFileLink.isHFileLink(hfile.getName())) {
+            continue;
+          }
+          HFile.Reader reader = HFile.createReader(fs, hfile, new 
CacheConfig(conf), conf);
+          final byte[] first, last;
+          try {
+            reader.loadFileInfo();
+            first = reader.getFirstRowKey();
+            last = reader.getLastRowKey();
+            LOG.debug("Trying to figure out region boundaries hfile=" + hfile 
+ " first="
+                + Bytes.toStringBinary(first) + " last=" + 
Bytes.toStringBinary(last));
+
+            // To eventually infer start key-end key boundaries
+            Integer value = map.containsKey(first) ? (Integer) map.get(first) 
: 0;
+            map.put(first, value + 1);
+            value = map.containsKey(last) ? (Integer) map.get(last) : 0;
+            map.put(last, value - 1);
+          } finally {
+            reader.close();
+          }
+        }
+      }
+    }
+    keys = LoadIncrementalHFiles.inferBoundaries(map);
+    return keys;
+  }
+
+  /**
+   * Prepare the table for bulkload, most codes copied from
+   * {@link LoadIncrementalHFiles#createTable(String, String)}
+   * @param svc MasterServices
+   * @param tableBackupPath path
+   * @param tableName table name
+   * @param targetTableName target table name
+   * @param regionDirList region directory list
+   * @param htd table descriptor
+   * @throws IOException exception
+   */
+  private void checkAndCreateTable(Connection conn, Path tableBackupPath, 
TableName tableName,
+      TableName targetTableName, ArrayList<Path> regionDirList,
+      HTableDescriptor htd, boolean truncateIfExists)
+          throws IOException {
+    try (Admin admin = conn.getAdmin();){
+      boolean createNew = false;
+      if (admin.tableExists(targetTableName)) {
+        if(truncateIfExists) {
+          LOG.info("Truncating exising target table '" + targetTableName +
+            "', preserving region splits");
+          admin.disableTable(targetTableName);
+          admin.truncateTable(targetTableName, true);
+        } else{
+          LOG.info("Using exising target table '" + targetTableName + "'");
+        }
+      } else {
+        createNew = true;
+      }
+      if (createNew){
+        LOG.info("Creating target table '" + targetTableName + "'");
+        byte[][] keys = null;
+        if (regionDirList == null || regionDirList.size() == 0) {
+          admin.createTable(htd, null);
+        } else {
+          keys = generateBoundaryKeys(regionDirList);
+          // create table using table descriptor and region boundaries
+          admin.createTable(htd, keys);
+        }
+        long startTime = EnvironmentEdgeManager.currentTime();
+        while (!admin.isTableAvailable(targetTableName, keys)) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+          }
+          if (EnvironmentEdgeManager.currentTime() - startTime > 
TABLE_AVAILABILITY_WAIT_TIME) {
+            throw new IOException("Time out "+TABLE_AVAILABILITY_WAIT_TIME+
+              "ms expired, table " + targetTableName + " is still not 
available");
+          }
+        }
+      }
+    }
+  }
+
+  public static boolean validate(HashMap<TableName, BackupManifest> 
backupManifestMap,
+      Configuration conf) throws IOException {
+    boolean isValid = true;
+
+    for (Entry<TableName, BackupManifest> manifestEntry : 
backupManifestMap.entrySet()) {
+      TableName table = manifestEntry.getKey();
+      TreeSet<BackupImage> imageSet = new TreeSet<BackupImage>();
+
+      ArrayList<BackupImage> depList = 
manifestEntry.getValue().getDependentListByTable(table);
+      if (depList != null && !depList.isEmpty()) {
+        imageSet.addAll(depList);
+      }
+
+      LOG.info("Dependent image(s) from old to new:");
+      for (BackupImage image : imageSet) {
+        String imageDir =
+            HBackupFileSystem.getTableBackupDir(image.getRootDir(), 
image.getBackupId(), table);
+        if (!BackupClientUtil.checkPathExist(imageDir, conf)) {
+          LOG.error("ERROR: backup image does not exist: " + imageDir);
+          isValid = false;
+          break;
+        }
+        LOG.info("Backup image: " + image.getBackupId() + " for '" + table + 
"' is available");
+      }
+    }
+    return isValid;
+  }
+
+  /**
+   * Create restore request.
+   *
+   */
+  public static RestoreRequest createRestoreRequest(
+      String backupRootDir,
+      String backupId, boolean check, TableName[] fromTables,
+      TableName[] toTables, boolean isOverwrite) {
+    RestoreRequest request = new RestoreRequest();
+    
request.setBackupRootDir(backupRootDir).setBackupId(backupId).setCheck(check)
+    .setFromTables(fromTables).setToTables(toTables).setOverwrite(isOverwrite);
+    return request;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index ae36f08..0e3755b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -17,9 +17,14 @@
  */
 package org.apache.hadoop.hbase.coordination;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} 
implementations.
@@ -51,8 +56,21 @@ public abstract class BaseCoordinatedStateManager implements 
CoordinatedStateMan
    * Method to retrieve coordination for split log worker
    */
   public abstract  SplitLogWorkerCoordination getSplitLogWorkerCoordination();
+
   /**
    * Method to retrieve coordination for split log manager
    */
   public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
+  /**
+   * Method to retrieve {@link 
org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
+   */
+  public abstract ProcedureCoordinatorRpcs
+    getProcedureCoordinatorRpcs(String procType, String coordNode) throws 
IOException;
+
+  /**
+   * Method to retrieve {@link 
org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
+   */
+  public abstract ProcedureMemberRpcs
+    getProcedureMemberRpcs(String procType) throws KeeperException;
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 3e89be7..d5e4085 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -17,10 +17,17 @@
  */
 package org.apache.hadoop.hbase.coordination;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import java.io.IOException;
+
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * ZooKeeper-based implementation of {@link 
org.apache.hadoop.hbase.CoordinatedStateManager}.
@@ -49,9 +56,21 @@ public class ZkCoordinatedStateManager extends 
BaseCoordinatedStateManager {
   @Override
   public SplitLogWorkerCoordination getSplitLogWorkerCoordination() {
     return splitLogWorkerCoordination;
-    }
+  }
+
   @Override
   public SplitLogManagerCoordination getSplitLogManagerCoordination() {
     return splitLogManagerCoordination;
   }
+
+  @Override
+  public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, 
String coordNode)
+      throws IOException {
+    return new ZKProcedureCoordinatorRpcs(watcher, procType, coordNode);
+  }
+
+  @Override
+  public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws 
KeeperException {
+    return new ZKProcedureMemberRpcs(watcher, procType);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
index 3fe5a90..d51ab12 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java
@@ -85,15 +85,19 @@ public class TableOutputFormat extends 
FileOutputFormat<ImmutableBytesWritable,
       }
     }
 
+    @Override
     public void close(Reporter reporter) throws IOException {
       if (this.m_mutator != null) {
         this.m_mutator.close();
+        this.m_mutator = null;
       }
       if (conn != null) {
         this.conn.close();
+        this.conn = null;
       }
     }
 
+    @Override
     public void write(ImmutableBytesWritable key, Put value) throws 
IOException {
       m_mutator.mutate(new Put(value));
     }
@@ -101,7 +105,7 @@ public class TableOutputFormat extends 
FileOutputFormat<ImmutableBytesWritable,
 
   /**
    * Creates a new record writer.
-   * 
+   *
    * Be aware that the baseline javadoc gives the impression that there is a 
single
    * {@link RecordWriter} per job but in HBase, it is more natural if we give 
you a new
    * RecordWriter per call of this method. You must close the returned 
RecordWriter when done.

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java
new file mode 100644
index 0000000..a00d390
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple MR input format for HFiles.
+ * This code was borrowed from Apache Crunch project.
+ * Updated to the recent version of HBase.
+ */
+public class HFileInputFormat2 extends FileInputFormat<NullWritable, Cell> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HFileInputFormat2.class);
+
+  /**
+   * File filter that removes all "hidden" files. This might be something 
worth removing from
+   * a more general purpose utility; it accounts for the presence of metadata 
files created
+   * in the way we're doing exports.
+   */
+  static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  /**
+   * Record reader for HFiles.
+   */
+  private static class HFileRecordReader extends RecordReader<NullWritable, 
Cell> {
+
+    private Reader in;
+    protected Configuration conf;
+    private HFileScanner scanner;
+
+    /**
+     * A private cache of the key value so it doesn't need to be loaded twice 
from the scanner.
+     */
+    private Cell value = null;
+    private long count;
+    private boolean seeked = false;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      FileSplit fileSplit = (FileSplit) split;
+      conf = context.getConfiguration();
+      Path path = fileSplit.getPath();
+      FileSystem fs = path.getFileSystem(conf);
+      LOG.info("Initialize HFileRecordReader for {}", path);
+      this.in = HFile.createReader(fs, path, new CacheConfig(conf), conf);
+
+      // The file info must be loaded before the scanner can be used.
+      // This seems like a bug in HBase, but it's easily worked around.
+      this.in.loadFileInfo();
+      this.scanner = in.getScanner(false, false);
+
+    }
+
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      boolean hasNext;
+      if (!seeked) {
+        LOG.info("Seeking to start");
+        hasNext = scanner.seekTo();
+        seeked = true;
+      } else {
+        hasNext = scanner.next();
+      }
+      if (!hasNext) {
+        return false;
+      }
+      value = scanner.getCell();
+      count++;
+      return true;
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, 
InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public Cell getCurrentValue() throws IOException, InterruptedException {
+      return value;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      // This would be inaccurate if KVs are not uniformly-sized or we have 
performed a seek to
+      // the start row, but better than nothing anyway.
+      return 1.0f * count / in.getEntries();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (in != null) {
+        in.close();
+        in = null;
+      }
+    }
+  }
+
+  @Override
+  protected List<FileStatus> listStatus(JobContext job) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+
+    // Explode out directories that match the original FileInputFormat filters
+    // since HFiles are written to directories where the
+    // directory name is the column name
+    for (FileStatus status : super.listStatus(job)) {
+      if (status.isDirectory()) {
+        FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
+        for (FileStatus match : fs.listStatus(status.getPath(), 
HIDDEN_FILE_FILTER)) {
+          result.add(match);
+        }
+      } else {
+        result.add(status);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, 
TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    return new HFileRecordReader();
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path filename) {
+    // This file isn't splittable.
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 574ae18..064d649 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -114,7 +114,7 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
   private static final String ASSIGN_SEQ_IDS = 
"hbase.mapreduce.bulkload.assign.sequenceNumbers";
   public final static String CREATE_TABLE_CONF_KEY = "create.table";
-  public final static String SILENCE_CONF_KEY = "ignore.unmatched.families";
+  public final static String IGNORE_UNMATCHED_CF_CONF_KEY = 
"ignore.unmatched.families";
   public final static String ALWAYS_COPY_FILES = "always.copy.files";
 
   // We use a '.' prefix which is ignored when walking directory trees
@@ -162,7 +162,7 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
     System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output 
tablename" + "\n -D"
         + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of 
table by this tool\n"
         + "  Note: if you set this to 'no', then the target table must already 
exist in HBase\n -D"
-        + SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column 
families\n"
+        + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore 
unmatched column families\n"
         + "\n");
   }
 
@@ -1228,7 +1228,7 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
 
       try (Table table = connection.getTable(tableName);
         RegionLocator locator = connection.getRegionLocator(tableName)) {
-        boolean silence = 
"yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
+        boolean silence = 
"yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
         boolean copyFiles = 
"yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
         if (dirPath != null) {
           doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index 02fcbba..da950f4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -27,22 +27,26 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} 
files.
@@ -169,7 +173,7 @@ public class WALInputFormat extends InputFormat<WALKey, 
WALEdit> {
           temp = reader.next(currentEntry);
           i++;
         } catch (EOFException x) {
-          LOG.info("Corrupted entry detected. Ignoring the rest of the file."
+          LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
               + " (This is normal when a RegionServer crashed.)");
           return false;
         }
@@ -231,29 +235,39 @@ public class WALInputFormat extends InputFormat<WALKey, 
WALEdit> {
   List<InputSplit> getSplits(final JobContext context, final String startKey, 
final String endKey)
       throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
-    Path inputDir = new 
Path(conf.get("mapreduce.input.fileinputformat.inputdir"));
+
+    Path[] inputPaths = getInputPaths(conf);
 
     long startTime = conf.getLong(startKey, Long.MIN_VALUE);
     long endTime = conf.getLong(endKey, Long.MAX_VALUE);
 
-    FileSystem fs = inputDir.getFileSystem(conf);
-    List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime);
-
-    List<InputSplit> splits = new ArrayList<InputSplit>(files.size());
-    for (FileStatus file : files) {
+    List<FileStatus> allFiles = new ArrayList<FileStatus>();
+    for(Path inputPath: inputPaths){
+      FileSystem fs = inputPath.getFileSystem(conf);
+      List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
+      allFiles.addAll(files);
+    }
+    List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
+    for (FileStatus file : allFiles) {
       splits.add(new WALSplit(file.getPath().toString(), file.getLen(), 
startTime, endTime));
     }
     return splits;
   }
 
+  private Path[] getInputPaths(Configuration conf) {
+    String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir");
+    return StringUtils.stringToPath(inpDirs.split(","));
+  }
+
   private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, 
long endTime)
       throws IOException {
     List<FileStatus> result = new ArrayList<FileStatus>();
     LOG.debug("Scanning " + dir.toString() + " for WAL files");
 
-    FileStatus[] files = fs.listStatus(dir);
-    if (files == null) return Collections.emptyList();
-    for (FileStatus file : files) {
+    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
+    if (!iter.hasNext()) return Collections.emptyList();
+    while (iter.hasNext()) {
+      LocatedFileStatus file = iter.next();
       if (file.isDirectory()) {
         // recurse into sub directories
         result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
@@ -264,7 +278,7 @@ public class WALInputFormat extends InputFormat<WALKey, 
WALEdit> {
           try {
             long fileStartTime = Long.parseLong(name.substring(idx+1));
             if (fileStartTime <= endTime) {
-              LOG.info("Found: " + name);
+              LOG.info("Found: " + file);
               result.add(file);
             }
           } catch (NumberFormatException x) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 452714b..edcb57f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -70,9 +70,9 @@ import org.apache.hadoop.util.ToolRunner;
 public class WALPlayer extends Configured implements Tool {
   private static final Log LOG = LogFactory.getLog(WALPlayer.class);
   final static String NAME = "WALPlayer";
-  final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
-  final static String TABLES_KEY = "wal.input.tables";
-  final static String TABLE_MAP_KEY = "wal.input.tablesmap";
+  public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
+  public final static String TABLES_KEY = "wal.input.tables";
+  public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
 
   // This relies on Hadoop Configuration to handle warning about deprecated 
configs and
   // to set the correct non-deprecated configs when an old one shows up.
@@ -86,6 +86,9 @@ public class WALPlayer extends Configured implements Tool {
 
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
+  public WALPlayer(){
+  }
+
   protected WALPlayer(final Configuration c) {
     super(c);
   }
@@ -95,7 +98,7 @@ public class WALPlayer extends Configured implements Tool {
    * This one can be used together with {@link KeyValueSortReducer}
    */
   static class WALKeyValueMapper
-  extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
+    extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
     private byte[] table;
 
     @Override
@@ -107,7 +110,9 @@ public class WALPlayer extends Configured implements Tool {
         if (Bytes.equals(table, key.getTablename().getName())) {
           for (Cell cell : value.getCells()) {
             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-            if (WALEdit.isMetaEditFamily(kv)) continue;
+            if (WALEdit.isMetaEditFamily(kv)) {
+              continue;
+            }
             context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), 
kv);
           }
         }
@@ -133,7 +138,7 @@ public class WALPlayer extends Configured implements Tool {
    * a running HBase instance.
    */
   protected static class WALMapper
-  extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
+    extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
     private Map<TableName, TableName> tables = new TreeMap<TableName, 
TableName>();
 
     @Override
@@ -150,7 +155,9 @@ public class WALPlayer extends Configured implements Tool {
           Cell lastCell = null;
           for (Cell cell : value.getCells()) {
             // filtering WAL meta entries
-            if (WALEdit.isMetaEditFamily(cell)) continue;
+            if (WALEdit.isMetaEditFamily(cell)) {
+              continue;
+            }
 
             // Allow a subclass filter out this cell.
             if (filter(context, cell)) {
@@ -161,8 +168,12 @@ public class WALPlayer extends Configured implements Tool {
               if (lastCell == null || lastCell.getTypeByte() != 
cell.getTypeByte()
                   || !CellUtil.matchingRow(lastCell, cell)) {
                 // row or type changed, write out aggregate KVs.
-                if (put != null) context.write(tableOut, put);
-                if (del != null) context.write(tableOut, del);
+                if (put != null) {
+                  context.write(tableOut, put);
+                }
+                if (del != null) {
+                  context.write(tableOut, del);
+                }
                 if (CellUtil.isDelete(cell)) {
                   del = new Delete(CellUtil.cloneRow(cell));
                 } else {
@@ -178,29 +189,34 @@ public class WALPlayer extends Configured implements Tool 
{
             lastCell = cell;
           }
           // write residual KVs
-          if (put != null) context.write(tableOut, put);
-          if (del != null) context.write(tableOut, del);
+          if (put != null) {
+            context.write(tableOut, put);
+          }
+          if (del != null) {
+            context.write(tableOut, del);
+          }
         }
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
     }
 
-    /**
-     * @param cell
-     * @return Return true if we are to emit this cell.
-     */
     protected boolean filter(Context context, final Cell cell) {
       return true;
     }
 
     @Override
+    protected void
+        cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, 
Mutation>.Context context)
+            throws IOException, InterruptedException {
+      super.cleanup(context);
+    }
+
+    @Override
     public void setup(Context context) throws IOException {
       String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
       String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
-      if (tablesToUse == null && tableMap == null) {
-        // Then user wants all tables.
-      } else if (tablesToUse == null || tableMap == null || tablesToUse.length 
!= tableMap.length) {
+      if (tablesToUse == null || tableMap == null || tablesToUse.length != 
tableMap.length) {
         // this can only happen when WALMapper is used directly by a class 
other than WALPlayer
         throw new IOException("No tables or incorrect table mapping 
specified.");
       }
@@ -216,7 +232,9 @@ public class WALPlayer extends Configured implements Tool {
 
   void setupTime(Configuration conf, String option) throws IOException {
     String val = conf.get(option);
-    if (null == val) return;
+    if (null == val) {
+      return;
+    }
     long ms;
     try {
       // first try to parse in user friendly form
@@ -246,7 +264,7 @@ public class WALPlayer extends Configured implements Tool {
     Configuration conf = getConf();
     setupTime(conf, HLogInputFormat.START_TIME_KEY);
     setupTime(conf, HLogInputFormat.END_TIME_KEY);
-    Path inputDir = new Path(args[0]);
+    String inputDirs = args[0];
     String[] tables = args[1].split(",");
     String[] tableMap;
     if (args.length > 2) {
@@ -260,13 +278,18 @@ public class WALPlayer extends Configured implements Tool 
{
     }
     conf.setStrings(TABLES_KEY, tables);
     conf.setStrings(TABLE_MAP_KEY, tableMap);
-    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + 
inputDir));
+    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + 
System.currentTimeMillis()));
     job.setJarByClass(WALPlayer.class);
-    FileInputFormat.setInputPaths(job, inputDir);
+
+    FileInputFormat.addInputPaths(job, inputDirs);
+
     job.setInputFormatClass(WALInputFormat.class);
     job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
     if (hfileOutPath != null) {
+      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+
       // the bulk HFile case
       if (tables.length != 1) {
         throw new IOException("Exactly one table must be specified for the 
bulk export option");
@@ -302,7 +325,9 @@ public class WALPlayer extends Configured implements Tool {
     return job;
   }
 
-  /*
+
+  /**
+   * Print usage
    * @param errorMsg Error message.  Can be null.
    */
   private void usage(final String errorMsg) {
@@ -312,7 +337,8 @@ public class WALPlayer extends Configured implements Tool {
     System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> 
[<tableMappings>]");
     System.err.println("Read all WAL entries for <tables>.");
     System.err.println("If no tables (\"\") are specific, all tables are 
imported.");
-    System.err.println("(Careful, even -ROOT- and hbase:meta entries will be 
imported in that case.)");
+    System.err.println("(Careful, even -ROOT- and hbase:meta entries will be 
imported"+
+      " in that case.)");
     System.err.println("Otherwise <tables> is a comma separated list of 
tables.\n");
     System.err.println("The WAL entries can be mapped to new set of tables via 
<tableMapping>.");
     System.err.println("<tableMapping> is a command separated list of 
targettables.");
@@ -325,10 +351,10 @@ public class WALPlayer extends Configured implements Tool 
{
     System.err.println("  -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
     System.err.println("  -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
     System.err.println("   -D " + JOB_NAME_CONF_KEY
-        + "=jobName - use the specified mapreduce job name for the wal 
player");
+      + "=jobName - use the specified mapreduce job name for the wal player");
     System.err.println("For performance also consider the following options:\n"
-        + "  -Dmapreduce.map.speculative=false\n"
-        + "  -Dmapreduce.reduce.speculative=false");
+      + "  -Dmapreduce.map.speculative=false\n"
+      + "  -Dmapreduce.reduce.speculative=false");
   }
 
   /**
@@ -349,6 +375,7 @@ public class WALPlayer extends Configured implements Tool {
       System.exit(-1);
     }
     Job job = createSubmittableJob(args);
-    return job.waitForCompletion(true) ? 0 : 1;
+    int result =job.waitForCompletion(true) ? 0 : 1;
+    return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0136ff5..1442433 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.RegionStateListener;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
@@ -74,6 +73,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.UnknownRegionException;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.MasterSwitchType;
 import org.apache.hadoop.hbase.client.Result;
@@ -413,6 +413,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
 
     Replication.decorateMasterConfiguration(this.conf);
+    BackupManager.decorateMasterConfiguration(this.conf);
 
     // Hack! Maps DFSClient => Master for logs.  HDFS made this
     // config param for task trackers, but we can piggyback off of it.
@@ -899,7 +900,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
 
   void initQuotaManager() throws IOException {
     MasterQuotaManager quotaManager = new MasterQuotaManager(this);
-    
this.assignmentManager.setRegionStateListener((RegionStateListener)quotaManager);
+    this.assignmentManager.setRegionStateListener(quotaManager);
     quotaManager.start();
     this.quotaManager = quotaManager;
   }
@@ -2644,6 +2645,8 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     return procInfoList;
   }
 
+
+
   /**
    * Returns the list of table descriptors that match the specified request
    * @param namespace the namespace to query, or null if querying for all

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
index 4632d23..9d75e2e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java
@@ -52,7 +52,7 @@ public class ZKProcedureCoordinatorRpcs implements 
ProcedureCoordinatorRpcs {
    * @throws KeeperException if an unexpected zk error occurs
    */
   public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher,
-      String procedureClass, String coordName) throws KeeperException {
+      String procedureClass, String coordName) throws IOException {
     this.watcher = watcher;
     this.procedureType = procedureClass;
     this.coordName = coordName;
@@ -179,6 +179,7 @@ public class ZKProcedureCoordinatorRpcs implements 
ProcedureCoordinatorRpcs {
    * Start monitoring znodes in ZK - subclass hook to start monitoring znodes 
they are about.
    * @return true if succeed, false if encountered initialization errors.
    */
+  @Override
   final public boolean start(final ProcedureCoordinator coordinator) {
     if (this.coordinator != null) {
       throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 24d8170..6ac88be 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -47,7 +47,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.management.MalformedObjectNameException;
@@ -79,6 +78,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.ZNodeClearer;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
@@ -194,13 +194,13 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
 
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-import sun.misc.Signal;
-import sun.misc.SignalHandler;
-
 /**
  * HRegionServer makes a set of HRegions available to clients. It checks in 
with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
@@ -374,7 +374,7 @@ public class HRegionServer extends HasThread implements
 
   // WAL roller. log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
-  final LogRoller walRoller;
+  public final LogRoller walRoller;
 
   // flag set after we're done setting up server threads
   final AtomicBoolean online = new AtomicBoolean(false);
@@ -522,7 +522,7 @@ public class HRegionServer extends HasThread implements
     FSUtils.setupShortCircuitRead(this.conf);
     // Disable usage of meta replicas in the regionserver
     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
-
+    BackupManager.decorateRSConfiguration(conf);
     // Config'ed params
     this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
@@ -633,7 +633,7 @@ public class HRegionServer extends HasThread implements
     int cleanerInterval =
         conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 
1000);
     this.compactedFileDischarger =
-        new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, 
(RegionServerServices)this);
+        new CompactedHFilesDischarger(cleanerInterval, this, this);
     choreService.scheduleChore(compactedFileDischarger);
   }
 
@@ -846,7 +846,7 @@ public class HRegionServer extends HasThread implements
       rspmHost.loadProcedures(conf);
       rspmHost.initialize(this);
     } catch (KeeperException e) {
-      this.abort("Failed to reach zk cluster when creating procedure 
handler.", e);
+      this.abort("Failed to reach coordination cluster when creating procedure 
handler.", e);
     }
     // register watcher for recovering regions
     this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, 
this);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index b4f0a29..8243fb5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -17,15 +17,6 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.ExceptionHandler;
-import com.lmax.disruptor.LifecycleAware;
-import com.lmax.disruptor.TimeoutException;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -66,6 +57,15 @@ import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.LifecycleAware;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
 /**
  * The default implementation of FSWAL.
  */
@@ -780,6 +780,15 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     }
   }
 
+  /**
+   * To support old API compatibility
+   * @return current file number (timestamp)
+   */
+  @Override
+  public long getFilenum() {
+    return filenum.get();
+  }
+
   @Override
   public void sync(long txid) throws IOException {
     if (this.highestSyncedTxid.get() >= txid) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ProcedureUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ProcedureUtil.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ProcedureUtil.java
new file mode 100644
index 0000000..6f252fc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ProcedureUtil.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class ProcedureUtil {
+  private static final Log LOG = LogFactory.getLog(ProcedureUtil.class);
+
+  public static ProcedureDescription buildProcedure(String signature, String 
instance,
+      Map<String, String> props) {
+    ProcedureDescription.Builder builder = ProcedureDescription.newBuilder();
+    builder.setSignature(signature).setInstance(instance);
+    for (Entry<String, String> entry : props.entrySet()) {
+      NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey())
+          .setValue(entry.getValue()).build();
+      builder.addConfiguration(pair);
+    }
+    ProcedureDescription desc = builder.build();
+    return desc;
+  }
+
+  public static long execProcedure(MasterProcedureManager mpm, String 
signature, String instance,
+      Map<String, String> props) throws IOException {
+    if (mpm == null) {
+      throw new IOException("The procedure is not registered: " + signature);
+    }
+    ProcedureDescription desc = buildProcedure(signature, instance, props);
+    mpm.execProcedure(desc);
+
+    // send back the max amount of time the client should wait for the 
procedure
+    // to complete
+    long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME;
+    return waitTime;
+  }
+
+  public static void waitForProcedure(MasterProcedureManager mpm, String 
signature, String instance,
+      Map<String, String> props, long max, int numRetries, long pause) throws 
IOException {
+    ProcedureDescription desc = buildProcedure(signature, instance, props);
+    long start = EnvironmentEdgeManager.currentTime();
+    long maxPauseTime = max / numRetries;
+    int tries = 0;
+    LOG.debug("Waiting a max of " + max + " ms for procedure '" +
+        signature + " : " + instance + "'' to complete. (max " + maxPauseTime 
+ " ms per retry)");
+    boolean done = false;
+    while (tries == 0
+        || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) {
+      try {
+        // sleep a backoff <= pauseTime amount
+        long sleep = getPauseTime(tries++, pause);
+        sleep = sleep > maxPauseTime ? maxPauseTime : sleep;
+        LOG.debug("(#" + tries + ") Sleeping: " + sleep +
+          "ms while waiting for procedure completion.");
+        Thread.sleep(sleep);
+      } catch (InterruptedException e) {
+        throw (InterruptedIOException) new 
InterruptedIOException("Interrupted").initCause(e);
+      }
+      LOG.debug("Getting current status of procedure from master...");
+      done = mpm.isProcedureDone(desc);
+    }
+    if (!done) {
+      throw new IOException("Procedure '" + signature + " : " + instance
+          + "' wasn't completed in expectedTime:" + max + " ms");
+    }
+  }
+
+  private static long getPauseTime(int tries, long pause) {
+    int triesCount = tries;
+    if (triesCount >= HConstants.RETRY_BACKOFF.length) {
+      triesCount = HConstants.RETRY_BACKOFF.length - 1;
+    }
+    return pause * HConstants.RETRY_BACKOFF[triesCount];
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 76a6415..0ae37cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -229,6 +229,11 @@ public abstract class AbstractFSWALProvider<T extends 
AbstractFSWAL<?>> implemen
   @VisibleForTesting
   public static long extractFileNumFromWAL(final WAL wal) {
     final Path walName = ((AbstractFSWAL<?>) wal).getCurrentFileName();
+        return extractFileNumFromWAL(walName);
+  }
+
+  @VisibleForTesting
+  public static long extractFileNumFromWAL(final Path walName) {
     if (walName == null) {
       throw new IllegalArgumentException("The WAL path couldn't be null");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index c74c399..d8c1c5b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -49,7 +49,6 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
-import edu.umd.cs.findbugs.annotations.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
@@ -61,6 +60,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.Waiter.Predicate;
+import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Admin;
@@ -117,6 +117,7 @@ import 
org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm;
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -137,6 +138,8 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 
+import edu.umd.cs.findbugs.annotations.Nullable;
+
 /**
  * Facility for testing HBase. Replacement for
  * old HBaseTestCase and HBaseClusterTestCase functionality.
@@ -2133,6 +2136,18 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
     }
   }
 
+  public void loadRandomRows(final Table t, final byte[] f, int rowSize, int 
totalRows)
+      throws IOException {
+    Random r = new Random();
+    byte[] row = new byte[rowSize];
+    for (int i = 0; i < totalRows; i++) {
+      r.nextBytes(row);
+      Put put = new Put(row);
+      put.addColumn(f, new byte[]{0}, new byte[]{0});
+      t.put(put);
+    }
+  }
+
   public void verifyNumericRows(Table table, final byte[] f, int startRow, int 
endRow,
       int replicaId)
       throws IOException {
@@ -3276,6 +3291,18 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
   }
 
   /**
+   * Waith until all system table's regions get assigned
+   * @throws IOException
+   */
+  public void waitUntilAllSystemRegionsAssigned() throws IOException {
+    waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
+    waitUntilAllRegionsAssigned(TableName.NAMESPACE_TABLE_NAME);
+    if (BackupManager.isBackupEnabled(conf)) {
+      waitUntilAllRegionsAssigned(TableName.BACKUP_TABLE_NAME);
+    }
+  }
+
+  /**
    * Wait until all regions for a table in hbase:meta have a non-empty
    * info:server, or until timeout.  This means all regions have been deployed,
    * master has been informed and updated hbase:meta with the regions deployed
@@ -3740,12 +3767,24 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
    */
   public static int createPreSplitLoadTestTable(Configuration conf,
       HTableDescriptor desc, HColumnDescriptor[] hcds, int 
numRegionsPerServer) throws IOException {
+
+    return createPreSplitLoadTestTable(conf, desc, hcds,
+      new RegionSplitter.HexStringSplit(), numRegionsPerServer);
+  }
+
+  /**
+   * Creates a pre-split table for load testing. If the table already exists,
+   * logs a warning and continues.
+   * @return the number of regions the table was split into
+   */
+  public static int createPreSplitLoadTestTable(Configuration conf,
+      HTableDescriptor desc, HColumnDescriptor[] hcds,
+      SplitAlgorithm splitter, int numRegionsPerServer) throws IOException {
     for (HColumnDescriptor hcd : hcds) {
       if (!desc.hasFamily(hcd.getName())) {
         desc.addFamily(hcd);
       }
     }
-
     int totalNumberOfRegions = 0;
     Connection unmanagedConnection = ConnectionFactory.createConnection(conf);
     Admin admin = unmanagedConnection.getAdmin();
@@ -3764,7 +3803,7 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
           "pre-splitting table into " + totalNumberOfRegions + " regions " +
           "(regions per server: " + numRegionsPerServer + ")");
 
-      byte[][] splits = new RegionSplitter.HexStringSplit().split(
+      byte[][] splits = splitter.split(
           totalNumberOfRegions);
 
       admin.createTable(desc, splits);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
index 2dca6b1..465020b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
@@ -32,6 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -63,6 +65,7 @@ public class TestNamespace {
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
+    
TEST_UTIL.getConfiguration().setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY,
 true);
     TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE);
     admin = TEST_UTIL.getHBaseAdmin();
     cluster = TEST_UTIL.getHBaseCluster();
@@ -110,13 +113,16 @@ public class TestNamespace {
     //verify existence of system tables
     Set<TableName> systemTables = Sets.newHashSet(
         TableName.META_TABLE_NAME,
-        TableName.NAMESPACE_TABLE_NAME);
+        TableName.NAMESPACE_TABLE_NAME,
+        TableName.BACKUP_TABLE_NAME);
     HTableDescriptor[] descs =
         
admin.listTableDescriptorsByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE.getName());
-    assertEquals(systemTables.size(), descs.length);
+    assertTrue(descs.length >= systemTables.size());
+    Set<TableName> tables = new HashSet<>();
     for (HTableDescriptor desc : descs) {
-      assertTrue(systemTables.contains(desc.getTableName()));
+      tables.add(desc.getTableName());
     }
+    assertTrue(tables.containsAll(systemTables));
     //verify system tables aren't listed
     assertEquals(0, admin.listTables().length);
 

Reply via email to