HBASE-13253 LoadIncrementalHFiles unify hfiles discovery

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a46666d7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a46666d7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a46666d7

Branch: refs/heads/0.98
Commit: a46666d761a8a4d8a7577410eea514f52caf0f30
Parents: b428f10
Author: Matteo Bertozzi <matteo.berto...@cloudera.com>
Authored: Tue Mar 17 19:38:39 2015 +0000
Committer: Matteo Bertozzi <matteo.berto...@cloudera.com>
Committed: Tue Mar 17 20:37:00 2015 +0000

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 202 ++++++++++---------
 .../mapreduce/TestLoadIncrementalHFiles.java    |  25 ++-
 2 files changed, 124 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a46666d7/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 7657571..e3bc9ce 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -72,6 +71,7 @@ import org.apache.hadoop.hbase.client.RegionServerCallable;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -152,6 +153,75 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
         + "\n");
   }
 
+  private static interface BulkHFileVisitor<TFamily> {
+    TFamily bulkFamily(final byte[] familyName)
+      throws IOException;
+    void bulkHFile(final TFamily family, final FileStatus hfileStatus)
+      throws IOException;
+  }
+
+  /**
+   * Iterate over the bulkDir hfiles.
+   * Skip reference, HFileLink, files starting with "_" and non-valid hfiles.
+   */
+  private static <TFamily> void visitBulkHFiles(final FileSystem fs, final 
Path bulkDir,
+    final BulkHFileVisitor<TFamily> visitor) throws IOException {
+    if (!fs.exists(bulkDir)) {
+      throw new FileNotFoundException("Bulkload dir " + bulkDir + " not 
found");
+    }
+
+    FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
+    if (familyDirStatuses == null) {
+      throw new FileNotFoundException("No families found in " + bulkDir);
+    }
+
+    for (FileStatus familyStat : familyDirStatuses) {
+      if (!familyStat.isDir()) {
+        LOG.warn("Skipping non-directory " + familyStat.getPath());
+        continue;
+      }
+      Path familyDir = familyStat.getPath();
+      byte[] familyName = familyDir.getName().getBytes();
+      TFamily family = visitor.bulkFamily(familyName);
+
+      FileStatus[] hfileStatuses = fs.listStatus(familyDir);
+      for (FileStatus hfileStatus : hfileStatuses) {
+        if (!fs.isFile(hfileStatus.getPath())) {
+          LOG.warn("Skipping non-file " + hfileStatus);
+          continue;
+        }
+
+        Path hfile = hfileStatus.getPath();
+        // Skip "_", reference, HFileLink
+        String fileName = hfile.getName();
+        if (fileName.startsWith("_")) {
+          continue;
+        }
+        if (StoreFileInfo.isReference(fileName)) {
+          LOG.warn("Skipping reference " + fileName);
+          continue;
+        }
+        if (HFileLink.isHFileLink(fileName)) {
+          LOG.warn("Skipping HFileLink " + fileName);
+          continue;
+        }
+
+        // Validate HFile Format
+        try {
+          if (!HFile.isHFileFormat(fs, hfile)) {
+            LOG.warn("the file " + hfile + " doesn't seems to be an hfile. 
skipping");
+            continue;
+          }
+        } catch (FileNotFoundException e) {
+          LOG.warn("the file " + hfile + " was removed");
+          continue;
+        }
+
+        visitor.bulkHFile(family, hfileStatus);
+      }
+    }
+  }
+
   /**
    * Represents an HFile waiting to be loaded. An queue is used
    * in this class in order to support the case where a region has
@@ -178,50 +248,25 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
    * Walk the given directory for all HFiles, and return a Queue
    * containing all such files.
    */
-  private void discoverLoadQueue(Deque<LoadQueueItem> ret, Path hfofDir)
+  private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path 
hfofDir)
   throws IOException {
     fs = hfofDir.getFileSystem(getConf());
-
-    if (!fs.exists(hfofDir)) {
-      throw new FileNotFoundException("HFileOutputFormat dir " +
-          hfofDir + " not found");
-    }
-
-    FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
-    if (familyDirStatuses == null) {
-      throw new FileNotFoundException("No families found in " + hfofDir);
-    }
-
-    for (FileStatus stat : familyDirStatuses) {
-      if (!stat.isDir()) {
-        LOG.warn("Skipping non-directory " + stat.getPath());
-        continue;
-      }
-      Path familyDir = stat.getPath();
-      if (familyDir.getName().equals("_logs")) {
-        // Family name of "_logs" is not supported.
-        // This is due to the presence of history directory under _logs 
directory
-        // when hadoop-1 is used
-        continue;
+    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
+      @Override
+      public byte[] bulkFamily(final byte[] familyName) {
+        return familyName;
       }
-      byte[] family = familyDir.getName().getBytes();
-      FileStatus[] hfileStatuses = fs.listStatus(familyDir);
-      for (FileStatus hfileStatus : hfileStatuses) {
-        if (!fs.isFile(hfileStatus.getPath())) {
-          LOG.warn("Skipping non-file " + hfileStatus);
-          continue;
+      @Override
+      public void bulkHFile(final byte[] family, final FileStatus hfile) 
throws IOException {
+        long length = hfile.getLen();
+        if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
+            HConstants.DEFAULT_MAX_FILE_SIZE)) {
+          LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with 
size: " +
+              length + " bytes can be problematic as it may lead to 
oversplitting.");
         }
-        long length = hfileStatus.getLen();
-        Path hfile = hfileStatus.getPath();
-        if (hfile.getName().startsWith("_")) continue;
-        if(length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
-             HConstants.DEFAULT_MAX_FILE_SIZE)) {
-           LOG.warn("Trying to bulk load hfile " + hfofDir.toString() + " with 
size: " +
-               length + " bytes can be problematic as it may lead to 
oversplitting.");
-         }
-        ret.add(new LoadQueueItem(family, hfile));
+        ret.add(new LoadQueueItem(family, hfile.getPath()));
       }
-    }
+    });
   }
 
   /**
@@ -273,20 +318,7 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
         LoadQueueItem lqi = queueIter.next();
         String familyNameInHFile = Bytes.toString(lqi.family);
         if (!familyNames.contains(familyNameInHFile)) {
-          boolean isValid = false;
-          try {
-            isValid = 
HFile.isHFileFormat(lqi.hfilePath.getFileSystem(getConf()), lqi.hfilePath);
-            if (!isValid) {
-              LOG.warn("the file " + lqi + " doesn't seems to be an hfile. 
skipping");
-            }
-          } catch (FileNotFoundException e) {
-            LOG.warn("the file " + lqi + " was removed");
-          }
-          if (isValid) {
-            unmatchedFamilies.add(familyNameInHFile);
-          } else {
-            queueIter.remove();
-          }
+          unmatchedFamilies.add(familyNameInHFile);
         }
       }
       if (unmatchedFamilies.size() > 0) {
@@ -838,50 +870,26 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
    * More modifications necessary if we want to avoid doing it.
    */
   private void createTable(TableName tableName, String dirPath) throws 
