http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java new file mode 100644 index 0000000..8a01a65 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java @@ -0,0 +1,755 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.RestoreRequest; +import org.apache.hadoop.hbase.backup.RestoreTask; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; + +/** + * A collection for methods used by multiple classes to restore HBase tables. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RestoreServerUtil { + + public static final Log LOG = LogFactory.getLog(RestoreServerUtil.class); + + private final String[] ignoreDirs = { "recovered.edits" }; + + private final long TABLE_AVAILABILITY_WAIT_TIME = 180000; + + protected Configuration conf = null; + + protected Path backupRootPath; + + protected String backupId; + + protected FileSystem fs; + private final Path restoreTmpPath; + + // store table name and snapshot dir mapping + private final HashMap<TableName, Path> snapshotMap = new HashMap<>(); + + public RestoreServerUtil(Configuration conf, final Path backupRootPath, final String backupId) + throws IOException { + this.conf = conf; + this.backupRootPath = backupRootPath; + this.backupId = backupId; + this.fs = backupRootPath.getFileSystem(conf); + this.restoreTmpPath = new Path(conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, + HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY), "restore"); + } + + /** + * return value represent path for: + * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/archive/data/default/t1_dn" + * @param tabelName table name + * @return path to table archive + * @throws IOException exception + */ + Path getTableArchivePath(TableName tableName) + throws IOException { + + Path baseDir = new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, + backupId), HConstants.HFILE_ARCHIVE_DIRECTORY); + Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR); + Path archivePath = new Path(dataDir, tableName.getNamespaceAsString()); + Path tableArchivePath = + new Path(archivePath, tableName.getQualifierAsString()); + if (!fs.exists(tableArchivePath) || !fs.getFileStatus(tableArchivePath).isDirectory()) { + LOG.debug("Folder tableArchivePath: " + tableArchivePath.toString() + " does not exists"); + tableArchivePath = null; // empty table has no archive + } + return tableArchivePath; + } + + /** + * Gets region list + * @param tableName table name + * @return RegionList region list + * @throws FileNotFoundException exception + * @throws IOException exception + */ + ArrayList<Path> getRegionList(TableName tableName) + throws FileNotFoundException, IOException { + Path tableArchivePath = this.getTableArchivePath(tableName); + ArrayList<Path> regionDirList = new ArrayList<Path>(); + FileStatus[] children = fs.listStatus(tableArchivePath); + for (FileStatus childStatus : children) { + // here child refer to each region(Name) + Path child = childStatus.getPath(); + regionDirList.add(child); + } + return regionDirList; + } + + /** + * Gets region list + * @param tableName table name + * @param backupId backup id + * @return RegionList region list + * @throws FileNotFoundException exception + * @throws IOException exception + */ + ArrayList<Path> getRegionList(TableName tableName, String backupId) throws FileNotFoundException, + IOException { + Path tableArchivePath = + new Path(BackupClientUtil.getTableBackupDir(backupRootPath.toString(), + backupId, tableName)); + + ArrayList<Path> regionDirList = new ArrayList<Path>(); + FileStatus[] children = fs.listStatus(tableArchivePath); + for (FileStatus childStatus : children) { + // here child refer to each region(Name) + Path child = childStatus.getPath(); + regionDirList.add(child); + } + return regionDirList; + } + + static void modifyTableSync(Connection conn, HTableDescriptor desc) throws IOException { + + try (Admin admin = conn.getAdmin();) { + admin.modifyTable(desc.getTableName(), desc); + int attempt = 0; + int maxAttempts = 600; + while (!admin.isTableAvailable(desc.getTableName())) { + Thread.sleep(100); + attempt++; + if (attempt++ > maxAttempts) { + throw new IOException("Timeout expired " + (maxAttempts * 100) + "ms"); + } + } + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently + * tableNames and newTablesNames only contain single table, will be expanded to multiple tables + * in the future + * @param conn HBase connection + * @param tableBackupPath backup path + * @param logDirs : incremental backup folders, which contains WAL + * @param tableNames : source tableNames(table names were backuped) + * @param newTableNames : target tableNames(table names to be restored to) + * @param incrBackupId incremental backup Id + * @throws IOException exception + */ + public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs, + TableName[] tableNames, TableName[] newTableNames, String incrBackupId) throws IOException { + + try (Admin admin = conn.getAdmin();) { + if (tableNames.length != newTableNames.length) { + throw new IOException("Number of source tables and target tables does not match!"); + } + FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); + + // for incremental backup image, expect the table already created either by user or previous + // full backup. Here, check that all new tables exists + for (TableName tableName : newTableNames) { + if (!admin.tableExists(tableName)) { + throw new IOException("HBase table " + tableName + + " does not exist. Create the table first, e.g. by restoring a full backup."); + } + } + // adjust table schema + for (int i = 0; i < tableNames.length; i++) { + TableName tableName = tableNames[i]; + HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, incrBackupId); + LOG.debug("Found descriptor " + tableDescriptor + " through " + incrBackupId); + + TableName newTableName = newTableNames[i]; + HTableDescriptor newTableDescriptor = admin.getTableDescriptor(newTableName); + List<HColumnDescriptor> families = Arrays.asList(tableDescriptor.getColumnFamilies()); + List<HColumnDescriptor> existingFamilies = + Arrays.asList(newTableDescriptor.getColumnFamilies()); + boolean schemaChangeNeeded = false; + for (HColumnDescriptor family : families) { + if (!existingFamilies.contains(family)) { + newTableDescriptor.addFamily(family); + schemaChangeNeeded = true; + } + } + for (HColumnDescriptor family : existingFamilies) { + if (!families.contains(family)) { + newTableDescriptor.removeFamily(family.getName()); + schemaChangeNeeded = true; + } + } + if (schemaChangeNeeded) { + RestoreServerUtil.modifyTableSync(conn, newTableDescriptor); + LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor); + } + } + RestoreTask restoreService = + BackupRestoreServerFactory.getRestoreTask(conf); + + restoreService.run(logDirs, tableNames, newTableNames, false); + } + } + + public void fullRestoreTable(Connection conn, Path tableBackupPath, TableName tableName, + TableName newTableName, boolean truncateIfExists, String lastIncrBackupId) + throws IOException { + restoreTableAndCreate(conn, tableName, newTableName, tableBackupPath, truncateIfExists, + lastIncrBackupId); + } + + /** + * return value represent path for: + * ".../user/biadmin/backup1/default/t1_dn/backup_1396650096738/.hbase-snapshot" + * @param backupRootPath backup root path + * @param tableName table name + * @param backupId backup Id + * @return path for snapshot + */ + static Path getTableSnapshotPath(Path backupRootPath, TableName tableName, + String backupId) { + return new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId), + HConstants.SNAPSHOT_DIR_NAME); + } + + /** + * return value represent path for: + * "..../default/t1_dn/backup_1396650096738/.hbase-snapshot/snapshot_1396650097621_default_t1_dn" + * this path contains .snapshotinfo, .tabledesc (0.96 and 0.98) this path contains .snapshotinfo, + * .data.manifest (trunk) + * @param tableName table name + * @return path to table info + * @throws FileNotFoundException exception + * @throws IOException exception + */ + Path getTableInfoPath(TableName tableName) + throws FileNotFoundException, IOException { + Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); + Path tableInfoPath = null; + + // can't build the path directly as the timestamp values are different + FileStatus[] snapshots = fs.listStatus(tableSnapShotPath); + for (FileStatus snapshot : snapshots) { + tableInfoPath = snapshot.getPath(); + // SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest"; + if (tableInfoPath.getName().endsWith("data.manifest")) { + break; + } + } + return tableInfoPath; + } + + /** + * @param tableName is the table backed up + * @return {@link HTableDescriptor} saved in backup image of the table + */ + HTableDescriptor getTableDesc(TableName tableName) + throws FileNotFoundException, IOException { + Path tableInfoPath = this.getTableInfoPath(tableName); + SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath); + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc); + HTableDescriptor tableDescriptor = manifest.getTableDescriptor(); + if (!tableDescriptor.getTableName().equals(tableName)) { + LOG.error("couldn't find Table Desc for table: " + tableName + " under tableInfoPath: " + + tableInfoPath.toString()); + LOG.error("tableDescriptor.getNameAsString() = " + tableDescriptor.getNameAsString()); + throw new FileNotFoundException("couldn't find Table Desc for table: " + tableName + + " under tableInfoPath: " + tableInfoPath.toString()); + } + return tableDescriptor; + } + + /** + * Duplicate the backup image if it's on local cluster + * @see HStore#bulkLoadHFile(String, long) + * @see HRegionFileSystem#bulkLoadStoreFile(String familyName, Path srcPath, long seqNum) + * @param tableArchivePath archive path + * @return the new tableArchivePath + * @throws IOException exception + */ + Path checkLocalAndBackup(Path tableArchivePath) throws IOException { + // Move the file if it's on local cluster + boolean isCopyNeeded = false; + + FileSystem srcFs = tableArchivePath.getFileSystem(conf); + FileSystem desFs = FileSystem.get(conf); + if (tableArchivePath.getName().startsWith("/")) { + isCopyNeeded = true; + } else { + // This should match what is done in @see HRegionFileSystem#bulkLoadStoreFile(String, Path, + // long) + if (srcFs.getUri().equals(desFs.getUri())) { + LOG.debug("cluster hold the backup image: " + srcFs.getUri() + "; local cluster node: " + + desFs.getUri()); + isCopyNeeded = true; + } + } + if (isCopyNeeded) { + LOG.debug("File " + tableArchivePath + " on local cluster, back it up before restore"); + if (desFs.exists(restoreTmpPath)) { + try { + desFs.delete(restoreTmpPath, true); + } catch (IOException e) { + LOG.debug("Failed to delete path: " + restoreTmpPath + + ", need to check whether restore target DFS cluster is healthy"); + } + } + FileUtil.copy(srcFs, tableArchivePath, desFs, restoreTmpPath, false, conf); + LOG.debug("Copied to temporary path on local cluster: " + restoreTmpPath); + tableArchivePath = restoreTmpPath; + } + return tableArchivePath; + } + + private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName, + String lastIncrBackupId) throws IOException { + if (lastIncrBackupId != null) { + String target = BackupClientUtil.getTableBackupDir(backupRootPath.toString(), + lastIncrBackupId, tableName); + // Path target = new Path(info.getBackupStatus(tableName).getTargetDir()); + return FSTableDescriptors.getTableDescriptorFromFs(fileSys, + new Path(target)); + } + return null; + } + + private void restoreTableAndCreate(Connection conn, TableName tableName, TableName newTableName, + Path tableBackupPath, boolean truncateIfExists, String lastIncrBackupId) throws IOException { + if (newTableName == null) { + newTableName = tableName; + } + FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); + + // get table descriptor first + HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, lastIncrBackupId); + if (tableDescriptor != null) { + LOG.debug("Retrieved descriptor: " + tableDescriptor + " thru " + lastIncrBackupId); + } + + if (tableDescriptor == null) { + Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); + if (fileSys.exists(tableSnapshotPath)) { + // snapshot path exist means the backup path is in HDFS + // check whether snapshot dir already recorded for target table + if (snapshotMap.get(tableName) != null) { + SnapshotDescription desc = + SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath); + SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc); + tableDescriptor = manifest.getTableDescriptor(); + } else { + tableDescriptor = getTableDesc(tableName); + snapshotMap.put(tableName, getTableInfoPath(tableName)); + } + if (tableDescriptor == null) { + LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost"); + } + } else { + throw new IOException("Table snapshot directory: " + tableSnapshotPath + + " does not exist."); + } + } + + Path tableArchivePath = getTableArchivePath(tableName); + if (tableArchivePath == null) { + if (tableDescriptor != null) { + // find table descriptor but no archive dir means the table is empty, create table and exit + if(LOG.isDebugEnabled()) { + LOG.debug("find table descriptor but no archive dir for table " + tableName + + ", will only create table"); + } + tableDescriptor.setName(newTableName); + checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, null, + tableDescriptor, truncateIfExists); + return; + } else { + throw new IllegalStateException("Cannot restore hbase table because directory '" + + " tableArchivePath is null."); + } + } + + if (tableDescriptor == null) { + tableDescriptor = new HTableDescriptor(newTableName); + } else { + tableDescriptor.setName(newTableName); + } + + // record all region dirs: + // load all files in dir + try { + ArrayList<Path> regionPathList = getRegionList(tableName); + + // should only try to create the table with all region informations, so we could pre-split + // the regions in fine grain + checkAndCreateTable(conn, tableBackupPath, tableName, newTableName, regionPathList, + tableDescriptor, truncateIfExists); + if (tableArchivePath != null) { + // start real restore through bulkload + // if the backup target is on local cluster, special action needed + Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); + if (tempTableArchivePath.equals(tableArchivePath)) { + if(LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); + } + } else { + regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir + if(LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); + } + } + + LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); + for (Path regionPath : regionPathList) { + String regionName = regionPath.toString(); + if(LOG.isDebugEnabled()) { + LOG.debug("Restoring HFiles from directory " + regionName); + } + String[] args = { regionName, newTableName.getNameAsString()}; + loader.run(args); + } + } + // we do not recovered edits + } catch (Exception e) { + throw new IllegalStateException("Cannot restore hbase table", e); + } + } + + /** + * Gets region list + * @param tableArchivePath table archive path + * @return RegionList region list + * @throws FileNotFoundException exception + * @throws IOException exception + */ + ArrayList<Path> getRegionList(Path tableArchivePath) throws FileNotFoundException, + IOException { + ArrayList<Path> regionDirList = new ArrayList<Path>(); + FileStatus[] children = fs.listStatus(tableArchivePath); + for (FileStatus childStatus : children) { + // here child refer to each region(Name) + Path child = childStatus.getPath(); + regionDirList.add(child); + } + return regionDirList; + } + + /** + * Counts the number of files in all subdirectories of an HBase table, i.e. HFiles. + * @param regionPath Path to an HBase table directory + * @return the number of files all directories + * @throws IOException exception + */ + int getNumberOfFilesInDir(Path regionPath) throws IOException { + int result = 0; + + if (!fs.exists(regionPath) || !fs.getFileStatus(regionPath).isDirectory()) { + throw new IllegalStateException("Cannot restore hbase table because directory '" + + regionPath.toString() + "' is not a directory."); + } + + FileStatus[] tableDirContent = fs.listStatus(regionPath); + for (FileStatus subDirStatus : tableDirContent) { + FileStatus[] colFamilies = fs.listStatus(subDirStatus.getPath()); + for (FileStatus colFamilyStatus : colFamilies) { + FileStatus[] colFamilyContent = fs.listStatus(colFamilyStatus.getPath()); + result += colFamilyContent.length; + } + } + return result; + } + + /** + * Counts the number of files in all subdirectories of an HBase tables, i.e. HFiles. And finds the + * maximum number of files in one HBase table. + * @param tableArchivePath archive path + * @return the maximum number of files found in 1 HBase table + * @throws IOException exception + */ + int getMaxNumberOfFilesInSubDir(Path tableArchivePath) throws IOException { + int result = 1; + ArrayList<Path> regionPathList = getRegionList(tableArchivePath); + // tableArchivePath = this.getTableArchivePath(tableName); + + if (regionPathList == null || regionPathList.size() == 0) { + throw new IllegalStateException("Cannot restore hbase table because directory '" + + tableArchivePath + "' is not a directory."); + } + + for (Path regionPath : regionPathList) { + result = Math.max(result, getNumberOfFilesInDir(regionPath)); + } + return result; + } + + /** + * Create a {@link LoadIncrementalHFiles} instance to be used to restore the HFiles of a full + * backup. + * @return the {@link LoadIncrementalHFiles} instance + * @throws IOException exception + */ + private LoadIncrementalHFiles createLoader(Path tableArchivePath, boolean multipleTables) + throws IOException { + // set configuration for restore: + // LoadIncrementalHFile needs more time + // <name>hbase.rpc.timeout</name> <value>600000</value> + // calculates + Integer milliSecInMin = 60000; + Integer previousMillis = this.conf.getInt("hbase.rpc.timeout", 0); + Integer numberOfFilesInDir = + multipleTables ? getMaxNumberOfFilesInSubDir(tableArchivePath) : + getNumberOfFilesInDir(tableArchivePath); + Integer calculatedMillis = numberOfFilesInDir * milliSecInMin; // 1 minute per file + Integer resultMillis = Math.max(calculatedMillis, previousMillis); + if (resultMillis > previousMillis) { + LOG.info("Setting configuration for restore with LoadIncrementalHFile: " + + "hbase.rpc.timeout to " + calculatedMillis / milliSecInMin + + " minutes, to handle the number of files in backup " + tableArchivePath); + this.conf.setInt("hbase.rpc.timeout", resultMillis); + } + + // By default, it is 32 and loader will fail if # of files in any region exceed this + // limit. Bad for snapshot restore. + this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); + LoadIncrementalHFiles loader = null; + try { + loader = new LoadIncrementalHFiles(this.conf); + } catch (Exception e1) { + throw new IOException(e1); + } + return loader; + } + + /** + * Calculate region boundaries and add all the column families to the table descriptor + * @param regionDirList region dir list + * @return a set of keys to store the boundaries + */ + byte[][] generateBoundaryKeys(ArrayList<Path> regionDirList) + throws FileNotFoundException, IOException { + TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); + // Build a set of keys to store the boundaries + byte[][] keys = null; + // calculate region boundaries and add all the column families to the table descriptor + for (Path regionDir : regionDirList) { + LOG.debug("Parsing region dir: " + regionDir); + Path hfofDir = regionDir; + + if (!fs.exists(hfofDir)) { + LOG.warn("HFileOutputFormat dir " + hfofDir + " not found"); + } + + FileStatus[] familyDirStatuses = fs.listStatus(hfofDir); + if (familyDirStatuses == null) { + throw new IOException("No families found in " + hfofDir); + } + + for (FileStatus stat : familyDirStatuses) { + if (!stat.isDirectory()) { + LOG.warn("Skipping non-directory " + stat.getPath()); + continue; + } + boolean isIgnore = false; + String pathName = stat.getPath().getName(); + for (String ignore : ignoreDirs) { + if (pathName.contains(ignore)) { + LOG.warn("Skipping non-family directory" + pathName); + isIgnore = true; + break; + } + } + if (isIgnore) { + continue; + } + Path familyDir = stat.getPath(); + LOG.debug("Parsing family dir [" + familyDir.toString() + " in region [" + regionDir + "]"); + // Skip _logs, etc + if (familyDir.getName().startsWith("_") || familyDir.getName().startsWith(".")) { + continue; + } + + // start to parse hfile inside one family dir + Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir)); + for (Path hfile : hfiles) { + if (hfile.getName().startsWith("_") || hfile.getName().startsWith(".") + || StoreFileInfo.isReference(hfile.getName()) + || HFileLink.isHFileLink(hfile.getName())) { + continue; + } + HFile.Reader reader = HFile.createReader(fs, hfile, new CacheConfig(conf), conf); + final byte[] first, last; + try { + reader.loadFileInfo(); + first = reader.getFirstRowKey(); + last = reader.getLastRowKey(); + LOG.debug("Trying to figure out region boundaries hfile=" + hfile + " first=" + + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); + + // To eventually infer start key-end key boundaries + Integer value = map.containsKey(first) ? (Integer) map.get(first) : 0; + map.put(first, value + 1); + value = map.containsKey(last) ? (Integer) map.get(last) : 0; + map.put(last, value - 1); + } finally { + reader.close(); + } + } + } + } + keys = LoadIncrementalHFiles.inferBoundaries(map); + return keys; + } + + /** + * Prepare the table for bulkload, most codes copied from + * {@link LoadIncrementalHFiles#createTable(String, String)} + * @param svc MasterServices + * @param tableBackupPath path + * @param tableName table name + * @param targetTableName target table name + * @param regionDirList region directory list + * @param htd table descriptor + * @throws IOException exception + */ + private void checkAndCreateTable(Connection conn, Path tableBackupPath, TableName tableName, + TableName targetTableName, ArrayList<Path> regionDirList, + HTableDescriptor htd, boolean truncateIfExists) + throws IOException { + try (Admin admin = conn.getAdmin();){ + boolean createNew = false; + if (admin.tableExists(targetTableName)) { + if(truncateIfExists) { + LOG.info("Truncating exising target table '" + targetTableName + + "', preserving region splits"); + admin.disableTable(targetTableName); + admin.truncateTable(targetTableName, true); + } else{ + LOG.info("Using exising target table '" + targetTableName + "'"); + } + } else { + createNew = true; + } + if (createNew){ + LOG.info("Creating target table '" + targetTableName + "'"); + byte[][] keys = null; + if (regionDirList == null || regionDirList.size() == 0) { + admin.createTable(htd, null); + } else { + keys = generateBoundaryKeys(regionDirList); + // create table using table descriptor and region boundaries + admin.createTable(htd, keys); + } + long startTime = EnvironmentEdgeManager.currentTime(); + while (!admin.isTableAvailable(targetTableName, keys)) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + if (EnvironmentEdgeManager.currentTime() - startTime > TABLE_AVAILABILITY_WAIT_TIME) { + throw new IOException("Time out "+TABLE_AVAILABILITY_WAIT_TIME+ + "ms expired, table " + targetTableName + " is still not available"); + } + } + } + } + } + + public static boolean validate(HashMap<TableName, BackupManifest> backupManifestMap, + Configuration conf) throws IOException { + boolean isValid = true; + + for (Entry<TableName, BackupManifest> manifestEntry : backupManifestMap.entrySet()) { + TableName table = manifestEntry.getKey(); + TreeSet<BackupImage> imageSet = new TreeSet<BackupImage>(); + + ArrayList<BackupImage> depList = manifestEntry.getValue().getDependentListByTable(table); + if (depList != null && !depList.isEmpty()) { + imageSet.addAll(depList); + } + + LOG.info("Dependent image(s) from old to new:"); + for (BackupImage image : imageSet) { + String imageDir = + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), table); + if (!BackupClientUtil.checkPathExist(imageDir, conf)) { + LOG.error("ERROR: backup image does not exist: " + imageDir); + isValid = false; + break; + } + LOG.info("Backup image: " + image.getBackupId() + " for '" + table + "' is available"); + } + } + return isValid; + } + + /** + * Create restore request. + * + */ + public static RestoreRequest createRestoreRequest( + String backupRootDir, + String backupId, boolean check, TableName[] fromTables, + TableName[] toTables, boolean isOverwrite) { + RestoreRequest request = new RestoreRequest(); + request.setBackupRootDir(backupRootDir).setBackupId(backupId).setCheck(check) + .setFromTables(fromTables).setToTables(toTables).setOverwrite(isOverwrite); + return request; + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java index ae36f08..0e3755b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java @@ -17,9 +17,14 @@ */ package org.apache.hadoop.hbase.coordination; -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.io.IOException; + import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.zookeeper.KeeperException; /** * Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations. @@ -51,8 +56,21 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan * Method to retrieve coordination for split log worker */ public abstract SplitLogWorkerCoordination getSplitLogWorkerCoordination(); + /** * Method to retrieve coordination for split log manager */ public abstract SplitLogManagerCoordination getSplitLogManagerCoordination(); + /** + * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs} + */ + public abstract ProcedureCoordinatorRpcs + getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException; + + /** + * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs} + */ + public abstract ProcedureMemberRpcs + getProcedureMemberRpcs(String procType) throws KeeperException; + } http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 3e89be7..d5e4085 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -17,10 +17,17 @@ */ package org.apache.hadoop.hbase.coordination; -import org.apache.hadoop.hbase.classification.InterfaceAudience; +import java.io.IOException; + import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; +import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs; +import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; /** * ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}. @@ -49,9 +56,21 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager { @Override public SplitLogWorkerCoordination getSplitLogWorkerCoordination() { return splitLogWorkerCoordination; - } + } + @Override public SplitLogManagerCoordination getSplitLogManagerCoordination() { return splitLogManagerCoordination; } + + @Override + public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode) + throws IOException { + return new ZKProcedureCoordinatorRpcs(watcher, procType, coordNode); + } + + @Override + public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException { + return new ZKProcedureMemberRpcs(watcher, procType); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java index 3fe5a90..d51ab12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java @@ -85,15 +85,19 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, } } + @Override public void close(Reporter reporter) throws IOException { if (this.m_mutator != null) { this.m_mutator.close(); + this.m_mutator = null; } if (conn != null) { this.conn.close(); + this.conn = null; } } + @Override public void write(ImmutableBytesWritable key, Put value) throws IOException { m_mutator.mutate(new Put(value)); } @@ -101,7 +105,7 @@ public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, /** * Creates a new record writer. - * + * * Be aware that the baseline javadoc gives the impression that there is a single * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new * RecordWriter per call of this method. You must close the returned RecordWriter when done. http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java new file mode 100644 index 0000000..a00d390 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple MR input format for HFiles. + * This code was borrowed from Apache Crunch project. + * Updated to the recent version of HBase. + */ +public class HFileInputFormat2 extends FileInputFormat<NullWritable, Cell> { + + private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat2.class); + + /** + * File filter that removes all "hidden" files. This might be something worth removing from + * a more general purpose utility; it accounts for the presence of metadata files created + * in the way we're doing exports. + */ + static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + /** + * Record reader for HFiles. + */ + private static class HFileRecordReader extends RecordReader<NullWritable, Cell> { + + private Reader in; + protected Configuration conf; + private HFileScanner scanner; + + /** + * A private cache of the key value so it doesn't need to be loaded twice from the scanner. + */ + private Cell value = null; + private long count; + private boolean seeked = false; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) split; + conf = context.getConfiguration(); + Path path = fileSplit.getPath(); + FileSystem fs = path.getFileSystem(conf); + LOG.info("Initialize HFileRecordReader for {}", path); + this.in = HFile.createReader(fs, path, new CacheConfig(conf), conf); + + // The file info must be loaded before the scanner can be used. + // This seems like a bug in HBase, but it's easily worked around. + this.in.loadFileInfo(); + this.scanner = in.getScanner(false, false); + + } + + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + boolean hasNext; + if (!seeked) { + LOG.info("Seeking to start"); + hasNext = scanner.seekTo(); + seeked = true; + } else { + hasNext = scanner.next(); + } + if (!hasNext) { + return false; + } + value = scanner.getCell(); + count++; + return true; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public Cell getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to + // the start row, but better than nothing anyway. + return 1.0f * count / in.getEntries(); + } + + @Override + public void close() throws IOException { + if (in != null) { + in.close(); + in = null; + } + } + } + + @Override + protected List<FileStatus> listStatus(JobContext job) throws IOException { + List<FileStatus> result = new ArrayList<FileStatus>(); + + // Explode out directories that match the original FileInputFormat filters + // since HFiles are written to directories where the + // directory name is the column name + for (FileStatus status : super.listStatus(job)) { + if (status.isDirectory()) { + FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); + for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { + result.add(match); + } + } else { + result.add(status); + } + } + return result; + } + + @Override + public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new HFileRecordReader(); + } + + @Override + protected boolean isSplitable(JobContext context, Path filename) { + // This file isn't splittable. + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 574ae18..064d649 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -114,7 +114,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; public final static String CREATE_TABLE_CONF_KEY = "create.table"; - public final static String SILENCE_CONF_KEY = "ignore.unmatched.families"; + public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families"; public final static String ALWAYS_COPY_FILES = "always.copy.files"; // We use a '.' prefix which is ignored when walking directory trees @@ -162,7 +162,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + " Note: if you set this to 'no', then the target table must already exist in HBase\n -D" - + SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" + + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" + "\n"); } @@ -1228,7 +1228,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { - boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, "")); + boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, "")); if (dirPath != null) { doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles); http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index 02fcbba..da950f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -27,22 +27,26 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.StringUtils; /** * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files. @@ -169,7 +173,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { temp = reader.next(currentEntry); i++; } catch (EOFException x) { - LOG.info("Corrupted entry detected. Ignoring the rest of the file." + LOG.warn("Corrupted entry detected. Ignoring the rest of the file." + " (This is normal when a RegionServer crashed.)"); return false; } @@ -231,29 +235,39 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); - Path inputDir = new Path(conf.get("mapreduce.input.fileinputformat.inputdir")); + + Path[] inputPaths = getInputPaths(conf); long startTime = conf.getLong(startKey, Long.MIN_VALUE); long endTime = conf.getLong(endKey, Long.MAX_VALUE); - FileSystem fs = inputDir.getFileSystem(conf); - List<FileStatus> files = getFiles(fs, inputDir, startTime, endTime); - - List<InputSplit> splits = new ArrayList<InputSplit>(files.size()); - for (FileStatus file : files) { + List<FileStatus> allFiles = new ArrayList<FileStatus>(); + for(Path inputPath: inputPaths){ + FileSystem fs = inputPath.getFileSystem(conf); + List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime); + allFiles.addAll(files); + } + List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); + for (FileStatus file : allFiles) { splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); } return splits; } + private Path[] getInputPaths(Configuration conf) { + String inpDirs = conf.get("mapreduce.input.fileinputformat.inputdir"); + return StringUtils.stringToPath(inpDirs.split(",")); + } + private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) throws IOException { List<FileStatus> result = new ArrayList<FileStatus>(); LOG.debug("Scanning " + dir.toString() + " for WAL files"); - FileStatus[] files = fs.listStatus(dir); - if (files == null) return Collections.emptyList(); - for (FileStatus file : files) { + RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir); + if (!iter.hasNext()) return Collections.emptyList(); + while (iter.hasNext()) { + LocatedFileStatus file = iter.next(); if (file.isDirectory()) { // recurse into sub directories result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); @@ -264,7 +278,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> { try { long fileStartTime = Long.parseLong(name.substring(idx+1)); if (fileStartTime <= endTime) { - LOG.info("Found: " + name); + LOG.info("Found: " + file); result.add(file); } } catch (NumberFormatException x) { http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java index 452714b..edcb57f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java @@ -70,9 +70,9 @@ import org.apache.hadoop.util.ToolRunner; public class WALPlayer extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(WALPlayer.class); final static String NAME = "WALPlayer"; - final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; - final static String TABLES_KEY = "wal.input.tables"; - final static String TABLE_MAP_KEY = "wal.input.tablesmap"; + public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; + public final static String TABLES_KEY = "wal.input.tables"; + public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; // This relies on Hadoop Configuration to handle warning about deprecated configs and // to set the correct non-deprecated configs when an old one shows up. @@ -86,6 +86,9 @@ public class WALPlayer extends Configured implements Tool { private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + public WALPlayer(){ + } + protected WALPlayer(final Configuration c) { super(c); } @@ -95,7 +98,7 @@ public class WALPlayer extends Configured implements Tool { * This one can be used together with {@link KeyValueSortReducer} */ static class WALKeyValueMapper - extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> { + extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> { private byte[] table; @Override @@ -107,7 +110,9 @@ public class WALPlayer extends Configured implements Tool { if (Bytes.equals(table, key.getTablename().getName())) { for (Cell cell : value.getCells()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - if (WALEdit.isMetaEditFamily(kv)) continue; + if (WALEdit.isMetaEditFamily(kv)) { + continue; + } context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv); } } @@ -133,7 +138,7 @@ public class WALPlayer extends Configured implements Tool { * a running HBase instance. */ protected static class WALMapper - extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> { + extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> { private Map<TableName, TableName> tables = new TreeMap<TableName, TableName>(); @Override @@ -150,7 +155,9 @@ public class WALPlayer extends Configured implements Tool { Cell lastCell = null; for (Cell cell : value.getCells()) { // filtering WAL meta entries - if (WALEdit.isMetaEditFamily(cell)) continue; + if (WALEdit.isMetaEditFamily(cell)) { + continue; + } // Allow a subclass filter out this cell. if (filter(context, cell)) { @@ -161,8 +168,12 @@ public class WALPlayer extends Configured implements Tool { if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() || !CellUtil.matchingRow(lastCell, cell)) { // row or type changed, write out aggregate KVs. - if (put != null) context.write(tableOut, put); - if (del != null) context.write(tableOut, del); + if (put != null) { + context.write(tableOut, put); + } + if (del != null) { + context.write(tableOut, del); + } if (CellUtil.isDelete(cell)) { del = new Delete(CellUtil.cloneRow(cell)); } else { @@ -178,29 +189,34 @@ public class WALPlayer extends Configured implements Tool { lastCell = cell; } // write residual KVs - if (put != null) context.write(tableOut, put); - if (del != null) context.write(tableOut, del); + if (put != null) { + context.write(tableOut, put); + } + if (del != null) { + context.write(tableOut, del); + } } } catch (InterruptedException e) { e.printStackTrace(); } } - /** - * @param cell - * @return Return true if we are to emit this cell. - */ protected boolean filter(Context context, final Cell cell) { return true; } @Override + protected void + cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) + throws IOException, InterruptedException { + super.cleanup(context); + } + + @Override public void setup(Context context) throws IOException { String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY); - if (tablesToUse == null && tableMap == null) { - // Then user wants all tables. - } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) { + if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) { // this can only happen when WALMapper is used directly by a class other than WALPlayer throw new IOException("No tables or incorrect table mapping specified."); } @@ -216,7 +232,9 @@ public class WALPlayer extends Configured implements Tool { void setupTime(Configuration conf, String option) throws IOException { String val = conf.get(option); - if (null == val) return; + if (null == val) { + return; + } long ms; try { // first try to parse in user friendly form @@ -246,7 +264,7 @@ public class WALPlayer extends Configured implements Tool { Configuration conf = getConf(); setupTime(conf, HLogInputFormat.START_TIME_KEY); setupTime(conf, HLogInputFormat.END_TIME_KEY); - Path inputDir = new Path(args[0]); + String inputDirs = args[0]; String[] tables = args[1].split(","); String[] tableMap; if (args.length > 2) { @@ -260,13 +278,18 @@ public class WALPlayer extends Configured implements Tool { } conf.setStrings(TABLES_KEY, tables); conf.setStrings(TABLE_MAP_KEY, tableMap); - Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + inputDir)); + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); job.setJarByClass(WALPlayer.class); - FileInputFormat.setInputPaths(job, inputDir); + + FileInputFormat.addInputPaths(job, inputDirs); + job.setInputFormatClass(WALInputFormat.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); if (hfileOutPath != null) { + LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); + // the bulk HFile case if (tables.length != 1) { throw new IOException("Exactly one table must be specified for the bulk export option"); @@ -302,7 +325,9 @@ public class WALPlayer extends Configured implements Tool { return job; } - /* + + /** + * Print usage * @param errorMsg Error message. Can be null. */ private void usage(final String errorMsg) { @@ -312,7 +337,8 @@ public class WALPlayer extends Configured implements Tool { System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]"); System.err.println("Read all WAL entries for <tables>."); System.err.println("If no tables (\"\") are specific, all tables are imported."); - System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported in that case.)"); + System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported"+ + " in that case.)"); System.err.println("Otherwise <tables> is a comma separated list of tables.\n"); System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>."); System.err.println("<tableMapping> is a command separated list of targettables."); @@ -325,10 +351,10 @@ public class WALPlayer extends Configured implements Tool { System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]"); System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]"); System.err.println(" -D " + JOB_NAME_CONF_KEY - + "=jobName - use the specified mapreduce job name for the wal player"); + + "=jobName - use the specified mapreduce job name for the wal player"); System.err.println("For performance also consider the following options:\n" - + " -Dmapreduce.map.speculative=false\n" - + " -Dmapreduce.reduce.speculative=false"); + + " -Dmapreduce.map.speculative=false\n" + + " -Dmapreduce.reduce.speculative=false"); } /** @@ -349,6 +375,7 @@ public class WALPlayer extends Configured implements Tool { System.exit(-1); } Job job = createSubmittableJob(args); - return job.waitForCompletion(true) ? 0 : 1; + int result =job.waitForCompletion(true) ? 0 : 1; + return result; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0136ff5..1442433 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.PleaseHoldException; import org.apache.hadoop.hbase.ProcedureInfo; -import org.apache.hadoop.hbase.RegionStateListener; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; @@ -74,6 +73,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.UnknownRegionException; +import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.Result; @@ -413,6 +413,7 @@ public class HMaster extends HRegionServer implements MasterServices { this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); Replication.decorateMasterConfiguration(this.conf); + BackupManager.decorateMasterConfiguration(this.conf); // Hack! Maps DFSClient => Master for logs. HDFS made this // config param for task trackers, but we can piggyback off of it. @@ -899,7 +900,7 @@ public class HMaster extends HRegionServer implements MasterServices { void initQuotaManager() throws IOException { MasterQuotaManager quotaManager = new MasterQuotaManager(this); - this.assignmentManager.setRegionStateListener((RegionStateListener)quotaManager); + this.assignmentManager.setRegionStateListener(quotaManager); quotaManager.start(); this.quotaManager = quotaManager; } @@ -2644,6 +2645,8 @@ public class HMaster extends HRegionServer implements MasterServices { return procInfoList; } + + /** * Returns the list of table descriptors that match the specified request * @param namespace the namespace to query, or null if querying for all http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java index 4632d23..9d75e2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java @@ -52,7 +52,7 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { * @throws KeeperException if an unexpected zk error occurs */ public ZKProcedureCoordinatorRpcs(ZooKeeperWatcher watcher, - String procedureClass, String coordName) throws KeeperException { + String procedureClass, String coordName) throws IOException { this.watcher = watcher; this.procedureType = procedureClass; this.coordName = coordName; @@ -179,6 +179,7 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { * Start monitoring znodes in ZK - subclass hook to start monitoring znodes they are about. * @return true if succeed, false if encountered initialization errors. */ + @Override final public boolean start(final ProcedureCoordinator coordinator) { if (this.coordinator != null) { throw new IllegalStateException( http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 24d8170..6ac88be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -47,7 +47,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.MalformedObjectNameException; @@ -79,6 +78,7 @@ import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; +import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; @@ -194,13 +194,13 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; +import sun.misc.Signal; +import sun.misc.SignalHandler; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; -import sun.misc.Signal; -import sun.misc.SignalHandler; - /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. @@ -374,7 +374,7 @@ public class HRegionServer extends HasThread implements // WAL roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes - final LogRoller walRoller; + public final LogRoller walRoller; // flag set after we're done setting up server threads final AtomicBoolean online = new AtomicBoolean(false); @@ -522,7 +522,7 @@ public class HRegionServer extends HasThread implements FSUtils.setupShortCircuitRead(this.conf); // Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); - + BackupManager.decorateRSConfiguration(conf); // Config'ed params this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); @@ -633,7 +633,7 @@ public class HRegionServer extends HasThread implements int cleanerInterval = conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000); this.compactedFileDischarger = - new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this); + new CompactedHFilesDischarger(cleanerInterval, this, this); choreService.scheduleChore(compactedFileDischarger); } @@ -846,7 +846,7 @@ public class HRegionServer extends HasThread implements rspmHost.loadProcedures(conf); rspmHost.initialize(this); } catch (KeeperException e) { - this.abort("Failed to reach zk cluster when creating procedure handler.", e); + this.abort("Failed to reach coordination cluster when creating procedure handler.", e); } // register watcher for recovering regions this.recoveringRegionWatcher = new RecoveringRegionWatcher(this.zooKeeper, this); http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index b4f0a29..8243fb5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -17,15 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import com.google.common.annotations.VisibleForTesting; -import com.lmax.disruptor.BlockingWaitStrategy; -import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.ExceptionHandler; -import com.lmax.disruptor.LifecycleAware; -import com.lmax.disruptor.TimeoutException; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; @@ -66,6 +57,15 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import com.google.common.annotations.VisibleForTesting; +import com.lmax.disruptor.BlockingWaitStrategy; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.ExceptionHandler; +import com.lmax.disruptor.LifecycleAware; +import com.lmax.disruptor.TimeoutException; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + /** * The default implementation of FSWAL. */ @@ -780,6 +780,15 @@ public class FSHLog extends AbstractFSWAL<Writer> { } } + /** + * To support old API compatibility + * @return current file number (timestamp) + */ + @Override + public long getFilenum() { + return filenum.get(); + } + @Override public void sync(long txid) throws IOException { if (this.highestSyncedTxid.get() >= txid) { http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ProcedureUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ProcedureUtil.java new file mode 100644 index 0000000..6f252fc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ProcedureUtil.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.procedure.MasterProcedureManager; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class ProcedureUtil { + private static final Log LOG = LogFactory.getLog(ProcedureUtil.class); + + public static ProcedureDescription buildProcedure(String signature, String instance, + Map<String, String> props) { + ProcedureDescription.Builder builder = ProcedureDescription.newBuilder(); + builder.setSignature(signature).setInstance(instance); + for (Entry<String, String> entry : props.entrySet()) { + NameStringPair pair = NameStringPair.newBuilder().setName(entry.getKey()) + .setValue(entry.getValue()).build(); + builder.addConfiguration(pair); + } + ProcedureDescription desc = builder.build(); + return desc; + } + + public static long execProcedure(MasterProcedureManager mpm, String signature, String instance, + Map<String, String> props) throws IOException { + if (mpm == null) { + throw new IOException("The procedure is not registered: " + signature); + } + ProcedureDescription desc = buildProcedure(signature, instance, props); + mpm.execProcedure(desc); + + // send back the max amount of time the client should wait for the procedure + // to complete + long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME; + return waitTime; + } + + public static void waitForProcedure(MasterProcedureManager mpm, String signature, String instance, + Map<String, String> props, long max, int numRetries, long pause) throws IOException { + ProcedureDescription desc = buildProcedure(signature, instance, props); + long start = EnvironmentEdgeManager.currentTime(); + long maxPauseTime = max / numRetries; + int tries = 0; + LOG.debug("Waiting a max of " + max + " ms for procedure '" + + signature + " : " + instance + "'' to complete. (max " + maxPauseTime + " ms per retry)"); + boolean done = false; + while (tries == 0 + || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) { + try { + // sleep a backoff <= pauseTime amount + long sleep = getPauseTime(tries++, pause); + sleep = sleep > maxPauseTime ? maxPauseTime : sleep; + LOG.debug("(#" + tries + ") Sleeping: " + sleep + + "ms while waiting for procedure completion."); + Thread.sleep(sleep); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException("Interrupted").initCause(e); + } + LOG.debug("Getting current status of procedure from master..."); + done = mpm.isProcedureDone(desc); + } + if (!done) { + throw new IOException("Procedure '" + signature + " : " + instance + + "' wasn't completed in expectedTime:" + max + " ms"); + } + } + + private static long getPauseTime(int tries, long pause) { + int triesCount = tries; + if (triesCount >= HConstants.RETRY_BACKOFF.length) { + triesCount = HConstants.RETRY_BACKOFF.length - 1; + } + return pause * HConstants.RETRY_BACKOFF[triesCount]; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 76a6415..0ae37cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -229,6 +229,11 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen @VisibleForTesting public static long extractFileNumFromWAL(final WAL wal) { final Path walName = ((AbstractFSWAL<?>) wal).getCurrentFileName(); + return extractFileNumFromWAL(walName); + } + + @VisibleForTesting + public static long extractFileNumFromWAL(final Path walName) { if (walName == null) { throw new IllegalArgumentException("The WAL path couldn't be null"); } http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index c74c399..d8c1c5b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -49,7 +49,6 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import edu.umd.cs.findbugs.annotations.Nullable; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; @@ -61,6 +60,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.Waiter.Predicate; +import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; @@ -117,6 +117,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.RegionSplitter; +import org.apache.hadoop.hbase.util.RegionSplitter.SplitAlgorithm; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; @@ -137,6 +138,8 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; +import edu.umd.cs.findbugs.annotations.Nullable; + /** * Facility for testing HBase. Replacement for * old HBaseTestCase and HBaseClusterTestCase functionality. @@ -2133,6 +2136,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } } + public void loadRandomRows(final Table t, final byte[] f, int rowSize, int totalRows) + throws IOException { + Random r = new Random(); + byte[] row = new byte[rowSize]; + for (int i = 0; i < totalRows; i++) { + r.nextBytes(row); + Put put = new Put(row); + put.addColumn(f, new byte[]{0}, new byte[]{0}); + t.put(put); + } + } + public void verifyNumericRows(Table table, final byte[] f, int startRow, int endRow, int replicaId) throws IOException { @@ -3276,6 +3291,18 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } /** + * Waith until all system table's regions get assigned + * @throws IOException + */ + public void waitUntilAllSystemRegionsAssigned() throws IOException { + waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME); + waitUntilAllRegionsAssigned(TableName.NAMESPACE_TABLE_NAME); + if (BackupManager.isBackupEnabled(conf)) { + waitUntilAllRegionsAssigned(TableName.BACKUP_TABLE_NAME); + } + } + + /** * Wait until all regions for a table in hbase:meta have a non-empty * info:server, or until timeout. This means all regions have been deployed, * master has been informed and updated hbase:meta with the regions deployed @@ -3740,12 +3767,24 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { */ public static int createPreSplitLoadTestTable(Configuration conf, HTableDescriptor desc, HColumnDescriptor[] hcds, int numRegionsPerServer) throws IOException { + + return createPreSplitLoadTestTable(conf, desc, hcds, + new RegionSplitter.HexStringSplit(), numRegionsPerServer); + } + + /** + * Creates a pre-split table for load testing. If the table already exists, + * logs a warning and continues. + * @return the number of regions the table was split into + */ + public static int createPreSplitLoadTestTable(Configuration conf, + HTableDescriptor desc, HColumnDescriptor[] hcds, + SplitAlgorithm splitter, int numRegionsPerServer) throws IOException { for (HColumnDescriptor hcd : hcds) { if (!desc.hasFamily(hcd.getName())) { desc.addFamily(hcd); } } - int totalNumberOfRegions = 0; Connection unmanagedConnection = ConnectionFactory.createConnection(conf); Admin admin = unmanagedConnection.getAdmin(); @@ -3764,7 +3803,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { "pre-splitting table into " + totalNumberOfRegions + " regions " + "(regions per server: " + numRegionsPerServer + ")"); - byte[][] splits = new RegionSplitter.HexStringSplit().split( + byte[][] splits = splitter.split( totalNumberOfRegions); admin.createTable(desc, splits); http://git-wip-us.apache.org/repos/asf/hbase/blob/2725fb25/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java index 2dca6b1..465020b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestNamespace.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.Callable; @@ -32,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -63,6 +65,7 @@ public class TestNamespace { @BeforeClass public static void setUp() throws Exception { TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE); admin = TEST_UTIL.getHBaseAdmin(); cluster = TEST_UTIL.getHBaseCluster(); @@ -110,13 +113,16 @@ public class TestNamespace { //verify existence of system tables Set<TableName> systemTables = Sets.newHashSet( TableName.META_TABLE_NAME, - TableName.NAMESPACE_TABLE_NAME); + TableName.NAMESPACE_TABLE_NAME, + TableName.BACKUP_TABLE_NAME); HTableDescriptor[] descs = admin.listTableDescriptorsByNamespace(NamespaceDescriptor.SYSTEM_NAMESPACE.getName()); - assertEquals(systemTables.size(), descs.length); + assertTrue(descs.length >= systemTables.size()); + Set<TableName> tables = new HashSet<>(); for (HTableDescriptor desc : descs) { - assertTrue(systemTables.contains(desc.getTableName())); + tables.add(desc.getTableName()); } + assertTrue(tables.containsAll(systemTables)); //verify system tables aren't listed assertEquals(0, admin.listTables().length);