Repository: hbase Updated Branches: refs/heads/HBASE-7912 6d48260c8 -> 28737d05f
HBASE-15449 HBase Backup Phase 3: Support physical table layout change Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/28737d05 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/28737d05 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/28737d05 Branch: refs/heads/HBASE-7912 Commit: 28737d05f4c336253dd161743accdad31f30efcc Parents: 6d48260 Author: tedyu <yuzhih...@gmail.com> Authored: Tue Sep 6 08:48:58 2016 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Tue Sep 6 08:48:58 2016 -0700 ---------------------------------------------------------------------- .../hbase/backup/impl/RestoreClientImpl.java | 29 +- .../hbase/backup/util/BackupServerUtil.java | 3 +- .../hbase/backup/util/RestoreServerUtil.java | 274 ++++++++++++------- .../hbase/mapreduce/LoadIncrementalHFiles.java | 60 +++- .../hadoop/hbase/backup/TestBackupBase.java | 14 +- .../hbase/backup/TestIncrementalBackup.java | 54 +++- 6 files changed, 302 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java index a04fc08..7f23ce0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreClientImpl.java @@ -261,35 +261,40 @@ public final class RestoreClientImpl implements RestoreClient { // TODO: convert feature will be provided in a future JIRA boolean converted = false; + String lastIncrBackupId = null; + List<String> logDirList = null; + // Scan incremental backups + if (it.hasNext()) { + // obtain the backupId for most recent incremental + logDirList = new ArrayList<String>(); + while (it.hasNext()) { + BackupImage im = it.next(); + String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); + logDirList.add(logBackupDir); + lastIncrBackupId = im.getBackupId(); + } + } if (manifest.getType() == BackupType.FULL || converted) { LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from " + (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString()); restoreTool.fullRestoreTable(tableBackupPath, sTable, tTable, - converted, truncateIfExists); - + converted, truncateIfExists, lastIncrBackupId); } else { // incremental Backup throw new IOException("Unexpected backup type " + image.getType()); } // The rest one are incremental - if (it.hasNext()) { - List<String> logDirList = new ArrayList<String>(); - while (it.hasNext()) { - BackupImage im = it.next(); - String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); - logDirList.add(logBackupDir); - } + if (logDirList != null) { String logDirs = StringUtils.join(logDirList, ","); LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + logDirs); String[] sarr = new String[logDirList.size()]; logDirList.toArray(sarr); Path[] paths = org.apache.hadoop.util.StringUtils.stringToPath(sarr); - restoreTool.incrementalRestoreTable(paths, new TableName[] { sTable }, - new TableName[] { tTable }); + restoreTool.incrementalRestoreTable(tableBackupPath, paths, new TableName[] { sTable }, + new TableName[] { tTable }, lastIncrBackupId); } LOG.info(sTable + " has been successfully restored to " + tTable); } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java index 265ef6c..37c8d65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/BackupServerUtil.java @@ -172,7 +172,6 @@ public final class BackupServerUtil { LOG.warn("Table "+ table+" does not exists, skipping it."); continue; } - LOG.debug("Attempting to copy table info for:" + table); TableDescriptor orig = FSTableDescriptors.getTableDescriptorFromFs(fs, rootDir, table); // write a copy of descriptor to the target directory @@ -181,6 +180,8 @@ public final class BackupServerUtil { FSTableDescriptors descriptors = new FSTableDescriptors(conf, targetFs, FSUtils.getRootDir(conf)); descriptors.createTableDescriptorForTableDirectory(target, orig, false); + LOG.debug("Attempting to copy table info for:" + table + " target: " + target + + " descriptor: " + orig); LOG.debug("Finished copying tableinfo."); List<HRegionInfo> regions = null; regions = admin.getTableRegions(table); http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java index 6f83f25..620d622 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java @@ -20,8 +20,11 @@ package org.apache.hadoop.hbase.backup.util; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.TreeMap; import org.apache.commons.logging.Log; @@ -31,9 +34,11 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory; import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.IncrementalRestoreService; @@ -55,6 +60,8 @@ import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.Pair; /** * A collection for methods used by multiple classes to restore HBase tables. @@ -133,21 +140,54 @@ public class RestoreServerUtil { return regionDirList; } + static void modifyTableSync(Admin admin, HTableDescriptor desc) + throws IOException { + admin.modifyTable(desc.getTableName(), desc); + Pair<Integer, Integer> status = new Pair<Integer, Integer>() {{ + setFirst(0); + setSecond(0); + }}; + int i = 0; + do { + status = admin.getAlterStatus(desc.getTableName()); + if (status.getSecond() != 0) { + LOG.debug(status.getSecond() - status.getFirst() + "/" + status.getSecond() + + " regions updated."); + try { + Thread.sleep(1 * 1000l); + } catch (InterruptedException ie) { + InterruptedIOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + } else { + LOG.debug("All regions updated."); + break; + } + } while (status.getFirst() != 0 && i++ < 500); + if (status.getFirst() != 0) { + throw new IOException("Failed to update all regions even after 500 seconds."); + } + } + /** * During incremental backup operation. Call WalPlayer to replay WAL in backup image Currently * tableNames and newTablesNames only contain single table, will be expanded to multiple tables in * the future - * @param logDir : incremental backup folders, which contains WAL + * @param tableBackupPath backup path + * @param logDirs : incremental backup folders, which contains WAL * @param tableNames : source tableNames(table names were backuped) * @param newTableNames : target tableNames(table names to be restored to) + * @param incrBackupId incremental backup Id * @throws IOException exception */ - public void incrementalRestoreTable(Path[] logDirs, - TableName[] tableNames, TableName[] newTableNames) throws IOException { + public void incrementalRestoreTable(Path tableBackupPath, Path[] logDirs, TableName[] tableNames, + TableName[] newTableNames, String incrBackupId) throws IOException { if (tableNames.length != newTableNames.length) { throw new IOException("Number of source tables and target tables does not match!"); } + FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); // for incremental backup image, expect the table already created either by user or previous // full backup. Here, check that all new tables exists @@ -160,6 +200,35 @@ public class RestoreServerUtil { + " does not exist. Create the table first, e.g. by restoring a full backup."); } } + // adjust table schema + for (int i = 0; i < tableNames.length; i++) { + TableName tableName = tableNames[i]; + HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, incrBackupId); + LOG.debug("Found descriptor " + tableDescriptor + " through " + incrBackupId); + + TableName newTableName = newTableNames[i]; + HTableDescriptor newTableDescriptor = admin.getTableDescriptor(newTableName); + List<HColumnDescriptor> families = Arrays.asList(tableDescriptor.getColumnFamilies()); + List<HColumnDescriptor> existingFamilies = + Arrays.asList(newTableDescriptor.getColumnFamilies()); + boolean schemaChangeNeeded = false; + for (HColumnDescriptor family : families) { + if (!existingFamilies.contains(family)) { + newTableDescriptor.addFamily(family); + schemaChangeNeeded = true; + } + } + for (HColumnDescriptor family : existingFamilies) { + if (!families.contains(family)) { + newTableDescriptor.removeFamily(family.getName()); + schemaChangeNeeded = true; + } + } + if (schemaChangeNeeded) { + RestoreServerUtil.modifyTableSync(admin, newTableDescriptor); + LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor); + } + } IncrementalRestoreService restoreService = BackupRestoreServerFactory.getIncrementalRestoreService(conf); @@ -168,8 +237,9 @@ public class RestoreServerUtil { } public void fullRestoreTable(Path tableBackupPath, TableName tableName, TableName newTableName, - boolean converted, boolean truncateIfExists) throws IOException { - restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted, truncateIfExists); + boolean converted, boolean truncateIfExists, String lastIncrBackupId) throws IOException { + restoreTableAndCreate(tableName, newTableName, tableBackupPath, converted, truncateIfExists, + lastIncrBackupId); } /** @@ -275,104 +345,126 @@ public class RestoreServerUtil { return tableArchivePath; } + private HTableDescriptor getTableDescriptor(FileSystem fileSys, TableName tableName, + String lastIncrBackupId) throws IOException { + if (lastIncrBackupId != null) { + String target = BackupClientUtil.getTableBackupDir(backupRootPath.toString(), + lastIncrBackupId, tableName); + // Path target = new Path(info.getBackupStatus(tableName).getTargetDir()); + return FSTableDescriptors.getTableDescriptorFromFs(fileSys, + new Path(target)).getHTableDescriptor(); + } + return null; + } + private void restoreTableAndCreate(TableName tableName, TableName newTableName, - Path tableBackupPath, boolean converted, boolean truncateIfExists) throws IOException { + Path tableBackupPath, boolean converted, boolean truncateIfExists, String lastIncrBackupId) + throws IOException { if (newTableName == null || newTableName.equals("")) { newTableName = tableName; } FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); - // get table descriptor first - HTableDescriptor tableDescriptor = null; - - Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); + HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, lastIncrBackupId); + if (tableDescriptor != null) { + LOG.debug("Retrieved descriptor: " + tableDescriptor + " thru " + lastIncrBackupId); + } - if (fileSys.exists(tableSnapshotPath)) { - // snapshot path exist means the backup path is in HDFS - // check whether snapshot dir already recorded for target table - if (snapshotMap.get(tableName) != null) { - SnapshotDescription desc = - SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath); - SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc); - tableDescriptor = manifest.getTableDescriptor(); - } else { - tableDescriptor = getTableDesc(tableName); - snapshotMap.put(tableName, getTableInfoPath(tableName)); - } + try (Connection conn = ConnectionFactory.createConnection(conf); + HBaseAdmin hbadmin = (HBaseAdmin) conn.getAdmin();) { if (tableDescriptor == null) { - LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost"); + Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); + if (fileSys.exists(tableSnapshotPath)) { + // snapshot path exist means the backup path is in HDFS + // check whether snapshot dir already recorded for target table + if (snapshotMap.get(tableName) != null) { + SnapshotDescription desc = + SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath); + SnapshotManifest manifest = SnapshotManifest.open(conf,fileSys,tableSnapshotPath,desc); + tableDescriptor = manifest.getTableDescriptor(); + LOG.debug("obtained descriptor from " + manifest); + } else { + tableDescriptor = getTableDesc(tableName); + snapshotMap.put(tableName, getTableInfoPath(tableName)); + LOG.debug("obtained descriptor from snapshot for " + tableName); + } + if (tableDescriptor == null) { + LOG.debug("Found no table descriptor in the snapshot dir, previous schema was lost"); + } + } else if (converted) { + // first check if this is a converted backup image + LOG.error("convert will be supported in a future jira"); + } } - } else if (converted) { - // first check if this is a converted backup image - LOG.error("convert will be supported in a future jira"); - } - Path tableArchivePath = getTableArchivePath(tableName); - if (tableArchivePath == null) { - if (tableDescriptor != null) { - // find table descriptor but no archive dir means the table is empty, create table and exit - if(LOG.isDebugEnabled()) { - LOG.debug("find table descriptor but no archive dir for table " + tableName - + ", will only create table"); + Path tableArchivePath = getTableArchivePath(tableName); + if (tableArchivePath == null) { + if (tableDescriptor != null) { + // find table descriptor but no archive dir => the table is empty, create table and exit + if(LOG.isDebugEnabled()) { + LOG.debug("find table descriptor but no archive dir for table " + tableName + + ", will only create table"); + } + tableDescriptor.setName(newTableName); + checkAndCreateTable(hbadmin, tableBackupPath, tableName, newTableName, null, + tableDescriptor, truncateIfExists); + return; + } else { + throw new IllegalStateException("Cannot restore hbase table because directory '" + + " tableArchivePath is null."); } - tableDescriptor.setName(newTableName); - checkAndCreateTable(tableBackupPath, tableName, newTableName, null, - tableDescriptor, truncateIfExists); - return; - } else { - throw new IllegalStateException("Cannot restore hbase table because directory '" - + " tableArchivePath is null."); } - } - if (tableDescriptor == null) { - tableDescriptor = new HTableDescriptor(newTableName); - } else { - tableDescriptor.setName(newTableName); - } + if (tableDescriptor == null) { + LOG.debug("New descriptor for " + newTableName); + tableDescriptor = new HTableDescriptor(newTableName); + } else { + tableDescriptor.setName(newTableName); + } - if (!converted) { - // record all region dirs: - // load all files in dir - try { - ArrayList<Path> regionPathList = getRegionList(tableName); - - // should only try to create the table with all region informations, so we could pre-split - // the regions in fine grain - checkAndCreateTable(tableBackupPath, tableName, newTableName, regionPathList, - tableDescriptor, truncateIfExists); - if (tableArchivePath != null) { - // start real restore through bulkload - // if the backup target is on local cluster, special action needed - Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); - if (tempTableArchivePath.equals(tableArchivePath)) { - if(LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); - } - } else { - regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir - if(LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); + if (!converted) { + // record all region dirs: + // load all files in dir + try { + ArrayList<Path> regionPathList = getRegionList(tableName); + + // should only try to create the table with all region informations, so we could pre-split + // the regions in fine grain + checkAndCreateTable(hbadmin, tableBackupPath, tableName, newTableName, regionPathList, + tableDescriptor, truncateIfExists); + if (tableArchivePath != null) { + // start real restore through bulkload + // if the backup target is on local cluster, special action needed + Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); + if (tempTableArchivePath.equals(tableArchivePath)) { + if(LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); + } + } else { + regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir + if(LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); + } } - } - LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); - for (Path regionPath : regionPathList) { - String regionName = regionPath.toString(); - if(LOG.isDebugEnabled()) { - LOG.debug("Restoring HFiles from directory " + regionName); + LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); + for (Path regionPath : regionPathList) { + String regionName = regionPath.toString(); + if(LOG.isDebugEnabled()) { + LOG.debug("Restoring HFiles from directory " + regionName); + } + String[] args = { regionName, newTableName.getNameAsString() }; + loader.run(args); } - String[] args = { regionName, newTableName.getNameAsString()}; - loader.run(args); } + // we do not recovered edits + } catch (Exception e) { + throw new IllegalStateException("Cannot restore hbase table", e); } - // we do not recovered edits - } catch (Exception e) { - throw new IllegalStateException("Cannot restore hbase table", e); + } else { + LOG.debug("convert will be supported in a future jira"); } - } else { - LOG.debug("convert will be supported in a future jira"); } } @@ -472,6 +564,7 @@ public class RestoreServerUtil { // By default, it is 32 and loader will fail if # of files in any region exceed this // limit. Bad for snapshot restore. this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + this.conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); LoadIncrementalHFiles loader = null; try { loader = new LoadIncrementalHFiles(this.conf); @@ -571,15 +664,11 @@ public class RestoreServerUtil { * @param htd table descriptor * @throws IOException exception */ - private void checkAndCreateTable(Path tableBackupPath, TableName tableName, + private void checkAndCreateTable(HBaseAdmin hbadmin, Path tableBackupPath, TableName tableName, TableName targetTableName, ArrayList<Path> regionDirList, HTableDescriptor htd, boolean truncateIfExists) throws IOException { - HBaseAdmin hbadmin = null; - Connection conn = null; try { - conn = ConnectionFactory.createConnection(conf); - hbadmin = (HBaseAdmin) conn.getAdmin(); boolean createNew = false; if (hbadmin.tableExists(targetTableName)) { if(truncateIfExists) { @@ -592,8 +681,8 @@ public class RestoreServerUtil { } } else { createNew = true; - } - if(createNew){ + } + if (createNew){ LOG.info("Creating target table '" + targetTableName + "'"); // if no region directory given, create the table and return if (regionDirList == null || regionDirList.size() == 0) { @@ -614,13 +703,6 @@ public class RestoreServerUtil { } } catch (Exception e) { throw new IOException(e); - } finally { - if (hbadmin != null) { - hbadmin.close(); - } - if(conn != null){ - conn.close(); - } } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 5d75d56..f7a5378 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -117,9 +117,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; public final static String CREATE_TABLE_CONF_KEY = "create.table"; + public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families"; private int maxFilesPerRegionPerFamily; private boolean assignSeqIds; + private Set<String> unmatchedFamilies = new HashSet<String>(); // Source filesystem private FileSystem fs; @@ -157,7 +159,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private void usage() { System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" - + " Note: if you set this to 'no', then the target table must already exist in HBase\n" + + " Note: if you set this to 'no', then the target table must already exist in HBase\n -D" + + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" + "\n"); } @@ -305,7 +308,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throws TableNotFoundException, IOException { try (Admin admin = table.getConnection().getAdmin(); RegionLocator rl = table.getRegionLocator()) { - doBulkLoad(hfofDir, admin, table, rl); + doBulkLoad(hfofDir, admin, table, rl, false); } } @@ -315,11 +318,30 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * * @param hfofDir the directory that was provided as the output path * of a job using HFileOutputFormat + * @param admin the Admin * @param table the table to load into + * @param regionLocator region locator * @throws TableNotFoundException if table does not yet exist */ public void doBulkLoad(Path hfofDir, final Admin admin, Table table, RegionLocator regionLocator) throws TableNotFoundException, IOException { + doBulkLoad(hfofDir, admin, table, regionLocator, false); + } + + /** + * Perform a bulk load of the given directory into the given + * pre-existing table. This method is not threadsafe. + * + * @param hfofDir the directory that was provided as the output path + * of a job using HFileOutputFormat + * @param admin the Admin + * @param table the table to load into + * @param regionLocator region locator + * @param ignoreUnmatchedCF true to ignore unmatched column families + * @throws TableNotFoundException if table does not yet exist + */ + public void doBulkLoad(Path hfofDir, final Admin admin, Table table, + RegionLocator regionLocator, boolean ignoreUnmatchedCF) throws TableNotFoundException, IOException { if (!admin.isTableAvailable(regionLocator.getName())) { throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); @@ -342,7 +364,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { "option, consider removing the files and bulkload again without this option. " + "See HBASE-13985"); } - prepareHFileQueue(hfofDir, table, queue, validateHFile); + prepareHFileQueue(hfofDir, table, queue, validateHFile, ignoreUnmatchedCF); int count = 0; @@ -431,8 +453,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, boolean validateHFile) throws IOException { + prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); + } + + /** + * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the + * passed directory and validates whether the prepared queue has all the valid table column + * families in it. + * @param hfilesDir directory containing list of hfiles to be loaded into the table + * @param table table to which hfiles should be loaded + * @param queue queue which needs to be loaded into the table + * @param validateHFile if true hfiles will be validated for its format + * @param ignoreUnmatchedCF true to ignore unmatched column families + * @throws IOException If any I/O or network error occurred + */ + public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, + boolean validateHFile, boolean ignoreUnmatchedCF) throws IOException { discoverLoadQueue(queue, hfilesDir, validateHFile); - validateFamiliesInHFiles(table, queue); + validateFamiliesInHFiles(table, queue, ignoreUnmatchedCF); } // Initialize a thread pool @@ -448,14 +486,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { /** * Checks whether there is any invalid family name in HFiles to be bulk loaded. */ - private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue) + private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence) throws IOException { Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies(); List<String> familyNames = new ArrayList<String>(families.size()); for (HColumnDescriptor family : families) { familyNames.add(family.getNameAsString()); } - List<String> unmatchedFamilies = new ArrayList<String>(); Iterator<LoadQueueItem> queueIter = queue.iterator(); while (queueIter.hasNext()) { LoadQueueItem lqi = queueIter.next(); @@ -470,7 +507,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " + familyNames; LOG.error(msg); - throw new IOException(msg); + if (!silence) throw new IOException(msg); } } @@ -774,7 +811,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(lqis.size()); for (LoadQueueItem lqi : lqis) { - famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); + if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) { + famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); + } } final RegionServerCallable<Boolean> svrCallable = @@ -1028,7 +1067,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - if (args.length != 2) { + if (args.length < 2) { usage(); return -1; } @@ -1054,7 +1093,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { - doBulkLoad(hfofDir, admin, table, locator); + boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); + doBulkLoad(hfofDir, admin, table, locator, silence); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 0db96be..d617b52 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -120,7 +120,7 @@ public class TestBackupBase { public static void waitForSystemTable() throws Exception { - try(Admin admin = TEST_UTIL.getAdmin();) { + try (Admin admin = TEST_UTIL.getAdmin();) { while (!admin.tableExists(BackupSystemTable.getTableName()) || !admin.isTableAvailable(BackupSystemTable.getTableName())) { Thread.sleep(1000); @@ -142,6 +142,18 @@ public class TestBackupBase { TEST_UTIL.shutdownMiniMapReduceCluster(); } + HTable insertIntoTable(Connection conn, TableName table, byte[] family, int id, int numRows) + throws IOException { + HTable t = (HTable) conn.getTable(table); + Put p1; + for (int i = 0; i < numRows; i++) { + p1 = new Put(Bytes.toBytes("row-" + table + "-" + id + "-"+ i)); + p1.addColumn(family, qualName, Bytes.toBytes("val" + i)); + t.put(p1); + } + return t; + } + protected String backupTables(BackupType type, List<TableName> tables, String path) throws IOException { Connection conn = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/28737d05/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java index 9a2ad89..ae37b4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java @@ -20,12 +20,15 @@ package org.apache.hadoop.hbase.backup; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.BackupAdmin; import org.apache.hadoop.hbase.client.Connection; @@ -66,8 +69,15 @@ public class TestIncrementalBackup extends TestBackupBase { LOG.info("create full backup image for all tables"); List<TableName> tables = Lists.newArrayList(table1, table2); - HBaseAdmin admin = null; + final byte[] fam3Name = Bytes.toBytes("f3"); + table1Desc.addFamily(new HColumnDescriptor(fam3Name)); + HBaseTestingUtility.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc); + Connection conn = ConnectionFactory.createConnection(conf1); + int NB_ROWS_FAM3 = 6; + insertIntoTable(conn, table1, fam3Name, 3, NB_ROWS_FAM3).close(); + + HBaseAdmin admin = null; admin = (HBaseAdmin) conn.getAdmin(); BackupRequest request = new BackupRequest(); @@ -77,16 +87,11 @@ public class TestIncrementalBackup extends TestBackupBase { assertTrue(checkSucceeded(backupIdFull)); // #2 - insert some data to table + HTable t1 = insertIntoTable(conn, table1, famName, 1, NB_ROWS_IN_BATCH); LOG.debug("writing " + NB_ROWS_IN_BATCH + " rows to " + table1); - HTable t1 = (HTable) conn.getTable(table1); - Put p1; - for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - p1 = new Put(Bytes.toBytes("row-t1" + i)); - p1.addColumn(famName, qualName, Bytes.toBytes("val" + i)); - t1.put(p1); - } - Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2)); + Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo( + NB_ROWS_IN_BATCH * 2 + NB_ROWS_FAM3)); t1.close(); LOG.debug("written " + NB_ROWS_IN_BATCH + " rows to " + table1); @@ -110,6 +115,24 @@ public class TestIncrementalBackup extends TestBackupBase { String backupIdIncMultiple = admin.getBackupAdmin().backupTables(request); assertTrue(checkSucceeded(backupIdIncMultiple)); + // add column family f2 to table1 + final byte[] fam2Name = Bytes.toBytes("f2"); + table1Desc.addFamily(new HColumnDescriptor(fam2Name)); + // drop column family f3 + table1Desc.removeFamily(fam3Name); + HBaseTestingUtility.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc); + + int NB_ROWS_FAM2 = 7; + HTable t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2); + t3.close(); + + // #3 - incremental backup for multiple tables + request = new BackupRequest(); + request.setBackupType(BackupType.INCREMENTAL).setTableList(tables) + .setTargetRootDir(BACKUP_ROOT_DIR); + String backupIdIncMultiple2 = admin.getBackupAdmin().backupTables(request); + assertTrue(checkSucceeded(backupIdIncMultiple2)); + // #4 - restore full backup for all tables, without overwrite TableName[] tablesRestoreFull = new TableName[] { table1, table2 }; @@ -118,6 +141,7 @@ public class TestIncrementalBackup extends TestBackupBase { new TableName[] { table1_restore, table2_restore }; BackupAdmin client = getBackupAdmin(); + LOG.debug("Restoring full " + backupIdFull); client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdFull, false, tablesRestoreFull, tablesMapFull, false)); @@ -131,7 +155,8 @@ public class TestIncrementalBackup extends TestBackupBase { // #5.2 - checking row count of tables for full restore HTable hTable = (HTable) conn.getTable(table1_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH)); + Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH + + NB_ROWS_FAM3)); hTable.close(); hTable = (HTable) conn.getTable(table2_restore); @@ -143,11 +168,16 @@ public class TestIncrementalBackup extends TestBackupBase { new TableName[] { table1, table2 }; TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore }; - client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, false, + client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2, false, tablesRestoreIncMultiple, tablesMapIncMultiple, true)); hTable = (HTable) conn.getTable(table1_restore); - Assert.assertThat(TEST_UTIL.countRows(hTable), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2)); + LOG.debug("After incremental restore: " + hTable.getTableDescriptor()); + LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows"); + Assert.assertThat(TEST_UTIL.countRows(hTable, famName), + CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2)); + LOG.debug("f2 has " + TEST_UTIL.countRows(hTable, fam2Name) + " rows"); + Assert.assertThat(TEST_UTIL.countRows(hTable, fam2Name), CoreMatchers.equalTo(NB_ROWS_FAM2)); hTable.close(); hTable = (HTable) conn.getTable(table2_restore);