Exception {
-    Path hfofDir = new Path(dirPath);
-    FileSystem fs = hfofDir.getFileSystem(getConf());
-
-    if (!fs.exists(hfofDir)) {
-      throw new FileNotFoundException("HFileOutputFormat dir " +
-          hfofDir + " not found");
-    }
-
-    FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
-    if (familyDirStatuses == null) {
-      throw new FileNotFoundException("No families found in " + hfofDir);
-    }
-
-    HTableDescriptor htd = new HTableDescriptor(tableName);
-    HColumnDescriptor hcd;
+    final Path hfofDir = new Path(dirPath);
+    final FileSystem fs = hfofDir.getFileSystem(getConf());
 
     // Add column families
     // Build a set of keys
-    byte[][] keys;
-    TreeMap<byte[], Integer> map = new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR);
-
-    for (FileStatus stat : familyDirStatuses) {
-      if (!stat.isDir()) {
-        LOG.warn("Skipping non-directory " + stat.getPath());
-        continue;
-      }
-      Path familyDir = stat.getPath();
-      if (familyDir.getName().equals("_logs")) {
-        // Family name of "_logs" is not supported.
-        // This is due to the presence of history directory under _logs 
directory
-        // when hadoop-1 is used
-        continue;
+    final HTableDescriptor htd = new HTableDescriptor(tableName);
+    final TreeMap<byte[], Integer> map = new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR);
+    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
+      @Override
+      public HColumnDescriptor bulkFamily(final byte[] familyName) {
+        HColumnDescriptor hcd = new HColumnDescriptor(familyName);
+        htd.addFamily(hcd);
+        return hcd;
       }
-      byte[] family = familyDir.getName().getBytes();
-
-      hcd = new HColumnDescriptor(family);
-      htd.addFamily(hcd);
-
-      Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
-      for (Path hfile : hfiles) {
-        if (hfile.getName().startsWith("_")) continue;
+      @Override
+      public void bulkHFile(final HColumnDescriptor hcd, final FileStatus 
hfileStatus)
+          throws IOException {
+        Path hfile = hfileStatus.getPath();
         HFile.Reader reader = HFile.createReader(fs, hfile,
             new CacheConfig(getConf()), getConf());
-        final byte[] first, last;
         try {
           if (hcd.getCompressionType() != 
reader.getFileContext().getCompression()) {
             hcd.setCompressionType(reader.getFileContext().getCompression());
@@ -889,8 +897,8 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
                      " for family " + hcd.toString());
           }
           reader.loadFileInfo();
-          first = reader.getFirstRowKey();
-          last =  reader.getLastRowKey();
+          byte[] first = reader.getFirstRowKey();
+          byte[] last  = reader.getLastRowKey();
 
           LOG.info("Trying to figure out region boundaries hfile=" + hfile +
             " first=" + Bytes.toStringBinary(first) +
@@ -902,13 +910,13 @@ public class LoadIncrementalHFiles extends Configured 
implements Tool {
 
           value = map.containsKey(last)? map.get(last):0;
           map.put(last, value-1);
-        }  finally {
+        } finally {
           reader.close();
         }
       }
-    }
+    });
 
