HBASE-13253 LoadIncrementalHFiles unify hfiles discovery
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a46666d7 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a46666d7 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a46666d7 Branch: refs/heads/0.98 Commit: a46666d761a8a4d8a7577410eea514f52caf0f30 Parents: b428f10 Author: Matteo Bertozzi <matteo.berto...@cloudera.com> Authored: Tue Mar 17 19:38:39 2015 +0000 Committer: Matteo Bertozzi <matteo.berto...@cloudera.com> Committed: Tue Mar 17 20:37:00 2015 +0000 ---------------------------------------------------------------------- .../hbase/mapreduce/LoadIncrementalHFiles.java | 202 ++++++++++--------- .../mapreduce/TestLoadIncrementalHFiles.java | 25 ++- 2 files changed, 124 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/a46666d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 7657571..e3bc9ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -53,7 +53,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -72,6 +71,7 @@ import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; @@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.util.Bytes; @@ -152,6 +153,75 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + "\n"); } + private static interface BulkHFileVisitor<TFamily> { + TFamily bulkFamily(final byte[] familyName) + throws IOException; + void bulkHFile(final TFamily family, final FileStatus hfileStatus) + throws IOException; + } + + /** + * Iterate over the bulkDir hfiles. + * Skip reference, HFileLink, files starting with "_" and non-valid hfiles. + */ + private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, + final BulkHFileVisitor<TFamily> visitor) throws IOException { + if (!fs.exists(bulkDir)) { + throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found"); + } + + FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); + if (familyDirStatuses == null) { + throw new FileNotFoundException("No families found in " + bulkDir); + } + + for (FileStatus familyStat : familyDirStatuses) { + if (!familyStat.isDir()) { + LOG.warn("Skipping non-directory " + familyStat.getPath()); + continue; + } + Path familyDir = familyStat.getPath(); + byte[] familyName = familyDir.getName().getBytes(); + TFamily family = visitor.bulkFamily(familyName); + + FileStatus[] hfileStatuses = fs.listStatus(familyDir); + for (FileStatus hfileStatus : hfileStatuses) { + if (!fs.isFile(hfileStatus.getPath())) { + LOG.warn("Skipping non-file " + hfileStatus); + continue; + } + + Path hfile = hfileStatus.getPath(); + // Skip "_", reference, HFileLink + String fileName = hfile.getName(); + if (fileName.startsWith("_")) { + continue; + } + if (StoreFileInfo.isReference(fileName)) { + LOG.warn("Skipping reference " + fileName); + continue; + } + if (HFileLink.isHFileLink(fileName)) { + LOG.warn("Skipping HFileLink " + fileName); + continue; + } + + // Validate HFile Format + try { + if (!HFile.isHFileFormat(fs, hfile)) { + LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); + continue; + } + } catch (FileNotFoundException e) { + LOG.warn("the file " + hfile + " was removed"); + continue; + } + + visitor.bulkHFile(family, hfileStatus); + } + } + } + /** * Represents an HFile waiting to be loaded. An queue is used * in this class in order to support the case where a region has @@ -178,50 +248,25 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * Walk the given directory for all HFiles, and return a Queue * containing all such files. */ - private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir) + private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir) throws IOException { fs = hfofDir.getFileSystem(getConf()); - - if (!fs.exists(hfofDir)) { - throw new FileNotFoundException("HFileOutputFormat dir " + - hfofDir + " not found"); - } - - FileStatus[] familyDirStatuses = fs.listStatus(hfofDir); - if (familyDirStatuses == null) { - throw new FileNotFoundException("No families found in " + hfofDir); - } - - for (FileStatus stat : familyDirStatuses) { - if (!stat.isDir()) { - LOG.warn("Skipping non-directory " + stat.getPath()); - continue; - } - Path familyDir = stat.getPath(); - if (familyDir.getName().equals("_logs")) { - // Family name of "_logs" is not supported. - // This is due to the presence of history directory under _logs directory - // when hadoop-1 is used - continue; + visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() { + @Override + public byte[] bulkFamily(final byte[] familyName) { + return familyName; } - byte[] family = familyDir.getName().getBytes(); - FileStatus[] hfileStatuses = fs.listStatus(familyDir); - for (FileStatus hfileStatus : hfileStatuses) { - if (!fs.isFile(hfileStatus.getPath())) { - LOG.warn("Skipping non-file " + hfileStatus); - continue; + @Override + public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { + long length = hfile.getLen(); + if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, + HConstants.DEFAULT_MAX_FILE_SIZE)) { + LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + + length + " bytes can be problematic as it may lead to oversplitting."); } - long length = hfileStatus.getLen(); - Path hfile = hfileStatus.getPath(); - if (hfile.getName().startsWith("_")) continue; - if(length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, - HConstants.DEFAULT_MAX_FILE_SIZE)) { - LOG.warn("Trying to bulk load hfile " + hfofDir.toString() + " with size: " + - length + " bytes can be problematic as it may lead to oversplitting."); - } - ret.add(new LoadQueueItem(family, hfile)); + ret.add(new LoadQueueItem(family, hfile.getPath())); } - } + }); } /** @@ -273,20 +318,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LoadQueueItem lqi = queueIter.next(); String familyNameInHFile = Bytes.toString(lqi.family); if (!familyNames.contains(familyNameInHFile)) { - boolean isValid = false; - try { - isValid = HFile.isHFileFormat(lqi.hfilePath.getFileSystem(getConf()), lqi.hfilePath); - if (!isValid) { - LOG.warn("the file " + lqi + " doesn't seems to be an hfile. skipping"); - } - } catch (FileNotFoundException e) { - LOG.warn("the file " + lqi + " was removed"); - } - if (isValid) { - unmatchedFamilies.add(familyNameInHFile); - } else { - queueIter.remove(); - } + unmatchedFamilies.add(familyNameInHFile); } } if (unmatchedFamilies.size() > 0) { @@ -838,50 +870,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * More modifications necessary if we want to avoid doing it. */ private void createTable(TableName tableName, String dirPath) throws Exception { - Path hfofDir = new Path(dirPath); - FileSystem fs = hfofDir.getFileSystem(getConf()); - - if (!fs.exists(hfofDir)) { - throw new FileNotFoundException("HFileOutputFormat dir " + - hfofDir + " not found"); - } - - FileStatus[] familyDirStatuses = fs.listStatus(hfofDir); - if (familyDirStatuses == null) { - throw new FileNotFoundException("No families found in " + hfofDir); - } - - HTableDescriptor htd = new HTableDescriptor(tableName); - HColumnDescriptor hcd; + final Path hfofDir = new Path(dirPath); + final FileSystem fs = hfofDir.getFileSystem(getConf()); // Add column families // Build a set of keys - byte[][] keys; - TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); - - for (FileStatus stat : familyDirStatuses) { - if (!stat.isDir()) { - LOG.warn("Skipping non-directory " + stat.getPath()); - continue; - } - Path familyDir = stat.getPath(); - if (familyDir.getName().equals("_logs")) { - // Family name of "_logs" is not supported. - // This is due to the presence of history directory under _logs directory - // when hadoop-1 is used - continue; + final HTableDescriptor htd = new HTableDescriptor(tableName); + final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); + visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() { + @Override + public HColumnDescriptor bulkFamily(final byte[] familyName) { + HColumnDescriptor hcd = new HColumnDescriptor(familyName); + htd.addFamily(hcd); + return hcd; } - byte[] family = familyDir.getName().getBytes(); - - hcd = new HColumnDescriptor(family); - htd.addFamily(hcd); - - Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir)); - for (Path hfile : hfiles) { - if (hfile.getName().startsWith("_")) continue; + @Override + public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus) + throws IOException { + Path hfile = hfileStatus.getPath(); HFile.Reader reader = HFile.createReader(fs, hfile, new CacheConfig(getConf()), getConf()); - final byte[] first, last; try { if (hcd.getCompressionType() != reader.getFileContext().getCompression()) { hcd.setCompressionType(reader.getFileContext().getCompression()); @@ -889,8 +897,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { " for family " + hcd.toString()); } reader.loadFileInfo(); - first = reader.getFirstRowKey(); - last = reader.getLastRowKey(); + byte[] first = reader.getFirstRowKey(); + byte[] last = reader.getLastRowKey(); LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + Bytes.toStringBinary(first) + @@ -902,13 +910,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { value = map.containsKey(last)? map.get(last):0; map.put(last, value-1); - } finally { + } finally { reader.close(); } } - } + }); - keys = LoadIncrementalHFiles.inferBoundaries(map); + byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map); this.hbAdmin.createTable(htd,keys); LOG.info("Table "+ tableName +" is available!!"); http://git-wip-us.apache.org/repos/asf/hbase/blob/a46666d7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 00fe6d3..28bf85d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -306,20 +306,30 @@ public class TestLoadIncrementalHFiles { } } + @Test(timeout = 60000) + public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { + testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true); + } + + @Test(timeout = 60000) + public void testNonHfileFolder() throws Exception { + testNonHfileFolder("testNonHfileFolder", false); + } + /** * Write a random data file and a non-file in a dir with a valid family name * but not part of the table families. we should we able to bulkload without * getting the unmatched family exception. HBASE-13037/HBASE-13227 */ - @Test(timeout = 60000) - public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { - Path dir = util.getDataTestDirOnTestFS("testNonHfileFolderWithUnmatchedFamilyName"); + private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception { + Path dir = util.getDataTestDirOnTestFS(tableName); FileSystem fs = util.getTestFileSystem(); dir = dir.makeQualified(fs); Path familyDir = new Path(dir, Bytes.toString(FAMILY)); HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500); + createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024); final String NON_FAMILY_FOLDER = "_logs"; Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER); @@ -329,10 +339,13 @@ public class TestLoadIncrementalHFiles { HTable table = null; try { - final String TABLE_NAME = "mytable_testNonHfileFolderWithUnmatchedFamilyName"; - table = util.createTable(TableName.valueOf(TABLE_NAME), FAMILY); + if (preCreateTable) { + table = util.createTable(TableName.valueOf(tableName), FAMILY); + } else { + table = new HTable(util.getConfiguration(), TableName.valueOf(tableName)); + } - final String[] args = {dir.toString(), TABLE_NAME}; + final String[] args = {dir.toString(), tableName}; new LoadIncrementalHFiles(util.getConfiguration()).run(args); assertEquals(500, util.countRows(table)); } finally {