http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java deleted file mode 100644 index ae21b33..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java +++ /dev/null @@ -1,496 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - -/** - * The main class which interprets the given arguments and trigger restore operation. - */ - -@InterfaceAudience.Public -@InterfaceStability.Evolving -public final class RestoreClient { - - private static final Log LOG = LogFactory.getLog(RestoreClient.class); - - private static Options opt; - private static Configuration conf; - private static Set<BackupImage> lastRestoreImagesSet; - - // delimiter in tablename list in restore command - private static final String DELIMITER_IN_COMMAND = ","; - - private static final String OPTION_OVERWRITE = "overwrite"; - private static final String OPTION_CHECK = "check"; - private static final String OPTION_AUTOMATIC = "automatic"; - - private static final String USAGE = - "Usage: hbase restore <backup_root_path> <backup_id> <tables> [tableMapping] \n" - + " [-overwrite] [-check] [-automatic]\n" - + " backup_root_path The parent location where the backup images are stored\n" - + " backup_id The id identifying the backup image\n" - + " table(s) Table(s) from the backup image to be restored.\n" - + " Tables are separated by comma.\n" - + " Options:\n" - + " tableMapping A comma separated list of target tables.\n" - + " If specified, each table in <tables> must have a mapping.\n" - + " -overwrite With this option, restore overwrites to the existing table " - + "if there's any in\n" - + " restore target. The existing table must be online before restore.\n" - + " -check With this option, restore sequence and dependencies are checked\n" - + " and verified without executing the restore\n" - + " -automatic With this option, all the dependencies are automatically restored\n" - + " together with this backup image following the correct order.\n" - + " The restore dependencies can be checked by using \"-check\" " - + "option,\n" - + " or using \"hbase backup describe\" command. Without this option, " - + "only\n" + " this backup image is restored\n"; - - private RestoreClient(){ - throw new AssertionError("Instantiating utility class..."); - } - - protected static void init() throws IOException { - // define supported options - opt = new Options(); - opt.addOption(OPTION_OVERWRITE, false, - "Overwrite the data if any of the restore target tables exists"); - opt.addOption(OPTION_CHECK, false, "Check restore sequence and dependencies"); - opt.addOption(OPTION_AUTOMATIC, false, "Restore all dependencies"); - opt.addOption("debug", false, "Enable debug logging"); - - conf = getConf(); - - // disable irrelevant loggers to avoid it mess up command output - disableUselessLoggers(); - } - - public static void main(String[] args) throws IOException { - init(); - parseAndRun(args); - } - - private static void parseAndRun(String[] args) { - CommandLine cmd = null; - try { - cmd = new PosixParser().parse(opt, args); - } catch (ParseException e) { - LOG.error("Could not parse command", e); - System.exit(-1); - } - - // enable debug logging - Logger backupClientLogger = Logger.getLogger("org.apache.hadoop.hbase.backup"); - if (cmd.hasOption("debug")) { - backupClientLogger.setLevel(Level.DEBUG); - } - - // whether to overwrite to existing table if any, false by default - boolean isOverwrite = cmd.hasOption(OPTION_OVERWRITE); - if (isOverwrite) { - LOG.debug("Found -overwrite option in restore command, " - + "will overwrite to existing table if any in the restore target"); - } - - // whether to only check the dependencies, false by default - boolean check = cmd.hasOption(OPTION_CHECK); - if (check) { - LOG.debug("Found -check option in restore command, " - + "will check and verify the dependencies"); - } - - // whether to restore all dependencies, false by default - boolean autoRestore = cmd.hasOption(OPTION_AUTOMATIC); - if (autoRestore) { - LOG.debug("Found -automatic option in restore command, " - + "will automatically retore all the dependencies"); - } - - // parse main restore command options - String[] remainArgs = cmd.getArgs(); - if (remainArgs.length < 3) { - System.out.println("ERROR: missing arguments"); - System.out.println(USAGE); - System.exit(-1); - } - - String backupRootDir = remainArgs[0]; - String backupId = remainArgs[1]; - String tables = remainArgs[2]; - - String tableMapping = (remainArgs.length > 3) ? remainArgs[3] : null; - - String[] sTableArray = (tables != null) ? tables.split(DELIMITER_IN_COMMAND) : null; - String[] tTableArray = (tableMapping != null) ? tableMapping.split(DELIMITER_IN_COMMAND) : null; - - if (tableMapping != null && tTableArray != null && (sTableArray.length != tTableArray.length)) { - System.err.println("ERROR: table mapping mismatch: " + tables + " : " + tableMapping); - System.out.println(USAGE); - System.exit(-1); - } - - try { - HBackupFileSystem hBackupFS = new HBackupFileSystem(conf, new Path(backupRootDir), backupId); - restore_stage1(hBackupFS, backupRootDir, backupId, check, autoRestore, sTableArray, - tTableArray, isOverwrite); - } catch (IOException e) { - System.err.println("ERROR: " + e.getMessage()); - System.exit(-1); - } - } - - /** - * Restore operation. Stage 1: validate backupManifest, and check target tables - * @param hBackupFS to access the backup image - * @param backupRootDir The root dir for backup image - * @param backupId The backup id for image to be restored - * @param check True if only do dependency check - * @param autoRestore True if automatically restore following the dependency - * @param sTableArray The array of tables to be restored - * @param tTableArray The array of mapping tables to restore to - * @param isOverwrite True then do restore overwrite if target table exists, otherwise fail the - * request if target table exists - * @return True if only do dependency check - * @throws IOException if any failure during restore - */ - public static boolean restore_stage1(HBackupFileSystem hBackupFS, String backupRootDir, - String backupId, boolean check, boolean autoRestore, String[] sTableArray, - String[] tTableArray, boolean isOverwrite) throws IOException { - - HashMap<String, BackupManifest> backupManifestMap = new HashMap<String, BackupManifest>(); - // check and load backup image manifest for the tables - hBackupFS.checkImageManifestExist(backupManifestMap, sTableArray); - - try { - // Check and validate the backup image and its dependencies - if (check || autoRestore) { - if (validate(backupManifestMap)) { - LOG.info("Checking backup images: ok"); - } else { - String errMsg = "Some dependencies are missing for restore"; - LOG.error(errMsg); - throw new IOException(errMsg); - } - } - - // return true if only for check - if (check) { - return true; - } - - if (tTableArray == null) { - tTableArray = sTableArray; - } - - // check the target tables - checkTargetTables(tTableArray, isOverwrite); - - // start restore process - Set<BackupImage> restoreImageSet = - restore_stage2(hBackupFS, backupManifestMap, sTableArray, tTableArray, autoRestore); - - LOG.info("Restore for " + Arrays.asList(sTableArray) + " are successful!"); - lastRestoreImagesSet = restoreImageSet; - - } catch (IOException e) { - LOG.error("ERROR: restore failed with error: " + e.getMessage()); - throw e; - } - - // not only for check, return false - return false; - } - - /** - * Get last restore image set. The value is globally set for the latest finished restore. - * @return the last restore image set - */ - public static Set<BackupImage> getLastRestoreImagesSet() { - return lastRestoreImagesSet; - } - - private static boolean validate(HashMap<String, BackupManifest> backupManifestMap) - throws IOException { - boolean isValid = true; - - for (Entry<String, BackupManifest> manifestEntry : backupManifestMap.entrySet()) { - - String table = manifestEntry.getKey(); - TreeSet<BackupImage> imageSet = new TreeSet<BackupImage>(); - - ArrayList<BackupImage> depList = manifestEntry.getValue().getDependentListByTable(table); - if (depList != null && !depList.isEmpty()) { - imageSet.addAll(depList); - } - - // todo merge - LOG.debug("merge will be implemented in future jira"); - // BackupUtil.clearMergedImages(table, imageSet, conf); - - LOG.info("Dependent image(s) from old to new:"); - for (BackupImage image : imageSet) { - String imageDir = - HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table); - if (!HBackupFileSystem.checkPathExist(imageDir, getConf())) { - LOG.error("ERROR: backup image does not exist: " + imageDir); - isValid = false; - break; - } - // TODO More validation? - LOG.info("Backup image: " + image.getBackupId() + " for '" + table + "' is available"); - } - } - return isValid; - } - - /** - * Validate target Tables - * @param tTableArray: target tables - * @param isOverwrite overwrite existing table - * @throws IOException exception - */ - private static void checkTargetTables(String[] tTableArray, boolean isOverwrite) - throws IOException { - ArrayList<String> existTableList = new ArrayList<String>(); - ArrayList<String> disabledTableList = new ArrayList<String>(); - - // check if the tables already exist - HBaseAdmin admin = null; - Connection conn = null; - try { - conn = ConnectionFactory.createConnection(conf); - admin = (HBaseAdmin) conn.getAdmin(); - for (String tableName : tTableArray) { - if (admin.tableExists(TableName.valueOf(tableName))) { - existTableList.add(tableName); - if (admin.isTableDisabled(TableName.valueOf(tableName))) { - disabledTableList.add(tableName); - } - } else { - LOG.info("HBase table " + tableName - + " does not exist. It will be create during backup process"); - } - } - } finally { - if (admin != null) { - admin.close(); - } - if (conn != null) { - conn.close(); - } - } - - if (existTableList.size() > 0) { - if (!isOverwrite) { - LOG.error("Existing table found in the restore target, please add \"-overwrite\" " - + "option in the command if you mean to restore to these existing tables"); - LOG.info("Existing table list in restore target: " + existTableList); - throw new IOException("Existing table found in target while no \"-overwrite\" " - + "option found"); - } else { - if (disabledTableList.size() > 0) { - LOG.error("Found offline table in the restore target, " - + "please enable them before restore with \"-overwrite\" option"); - LOG.info("Offline table list in restore target: " + disabledTableList); - throw new IOException( - "Found offline table in the target when restore with \"-overwrite\" option"); - } - } - } - - } - - /** - * Restore operation. Stage 2: resolved Backup Image dependency - * @param hBackupFS to access the backup image - * @param backupManifestMap : tableName, Manifest - * @param sTableArray The array of tables to be restored - * @param tTableArray The array of mapping tables to restore to - * @param autoRestore : yes, restore all the backup images on the dependency list - * @return set of BackupImages restored - * @throws IOException exception - */ - private static Set<BackupImage> restore_stage2(HBackupFileSystem hBackupFS, - HashMap<String, BackupManifest> backupManifestMap, String[] sTableArray, - String[] tTableArray, boolean autoRestore) throws IOException { - TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>(); - - for (int i = 0; i < sTableArray.length; i++) { - restoreImageSet.clear(); - String table = sTableArray[i]; - BackupManifest manifest = backupManifestMap.get(table); - if (autoRestore) { - // Get the image list of this backup for restore in time order from old - // to new. - TreeSet<BackupImage> restoreList = - new TreeSet<BackupImage>(manifest.getDependentListByTable(table)); - LOG.debug("need to clear merged Image. to be implemented in future jira"); - - for (BackupImage image : restoreList) { - restoreImage(image, table, tTableArray[i]); - } - restoreImageSet.addAll(restoreList); - } else { - BackupImage image = manifest.getBackupImage(); - List<BackupImage> depList = manifest.getDependentListByTable(table); - // The dependency list always contains self. - if (depList != null && depList.size() > 1) { - LOG.warn("Backup image " + image.getBackupId() + " depends on other images.\n" - + "this operation will only restore the delta contained within backupImage " - + image.getBackupId()); - } - restoreImage(image, table, tTableArray[i]); - restoreImageSet.add(image); - } - - if (autoRestore) { - if (restoreImageSet != null && !restoreImageSet.isEmpty()) { - LOG.info("Restore includes the following image(s):"); - for (BackupImage image : restoreImageSet) { - LOG.info(" Backup: " - + image.getBackupId() - + " " - + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), - table)); - } - } - } - - } - return restoreImageSet; - } - - /** - * Restore operation handle each backupImage - * @param image: backupImage - * @param sTable: table to be restored - * @param tTable: table to be restored to - * @throws IOException exception - */ - private static void restoreImage(BackupImage image, String sTable, String tTable) - throws IOException { - - Configuration conf = getConf(); - - String rootDir = image.getRootDir(); - LOG.debug("Image root dir " + rootDir); - String backupId = image.getBackupId(); - - HBackupFileSystem hFS = new HBackupFileSystem(conf, new Path(rootDir), backupId); - RestoreUtil restoreTool = new RestoreUtil(conf, hFS); - BackupManifest manifest = hFS.getManifest(sTable); - - Path tableBackupPath = hFS.getTableBackupPath(sTable); - - // todo: convert feature will be provided in a future jira - boolean converted = false; - - if (manifest.getType().equals(BackupRestoreConstants.BACKUP_TYPE_FULL) || converted) { - LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from " - + (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString()); - restoreTool.fullRestoreTable(tableBackupPath, sTable, tTable, converted); - } else { // incremental Backup - String logBackupDir = - HBackupFileSystem.getLogBackupDir(image.getRootDir(), image.getBackupId()); - LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from incremental backup image " - + logBackupDir); - restoreTool.incrementalRestoreTable(logBackupDir, new String[] { sTable }, - new String[] { tTable }); - } - - LOG.info(sTable + " has been successfully restored to " + tTable); - } - - /** - * Set the configuration from a given one. - * @param newConf A new given configuration - */ - public synchronized static void setConf(Configuration newConf) { - conf = newConf; - } - - /** - * Get and merge Hadoop and HBase configuration. - * @throws IOException exception - */ - protected static Configuration getConf() { - if (conf == null) { - synchronized (RestoreClient.class) { - conf = new Configuration(); - HBaseConfiguration.merge(conf, HBaseConfiguration.create()); - } - } - return conf; - } - - private static void disableUselessLoggers() { - // disable zookeeper log to avoid it mess up command output - Logger zkLogger = Logger.getLogger("org.apache.zookeeper"); - LOG.debug("Zookeeper log level before set: " + zkLogger.getLevel()); - zkLogger.setLevel(Level.OFF); - LOG.debug("Zookeeper log level after set: " + zkLogger.getLevel()); - - // disable hbase zookeeper tool log to avoid it mess up command output - Logger hbaseZkLogger = Logger.getLogger("org.apache.hadoop.hbase.zookeeper"); - LOG.debug("HBase zookeeper log level before set: " + hbaseZkLogger.getLevel()); - hbaseZkLogger.setLevel(Level.OFF); - LOG.debug("HBase Zookeeper log level after set: " + hbaseZkLogger.getLevel()); - - // disable hbase client log to avoid it mess up command output - Logger hbaseClientLogger = Logger.getLogger("org.apache.hadoop.hbase.client"); - LOG.debug("HBase client log level before set: " + hbaseClientLogger.getLevel()); - hbaseClientLogger.setLevel(Level.OFF); - LOG.debug("HBase client log level after set: " + hbaseClientLogger.getLevel()); - - // disable other related log to avoid mess up command output - Logger otherLogger = Logger.getLogger("org.apache.hadoop.hbase.io.hfile"); - otherLogger.setLevel(Level.OFF); - otherLogger = Logger.getLogger("org.apache.hadoop.hbase.util"); - otherLogger.setLevel(Level.OFF); - otherLogger = Logger.getLogger("org.apache.hadoop.hbase.mapreduce"); - otherLogger.setLevel(Level.OFF); - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java deleted file mode 100644 index bdb7988..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java +++ /dev/null @@ -1,503 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup; - -import java.io.EOFException; -import java.io.IOException; -import java.text.ParseException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.NavigableSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; -import org.apache.hadoop.hbase.snapshot.SnapshotManifest; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALSplitter; - -/** - * A collection for methods used by multiple classes to restore HBase tables. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class RestoreUtil { - - public static final Log LOG = LogFactory.getLog(RestoreUtil.class); - - protected Configuration conf = null; - - protected HBackupFileSystem hBackupFS = null; - - // store table name and snapshot dir mapping - private final HashMap<String, Path> snapshotMap = new HashMap<String, Path>(); - - public RestoreUtil(Configuration conf, HBackupFileSystem hBackupFS) throws IOException { - this.conf = conf; - this.hBackupFS = hBackupFS; - } - - /** - * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently - * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in - * the future - * @param logDir : incremental backup folders, which contains WAL - * @param tableNames : source tableNames(table names were backuped) - * @param newTableNames : target tableNames(table names to be restored to) - * @throws IOException exception - */ - public void incrementalRestoreTable(String logDir, String[] tableNames, String[] newTableNames) - throws IOException { - - if (tableNames.length != newTableNames.length) { - throw new IOException("Number of source tables adn taget Tables does not match!"); - } - - // for incremental backup image, expect the table already created either by user or previous - // full backup. Here, check that all new tables exists - HBaseAdmin admin = null; - Connection conn = null; - try { - conn = ConnectionFactory.createConnection(conf); - admin = (HBaseAdmin) conn.getAdmin(); - for (String tableName : newTableNames) { - if (!admin.tableExists(TableName.valueOf(tableName))) { - admin.close(); - throw new IOException("HBase table " + tableName - + " does not exist. Create the table first, e.g. by restoring a full backup."); - } - } - IncrementalRestoreService restoreService = - BackupRestoreServiceFactory.getIncrementalRestoreService(conf); - - restoreService.run(logDir, tableNames, newTableNames); - } finally { - if (admin != null) { - admin.close(); - } - if(conn != null){ - conn.close(); - } - } - } - - public void fullRestoreTable(Path tableBackupPath, String tableName, String newTableName, - boolean converted) throws IOException { - - restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted); - } - - private void restoreTableAndCreate(String tableName, String newTableName, Path tableBackupPath, - boolean converted) throws IOException { - if (newTableName == null || newTableName.equals("")) { - newTableName = tableName; - } - - FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); - - // get table descriptor first - HTableDescriptor tableDescriptor = null; - - Path tableSnapshotPath = hBackupFS.getTableSnapshotPath(tableName); - - if (fileSys.exists(tableSnapshotPath)) { - // snapshot path exist means the backup path is in HDFS - // check whether snapshot dir already recorded for target table - if (snapshotMap.get(tableName) != null) { - SnapshotDescription desc = - SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath); - SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc); - tableDescriptor = manifest.getTableDescriptor(); - LOG.debug("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString() - + " while tableName = " + tableName); - // HBase 96.0 and 98.0 - // tableDescriptor = - // FSTableDescriptors.getTableDescriptorFromFs(fileSys, snapshotMap.get(tableName)); - } else { - tableDescriptor = hBackupFS.getTableDesc(tableName); - LOG.debug("tableSnapshotPath=" + tableSnapshotPath.toString()); - snapshotMap.put(tableName, hBackupFS.getTableInfoPath(tableName)); - } - if (tableDescriptor == null) { - LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost"); - } - } else if (converted) { - // first check if this is a converted backup image - LOG.error("convert will be supported in a future jira"); - } - - Path tableArchivePath = hBackupFS.getTableArchivePath(tableName); - if (tableArchivePath == null) { - if (tableDescriptor != null) { - // find table descriptor but no archive dir means the table is empty, create table and exit - LOG.debug("find table descriptor but no archive dir for table " + tableName - + ", will only create table"); - tableDescriptor.setName(Bytes.toBytes(newTableName)); - checkAndCreateTable(tableBackupPath, tableName, newTableName, null, tableDescriptor); - return; - } else { - throw new IllegalStateException("Cannot restore hbase table because directory '" - + " tableArchivePath is null."); - } - } - - if (tableDescriptor == null) { - tableDescriptor = new HTableDescriptor(newTableName); - } else { - tableDescriptor.setName(Bytes.toBytes(newTableName)); - } - - if (!converted) { - // record all region dirs: - // load all files in dir - try { - ArrayList<Path> regionPathList = hBackupFS.getRegionList(tableName); - - // should only try to create the table with all region informations, so we could pre-split - // the regions in fine grain - checkAndCreateTable(tableBackupPath, tableName, newTableName, regionPathList, - tableDescriptor); - if (tableArchivePath != null) { - // start real restore through bulkload - // if the backup target is on local cluster, special action needed - Path tempTableArchivePath = hBackupFS.checkLocalAndBackup(tableArchivePath); - if (tempTableArchivePath.equals(tableArchivePath)) { - LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); - } else { - regionPathList = hBackupFS.getRegionList(tempTableArchivePath); // point to the tempDir - LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); - } - - LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); - for (Path regionPath : regionPathList) { - String regionName = regionPath.toString(); - LOG.debug("Restoring HFiles from directory " + regionName); - String[] args = { regionName, newTableName }; - loader.run(args); - } - } - // restore the recovered.edits if exists - replayRecoveredEditsIfAny(tableBackupPath, tableName, tableDescriptor); - } catch (Exception e) { - throw new IllegalStateException("Cannot restore hbase table", e); - } - } else { - LOG.debug("convert will be supported in a future jira"); - } - } - - /** - * Replay recovered edits from backup. - */ - private void replayRecoveredEditsIfAny(Path tableBackupPath, String tableName, - HTableDescriptor newTableHtd) throws IOException { - - LOG.debug("Trying to replay the recovered.edits if exist to the target table " - + newTableHtd.getNameAsString() + " from the backup of table " + tableName + "."); - - FileSystem fs = tableBackupPath.getFileSystem(this.conf); - ArrayList<Path> regionDirs = hBackupFS.getRegionList(tableName); - - if (regionDirs == null || regionDirs.size() == 0) { - LOG.warn("No recovered.edits to be replayed for empty backup of table " + tableName + "."); - return; - } - - Connection conn = null; - try { - - conn = ConnectionFactory.createConnection(conf); - - for (Path regionDir : regionDirs) { - // OLD: NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regionDir); - NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regionDir); - - if (files == null || files.isEmpty()) { - LOG.warn("No recovered.edits found for the region " + regionDir.getName() + "."); - return; - } - - for (Path edits : files) { - if (edits == null || !fs.exists(edits)) { - LOG.warn("Null or non-existent edits file: " + edits); - continue; - } - - HTable table = null; - try { - table = (HTable) conn.getTable(newTableHtd.getTableName()); - replayRecoveredEdits(table, fs, edits); - table.flushCommits(); - table.close(); - } catch (IOException e) { - boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); - if (skipErrors) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); - LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS - + "=true so continuing. Renamed " + edits + " as " + p, e); - } else { - throw e; - } - } finally { - if (table != null) { - table.close(); - } - } - } // for each edit file under a region - } // for each region - - } finally { - if (conn != null) { - conn.close(); - } - } - } - - /** - * Restore process for an edit entry. - * @param htable The target table of restore - * @param key HLog key - * @param val KVs - * @throws IOException exception - */ - private void restoreEdit(HTable htable, WALKey key, WALEdit val) throws IOException { - Put put = null; - Delete del = null; - Cell lastKV = null; - for (Cell kv : val.getCells()) { - // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit - if (WALEdit.isMetaEditFamily(CellUtil.cloneFamily(kv))) { - continue; - } - - // A WALEdit may contain multiple operations (HBASE-3584) and/or - // multiple rows (HBASE-5229). - // Aggregate as much as possible into a single Put/Delete - // operation before apply the action to the table. - if (lastKV == null || lastKV.getTypeByte() != kv.getTypeByte() - || !CellUtil.matchingRow(lastKV, kv)) { - // row or type changed, write out aggregate KVs. - if (put != null) { - applyAction(htable, put); - } - if (del != null) { - applyAction(htable, del); - } - - if (CellUtil.isDelete(kv)) { - del = new Delete(CellUtil.cloneRow(kv)); - } else { - put = new Put(CellUtil.cloneRow(kv)); - } - } - if (CellUtil.isDelete(kv)) { - del.addDeleteMarker(kv); - } else { - put.add(kv); - } - lastKV = kv; - } - // write residual KVs - if (put != null) { - applyAction(htable, put); - } - if (del != null) { - applyAction(htable, del); - } - } - - /** - * Apply an action (Put/Delete) to table. - * @param table table - * @param action action - * @throws IOException exception - */ - private void applyAction(HTable table, Mutation action) throws IOException { - // The actions are not immutable, so we defensively copy them - if (action instanceof Put) { - Put put = new Put((Put) action); - // put.setWriteToWAL(false); - // why do not we do WAL? - put.setDurability(Durability.SKIP_WAL); - table.put(put); - } else if (action instanceof Delete) { - Delete delete = new Delete((Delete) action); - table.delete(delete); - } else { - throw new IllegalArgumentException("action must be either Delete or Put"); - } - } - - /** - * Replay the given edits. - * @param htable The target table of restore - * @param fs File system - * @param edits Recovered.edits to be replayed - * @throws IOException exception - */ - private void replayRecoveredEdits(HTable htable, FileSystem fs, Path edits) throws IOException { - LOG.debug("Replaying edits from " + edits + "; path=" + edits); - - WAL.Reader reader = null; - try { - reader = WALFactory.createReader(fs, edits, this.conf); - long editsCount = 0; - WAL.Entry entry; - - try { - while ((entry = reader.next()) != null) { - restoreEdit(htable, entry.getKey(), entry.getEdit()); - editsCount++; - } - LOG.debug(editsCount + " edits from " + edits + " have been replayed."); - - } catch (EOFException eof) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); - String msg = - "Encountered EOF. Most likely due to Master failure during " - + "log spliting, so we have this data in another edit. " - + "Continuing, but renaming " + edits + " as " + p; - LOG.warn(msg, eof); - } catch (IOException ioe) { - // If the IOE resulted from bad file format, - // then this problem is idempotent and retrying won't help - if (ioe.getCause() instanceof ParseException) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); - String msg = - "File corruption encountered! " + "Continuing, but renaming " + edits + " as " + p; - LOG.warn(msg, ioe); - } else { - // other IO errors may be transient (bad network connection, - // checksum exception on one datanode, etc). throw & retry - throw ioe; - } - } - } finally { - if (reader != null) { - reader.close(); - } - } - } - - /** - * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full - * backup. - * @return the {@link LoadIncrementalHFiles} instance - * @throws IOException exception - */ - private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables) - throws IOException { - // set configuration for restore: - // LoadIncrementalHFile needs more time - // <name>hbase.rpc.timeout</name> <value>600000</value> - // calculates - Integer milliSecInMin = 60000; - Integer previousMillis = this.conf.getInt("hbase.rpc.timeout", 0); - Integer numberOfFilesInDir = - multipleTables ? hBackupFS.getMaxNumberOfFilesInSubDir(tableArchivePath) : hBackupFS - .getNumberOfFilesInDir(tableArchivePath); - Integer calculatedMillis = numberOfFilesInDir * milliSecInMin; // 1 minute per file - Integer resultMillis = Math.max(calculatedMillis, previousMillis); - if (resultMillis > previousMillis) { - LOG.info("Setting configuration for restore with LoadIncrementalHFile: " - + "hbase.rpc.timeout to " + calculatedMillis / milliSecInMin - + " minutes, to handle the number of files in backup " + tableArchivePath); - this.conf.setInt("hbase.rpc.timeout", resultMillis); - } - - LoadIncrementalHFiles loader = null; - try { - loader = new LoadIncrementalHFiles(this.conf); - } catch (Exception e1) { - throw new IOException(e1); - } - return loader; - } - - /** - * Prepare the table for bulkload, most codes copied from - * {@link LoadIncrementalHFiles#createTable(String, String)} - * @param tableBackupPath path - * @param tableName table name - * @param targetTableName target table name - * @param regionDirList region directory list - * @param htd table descriptor - * @throws IOException exception - */ - private void checkAndCreateTable(Path tableBackupPath, String tableName, String targetTableName, - ArrayList<Path> regionDirList, HTableDescriptor htd) throws IOException { - HBaseAdmin hbadmin = null; - Connection conn = null; - try { - conn = ConnectionFactory.createConnection(conf); - hbadmin = (HBaseAdmin) conn.getAdmin(); - if (hbadmin.tableExists(TableName.valueOf(targetTableName))) { - LOG.info("Using exising target table '" + targetTableName + "'"); - } else { - LOG.info("Creating target table '" + targetTableName + "'"); - - // if no region dir given, create the table and return - if (regionDirList == null || regionDirList.size() == 0) { - - hbadmin.createTable(htd); - return; - } - - byte[][] keys = hBackupFS.generateBoundaryKeys(regionDirList); - - // create table using table decriptor and region boundaries - hbadmin.createTable(htd, keys); - } - } catch (Exception e) { - throw new IOException(e); - } finally { - if (hbadmin != null) { - hbadmin.close(); - } - if(conn != null){ - conn.close(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java deleted file mode 100644 index a3b5db5..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java +++ /dev/null @@ -1,292 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.mapreduce; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.math.BigDecimal; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.backup.BackupCopyService; -import org.apache.hadoop.hbase.backup.BackupHandler; -import org.apache.hadoop.hbase.backup.BackupUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.snapshot.ExportSnapshot; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.tools.DistCp; -import org.apache.hadoop.tools.DistCpConstants; -import org.apache.hadoop.tools.DistCpOptions; -/** - * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot, - * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper - * implementation. The other is copying for incremental log files, which bases on extending - * DistCp's function with copy progress reporting to ZooKeeper implementation. - * - * For now this is only a wrapper. The other features such as progress and increment backup will be - * implemented in future jira - */ - -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class MapReduceBackupCopyService implements BackupCopyService { - private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyService.class); - - private Configuration conf; - // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024; - - // Accumulated progress within the whole backup process for the copy operation - private float progressDone = 0.1f; - private long bytesCopied = 0; - private static float INIT_PROGRESS = 0.1f; - - // The percentage of the current copy task within the whole task if multiple time copies are - // needed. The default value is 100%, which means only 1 copy task for the whole. - private float subTaskPercntgInWholeTask = 1f; - - public MapReduceBackupCopyService() { - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - /** - * Get the current copy task percentage within the whole task if multiple copies are needed. - * @return the current copy task percentage - */ - public float getSubTaskPercntgInWholeTask() { - return subTaskPercntgInWholeTask; - } - - /** - * Set the current copy task percentage within the whole task if multiple copies are needed. Must - * be called before calling - * {@link #copy(BackupHandler, Configuration, Type, String[])} - * @param subTaskPercntgInWholeTask The percentage of the copy subtask - */ - public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) { - this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask; - } - - class SnapshotCopy extends ExportSnapshot { - private BackupHandler backupHandler; - private String table; - - public SnapshotCopy(BackupHandler backupHandler, String table) { - super(); - this.backupHandler = backupHandler; - this.table = table; - } - - public BackupHandler getBackupHandler() { - return this.backupHandler; - } - - public String getTable() { - return this.table; - } - } - - // Extends DistCp for progress updating to hbase:backup - // during backup. Using DistCpV2 (MAPREDUCE-2765). - // Simply extend it and override execute() method to get the - // Job reference for progress updating. - // Only the argument "src1, [src2, [...]] dst" is supported, - // no more DistCp options. - class BackupDistCp extends DistCp { - - private BackupHandler backupHandler; - - public BackupDistCp(Configuration conf, DistCpOptions options, BackupHandler backupHandler) - throws Exception { - super(conf, options); - this.backupHandler = backupHandler; - } - - @Override - public Job execute() throws Exception { - - // reflection preparation for private methods and fields - Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class; - Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath"); - Method methodCreateJob = classDistCp.getDeclaredMethod("createJob"); - Method methodCreateInputFileListing = - classDistCp.getDeclaredMethod("createInputFileListing", Job.class); - Method methodCleanup = classDistCp.getDeclaredMethod("cleanup"); - - Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions"); - Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder"); - Field fieldJobFS = classDistCp.getDeclaredField("jobFS"); - Field fieldSubmitted = classDistCp.getDeclaredField("submitted"); - - methodCreateMetaFolderPath.setAccessible(true); - methodCreateJob.setAccessible(true); - methodCreateInputFileListing.setAccessible(true); - methodCleanup.setAccessible(true); - - fieldInputOptions.setAccessible(true); - fieldMetaFolder.setAccessible(true); - fieldJobFS.setAccessible(true); - fieldSubmitted.setAccessible(true); - - // execute() logic starts here - assert fieldInputOptions.get(this) != null; - assert getConf() != null; - - Job job = null; - try { - synchronized (this) { - // Don't cleanup while we are setting up. - fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this)); - fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(getConf())); - - job = (Job) methodCreateJob.invoke(this); - } - methodCreateInputFileListing.invoke(this, job); - - // Get the total length of the source files - List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths(); - long totalSrcLgth = 0; - for (Path aSrc : srcs) { - totalSrcLgth += BackupUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc); - } - - // submit the copy job - job.submit(); - fieldSubmitted.set(this, true); - - // after submit the MR job, set its handler in backup handler for cancel process - // this.backupHandler.copyJob = job; - - // Update the copy progress to ZK every 0.5s if progress value changed - int progressReportFreq = - this.getConf().getInt("hbase.backup.progressreport.frequency", 500); - float lastProgress = progressDone; - while (!job.isComplete()) { - float newProgress = - progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); - - if (newProgress > lastProgress) { - - BigDecimal progressData = - new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); - String newProgressStr = progressData + "%"; - LOG.info("Progress: " + newProgressStr); - this.backupHandler.updateProgress(newProgressStr, bytesCopied); - LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr - + ".\""); - lastProgress = newProgress; - } - Thread.sleep(progressReportFreq); - } - - // update the progress data after copy job complete - float newProgress = - progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); - BigDecimal progressData = - new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); - - String newProgressStr = progressData + "%"; - LOG.info("Progress: " + newProgressStr); - - // accumulate the overall backup progress - progressDone = newProgress; - bytesCopied += totalSrcLgth; - - this.backupHandler.updateProgress(newProgressStr, bytesCopied); - LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr - + " - " + bytesCopied + " bytes copied.\""); - - } finally { - if (!fieldSubmitted.getBoolean(this)) { - methodCleanup.invoke(this); - } - } - - String jobID = job.getJobID().toString(); - job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); - - LOG.debug("DistCp job-id: " + jobID); - return job; - } - - } - - /** - * Do backup copy based on different types. - * @param handler The backup handler reference - * @param conf The hadoop configuration - * @param copyType The backup copy type - * @param options Options for customized ExportSnapshot or DistCp - * @throws Exception exception - */ - public int copy(BackupHandler handler, Configuration conf, BackupCopyService.Type copyType, - String[] options) throws IOException { - - int res = 0; - - try { - if (copyType == Type.FULL) { - SnapshotCopy snapshotCp = - new SnapshotCopy(handler, handler.getBackupContext().getTableBySnapshot(options[1])); - LOG.debug("Doing SNAPSHOT_COPY"); - // Make a new instance of conf to be used by the snapshot copy class. - snapshotCp.setConf(new Configuration(conf)); - res = snapshotCp.run(options); - } else if (copyType == Type.INCREMENTAL) { - LOG.debug("Doing COPY_TYPE_DISTCP"); - setSubTaskPercntgInWholeTask(1f); - - BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, handler); - // Handle a special case where the source file is a single file. - // In this case, distcp will not create the target dir. It just take the - // target as a file name and copy source file to the target (as a file name). - // We need to create the target dir before run distcp. - LOG.debug("DistCp options: " + Arrays.toString(options)); - if (options.length == 2) { - Path dest = new Path(options[1]); - FileSystem destfs = dest.getFileSystem(conf); - if (!destfs.exists(dest)) { - destfs.mkdirs(dest); - } - } - - res = distcp.run(options); - } - return res; - - } catch (Exception e) { - throw new IOException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java deleted file mode 100644 index deefbf7..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.mapreduce; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.IncrementalRestoreService; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.mapreduce.WALPlayer; - -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class MapReduceRestoreService implements IncrementalRestoreService { - public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class); - - private WALPlayer player; - - public MapReduceRestoreService() { - this.player = new WALPlayer(); - } - - @Override - public void run(String logDir, String[] tableNames, String[] newTableNames) throws IOException { - String tableStr = HBackupFileSystem.join(tableNames); - String newTableStr = HBackupFileSystem.join(newTableNames); - - // WALPlayer reads all files in arbitrary directory structure and creates a Map task for each - // log file - - String[] playerArgs = { logDir, tableStr, newTableStr }; - LOG.info("Restore incremental backup from directory " + logDir + " from hbase tables " - + HBackupFileSystem.join(tableNames) + " to tables " - + HBackupFileSystem.join(newTableNames)); - try { - player.run(playerArgs); - } catch (Exception e) { - throw new IOException("cannot restore from backup directory " + logDir - + " (check Hadoop and HBase logs) " + e); - } - } - - @Override - public Configuration getConf() { - return player.getConf(); - } - - @Override - public void setConf(Configuration conf) { - this.player.setConf(conf); - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java deleted file mode 100644 index 4712548..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.backup.master; - -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; - -import java.io.IOException; -import java.util.ArrayList; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.backup.BackupSystemTable; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; - - - - -/** - * Implementation of a log cleaner that checks if a log is still scheduled for - * incremental backup before deleting it when its TTL is over. - */ -@InterfaceStability.Evolving -@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class BackupLogCleaner extends BaseLogCleanerDelegate { - private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class); - - private boolean stopped = false; - - public BackupLogCleaner() { - } - - @Override - public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { - // all members of this class are null if backup is disabled, - // so we cannot filter the files - if (this.getConf() == null) { - return files; - } - - try { - final BackupSystemTable table = BackupSystemTable.getTable(getConf()); - // If we do not have recorded backup sessions - if (table.hasBackupSessions() == false) { - return files; - } - return Iterables.filter(files, new Predicate<FileStatus>() { - @Override - public boolean apply(FileStatus file) { - try { - String wal = file.getPath().toString(); - boolean logInSystemTable = table.checkWALFile(wal); - if (LOG.isDebugEnabled()) { - if (logInSystemTable) { - LOG.debug("Found log file in hbase:backup, deleting: " + wal); - } else { - LOG.debug("Didn't find this log in hbase:backup, keeping: " + wal); - } - } - return logInSystemTable; - } catch (IOException e) { - LOG.error(e); - return false;// keep file for a while, HBase failed - } - } - }); - } catch (IOException e) { - LOG.error("Failed to get hbase:backup table, therefore will keep all files", e); - // nothing to delete - return new ArrayList<FileStatus>(); - } - - } - - @Override - public void setConf(Configuration config) { - // If backup is disabled, keep all members null - if (!config.getBoolean(HConstants.BACKUP_ENABLE_KEY, HConstants.BACKUP_ENABLE_DEFAULT)) { - LOG.warn("Backup is disabled - allowing all wals to be deleted"); - return; - } - super.setConf(config); - } - - @Override - public void stop(String why) { - if (this.stopped) { - return; - } - this.stopped = true; - LOG.info("Stopping BackupLogCleaner"); - } - - @Override - public boolean isStopped() { - return this.stopped; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java deleted file mode 100644 index f96682f..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.master; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ThreadPoolExecutor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; -import org.apache.hadoop.hbase.errorhandling.ForeignException; -import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.MetricsMaster; -import org.apache.hadoop.hbase.procedure.MasterProcedureManager; -import org.apache.hadoop.hbase.procedure.Procedure; -import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; -import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription; -import org.apache.zookeeper.KeeperException; - -public class LogRollMasterProcedureManager extends MasterProcedureManager { - - public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc"; - public static final String ROLLLOG_PROCEDURE_NAME = "rolllog"; - private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class); - - private MasterServices master; - private ProcedureCoordinator coordinator; - private boolean done; - - @Override - public void stop(String why) { - LOG.info("stop: " + why); - } - - @Override - public boolean isStopped() { - return false; - } - - @Override - public void initialize(MasterServices master, MetricsMaster metricsMaster) - throws KeeperException, IOException, UnsupportedOperationException { - this.master = master; - this.done = false; - - // setup the default procedure coordinator - String name = master.getServerName().toString(); - ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); - BaseCoordinatedStateManager coordManager = - (BaseCoordinatedStateManager) CoordinatedStateManagerFactory - .getCoordinatedStateManager(master.getConfiguration()); - coordManager.initialize(master); - - ProcedureCoordinatorRpcs comms = - coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); - - this.coordinator = new ProcedureCoordinator(comms, tpool); - } - - @Override - public String getProcedureSignature() { - return ROLLLOG_PROCEDURE_SIGNATURE; - } - - @Override - public void execProcedure(ProcedureDescription desc) throws IOException { - this.done = false; - // start the process on the RS - ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); - List<ServerName> serverNames = master.getServerManager().getOnlineServersList(); - List<String> servers = new ArrayList<String>(); - for (ServerName sn : serverNames) { - servers.add(sn.toString()); - } - Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers); - if (proc == null) { - String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'"; - LOG.error(msg); - throw new IOException(msg); - } - - try { - // wait for the procedure to complete. A timer thread is kicked off that should cancel this - // if it takes too long. - proc.waitForCompleted(); - LOG.info("Done waiting - exec procedure for " + desc.getInstance()); - LOG.info("Distributed roll log procedure is successful!"); - this.done = true; - } catch (InterruptedException e) { - ForeignException ee = - new ForeignException("Interrupted while waiting for roll log procdure to finish", e); - monitor.receive(ee); - Thread.currentThread().interrupt(); - } catch (ForeignException e) { - ForeignException ee = - new ForeignException("Exception while waiting for roll log procdure to finish", e); - monitor.receive(ee); - } - monitor.rethrowException(); - } - - @Override - public boolean isProcedureDone(ProcedureDescription desc) throws IOException { - return done; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java deleted file mode 100644 index 618748e..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.regionserver; - -import java.util.HashMap; -import java.util.concurrent.Callable; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.backup.BackupSystemTable; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; -import org.apache.hadoop.hbase.errorhandling.ForeignException; -import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; -import org.apache.hadoop.hbase.procedure.ProcedureMember; -import org.apache.hadoop.hbase.procedure.Subprocedure; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.regionserver.wal.FSHLog; - - -/** - * This backup subprocedure implementation forces a log roll on the RS. - */ -public class LogRollBackupSubprocedure extends Subprocedure { - private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class); - - private final RegionServerServices rss; - private final LogRollBackupSubprocedurePool taskManager; - private FSHLog hlog; - - public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member, - ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, - LogRollBackupSubprocedurePool taskManager) { - - super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener, - wakeFrequency, timeout); - LOG.info("Constructing a LogRollBackupSubprocedure."); - this.rss = rss; - this.taskManager = taskManager; - } - - /** - * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified - * with no use of subprocedurepool. - */ - class RSRollLogTask implements Callable<Void> { - RSRollLogTask() { - } - - @Override - public Void call() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("++ DRPC started: " + rss.getServerName()); - } - hlog = (FSHLog) rss.getWAL(null); - long filenum = hlog.getFilenum(); - - LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum); - hlog.rollWriter(true); - LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum()); - // write the log number to hbase:backup. - BackupSystemTable table = BackupSystemTable.getTable(rss.getConfiguration()); - // sanity check, good for testing - HashMap<String, String> serverTimestampMap = table.readRegionServerLastLogRollResult(); - String host = rss.getServerName().getHostname(); - String sts = serverTimestampMap.get(host); - if (sts != null && Long.parseLong(sts) > filenum) { - LOG.warn("Won't update server's last roll log result: current=" + sts + " new=" + filenum); - return null; - } - table.writeRegionServerLastLogRollResult(host, Long.toString(filenum)); - // TODO: potential leak of HBase connection - // BackupSystemTable.close(); - return null; - } - - } - - private void rolllog() throws ForeignException { - - monitor.rethrowException(); - - taskManager.submitTask(new RSRollLogTask()); - monitor.rethrowException(); - - // wait for everything to complete. - taskManager.waitForOutstandingTasks(); - monitor.rethrowException(); - - } - - @Override - public void acquireBarrier() throws ForeignException { - // do nothing, executing in inside barrier step. - } - - /** - * do a log roll. - * @return some bytes - */ - @Override - public byte[] insideBarrier() throws ForeignException { - rolllog(); - // FIXME - return null; - } - - /** - * Cancel threads if they haven't finished. - */ - @Override - public void cleanup(Exception e) { - taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e); - } - - /** - * Hooray! - */ - public void releaseBarrier() { - // NO OP - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java deleted file mode 100644 index 1ca638c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.regionserver; - -import java.io.Closeable; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.DaemonThreadFactory; -import org.apache.hadoop.hbase.errorhandling.ForeignException; - -/** - * Handle running each of the individual tasks for completing a backup procedure - * on a regionserver. - */ -public class LogRollBackupSubprocedurePool implements Closeable, Abortable { - private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class); - - /** Maximum number of concurrent snapshot region tasks that can run concurrently */ - private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks"; - private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3; - - private final ExecutorCompletionService<Void> taskPool; - private final ThreadPoolExecutor executor; - private volatile boolean aborted; - private final List<Future<Void>> futures = new ArrayList<Future<Void>>(); - private final String name; - - public LogRollBackupSubprocedurePool(String name, Configuration conf) { - // configure the executor service - long keepAlive = - conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY, - LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT); - int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS); - this.name = name; - executor = - new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name - + ")-backup-pool")); - taskPool = new ExecutorCompletionService<Void>(executor); - } - - /** - * Submit a task to the pool. - */ - public void submitTask(final Callable<Void> task) { - Future<Void> f = this.taskPool.submit(task); - futures.add(f); - } - - /** - * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} - * @return <tt>true</tt> on success, <tt>false</tt> otherwise - * @throws ForeignException exception - */ - public boolean waitForOutstandingTasks() throws ForeignException { - LOG.debug("Waiting for backup procedure to finish."); - - try { - for (Future<Void> f : futures) { - f.get(); - } - return true; - } catch (InterruptedException e) { - if (aborted) { - throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!", - e); - } - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - if (e.getCause() instanceof ForeignException) { - throw (ForeignException) e.getCause(); - } - throw new ForeignException(name, e.getCause()); - } finally { - // close off remaining tasks - for (Future<Void> f : futures) { - if (!f.isDone()) { - f.cancel(true); - } - } - } - return false; - } - - /** - * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly - * finish - */ - @Override - public void close() { - executor.shutdown(); - } - - @Override - public void abort(String why, Throwable e) { - if (this.aborted) { - return; - } - - this.aborted = true; - LOG.warn("Aborting because: " + why, e); - this.executor.shutdownNow(); - } - - @Override - public boolean isAborted() { - return this.aborted; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java deleted file mode 100644 index aca190c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.regionserver; - - -import java.io.IOException; -import java.util.concurrent.ThreadPoolExecutor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; -import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; -import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; -import org.apache.hadoop.hbase.procedure.ProcedureMember; -import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; -import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; -import org.apache.hadoop.hbase.procedure.Subprocedure; -import org.apache.hadoop.hbase.procedure.SubprocedureFactory; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; - -/** - * This manager class handles the work dealing with backup for a {@link HRegionServer}. - * <p> - * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is - * responsible by this region server. If any failures occur with the subprocedure, the manager's - * procedure member notifies the procedure coordinator to abort all others. - * <p> - * On startup, requires {@link #start()} to be called. - * <p> - * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be - * called - */ -public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager { - - private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class); - - /** Conf key for number of request threads to start backup on regionservers */ - public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads"; - /** # of threads for backup work on the rs. */ - public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10; - - public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout"; - public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000; - - /** Conf key for millis between checks to see if backup work completed or if there are errors */ - public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency"; - /** Default amount of time to check for errors while regions finish backup work */ - private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500; - - private RegionServerServices rss; - private ProcedureMemberRpcs memberRpcs; - private ProcedureMember member; - - /** - * Create a default backup procedure manager - */ - public LogRollRegionServerProcedureManager() { - } - - /** - * Start accepting backup procedure requests. - */ - @Override - public void start() { - this.memberRpcs.start(rss.getServerName().toString(), member); - LOG.info("Started region server backup manager."); - } - - /** - * Close <tt>this</tt> and all running backup procedure tasks - * @param force forcefully stop all running tasks - * @throws IOException exception - */ - @Override - public void stop(boolean force) throws IOException { - String mode = force ? "abruptly" : "gracefully"; - LOG.info("Stopping RegionServerBackupManager " + mode + "."); - - try { - this.member.close(); - } finally { - this.memberRpcs.close(); - } - } - - /** - * If in a running state, creates the specified subprocedure for handling a backup procedure. - * @return Subprocedure to submit to the ProcedureMemeber. - */ - public Subprocedure buildSubprocedure() { - - // don't run a backup if the parent is stop(ping) - if (rss.isStopping() || rss.isStopped()) { - throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName() - + ", because stopping/stopped!"); - } - - LOG.info("Attempting to run a roll log procedure for backup."); - ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); - Configuration conf = rss.getConfiguration(); - long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); - long wakeMillis = - conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT); - - LogRollBackupSubprocedurePool taskManager = - new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf); - return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis, - taskManager); - - } - - /** - * Build the actual backup procedure runner that will do all the 'hard' work - */ - public class BackupSubprocedureBuilder implements SubprocedureFactory { - - @Override - public Subprocedure buildSubprocedure(String name, byte[] data) { - return LogRollRegionServerProcedureManager.this.buildSubprocedure(); - } - } - - @Override - public void initialize(RegionServerServices rss) throws IOException { - this.rss = rss; - BaseCoordinatedStateManager coordManager = - (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.getCoordinatedStateManager(rss - .getConfiguration()); - coordManager.initialize(rss); - this.memberRpcs = - coordManager - .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); - - // read in the backup handler configuration properties - Configuration conf = rss.getConfiguration(); - long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); - int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT); - // create the actual cohort member - ThreadPoolExecutor pool = - ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); - this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder()); - } - - @Override - public String getProcedureSignature() { - return "backup-proc"; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java index 3342743..ae36f08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java @@ -17,11 +17,7 @@ */ package org.apache.hadoop.hbase.coordination; -import java.io.IOException; - import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; -import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.Server; @@ -55,21 +51,8 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan * Method to retrieve coordination for split log worker */ public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination(); - /** * Method to retrieve coordination for split log manager */ public abstract SplitLogManagerCoordination getSplitLogManagerCoordination(); - /** - * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs} - */ - public abstract ProcedureCoordinatorRpcs - getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException; - - /** - * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpc} - */ - public abstract ProcedureMemberRpcs - getProcedureMemberRpcs(String procType) throws IOException; - } http://git-wip-us.apache.org/repos/asf/hbase/blob/449fb812/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 7cf4aab..3e89be7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -17,15 +17,9 @@ */ package org.apache.hadoop.hbase.coordination; -import java.io.IOException; - import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; -import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; -import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs; -import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; /** @@ -55,21 +49,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { @Override public SplitLogWorkerCoordination getSplitLogWorkerCoordination() { return splitLogWorkerCoordination; - } - + } @Override public SplitLogManagerCoordination getSplitLogManagerCoordination() { return splitLogManagerCoordination; } - - @Override - public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode) - throws IOException { - return new ZKProcedureCoordinatorRpcs(watcher, procType, coordNode); - } - - @Override - public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws IOException { - return new ZKProcedureMemberRpcs(watcher, procType); - } }