-    keys = LoadIncrementalHFiles.inferBoundaries(map);
+    byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
     this.hbAdmin.createTable(htd,keys);
 
     LOG.info("Table "+ tableName +" is available!!");

http://git-wip-us.apache.org/repos/asf/hbase/blob/a46666d7/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 00fe6d3..28bf85d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -306,20 +306,30 @@ public class TestLoadIncrementalHFiles {
     }
   }
 
+  @Test(timeout = 60000)
+  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
+    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
+  }
+
+  @Test(timeout = 60000)
+  public void testNonHfileFolder() throws Exception {
+    testNonHfileFolder("testNonHfileFolder", false);
+  }
+
   /**
    * Write a random data file and a non-file in a dir with a valid family name
    * but not part of the table families. we should we able to bulkload without
    * getting the unmatched family exception. HBASE-13037/HBASE-13227
    */
-  @Test(timeout = 60000)
-  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
-    Path dir = 
util.getDataTestDirOnTestFS("testNonHfileFolderWithUnmatchedFamilyName");
+  private void testNonHfileFolder(String tableName, boolean preCreateTable) 
throws Exception {
+    Path dir = util.getDataTestDirOnTestFS(tableName);
     FileSystem fs = util.getTestFileSystem();
     dir = dir.makeQualified(fs);
 
     Path familyDir = new Path(dir, Bytes.toString(FAMILY));
     HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, 
"hfile_0"),
         FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
+    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
 
     final String NON_FAMILY_FOLDER = "_logs";
     Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
@@ -329,10 +339,13 @@ public class TestLoadIncrementalHFiles {
 
     HTable table = null;
     try {
-      final String TABLE_NAME = 
"mytable_testNonHfileFolderWithUnmatchedFamilyName";
-      table = util.createTable(TableName.valueOf(TABLE_NAME), FAMILY);
+      if (preCreateTable) {
+        table = util.createTable(TableName.valueOf(tableName), FAMILY);
+      } else {
+        table = new HTable(util.getConfiguration(), 
TableName.valueOf(tableName));
+      }
 
-      final String[] args = {dir.toString(), TABLE_NAME};
+      final String[] args = {dir.toString(), tableName};
       new LoadIncrementalHFiles(util.getConfiguration()).run(args);
       assertEquals(500, util.countRows(table));
     } finally {

Reply via email to