http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java new file mode 100644 index 0000000..ae21b33 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreClient.java @@ -0,0 +1,496 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * The main class which interprets the given arguments and trigger restore operation. + */ + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class RestoreClient { + + private static final Log LOG = LogFactory.getLog(RestoreClient.class); + + private static Options opt; + private static Configuration conf; + private static Set<BackupImage> lastRestoreImagesSet; + + // delimiter in tablename list in restore command + private static final String DELIMITER_IN_COMMAND = ","; + + private static final String OPTION_OVERWRITE = "overwrite"; + private static final String OPTION_CHECK = "check"; + private static final String OPTION_AUTOMATIC = "automatic"; + + private static final String USAGE = + "Usage: hbase restore <backup_root_path> <backup_id> <tables> [tableMapping] \n" + + " [-overwrite] [-check] [-automatic]\n" + + " backup_root_path The parent location where the backup images are stored\n" + + " backup_id The id identifying the backup image\n" + + " table(s) Table(s) from the backup image to be restored.\n" + + " Tables are separated by comma.\n" + + " Options:\n" + + " tableMapping A comma separated list of target tables.\n" + + " If specified, each table in <tables> must have a mapping.\n" + + " -overwrite With this option, restore overwrites to the existing table " + + "if there's any in\n" + + " restore target. The existing table must be online before restore.\n" + + " -check With this option, restore sequence and dependencies are checked\n" + + " and verified without executing the restore\n" + + " -automatic With this option, all the dependencies are automatically restored\n" + + " together with this backup image following the correct order.\n" + + " The restore dependencies can be checked by using \"-check\" " + + "option,\n" + + " or using \"hbase backup describe\" command. Without this option, " + + "only\n" + " this backup image is restored\n"; + + private RestoreClient(){ + throw new AssertionError("Instantiating utility class..."); + } + + protected static void init() throws IOException { + // define supported options + opt = new Options(); + opt.addOption(OPTION_OVERWRITE, false, + "Overwrite the data if any of the restore target tables exists"); + opt.addOption(OPTION_CHECK, false, "Check restore sequence and dependencies"); + opt.addOption(OPTION_AUTOMATIC, false, "Restore all dependencies"); + opt.addOption("debug", false, "Enable debug logging"); + + conf = getConf(); + + // disable irrelevant loggers to avoid it mess up command output + disableUselessLoggers(); + } + + public static void main(String[] args) throws IOException { + init(); + parseAndRun(args); + } + + private static void parseAndRun(String[] args) { + CommandLine cmd = null; + try { + cmd = new PosixParser().parse(opt, args); + } catch (ParseException e) { + LOG.error("Could not parse command", e); + System.exit(-1); + } + + // enable debug logging + Logger backupClientLogger = Logger.getLogger("org.apache.hadoop.hbase.backup"); + if (cmd.hasOption("debug")) { + backupClientLogger.setLevel(Level.DEBUG); + } + + // whether to overwrite to existing table if any, false by default + boolean isOverwrite = cmd.hasOption(OPTION_OVERWRITE); + if (isOverwrite) { + LOG.debug("Found -overwrite option in restore command, " + + "will overwrite to existing table if any in the restore target"); + } + + // whether to only check the dependencies, false by default + boolean check = cmd.hasOption(OPTION_CHECK); + if (check) { + LOG.debug("Found -check option in restore command, " + + "will check and verify the dependencies"); + } + + // whether to restore all dependencies, false by default + boolean autoRestore = cmd.hasOption(OPTION_AUTOMATIC); + if (autoRestore) { + LOG.debug("Found -automatic option in restore command, " + + "will automatically retore all the dependencies"); + } + + // parse main restore command options + String[] remainArgs = cmd.getArgs(); + if (remainArgs.length < 3) { + System.out.println("ERROR: missing arguments"); + System.out.println(USAGE); + System.exit(-1); + } + + String backupRootDir = remainArgs[0]; + String backupId = remainArgs[1]; + String tables = remainArgs[2]; + + String tableMapping = (remainArgs.length > 3) ? remainArgs[3] : null; + + String[] sTableArray = (tables != null) ? tables.split(DELIMITER_IN_COMMAND) : null; + String[] tTableArray = (tableMapping != null) ? tableMapping.split(DELIMITER_IN_COMMAND) : null; + + if (tableMapping != null && tTableArray != null && (sTableArray.length != tTableArray.length)) { + System.err.println("ERROR: table mapping mismatch: " + tables + " : " + tableMapping); + System.out.println(USAGE); + System.exit(-1); + } + + try { + HBackupFileSystem hBackupFS = new HBackupFileSystem(conf, new Path(backupRootDir), backupId); + restore_stage1(hBackupFS, backupRootDir, backupId, check, autoRestore, sTableArray, + tTableArray, isOverwrite); + } catch (IOException e) { + System.err.println("ERROR: " + e.getMessage()); + System.exit(-1); + } + } + + /** + * Restore operation. Stage 1: validate backupManifest, and check target tables + * @param hBackupFS to access the backup image + * @param backupRootDir The root dir for backup image + * @param backupId The backup id for image to be restored + * @param check True if only do dependency check + * @param autoRestore True if automatically restore following the dependency + * @param sTableArray The array of tables to be restored + * @param tTableArray The array of mapping tables to restore to + * @param isOverwrite True then do restore overwrite if target table exists, otherwise fail the + * request if target table exists + * @return True if only do dependency check + * @throws IOException if any failure during restore + */ + public static boolean restore_stage1(HBackupFileSystem hBackupFS, String backupRootDir, + String backupId, boolean check, boolean autoRestore, String[] sTableArray, + String[] tTableArray, boolean isOverwrite) throws IOException { + + HashMap<String, BackupManifest> backupManifestMap = new HashMap<String, BackupManifest>(); + // check and load backup image manifest for the tables + hBackupFS.checkImageManifestExist(backupManifestMap, sTableArray); + + try { + // Check and validate the backup image and its dependencies + if (check || autoRestore) { + if (validate(backupManifestMap)) { + LOG.info("Checking backup images: ok"); + } else { + String errMsg = "Some dependencies are missing for restore"; + LOG.error(errMsg); + throw new IOException(errMsg); + } + } + + // return true if only for check + if (check) { + return true; + } + + if (tTableArray == null) { + tTableArray = sTableArray; + } + + // check the target tables + checkTargetTables(tTableArray, isOverwrite); + + // start restore process + Set<BackupImage> restoreImageSet = + restore_stage2(hBackupFS, backupManifestMap, sTableArray, tTableArray, autoRestore); + + LOG.info("Restore for " + Arrays.asList(sTableArray) + " are successful!"); + lastRestoreImagesSet = restoreImageSet; + + } catch (IOException e) { + LOG.error("ERROR: restore failed with error: " + e.getMessage()); + throw e; + } + + // not only for check, return false + return false; + } + + /** + * Get last restore image set. The value is globally set for the latest finished restore. + * @return the last restore image set + */ + public static Set<BackupImage> getLastRestoreImagesSet() { + return lastRestoreImagesSet; + } + + private static boolean validate(HashMap<String, BackupManifest> backupManifestMap) + throws IOException { + boolean isValid = true; + + for (Entry<String, BackupManifest> manifestEntry : backupManifestMap.entrySet()) { + + String table = manifestEntry.getKey(); + TreeSet<BackupImage> imageSet = new TreeSet<BackupImage>(); + + ArrayList<BackupImage> depList = manifestEntry.getValue().getDependentListByTable(table); + if (depList != null && !depList.isEmpty()) { + imageSet.addAll(depList); + } + + // todo merge + LOG.debug("merge will be implemented in future jira"); + // BackupUtil.clearMergedImages(table, imageSet, conf); + + LOG.info("Dependent image(s) from old to new:"); + for (BackupImage image : imageSet) { + String imageDir = + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table); + if (!HBackupFileSystem.checkPathExist(imageDir, getConf())) { + LOG.error("ERROR: backup image does not exist: " + imageDir); + isValid = false; + break; + } + // TODO More validation? + LOG.info("Backup image: " + image.getBackupId() + " for '" + table + "' is available"); + } + } + return isValid; + } + + /** + * Validate target Tables + * @param tTableArray: target tables + * @param isOverwrite overwrite existing table + * @throws IOException exception + */ + private static void checkTargetTables(String[] tTableArray, boolean isOverwrite) + throws IOException { + ArrayList<String> existTableList = new ArrayList<String>(); + ArrayList<String> disabledTableList = new ArrayList<String>(); + + // check if the tables already exist + HBaseAdmin admin = null; + Connection conn = null; + try { + conn = ConnectionFactory.createConnection(conf); + admin = (HBaseAdmin) conn.getAdmin(); + for (String tableName : tTableArray) { + if (admin.tableExists(TableName.valueOf(tableName))) { + existTableList.add(tableName); + if (admin.isTableDisabled(TableName.valueOf(tableName))) { + disabledTableList.add(tableName); + } + } else { + LOG.info("HBase table " + tableName + + " does not exist. It will be create during backup process"); + } + } + } finally { + if (admin != null) { + admin.close(); + } + if (conn != null) { + conn.close(); + } + } + + if (existTableList.size() > 0) { + if (!isOverwrite) { + LOG.error("Existing table found in the restore target, please add \"-overwrite\" " + + "option in the command if you mean to restore to these existing tables"); + LOG.info("Existing table list in restore target: " + existTableList); + throw new IOException("Existing table found in target while no \"-overwrite\" " + + "option found"); + } else { + if (disabledTableList.size() > 0) { + LOG.error("Found offline table in the restore target, " + + "please enable them before restore with \"-overwrite\" option"); + LOG.info("Offline table list in restore target: " + disabledTableList); + throw new IOException( + "Found offline table in the target when restore with \"-overwrite\" option"); + } + } + } + + } + + /** + * Restore operation. Stage 2: resolved Backup Image dependency + * @param hBackupFS to access the backup image + * @param backupManifestMap : tableName, Manifest + * @param sTableArray The array of tables to be restored + * @param tTableArray The array of mapping tables to restore to + * @param autoRestore : yes, restore all the backup images on the dependency list + * @return set of BackupImages restored + * @throws IOException exception + */ + private static Set<BackupImage> restore_stage2(HBackupFileSystem hBackupFS, + HashMap<String, BackupManifest> backupManifestMap, String[] sTableArray, + String[] tTableArray, boolean autoRestore) throws IOException { + TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>(); + + for (int i = 0; i < sTableArray.length; i++) { + restoreImageSet.clear(); + String table = sTableArray[i]; + BackupManifest manifest = backupManifestMap.get(table); + if (autoRestore) { + // Get the image list of this backup for restore in time order from old + // to new. + TreeSet<BackupImage> restoreList = + new TreeSet<BackupImage>(manifest.getDependentListByTable(table)); + LOG.debug("need to clear merged Image. to be implemented in future jira"); + + for (BackupImage image : restoreList) { + restoreImage(image, table, tTableArray[i]); + } + restoreImageSet.addAll(restoreList); + } else { + BackupImage image = manifest.getBackupImage(); + List<BackupImage> depList = manifest.getDependentListByTable(table); + // The dependency list always contains self. + if (depList != null && depList.size() > 1) { + LOG.warn("Backup image " + image.getBackupId() + " depends on other images.\n" + + "this operation will only restore the delta contained within backupImage " + + image.getBackupId()); + } + restoreImage(image, table, tTableArray[i]); + restoreImageSet.add(image); + } + + if (autoRestore) { + if (restoreImageSet != null && !restoreImageSet.isEmpty()) { + LOG.info("Restore includes the following image(s):"); + for (BackupImage image : restoreImageSet) { + LOG.info(" Backup: " + + image.getBackupId() + + " " + + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), + table)); + } + } + } + + } + return restoreImageSet; + } + + /** + * Restore operation handle each backupImage + * @param image: backupImage + * @param sTable: table to be restored + * @param tTable: table to be restored to + * @throws IOException exception + */ + private static void restoreImage(BackupImage image, String sTable, String tTable) + throws IOException { + + Configuration conf = getConf(); + + String rootDir = image.getRootDir(); + LOG.debug("Image root dir " + rootDir); + String backupId = image.getBackupId(); + + HBackupFileSystem hFS = new HBackupFileSystem(conf, new Path(rootDir), backupId); + RestoreUtil restoreTool = new RestoreUtil(conf, hFS); + BackupManifest manifest = hFS.getManifest(sTable); + + Path tableBackupPath = hFS.getTableBackupPath(sTable); + + // todo: convert feature will be provided in a future jira + boolean converted = false; + + if (manifest.getType().equals(BackupRestoreConstants.BACKUP_TYPE_FULL) || converted) { + LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from " + + (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString()); + restoreTool.fullRestoreTable(tableBackupPath, sTable, tTable, converted); + } else { // incremental Backup + String logBackupDir = + HBackupFileSystem.getLogBackupDir(image.getRootDir(), image.getBackupId()); + LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from incremental backup image " + + logBackupDir); + restoreTool.incrementalRestoreTable(logBackupDir, new String[] { sTable }, + new String[] { tTable }); + } + + LOG.info(sTable + " has been successfully restored to " + tTable); + } + + /** + * Set the configuration from a given one. + * @param newConf A new given configuration + */ + public synchronized static void setConf(Configuration newConf) { + conf = newConf; + } + + /** + * Get and merge Hadoop and HBase configuration. + * @throws IOException exception + */ + protected static Configuration getConf() { + if (conf == null) { + synchronized (RestoreClient.class) { + conf = new Configuration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create()); + } + } + return conf; + } + + private static void disableUselessLoggers() { + // disable zookeeper log to avoid it mess up command output + Logger zkLogger = Logger.getLogger("org.apache.zookeeper"); + LOG.debug("Zookeeper log level before set: " + zkLogger.getLevel()); + zkLogger.setLevel(Level.OFF); + LOG.debug("Zookeeper log level after set: " + zkLogger.getLevel()); + + // disable hbase zookeeper tool log to avoid it mess up command output + Logger hbaseZkLogger = Logger.getLogger("org.apache.hadoop.hbase.zookeeper"); + LOG.debug("HBase zookeeper log level before set: " + hbaseZkLogger.getLevel()); + hbaseZkLogger.setLevel(Level.OFF); + LOG.debug("HBase Zookeeper log level after set: " + hbaseZkLogger.getLevel()); + + // disable hbase client log to avoid it mess up command output + Logger hbaseClientLogger = Logger.getLogger("org.apache.hadoop.hbase.client"); + LOG.debug("HBase client log level before set: " + hbaseClientLogger.getLevel()); + hbaseClientLogger.setLevel(Level.OFF); + LOG.debug("HBase client log level after set: " + hbaseClientLogger.getLevel()); + + // disable other related log to avoid mess up command output + Logger otherLogger = Logger.getLogger("org.apache.hadoop.hbase.io.hfile"); + otherLogger.setLevel(Level.OFF); + otherLogger = Logger.getLogger("org.apache.hadoop.hbase.util"); + otherLogger.setLevel(Level.OFF); + otherLogger = Logger.getLogger("org.apache.hadoop.hbase.mapreduce"); + otherLogger.setLevel(Level.OFF); + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java new file mode 100644 index 0000000..bdb7988 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreUtil.java @@ -0,0 +1,503 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.EOFException; +import java.io.IOException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.NavigableSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALSplitter; + +/** + * A collection for methods used by multiple classes to restore HBase tables. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RestoreUtil { + + public static final Log LOG = LogFactory.getLog(RestoreUtil.class); + + protected Configuration conf = null; + + protected HBackupFileSystem hBackupFS = null; + + // store table name and snapshot dir mapping + private final HashMap<String, Path> snapshotMap = new HashMap<String, Path>(); + + public RestoreUtil(Configuration conf, HBackupFileSystem hBackupFS) throws IOException { + this.conf = conf; + this.hBackupFS = hBackupFS; + } + + /** + * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently + * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in + * the future + * @param logDir : incremental backup folders, which contains WAL + * @param tableNames : source tableNames(table names were backuped) + * @param newTableNames : target tableNames(table names to be restored to) + * @throws IOException exception + */ + public void incrementalRestoreTable(String logDir, String[] tableNames, String[] newTableNames) + throws IOException { + + if (tableNames.length != newTableNames.length) { + throw new IOException("Number of source tables adn taget Tables does not match!"); + } + + // for incremental backup image, expect the table already created either by user or previous + // full backup. Here, check that all new tables exists + HBaseAdmin admin = null; + Connection conn = null; + try { + conn = ConnectionFactory.createConnection(conf); + admin = (HBaseAdmin) conn.getAdmin(); + for (String tableName : newTableNames) { + if (!admin.tableExists(TableName.valueOf(tableName))) { + admin.close(); + throw new IOException("HBase table " + tableName + + " does not exist. Create the table first, e.g. by restoring a full backup."); + } + } + IncrementalRestoreService restoreService = + BackupRestoreServiceFactory.getIncrementalRestoreService(conf); + + restoreService.run(logDir, tableNames, newTableNames); + } finally { + if (admin != null) { + admin.close(); + } + if(conn != null){ + conn.close(); + } + } + } + + public void fullRestoreTable(Path tableBackupPath, String tableName, String newTableName, + boolean converted) throws IOException { + + restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted); + } + + private void restoreTableAndCreate(String tableName, String newTableName, Path tableBackupPath, + boolean converted) throws IOException { + if (newTableName == null || newTableName.equals("")) { + newTableName = tableName; + } + + FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); + + // get table descriptor first + HTableDescriptor tableDescriptor = null; + + Path tableSnapshotPath = hBackupFS.getTableSnapshotPath(tableName); + + if (fileSys.exists(tableSnapshotPath)) { + // snapshot path exist means the backup path is in HDFS + // check whether snapshot dir already recorded for target table + if (snapshotMap.get(tableName) != null) { + SnapshotDescription desc = + SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath); + SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc); + tableDescriptor = manifest.getTableDescriptor(); + LOG.debug("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString() + + " while tableName = " + tableName); + // HBase 96.0 and 98.0 + // tableDescriptor = + // FSTableDescriptors.getTableDescriptorFromFs(fileSys, snapshotMap.get(tableName)); + } else { + tableDescriptor = hBackupFS.getTableDesc(tableName); + LOG.debug("tableSnapshotPath=" + tableSnapshotPath.toString()); + snapshotMap.put(tableName, hBackupFS.getTableInfoPath(tableName)); + } + if (tableDescriptor == null) { + LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost"); + } + } else if (converted) { + // first check if this is a converted backup image + LOG.error("convert will be supported in a future jira"); + } + + Path tableArchivePath = hBackupFS.getTableArchivePath(tableName); + if (tableArchivePath == null) { + if (tableDescriptor != null) { + // find table descriptor but no archive dir means the table is empty, create table and exit + LOG.debug("find table descriptor but no archive dir for table " + tableName + + ", will only create table"); + tableDescriptor.setName(Bytes.toBytes(newTableName)); + checkAndCreateTable(tableBackupPath, tableName, newTableName, null, tableDescriptor); + return; + } else { + throw new IllegalStateException("Cannot restore hbase table because directory '" + + " tableArchivePath is null."); + } + } + + if (tableDescriptor == null) { + tableDescriptor = new HTableDescriptor(newTableName); + } else { + tableDescriptor.setName(Bytes.toBytes(newTableName)); + } + + if (!converted) { + // record all region dirs: + // load all files in dir + try { + ArrayList<Path> regionPathList = hBackupFS.getRegionList(tableName); + + // should only try to create the table with all region informations, so we could pre-split + // the regions in fine grain + checkAndCreateTable(tableBackupPath, tableName, newTableName, regionPathList, + tableDescriptor); + if (tableArchivePath != null) { + // start real restore through bulkload + // if the backup target is on local cluster, special action needed + Path tempTableArchivePath = hBackupFS.checkLocalAndBackup(tableArchivePath); + if (tempTableArchivePath.equals(tableArchivePath)) { + LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); + } else { + regionPathList = hBackupFS.getRegionList(tempTableArchivePath); // point to the tempDir + LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); + } + + LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); + for (Path regionPath : regionPathList) { + String regionName = regionPath.toString(); + LOG.debug("Restoring HFiles from directory " + regionName); + String[] args = { regionName, newTableName }; + loader.run(args); + } + } + // restore the recovered.edits if exists + replayRecoveredEditsIfAny(tableBackupPath, tableName, tableDescriptor); + } catch (Exception e) { + throw new IllegalStateException("Cannot restore hbase table", e); + } + } else { + LOG.debug("convert will be supported in a future jira"); + } + } + + /** + * Replay recovered edits from backup. + */ + private void replayRecoveredEditsIfAny(Path tableBackupPath, String tableName, + HTableDescriptor newTableHtd) throws IOException { + + LOG.debug("Trying to replay the recovered.edits if exist to the target table " + + newTableHtd.getNameAsString() + " from the backup of table " + tableName + "."); + + FileSystem fs = tableBackupPath.getFileSystem(this.conf); + ArrayList<Path> regionDirs = hBackupFS.getRegionList(tableName); + + if (regionDirs == null || regionDirs.size() == 0) { + LOG.warn("No recovered.edits to be replayed for empty backup of table " + tableName + "."); + return; + } + + Connection conn = null; + try { + + conn = ConnectionFactory.createConnection(conf); + + for (Path regionDir : regionDirs) { + // OLD: NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regionDir); + NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regionDir); + + if (files == null || files.isEmpty()) { + LOG.warn("No recovered.edits found for the region " + regionDir.getName() + "."); + return; + } + + for (Path edits : files) { + if (edits == null || !fs.exists(edits)) { + LOG.warn("Null or non-existent edits file: " + edits); + continue; + } + + HTable table = null; + try { + table = (HTable) conn.getTable(newTableHtd.getTableName()); + replayRecoveredEdits(table, fs, edits); + table.flushCommits(); + table.close(); + } catch (IOException e) { + boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); + if (skipErrors) { + Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + + "=true so continuing. Renamed " + edits + " as " + p, e); + } else { + throw e; + } + } finally { + if (table != null) { + table.close(); + } + } + } // for each edit file under a region + } // for each region + + } finally { + if (conn != null) { + conn.close(); + } + } + } + + /** + * Restore process for an edit entry. + * @param htable The target table of restore + * @param key HLog key + * @param val KVs + * @throws IOException exception + */ + private void restoreEdit(HTable htable, WALKey key, WALEdit val) throws IOException { + Put put = null; + Delete del = null; + Cell lastKV = null; + for (Cell kv : val.getCells()) { + // filtering HLog meta entries, see HLog.completeCacheFlushLogEdit + if (WALEdit.isMetaEditFamily(CellUtil.cloneFamily(kv))) { + continue; + } + + // A WALEdit may contain multiple operations (HBASE-3584) and/or + // multiple rows (HBASE-5229). + // Aggregate as much as possible into a single Put/Delete + // operation before apply the action to the table. + if (lastKV == null || lastKV.getTypeByte() != kv.getTypeByte() + || !CellUtil.matchingRow(lastKV, kv)) { + // row or type changed, write out aggregate KVs. + if (put != null) { + applyAction(htable, put); + } + if (del != null) { + applyAction(htable, del); + } + + if (CellUtil.isDelete(kv)) { + del = new Delete(CellUtil.cloneRow(kv)); + } else { + put = new Put(CellUtil.cloneRow(kv)); + } + } + if (CellUtil.isDelete(kv)) { + del.addDeleteMarker(kv); + } else { + put.add(kv); + } + lastKV = kv; + } + // write residual KVs + if (put != null) { + applyAction(htable, put); + } + if (del != null) { + applyAction(htable, del); + } + } + + /** + * Apply an action (Put/Delete) to table. + * @param table table + * @param action action + * @throws IOException exception + */ + private void applyAction(HTable table, Mutation action) throws IOException { + // The actions are not immutable, so we defensively copy them + if (action instanceof Put) { + Put put = new Put((Put) action); + // put.setWriteToWAL(false); + // why do not we do WAL? + put.setDurability(Durability.SKIP_WAL); + table.put(put); + } else if (action instanceof Delete) { + Delete delete = new Delete((Delete) action); + table.delete(delete); + } else { + throw new IllegalArgumentException("action must be either Delete or Put"); + } + } + + /** + * Replay the given edits. + * @param htable The target table of restore + * @param fs File system + * @param edits Recovered.edits to be replayed + * @throws IOException exception + */ + private void replayRecoveredEdits(HTable htable, FileSystem fs, Path edits) throws IOException { + LOG.debug("Replaying edits from " + edits + "; path=" + edits); + + WAL.Reader reader = null; + try { + reader = WALFactory.createReader(fs, edits, this.conf); + long editsCount = 0; + WAL.Entry entry; + + try { + while ((entry = reader.next()) != null) { + restoreEdit(htable, entry.getKey(), entry.getEdit()); + editsCount++; + } + LOG.debug(editsCount + " edits from " + edits + " have been replayed."); + + } catch (EOFException eof) { + Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + String msg = + "Encountered EOF. Most likely due to Master failure during " + + "log spliting, so we have this data in another edit. " + + "Continuing, but renaming " + edits + " as " + p; + LOG.warn(msg, eof); + } catch (IOException ioe) { + // If the IOE resulted from bad file format, + // then this problem is idempotent and retrying won't help + if (ioe.getCause() instanceof ParseException) { + Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + String msg = + "File corruption encountered! " + "Continuing, but renaming " + edits + " as " + p; + LOG.warn(msg, ioe); + } else { + // other IO errors may be transient (bad network connection, + // checksum exception on one datanode, etc). throw & retry + throw ioe; + } + } + } finally { + if (reader != null) { + reader.close(); + } + } + } + + /** + * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full + * backup. + * @return the {@link LoadIncrementalHFiles} instance + * @throws IOException exception + */ + private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables) + throws IOException { + // set configuration for restore: + // LoadIncrementalHFile needs more time + // <name>hbase.rpc.timeout</name> <value>600000</value> + // calculates + Integer milliSecInMin = 60000; + Integer previousMillis = this.conf.getInt("hbase.rpc.timeout", 0); + Integer numberOfFilesInDir = + multipleTables ? hBackupFS.getMaxNumberOfFilesInSubDir(tableArchivePath) : hBackupFS + .getNumberOfFilesInDir(tableArchivePath); + Integer calculatedMillis = numberOfFilesInDir * milliSecInMin; // 1 minute per file + Integer resultMillis = Math.max(calculatedMillis, previousMillis); + if (resultMillis > previousMillis) { + LOG.info("Setting configuration for restore with LoadIncrementalHFile: " + + "hbase.rpc.timeout to " + calculatedMillis / milliSecInMin + + " minutes, to handle the number of files in backup " + tableArchivePath); + this.conf.setInt("hbase.rpc.timeout", resultMillis); + } + + LoadIncrementalHFiles loader = null; + try { + loader = new LoadIncrementalHFiles(this.conf); + } catch (Exception e1) { + throw new IOException(e1); + } + return loader; + } + + /** + * Prepare the table for bulkload, most codes copied from + * {@link LoadIncrementalHFiles#createTable(String, String)} + * @param tableBackupPath path + * @param tableName table name + * @param targetTableName target table name + * @param regionDirList region directory list + * @param htd table descriptor + * @throws IOException exception + */ + private void checkAndCreateTable(Path tableBackupPath, String tableName, String targetTableName, + ArrayList<Path> regionDirList, HTableDescriptor htd) throws IOException { + HBaseAdmin hbadmin = null; + Connection conn = null; + try { + conn = ConnectionFactory.createConnection(conf); + hbadmin = (HBaseAdmin) conn.getAdmin(); + if (hbadmin.tableExists(TableName.valueOf(targetTableName))) { + LOG.info("Using exising target table '" + targetTableName + "'"); + } else { + LOG.info("Creating target table '" + targetTableName + "'"); + + // if no region dir given, create the table and return + if (regionDirList == null || regionDirList.size() == 0) { + + hbadmin.createTable(htd); + return; + } + + byte[][] keys = hBackupFS.generateBoundaryKeys(regionDirList); + + // create table using table decriptor and region boundaries + hbadmin.createTable(htd, keys); + } + } catch (Exception e) { + throw new IOException(e); + } finally { + if (hbadmin != null) { + hbadmin.close(); + } + if(conn != null){ + conn.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java new file mode 100644 index 0000000..a3b5db5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.backup.BackupCopyService; +import org.apache.hadoop.hbase.backup.BackupHandler; +import org.apache.hadoop.hbase.backup.BackupUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.snapshot.ExportSnapshot; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.DistCpConstants; +import org.apache.hadoop.tools.DistCpOptions; +/** + * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot, + * which bases on extending ExportSnapshot's function with copy progress reporting to ZooKeeper + * implementation. The other is copying for incremental log files, which bases on extending + * DistCp's function with copy progress reporting to ZooKeeper implementation. + * + * For now this is only a wrapper. The other features such as progress and increment backup will be + * implemented in future jira + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MapReduceBackupCopyService implements BackupCopyService { + private static final Log LOG = LogFactory.getLog(MapReduceBackupCopyService.class); + + private Configuration conf; + // private static final long BYTES_PER_MAP = 2 * 256 * 1024 * 1024; + + // Accumulated progress within the whole backup process for the copy operation + private float progressDone = 0.1f; + private long bytesCopied = 0; + private static float INIT_PROGRESS = 0.1f; + + // The percentage of the current copy task within the whole task if multiple time copies are + // needed. The default value is 100%, which means only 1 copy task for the whole. + private float subTaskPercntgInWholeTask = 1f; + + public MapReduceBackupCopyService() { + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Get the current copy task percentage within the whole task if multiple copies are needed. + * @return the current copy task percentage + */ + public float getSubTaskPercntgInWholeTask() { + return subTaskPercntgInWholeTask; + } + + /** + * Set the current copy task percentage within the whole task if multiple copies are needed. Must + * be called before calling + * {@link #copy(BackupHandler, Configuration, Type, String[])} + * @param subTaskPercntgInWholeTask The percentage of the copy subtask + */ + public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) { + this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask; + } + + class SnapshotCopy extends ExportSnapshot { + private BackupHandler backupHandler; + private String table; + + public SnapshotCopy(BackupHandler backupHandler, String table) { + super(); + this.backupHandler = backupHandler; + this.table = table; + } + + public BackupHandler getBackupHandler() { + return this.backupHandler; + } + + public String getTable() { + return this.table; + } + } + + // Extends DistCp for progress updating to hbase:backup + // during backup. Using DistCpV2 (MAPREDUCE-2765). + // Simply extend it and override execute() method to get the + // Job reference for progress updating. + // Only the argument "src1, [src2, [...]] dst" is supported, + // no more DistCp options. + class BackupDistCp extends DistCp { + + private BackupHandler backupHandler; + + public BackupDistCp(Configuration conf, DistCpOptions options, BackupHandler backupHandler) + throws Exception { + super(conf, options); + this.backupHandler = backupHandler; + } + + @Override + public Job execute() throws Exception { + + // reflection preparation for private methods and fields + Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class; + Method methodCreateMetaFolderPath = classDistCp.getDeclaredMethod("createMetaFolderPath"); + Method methodCreateJob = classDistCp.getDeclaredMethod("createJob"); + Method methodCreateInputFileListing = + classDistCp.getDeclaredMethod("createInputFileListing", Job.class); + Method methodCleanup = classDistCp.getDeclaredMethod("cleanup"); + + Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions"); + Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder"); + Field fieldJobFS = classDistCp.getDeclaredField("jobFS"); + Field fieldSubmitted = classDistCp.getDeclaredField("submitted"); + + methodCreateMetaFolderPath.setAccessible(true); + methodCreateJob.setAccessible(true); + methodCreateInputFileListing.setAccessible(true); + methodCleanup.setAccessible(true); + + fieldInputOptions.setAccessible(true); + fieldMetaFolder.setAccessible(true); + fieldJobFS.setAccessible(true); + fieldSubmitted.setAccessible(true); + + // execute() logic starts here + assert fieldInputOptions.get(this) != null; + assert getConf() != null; + + Job job = null; + try { + synchronized (this) { + // Don't cleanup while we are setting up. + fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this)); + fieldJobFS.set(this, ((Path) fieldMetaFolder.get(this)).getFileSystem(getConf())); + + job = (Job) methodCreateJob.invoke(this); + } + methodCreateInputFileListing.invoke(this, job); + + // Get the total length of the source files + List<Path> srcs = ((DistCpOptions) fieldInputOptions.get(this)).getSourcePaths(); + long totalSrcLgth = 0; + for (Path aSrc : srcs) { + totalSrcLgth += BackupUtil.getFilesLength(aSrc.getFileSystem(getConf()), aSrc); + } + + // submit the copy job + job.submit(); + fieldSubmitted.set(this, true); + + // after submit the MR job, set its handler in backup handler for cancel process + // this.backupHandler.copyJob = job; + + // Update the copy progress to ZK every 0.5s if progress value changed + int progressReportFreq = + this.getConf().getInt("hbase.backup.progressreport.frequency", 500); + float lastProgress = progressDone; + while (!job.isComplete()) { + float newProgress = + progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); + + if (newProgress > lastProgress) { + + BigDecimal progressData = + new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); + String newProgressStr = progressData + "%"; + LOG.info("Progress: " + newProgressStr); + this.backupHandler.updateProgress(newProgressStr, bytesCopied); + LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr + + ".\""); + lastProgress = newProgress; + } + Thread.sleep(progressReportFreq); + } + + // update the progress data after copy job complete + float newProgress = + progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS); + BigDecimal progressData = + new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); + + String newProgressStr = progressData + "%"; + LOG.info("Progress: " + newProgressStr); + + // accumulate the overall backup progress + progressDone = newProgress; + bytesCopied += totalSrcLgth; + + this.backupHandler.updateProgress(newProgressStr, bytesCopied); + LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr + + " - " + bytesCopied + " bytes copied.\""); + + } finally { + if (!fieldSubmitted.getBoolean(this)) { + methodCleanup.invoke(this); + } + } + + String jobID = job.getJobID().toString(); + job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); + + LOG.debug("DistCp job-id: " + jobID); + return job; + } + + } + + /** + * Do backup copy based on different types. + * @param handler The backup handler reference + * @param conf The hadoop configuration + * @param copyType The backup copy type + * @param options Options for customized ExportSnapshot or DistCp + * @throws Exception exception + */ + public int copy(BackupHandler handler, Configuration conf, BackupCopyService.Type copyType, + String[] options) throws IOException { + + int res = 0; + + try { + if (copyType == Type.FULL) { + SnapshotCopy snapshotCp = + new SnapshotCopy(handler, handler.getBackupContext().getTableBySnapshot(options[1])); + LOG.debug("Doing SNAPSHOT_COPY"); + // Make a new instance of conf to be used by the snapshot copy class. + snapshotCp.setConf(new Configuration(conf)); + res = snapshotCp.run(options); + } else if (copyType == Type.INCREMENTAL) { + LOG.debug("Doing COPY_TYPE_DISTCP"); + setSubTaskPercntgInWholeTask(1f); + + BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, handler); + // Handle a special case where the source file is a single file. + // In this case, distcp will not create the target dir. It just take the + // target as a file name and copy source file to the target (as a file name). + // We need to create the target dir before run distcp. + LOG.debug("DistCp options: " + Arrays.toString(options)); + if (options.length == 2) { + Path dest = new Path(options[1]); + FileSystem destfs = dest.getFileSystem(conf); + if (!destfs.exists(dest)) { + destfs.mkdirs(dest); + } + } + + res = distcp.run(options); + } + return res; + + } catch (Exception e) { + throw new IOException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java new file mode 100644 index 0000000..deefbf7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.IncrementalRestoreService; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.mapreduce.WALPlayer; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MapReduceRestoreService implements IncrementalRestoreService { + public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class); + + private WALPlayer player; + + public MapReduceRestoreService() { + this.player = new WALPlayer(); + } + + @Override + public void run(String logDir, String[] tableNames, String[] newTableNames) throws IOException { + String tableStr = HBackupFileSystem.join(tableNames); + String newTableStr = HBackupFileSystem.join(newTableNames); + + // WALPlayer reads all files in arbitrary directory structure and creates a Map task for each + // log file + + String[] playerArgs = { logDir, tableStr, newTableStr }; + LOG.info("Restore incremental backup from directory " + logDir + " from hbase tables " + + HBackupFileSystem.join(tableNames) + " to tables " + + HBackupFileSystem.join(newTableNames)); + try { + player.run(playerArgs); + } catch (Exception e) { + throw new IOException("cannot restore from backup directory " + logDir + + " (check Hadoop and HBase logs) " + e); + } + } + + @Override + public Configuration getConf() { + return player.getConf(); + } + + @Override + public void setConf(Configuration conf) { + this.player.setConf(conf); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java new file mode 100644 index 0000000..4712548 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/BackupLogCleaner.java @@ -0,0 +1,121 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.master; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.backup.BackupSystemTable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; + + + + +/** + * Implementation of a log cleaner that checks if a log is still scheduled for + * incremental backup before deleting it when its TTL is over. + */ +@InterfaceStability.Evolving +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class BackupLogCleaner extends BaseLogCleanerDelegate { + private static final Log LOG = LogFactory.getLog(BackupLogCleaner.class); + + private boolean stopped = false; + + public BackupLogCleaner() { + } + + @Override + public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) { + // all members of this class are null if backup is disabled, + // so we cannot filter the files + if (this.getConf() == null) { + return files; + } + + try { + final BackupSystemTable table = BackupSystemTable.getTable(getConf()); + // If we do not have recorded backup sessions + if (table.hasBackupSessions() == false) { + return files; + } + return Iterables.filter(files, new Predicate<FileStatus>() { + @Override + public boolean apply(FileStatus file) { + try { + String wal = file.getPath().toString(); + boolean logInSystemTable = table.checkWALFile(wal); + if (LOG.isDebugEnabled()) { + if (logInSystemTable) { + LOG.debug("Found log file in hbase:backup, deleting: " + wal); + } else { + LOG.debug("Didn't find this log in hbase:backup, keeping: " + wal); + } + } + return logInSystemTable; + } catch (IOException e) { + LOG.error(e); + return false;// keep file for a while, HBase failed + } + } + }); + } catch (IOException e) { + LOG.error("Failed to get hbase:backup table, therefore will keep all files", e); + // nothing to delete + return new ArrayList<FileStatus>(); + } + + } + + @Override + public void setConf(Configuration config) { + // If backup is disabled, keep all members null + if (!config.getBoolean(HConstants.BACKUP_ENABLE_KEY, HConstants.BACKUP_ENABLE_DEFAULT)) { + LOG.warn("Backup is disabled - allowing all wals to be deleted"); + return; + } + super.setConf(config); + } + + @Override + public void stop(String why) { + if (this.stopped) { + return; + } + this.stopped = true; + LOG.info("Stopping BackupLogCleaner"); + } + + @Override + public boolean isStopped() { + return this.stopped; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java new file mode 100644 index 0000000..f96682f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.MetricsMaster; +import org.apache.hadoop.hbase.procedure.MasterProcedureManager; +import org.apache.hadoop.hbase.procedure.Procedure; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription; +import org.apache.zookeeper.KeeperException; + +public class LogRollMasterProcedureManager extends MasterProcedureManager { + + public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc"; + public static final String ROLLLOG_PROCEDURE_NAME = "rolllog"; + private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class); + + private MasterServices master; + private ProcedureCoordinator coordinator; + private boolean done; + + @Override + public void stop(String why) { + LOG.info("stop: " + why); + } + + @Override + public boolean isStopped() { + return false; + } + + @Override + public void initialize(MasterServices master, MetricsMaster metricsMaster) + throws KeeperException, IOException, UnsupportedOperationException { + this.master = master; + this.done = false; + + // setup the default procedure coordinator + String name = master.getServerName().toString(); + ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); + BaseCoordinatedStateManager coordManager = + (BaseCoordinatedStateManager) CoordinatedStateManagerFactory + .getCoordinatedStateManager(master.getConfiguration()); + coordManager.initialize(master); + + ProcedureCoordinatorRpcs comms = + coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); + + this.coordinator = new ProcedureCoordinator(comms, tpool); + } + + @Override + public String getProcedureSignature() { + return ROLLLOG_PROCEDURE_SIGNATURE; + } + + @Override + public void execProcedure(ProcedureDescription desc) throws IOException { + this.done = false; + // start the process on the RS + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(desc.getInstance()); + List<ServerName> serverNames = master.getServerManager().getOnlineServersList(); + List<String> servers = new ArrayList<String>(); + for (ServerName sn : serverNames) { + servers.add(sn.toString()); + } + Procedure proc = coordinator.startProcedure(monitor, desc.getInstance(), new byte[0], servers); + if (proc == null) { + String msg = "Failed to submit distributed procedure for '" + desc.getInstance() + "'"; + LOG.error(msg); + throw new IOException(msg); + } + + try { + // wait for the procedure to complete. A timer thread is kicked off that should cancel this + // if it takes too long. + proc.waitForCompleted(); + LOG.info("Done waiting - exec procedure for " + desc.getInstance()); + LOG.info("Distributed roll log procedure is successful!"); + this.done = true; + } catch (InterruptedException e) { + ForeignException ee = + new ForeignException("Interrupted while waiting for roll log procdure to finish", e); + monitor.receive(ee); + Thread.currentThread().interrupt(); + } catch (ForeignException e) { + ForeignException ee = + new ForeignException("Exception while waiting for roll log procdure to finish", e); + monitor.receive(ee); + } + monitor.rethrowException(); + } + + @Override + public boolean isProcedureDone(ProcedureDescription desc) throws IOException { + return done; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java new file mode 100644 index 0000000..618748e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + +import java.util.HashMap; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.backup.BackupSystemTable; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.errorhandling.ForeignException; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; + + +/** + * This backup subprocedure implementation forces a log roll on the RS. + */ +public class LogRollBackupSubprocedure extends Subprocedure { + private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedure.class); + + private final RegionServerServices rss; + private final LogRollBackupSubprocedurePool taskManager; + private FSHLog hlog; + + public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member, + ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, + LogRollBackupSubprocedurePool taskManager) { + + super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener, + wakeFrequency, timeout); + LOG.info("Constructing a LogRollBackupSubprocedure."); + this.rss = rss; + this.taskManager = taskManager; + } + + /** + * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified + * with no use of subprocedurepool. + */ + class RSRollLogTask implements Callable<Void> { + RSRollLogTask() { + } + + @Override + public Void call() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("++ DRPC started: " + rss.getServerName()); + } + hlog = (FSHLog) rss.getWAL(null); + long filenum = hlog.getFilenum(); + + LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum); + hlog.rollWriter(true); + LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum()); + // write the log number to hbase:backup. + BackupSystemTable table = BackupSystemTable.getTable(rss.getConfiguration()); + // sanity check, good for testing + HashMap<String, String> serverTimestampMap = table.readRegionServerLastLogRollResult(); + String host = rss.getServerName().getHostname(); + String sts = serverTimestampMap.get(host); + if (sts != null && Long.parseLong(sts) > filenum) { + LOG.warn("Won't update server's last roll log result: current=" + sts + " new=" + filenum); + return null; + } + table.writeRegionServerLastLogRollResult(host, Long.toString(filenum)); + // TODO: potential leak of HBase connection + // BackupSystemTable.close(); + return null; + } + + } + + private void rolllog() throws ForeignException { + + monitor.rethrowException(); + + taskManager.submitTask(new RSRollLogTask()); + monitor.rethrowException(); + + // wait for everything to complete. + taskManager.waitForOutstandingTasks(); + monitor.rethrowException(); + + } + + @Override + public void acquireBarrier() throws ForeignException { + // do nothing, executing in inside barrier step. + } + + /** + * do a log roll. + * @return some bytes + */ + @Override + public byte[] insideBarrier() throws ForeignException { + rolllog(); + // FIXME + return null; + } + + /** + * Cancel threads if they haven't finished. + */ + @Override + public void cleanup(Exception e) { + taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e); + } + + /** + * Hooray! + */ + public void releaseBarrier() { + // NO OP + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java new file mode 100644 index 0000000..1ca638c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedurePool.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.DaemonThreadFactory; +import org.apache.hadoop.hbase.errorhandling.ForeignException; + +/** + * Handle running each of the individual tasks for completing a backup procedure + * on a regionserver. + */ +public class LogRollBackupSubprocedurePool implements Closeable, Abortable { + private static final Log LOG = LogFactory.getLog(LogRollBackupSubprocedurePool.class); + + /** Maximum number of concurrent snapshot region tasks that can run concurrently */ + private static final String CONCURENT_BACKUP_TASKS_KEY = "hbase.backup.region.concurrentTasks"; + private static final int DEFAULT_CONCURRENT_BACKUP_TASKS = 3; + + private final ExecutorCompletionService<Void> taskPool; + private final ThreadPoolExecutor executor; + private volatile boolean aborted; + private final List<Future<Void>> futures = new ArrayList<Future<Void>>(); + private final String name; + + public LogRollBackupSubprocedurePool(String name, Configuration conf) { + // configure the executor service + long keepAlive = + conf.getLong(LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_KEY, + LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT); + int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS); + this.name = name; + executor = + new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs(" + name + + ")-backup-pool")); + taskPool = new ExecutorCompletionService<Void>(executor); + } + + /** + * Submit a task to the pool. + */ + public void submitTask(final Callable<Void> task) { + Future<Void> f = this.taskPool.submit(task); + futures.add(f); + } + + /** + * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} + * @return <tt>true</tt> on success, <tt>false</tt> otherwise + * @throws ForeignException exception + */ + public boolean waitForOutstandingTasks() throws ForeignException { + LOG.debug("Waiting for backup procedure to finish."); + + try { + for (Future<Void> f : futures) { + f.get(); + } + return true; + } catch (InterruptedException e) { + if (aborted) { + throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!", + e); + } + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + if (e.getCause() instanceof ForeignException) { + throw (ForeignException) e.getCause(); + } + throw new ForeignException(name, e.getCause()); + } finally { + // close off remaining tasks + for (Future<Void> f : futures) { + if (!f.isDone()) { + f.cancel(true); + } + } + } + return false; + } + + /** + * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly + * finish + */ + @Override + public void close() { + executor.shutdown(); + } + + @Override + public void abort(String why, Throwable e) { + if (this.aborted) { + return; + } + + this.aborted = true; + LOG.warn("Aborting because: " + why, e); + this.executor.shutdownNow(); + } + + @Override + public boolean isAborted() { + return this.aborted; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java new file mode 100644 index 0000000..aca190c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollRegionServerProcedureManager.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.regionserver; + + +import java.io.IOException; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; +import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.procedure.ProcedureMember; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; +import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.procedure.SubprocedureFactory; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * This manager class handles the work dealing with backup for a {@link HRegionServer}. + * <p> + * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is + * responsible by this region server. If any failures occur with the subprocedure, the manager's + * procedure member notifies the procedure coordinator to abort all others. + * <p> + * On startup, requires {@link #start()} to be called. + * <p> + * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be + * called + */ +public class LogRollRegionServerProcedureManager extends RegionServerProcedureManager { + + private static final Log LOG = LogFactory.getLog(LogRollRegionServerProcedureManager.class); + + /** Conf key for number of request threads to start backup on regionservers */ + public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads"; + /** # of threads for backup work on the rs. */ + public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10; + + public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout"; + public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000; + + /** Conf key for millis between checks to see if backup work completed or if there are errors */ + public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency"; + /** Default amount of time to check for errors while regions finish backup work */ + private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500; + + private RegionServerServices rss; + private ProcedureMemberRpcs memberRpcs; + private ProcedureMember member; + + /** + * Create a default backup procedure manager + */ + public LogRollRegionServerProcedureManager() { + } + + /** + * Start accepting backup procedure requests. + */ + @Override + public void start() { + this.memberRpcs.start(rss.getServerName().toString(), member); + LOG.info("Started region server backup manager."); + } + + /** + * Close <tt>this</tt> and all running backup procedure tasks + * @param force forcefully stop all running tasks + * @throws IOException exception + */ + @Override + public void stop(boolean force) throws IOException { + String mode = force ? "abruptly" : "gracefully"; + LOG.info("Stopping RegionServerBackupManager " + mode + "."); + + try { + this.member.close(); + } finally { + this.memberRpcs.close(); + } + } + + /** + * If in a running state, creates the specified subprocedure for handling a backup procedure. + * @return Subprocedure to submit to the ProcedureMemeber. + */ + public Subprocedure buildSubprocedure() { + + // don't run a backup if the parent is stop(ping) + if (rss.isStopping() || rss.isStopped()) { + throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName() + + ", because stopping/stopped!"); + } + + LOG.info("Attempting to run a roll log procedure for backup."); + ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); + Configuration conf = rss.getConfiguration(); + long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); + long wakeMillis = + conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT); + + LogRollBackupSubprocedurePool taskManager = + new LogRollBackupSubprocedurePool(rss.getServerName().toString(), conf); + return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis, + taskManager); + + } + + /** + * Build the actual backup procedure runner that will do all the 'hard' work + */ + public class BackupSubprocedureBuilder implements SubprocedureFactory { + + @Override + public Subprocedure buildSubprocedure(String name, byte[] data) { + return LogRollRegionServerProcedureManager.this.buildSubprocedure(); + } + } + + @Override + public void initialize(RegionServerServices rss) throws IOException { + this.rss = rss; + BaseCoordinatedStateManager coordManager = + (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.getCoordinatedStateManager(rss + .getConfiguration()); + coordManager.initialize(rss); + this.memberRpcs = + coordManager + .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE); + + // read in the backup handler configuration properties + Configuration conf = rss.getConfiguration(); + long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT); + int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT); + // create the actual cohort member + ThreadPoolExecutor pool = + ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); + this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder()); + } + + @Override + public String getProcedureSignature() { + return "backup-proc"; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java index ae36f08..3342743 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hbase.coordination; +import java.io.IOException; + import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.Server; @@ -51,8 +55,21 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan * Method to retrieve coordination for split log worker */ public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination(); + /** * Method to retrieve coordination for split log manager */ public abstract SplitLogManagerCoordination getSplitLogManagerCoordination(); + /** + * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs} + */ + public abstract ProcedureCoordinatorRpcs + getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException; + + /** + * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpc} + */ + public abstract ProcedureMemberRpcs + getProcedureMemberRpcs(String procType) throws IOException; + } http://git-wip-us.apache.org/repos/asf/hbase/blob/de69f0df/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 3e89be7..7cf4aab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -17,9 +17,15 @@ */ package org.apache.hadoop.hbase.coordination; +import java.io.IOException; + import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; /** @@ -49,9 +55,21 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { @Override public SplitLogWorkerCoordination getSplitLogWorkerCoordination() { return splitLogWorkerCoordination; - } + } + @Override public SplitLogManagerCoordination getSplitLogManagerCoordination() { return splitLogManagerCoordination; } + + @Override + public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode) + throws IOException { + return new ZKProcedureCoordinatorRpcs(watcher, procType, coordNode); + } + + @Override + public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws IOException { + return new ZKProcedureMemberRpcs(watcher, procType); + } }