HBASE-13253 LoadIncrementalHFiles unify hfiles discovery
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aea69c13 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aea69c13 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aea69c13 Branch: refs/heads/branch-1.0 Commit: aea69c137d965ac9095a9a8d02e8dde107ecbfb8 Parents: c641db0 Author: Matteo Bertozzi <matteo.berto...@cloudera.com> Authored: Tue Mar 17 19:38:39 2015 +0000 Committer: Matteo Bertozzi <matteo.berto...@cloudera.com> Committed: Tue Mar 17 19:40:53 2015 +0000 ---------------------------------------------------------------------- .../hbase/mapreduce/LoadIncrementalHFiles.java | 196 ++++++++++--------- .../mapreduce/TestLoadIncrementalHFiles.java | 25 ++- 2 files changed, 120 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/aea69c13/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index b78017d..05ac012 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -55,7 +55,6 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -160,6 +159,75 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + "\n"); } + private static interface BulkHFileVisitor<TFamily> { + TFamily bulkFamily(final byte[] familyName) + throws IOException; + void bulkHFile(final TFamily family, final FileStatus hfileStatus) + throws IOException; + } + + /** + * Iterate over the bulkDir hfiles. + * Skip reference, HFileLink, files starting with "_" and non-valid hfiles. + */ + private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, + final BulkHFileVisitor<TFamily> visitor) throws IOException { + if (!fs.exists(bulkDir)) { + throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found"); + } + + FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); + if (familyDirStatuses == null) { + throw new FileNotFoundException("No families found in " + bulkDir); + } + + for (FileStatus familyStat : familyDirStatuses) { + if (!familyStat.isDirectory()) { + LOG.warn("Skipping non-directory " + familyStat.getPath()); + continue; + } + Path familyDir = familyStat.getPath(); + byte[] familyName = familyDir.getName().getBytes(); + TFamily family = visitor.bulkFamily(familyName); + + FileStatus[] hfileStatuses = fs.listStatus(familyDir); + for (FileStatus hfileStatus : hfileStatuses) { + if (!fs.isFile(hfileStatus.getPath())) { + LOG.warn("Skipping non-file " + hfileStatus); + continue; + } + + Path hfile = hfileStatus.getPath(); + // Skip "_", reference, HFileLink + String fileName = hfile.getName(); + if (fileName.startsWith("_")) { + continue; + } + if (StoreFileInfo.isReference(fileName)) { + LOG.warn("Skipping reference " + fileName); + continue; + } + if (HFileLink.isHFileLink(fileName)) { + LOG.warn("Skipping HFileLink " + fileName); + continue; + } + + // Validate HFile Format + try { + if (!HFile.isHFileFormat(fs, hfile)) { + LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); + continue; + } + } catch (FileNotFoundException e) { + LOG.warn("the file " + hfile + " was removed"); + continue; + } + + visitor.bulkHFile(family, hfileStatus); + } + } + } + /** * Represents an HFile waiting to be loaded. An queue is used * in this class in order to support the case where a region has @@ -186,54 +254,25 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * Walk the given directory for all HFiles, and return a Queue * containing all such files. */ - private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir) + private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir) throws IOException { fs = hfofDir.getFileSystem(getConf()); - - if (!fs.exists(hfofDir)) { - throw new FileNotFoundException("HFileOutputFormat dir " + - hfofDir + " not found"); - } - - FileStatus[] familyDirStatuses = fs.listStatus(hfofDir); - if (familyDirStatuses == null) { - throw new FileNotFoundException("No families found in " + hfofDir); - } - - for (FileStatus stat : familyDirStatuses) { - if (!stat.isDirectory()) { - LOG.warn("Skipping non-directory " + stat.getPath()); - continue; + visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() { + @Override + public byte[] bulkFamily(final byte[] familyName) { + return familyName; } - Path familyDir = stat.getPath(); - byte[] family = familyDir.getName().getBytes(); - FileStatus[] hfileStatuses = fs.listStatus(familyDir); - for (FileStatus hfileStatus : hfileStatuses) { - if (!hfileStatus.isFile()) { - LOG.warn("Skipping non-file " + hfileStatus); - continue; - } - long length = hfileStatus.getLen(); - Path hfile = hfileStatus.getPath(); - // Skip "_", reference, HFileLink - String fileName = hfile.getName(); - if (fileName.startsWith("_")) continue; - if (StoreFileInfo.isReference(fileName)) { - LOG.warn("Skipping reference " + fileName); - continue; - } - if (HFileLink.isHFileLink(fileName)) { - LOG.warn("Skipping HFileLink " + fileName); - continue; - } - if(length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, + @Override + public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { + long length = hfile.getLen(); + if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)) { - LOG.warn("Trying to bulk load hfile " + hfofDir.toString() + " with size: " + + LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length + " bytes can be problematic as it may lead to oversplitting."); } - ret.add(new LoadQueueItem(family, hfile)); + ret.add(new LoadQueueItem(family, hfile.getPath())); } - } + }); } /** @@ -285,20 +324,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LoadQueueItem lqi = queueIter.next(); String familyNameInHFile = Bytes.toString(lqi.family); if (!familyNames.contains(familyNameInHFile)) { - boolean isValid = false; - try { - isValid = HFile.isHFileFormat(lqi.hfilePath.getFileSystem(getConf()), lqi.hfilePath); - if (!isValid) { - LOG.warn("the file " + lqi + " doesn't seems to be an hfile. skipping"); - } - } catch (FileNotFoundException e) { - LOG.warn("the file " + lqi + " was removed"); - } - if (isValid) { - unmatchedFamilies.add(familyNameInHFile); - } else { - queueIter.remove(); - } + unmatchedFamilies.add(familyNameInHFile); } } if (unmatchedFamilies.size() > 0) { @@ -852,46 +878,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * More modifications necessary if we want to avoid doing it. */ private void createTable(TableName tableName, String dirPath) throws Exception { - Path hfofDir = new Path(dirPath); - FileSystem fs = hfofDir.getFileSystem(getConf()); - - if (!fs.exists(hfofDir)) { - throw new FileNotFoundException("HFileOutputFormat dir " + - hfofDir + " not found"); - } - - FileStatus[] familyDirStatuses = fs.listStatus(hfofDir); - if (familyDirStatuses == null) { - throw new FileNotFoundException("No families found in " + hfofDir); - } - - HTableDescriptor htd = new HTableDescriptor(tableName); - HColumnDescriptor hcd; + final Path hfofDir = new Path(dirPath); + final FileSystem fs = hfofDir.getFileSystem(getConf()); // Add column families // Build a set of keys - byte[][] keys; - TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); - - for (FileStatus stat : familyDirStatuses) { - if (!stat.isDirectory()) { - LOG.warn("Skipping non-directory " + stat.getPath()); - continue; + final HTableDescriptor htd = new HTableDescriptor(tableName); + final TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); + visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() { + @Override + public HColumnDescriptor bulkFamily(final byte[] familyName) { + HColumnDescriptor hcd = new HColumnDescriptor(familyName); + htd.addFamily(hcd); + return hcd; } - Path familyDir = stat.getPath(); - byte[] family = familyDir.getName().getBytes(); - - hcd = new HColumnDescriptor(family); - htd.addFamily(hcd); - - Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir)); - for (Path hfile : hfiles) { - String fileName = hfile.getName(); - if (fileName.startsWith("_") || StoreFileInfo.isReference(fileName) - || HFileLink.isHFileLink(fileName)) continue; + @Override + public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus) + throws IOException { + Path hfile = hfileStatus.getPath(); HFile.Reader reader = HFile.createReader(fs, hfile, new CacheConfig(getConf()), getConf()); - final byte[] first, last; try { if (hcd.getCompressionType() != reader.getFileContext().getCompression()) { hcd.setCompressionType(reader.getFileContext().getCompression()); @@ -899,8 +905,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { " for family " + hcd.toString()); } reader.loadFileInfo(); - first = reader.getFirstRowKey(); - last = reader.getLastRowKey(); + byte[] first = reader.getFirstRowKey(); + byte[] last = reader.getLastRowKey(); LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + Bytes.toStringBinary(first) + @@ -912,13 +918,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { value = map.containsKey(last)? map.get(last):0; map.put(last, value-1); - } finally { + } finally { reader.close(); } } - } + }); - keys = LoadIncrementalHFiles.inferBoundaries(map); + byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map); this.hbAdmin.createTable(htd,keys); LOG.info("Table "+ tableName +" is available!!"); http://git-wip-us.apache.org/repos/asf/hbase/blob/aea69c13/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 39b6663..d5a9a86 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -307,20 +307,30 @@ public class TestLoadIncrementalHFiles { } } + @Test(timeout = 60000) + public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { + testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true); + } + + @Test(timeout = 60000) + public void testNonHfileFolder() throws Exception { + testNonHfileFolder("testNonHfileFolder", false); + } + /** * Write a random data file and a non-file in a dir with a valid family name * but not part of the table families. we should we able to bulkload without * getting the unmatched family exception. HBASE-13037/HBASE-13227 */ - @Test(timeout = 60000) - public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { - Path dir = util.getDataTestDirOnTestFS("testNonHfileFolderWithUnmatchedFamilyName"); + private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception { + Path dir = util.getDataTestDirOnTestFS(tableName); FileSystem fs = util.getTestFileSystem(); dir = dir.makeQualified(fs); Path familyDir = new Path(dir, Bytes.toString(FAMILY)); HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500); + createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024); final String NON_FAMILY_FOLDER = "_logs"; Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER); @@ -330,10 +340,13 @@ public class TestLoadIncrementalHFiles { Table table = null; try { - final String TABLE_NAME = "mytable_testNonHfileFolderWithUnmatchedFamilyName"; - table = util.createTable(TableName.valueOf(TABLE_NAME), FAMILY); + if (preCreateTable) { + table = util.createTable(TableName.valueOf(tableName), FAMILY); + } else { + table = util.getConnection().getTable(TableName.valueOf(tableName)); + } - final String[] args = {dir.toString(), TABLE_NAME}; + final String[] args = {dir.toString(), tableName}; new LoadIncrementalHFiles(util.getConfiguration()).run(args); assertEquals(500, util.countRows(table)); } finally {