Repository: hbase
Updated Branches:
  refs/heads/0.98 beb76b920 -> 6e5a773d3


HBASE-16052 Improve HBaseFsck Scalability (Ben Lau)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e5a773d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e5a773d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e5a773d

Branch: refs/heads/0.98
Commit: 6e5a773d359d1ea28d2e4042f4d9b0b6cbb448eb
Parents: beb76b9
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Jul 19 13:56:43 2016 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Tue Jul 19 13:56:43 2016 -0700

----------------------------------------------------------------------
 .../hbase/util/AbstractFileStatusFilter.java    |  67 ++++
 .../org/apache/hadoop/hbase/util/FSUtils.java   | 327 +++++++++++++++----
 .../org/apache/hadoop/hbase/util/FSVisitor.java |   5 +-
 .../hadoop/hbase/util/FileStatusFilter.java     |  36 ++
 .../org/apache/hadoop/hbase/util/HBaseFsck.java | 172 ++++++----
 .../hbase/util/hbck/HFileCorruptionChecker.java |  27 +-
 6 files changed, 482 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractFileStatusFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractFileStatusFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractFileStatusFilter.java
new file mode 100644
index 0000000..289cf89
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractFileStatusFilter.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import edu.umd.cs.findbugs.annotations.CheckForNull;
+
+/**
+ * Typical base class for file status filter.  Works more efficiently when
+ * filtering file statuses, otherwise implementation will need to lookup 
filestatus
+ * for the path which will be expensive.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class AbstractFileStatusFilter implements PathFilter, 
FileStatusFilter {
+
+  /**
+   * Filters out a path.  Can be given an optional directory hint to avoid
+   * filestatus lookup.
+   *
+   * @param p       A filesystem path
+   * @param isDir   An optional boolean indicating whether the path is a 
directory or not
+   * @return        true if the path is accepted, false if the path is 
filtered out
+   */
+  protected abstract boolean accept(Path p, @CheckForNull Boolean isDir);
+
+  @Override
+  public boolean accept(FileStatus f) {
+    return accept(f.getPath(), f.isDirectory());
+  }
+
+  @Override
+  public boolean accept(Path p) {
+    return accept(p, null);
+  }
+
+  protected boolean isFile(FileSystem fs, @CheckForNull Boolean isDir, Path p) 
throws IOException {
+    return !isDirectory(fs, isDir, p);
+  }
+
+  protected boolean isDirectory(FileSystem fs, @CheckForNull Boolean isDir, 
Path p) throws IOException {
+    return isDir != null ? isDir : fs.isDirectory(p);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 74dad43..218cc18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -31,14 +31,21 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Vector;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -82,8 +89,12 @@ import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterators;
 import com.google.common.primitives.Ints;
 
+import edu.umd.cs.findbugs.annotations.CheckForNull;
+
 /**
  * Utility methods for interacting with the underlying file system.
  */
@@ -1154,7 +1165,7 @@ public abstract class FSUtils {
   /**
    * A {@link PathFilter} that returns only regular files.
    */
-  static class FileFilter implements PathFilter {
+  static class FileFilter extends AbstractFileStatusFilter {
     private final FileSystem fs;
 
     public FileFilter(final FileSystem fs) {
@@ -1162,11 +1173,11 @@ public abstract class FSUtils {
     }
 
     @Override
-    public boolean accept(Path p) {
+    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
       try {
-        return fs.isFile(p);
+        return isFile(fs, isDir, p);
       } catch (IOException e) {
-        LOG.debug("unable to verify if path=" + p + " is a regular file", e);
+        LOG.warn("unable to verify if path=" + p + " is a regular file", e);
         return false;
       }
     }
@@ -1175,7 +1186,7 @@ public abstract class FSUtils {
   /**
    * Directory filter that doesn't include any of the directories in the 
specified blacklist
    */
-  public static class BlackListDirFilter implements PathFilter {
+  public static class BlackListDirFilter extends AbstractFileStatusFilter {
     private final FileSystem fs;
     private List<String> blacklist;
 
@@ -1194,19 +1205,18 @@ public abstract class FSUtils {
     }
 
     @Override
-    public boolean accept(Path p) {
-      boolean isValid = false;
+    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
+      if (!isValidName(p.getName())) {
+        return false;
+      }
+
       try {
-        if (isValidName(p.getName())) {
-          isValid = fs.getFileStatus(p).isDir();
-        } else {
-          isValid = false;
-        }
+        return isDirectory(fs, isDir, p);
       } catch (IOException e) {
         LOG.warn("An error occurred while verifying if [" + p.toString()
             + "] is a valid directory. Returning 'not valid' and continuing.", 
e);
+        return false;
       }
-      return isValid;
     }
 
     protected boolean isValidName(final String name) {
@@ -1344,7 +1354,7 @@ public abstract class FSUtils {
   /**
    * Filter for all dirs that don't start with '.'
    */
-  public static class RegionDirFilter implements PathFilter {
+  public static class RegionDirFilter extends AbstractFileStatusFilter {
     // This pattern will accept 0.90+ style hex region dirs and older numeric 
region dir names.
     final public static Pattern regionDirPattern = 
Pattern.compile("^[0-9a-f]*$");
     final FileSystem fs;
@@ -1354,16 +1364,16 @@ public abstract class FSUtils {
     }
 
     @Override
-    public boolean accept(Path rd) {
-      if (!regionDirPattern.matcher(rd.getName()).matches()) {
+    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
+      if (!regionDirPattern.matcher(p.getName()).matches()) {
         return false;
       }
 
       try {
-        return fs.getFileStatus(rd).isDir();
+        return isDirectory(fs, isDir, p);
       } catch (IOException ioe) {
         // Maybe the file was moved or the fs was disconnected.
-        LOG.warn("Skipping file " + rd +" due to IOException", ioe);
+        LOG.warn("Skipping file " + p +" due to IOException", ioe);
         return false;
       }
     }
@@ -1379,8 +1389,11 @@ public abstract class FSUtils {
    */
   public static List<Path> getRegionDirs(final FileSystem fs, final Path 
tableDir) throws IOException {
     // assumes we are in a table dir.
-    FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
-    List<Path> regionDirs = new ArrayList<Path>(rds.length);
+    List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new 
RegionDirFilter(fs));
+    if (rds == null) {
+      return new ArrayList<Path>();
+    }
+    List<Path> regionDirs = new ArrayList<Path>(rds.size());
     for (FileStatus rdfs: rds) {
       Path rdPath = rdfs.getPath();
       regionDirs.add(rdPath);
@@ -1392,7 +1405,7 @@ public abstract class FSUtils {
    * Filter for all dirs that are legal column family names.  This is 
generally used for colfam
    * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
    */
-  public static class FamilyDirFilter implements PathFilter {
+  public static class FamilyDirFilter extends AbstractFileStatusFilter {
     final FileSystem fs;
 
     public FamilyDirFilter(FileSystem fs) {
@@ -1400,20 +1413,20 @@ public abstract class FSUtils {
     }
 
     @Override
-    public boolean accept(Path rd) {
+    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
       try {
         // throws IAE if invalid
-        HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName()));
+        HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName()));
       } catch (IllegalArgumentException iae) {
         // path name is an invalid family name and thus is excluded.
         return false;
       }
 
       try {
-        return fs.getFileStatus(rd).isDir();
+        return isDirectory(fs, isDir, p);
       } catch (IOException ioe) {
         // Maybe the file was moved or the fs was disconnected.
-        LOG.warn("Skipping file " + rd +" due to IOException", ioe);
+        LOG.warn("Skipping file " + p +" due to IOException", ioe);
         return false;
       }
     }
@@ -1439,8 +1452,11 @@ public abstract class FSUtils {
   }
 
   public static List<Path> getReferenceFilePaths(final FileSystem fs, final 
Path familyDir) throws IOException {
-    FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs));
-    List<Path> referenceFiles = new ArrayList<Path>(fds.length);
+    List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new 
ReferenceFileFilter(fs));
+    if (fds == null) {
+      return new ArrayList<Path>();
+    }
+    List<Path> referenceFiles = new ArrayList<Path>(fds.size());
     for (FileStatus fdfs: fds) {
       Path fdPath = fdfs.getPath();
       referenceFiles.add(fdPath);
@@ -1451,7 +1467,7 @@ public abstract class FSUtils {
   /**
    * Filter for HFiles that excludes reference files.
    */
-  public static class HFileFilter implements PathFilter {
+  public static class HFileFilter extends AbstractFileStatusFilter {
     final FileSystem fs;
 
     public HFileFilter(FileSystem fs) {
@@ -1459,19 +1475,22 @@ public abstract class FSUtils {
     }
 
     @Override
-    public boolean accept(Path rd) {
+    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
+      if (!StoreFileInfo.isHFile(p)) {
+        return false;
+      }
+
       try {
-        // only files
-        return !fs.getFileStatus(rd).isDir() && StoreFileInfo.isHFile(rd);
+        return isFile(fs, isDir, p);
       } catch (IOException ioe) {
         // Maybe the file was moved or the fs was disconnected.
-        LOG.warn("Skipping file " + rd +" due to IOException", ioe);
+        LOG.warn("Skipping file " + p +" due to IOException", ioe);
         return false;
       }
     }
   }
 
-  public static class ReferenceFileFilter implements PathFilter {
+  public static class ReferenceFileFilter extends AbstractFileStatusFilter {
 
     private final FileSystem fs;
 
@@ -1480,13 +1499,17 @@ public abstract class FSUtils {
     }
 
     @Override
-    public boolean accept(Path rd) {
+    protected boolean accept(Path p, @CheckForNull Boolean isDir) {
+      if (!StoreFileInfo.isReference(p)) {
+        return false;
+      }
+
       try {
         // only files can be references.
-        return !fs.getFileStatus(rd).isDir() && StoreFileInfo.isReference(rd);
+        return isFile(fs, isDir, p);
       } catch (IOException ioe) {
         // Maybe the file was moved or the fs was disconnected.
-        LOG.warn("Skipping file " + rd +" due to IOException", ioe);
+        LOG.warn("Skipping file " + p +" due to IOException", ioe);
         return false;
       }
     }
@@ -1518,67 +1541,149 @@ public abstract class FSUtils {
    * @param tableName name of the table to scan.
    * @return Map keyed by StoreFile name with a value of the full Path.
    * @throws IOException When scanning the directory fails.
+   * @throws InterruptedException
    */
   public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> 
map,
   final FileSystem fs, final Path hbaseRootDir, TableName tableName)
-  throws IOException {
-    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null);
+  throws IOException, InterruptedException {
+    return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, 
null, null);
   }
 
   /**
    * Runs through the HBase rootdir/tablename and creates a reverse lookup map 
for
-   * table StoreFile names to the full Path.
+   * table StoreFile names to the full Path.  Note that because this method 
can be called
+   * on a 'live' HBase system that we will skip files that no longer exist by 
the time
+   * we traverse them and similarly the user of the result needs to consider 
that some
+   * entries in this map may not exist by the time this call completes.
    * <br>
    * Example...<br>
    * Key = 3944417774205889744  <br>
    * Value = 
hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744
    *
-   * @param map map to add values.  If null, this method will create and 
populate one to return
+   * @param resultMap map to add values.  If null, this method will create and 
populate one to return
    * @param fs  The file system to use.
    * @param hbaseRootDir  The root directory to scan.
    * @param tableName name of the table to scan.
+   * @param sfFilter optional path filter to apply to store files
+   * @param executor optional executor service to parallelize this operation
    * @param errors ErrorReporter instance or null
    * @return Map keyed by StoreFile name with a value of the full Path.
    * @throws IOException When scanning the directory fails.
+   * @throws InterruptedException
    */
-  public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> 
map,
-  final FileSystem fs, final Path hbaseRootDir, TableName tableName, 
ErrorReporter errors)
-  throws IOException {
-    if (map == null) {
-      map = new HashMap<String, Path>();
-    }
+  public static Map<String, Path> getTableStoreFilePathMap(
+      Map<String, Path> resultMap,
+      final FileSystem fs, final Path hbaseRootDir, TableName tableName, final 
PathFilter sfFilter,
+      ExecutorService executor, final ErrorReporter errors) throws 
IOException, InterruptedException {
+
+    final Map<String, Path> finalResultMap =
+        resultMap == null ? new ConcurrentHashMap<String, Path>(128, 0.75f, 
32) : resultMap;
 
     // only include the directory paths to tables
     Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName);
     // Inside a table, there are compaction.dir directories to skip.  
Otherwise, all else
     // should be regions.
-    PathFilter familyFilter = new FamilyDirFilter(fs);
-    FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs));
-    for (FileStatus regionDir : regionDirs) {
-      if (null != errors) {
-        errors.progress();
+    final FamilyDirFilter familyFilter = new FamilyDirFilter(fs);
+    final Vector<Exception> exceptions = new Vector<Exception>();
+
+    try {
+      List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, 
tableDir, new RegionDirFilter(fs));
+      if (regionDirs == null) {
+        return finalResultMap;
       }
-      Path dd = regionDir.getPath();
-      // else its a region name, now look in region for families
-      FileStatus[] familyDirs = fs.listStatus(dd, familyFilter);
-      for (FileStatus familyDir : familyDirs) {
+
+      final List<Future<?>> futures = new 
ArrayList<Future<?>>(regionDirs.size());
+
+      for (FileStatus regionDir : regionDirs) {
         if (null != errors) {
           errors.progress();
         }
-        Path family = familyDir.getPath();
-        // now in family, iterate over the StoreFiles and
-        // put in map
-        FileStatus[] familyStatus = fs.listStatus(family);
-        for (FileStatus sfStatus : familyStatus) {
-          if (null != errors) {
-            errors.progress();
+        final Path dd = regionDir.getPath();
+
+        if (!exceptions.isEmpty()) {
+          break;
+        }
+
+        Runnable getRegionStoreFileMapCall = new Runnable() {
+          @Override
+          public void run() {
+            try {
+              HashMap<String,Path> regionStoreFileMap = new HashMap<String, 
Path>();
+              List<FileStatus> familyDirs = 
FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter);
+              if (familyDirs == null) {
+                if (!fs.exists(dd)) {
+                  LOG.warn("Skipping region because it no longer exists: " + 
dd);
+                } else {
+                  LOG.warn("Skipping region because it has no family dirs: " + 
dd);
+                }
+                return;
+              }
+              for (FileStatus familyDir : familyDirs) {
+                if (null != errors) {
+                  errors.progress();
+                }
+                Path family = familyDir.getPath();
+                if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) {
+                  continue;
+                }
+                // now in family, iterate over the StoreFiles and
+                // put in map
+                FileStatus[] familyStatus = fs.listStatus(family);
+                for (FileStatus sfStatus : familyStatus) {
+                  if (null != errors) {
+                    errors.progress();
+                  }
+                  Path sf = sfStatus.getPath();
+                  if (sfFilter == null || sfFilter.accept(sf)) {
+                    regionStoreFileMap.put( sf.getName(), sf);
+                  }
+                }
+              }
+              finalResultMap.putAll(regionStoreFileMap);
+            } catch (Exception e) {
+              LOG.error("Could not get region store file map for region: " + 
dd, e);
+              exceptions.add(e);
+            }
           }
-          Path sf = sfStatus.getPath();
-          map.put( sf.getName(), sf);
+        };
+
+        // If executor is available, submit async tasks to exec concurrently, 
otherwise
+        // just do serial sync execution
+        if (executor != null) {
+          Future<?> future = executor.submit(getRegionStoreFileMapCall);
+          futures.add(future);
+        } else {
+          FutureTask<?> future = new 
FutureTask<Object>(getRegionStoreFileMapCall, null);
+          future.run();
+          futures.add(future);
+        }
+      }
+
+      // Ensure all pending tasks are complete (or that we run into an 
exception)
+      for (Future<?> f : futures) {
+        if (!exceptions.isEmpty()) {
+          break;
         }
+        try {
+          f.get();
+        } catch (ExecutionException e) {
+          LOG.error("Unexpected exec exception!  Should've been caught 
already.  (Bug?)", e);
+          // Shouldn't happen, we already logged/caught any exceptions in the 
Runnable
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e);
+      exceptions.add(e);
+    } finally {
+      if (!exceptions.isEmpty()) {
+        // Just throw the first exception as an indication something bad 
happened
+        // Don't need to propagate all the exceptions, we already logged them 
all anyway
+        Throwables.propagateIfInstanceOf(exceptions.firstElement(), 
IOException.class);
+        throw Throwables.propagate(exceptions.firstElement());
       }
     }
-    return map;
+
+    return finalResultMap;
   }
 
   public static int getRegionReferenceFileCount(final FileSystem fs, final 
Path p) {
@@ -1605,11 +1710,12 @@ public abstract class FSUtils {
    * @param hbaseRootDir  The root directory to scan.
    * @return Map keyed by StoreFile name with a value of the full Path.
    * @throws IOException When scanning the directory fails.
+   * @throws InterruptedException
    */
   public static Map<String, Path> getTableStoreFilePathMap(
     final FileSystem fs, final Path hbaseRootDir)
-  throws IOException {
-    return getTableStoreFilePathMap(fs, hbaseRootDir, null);
+  throws IOException, InterruptedException {
+    return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null);
   }
 
   /**
@@ -1622,14 +1728,18 @@ public abstract class FSUtils {
    *
    * @param fs  The file system to use.
    * @param hbaseRootDir  The root directory to scan.
+   * @param sfFilter optional path filter to apply to store files
+   * @param executor optional executor service to parallelize this operation
    * @param errors ErrorReporter instance or null
    * @return Map keyed by StoreFile name with a value of the full Path.
    * @throws IOException When scanning the directory fails.
+   * @throws InterruptedException
    */
   public static Map<String, Path> getTableStoreFilePathMap(
-    final FileSystem fs, final Path hbaseRootDir, ErrorReporter errors)
-  throws IOException {
-    Map<String, Path> map = new HashMap<String, Path>();
+    final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter,
+    ExecutorService executor, ErrorReporter errors)
+  throws IOException, InterruptedException {
+    ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<String, 
Path>(1024, 0.75f, 32);
 
     // if this method looks similar to 'getTableFragmentation' that is because
     // it was borrowed from it.
@@ -1637,12 +1747,45 @@ public abstract class FSUtils {
     // only include the directory paths to tables
     for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) {
       getTableStoreFilePathMap(map, fs, hbaseRootDir,
-          FSUtils.getTableName(tableDir), errors);
+          FSUtils.getTableName(tableDir), sfFilter, executor, errors);
     }
     return map;
   }
 
   /**
+   * Filters FileStatuses in an array and returns a list
+   *
+   * @param input   An array of FileStatuses
+   * @param filter  A required filter to filter the array
+   * @return        A list of FileStatuses
+   */
+  public static List<FileStatus> filterFileStatuses(FileStatus[] input,
+      FileStatusFilter filter) {
+    if (input == null) return null;
+    return filterFileStatuses(Iterators.forArray(input), filter);
+  }
+
+  /**
+   * Filters FileStatuses in an iterator and returns a list
+   *
+   * @param input   An iterator of FileStatuses
+   * @param filter  A required filter to filter the array
+   * @return        A list of FileStatuses
+   */
+  public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input,
+      FileStatusFilter filter) {
+    if (input == null) return null;
+    ArrayList<FileStatus> results = new ArrayList<FileStatus>();
+    while (input.hasNext()) {
+      FileStatus f = input.next();
+      if (filter.accept(f)) {
+        results.add(f);
+      }
+    }
+    return results;
+  }
+
+  /**
    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
    * This accommodates differences between hadoop versions, where hadoop 1
    * does not throw a FileNotFoundException, and return an empty FileStatus[]
@@ -1650,6 +1793,48 @@ public abstract class FSUtils {
    *
    * @param fs file system
    * @param dir directory
+   * @param filter file status filter
+   * @return null if dir is empty or doesn't exist, otherwise FileStatus list
+   */
+  public static List<FileStatus> listStatusWithStatusFilter(final FileSystem 
fs,
+      final Path dir, final FileStatusFilter filter) throws IOException {
+    FileStatus [] status = null;
+    try {
+      status = fs.listStatus(dir);
+    } catch (FileNotFoundException fnfe) {
+      // if directory doesn't exist, return null
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(dir + " doesn't exist");
+      }
+    }
+
+    if (status == null || status.length < 1)  {
+      return null;
+    }
+
+    if (filter == null) {
+      return Arrays.asList(status);
+    } else {
+      List<FileStatus> status2 = filterFileStatuses(status, filter);
+      if (status2 == null || status2.isEmpty()) {
+        return null;
+      } else {
+        return status2;
+      }
+    }
+  }
+
+  /**
+   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
+   * This accommodates differences between hadoop versions, where hadoop 1
+   * does not throw a FileNotFoundException, and return an empty FileStatus[]
+   * while Hadoop 2 will throw FileNotFoundException.
+   *
+   * Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem,
+   * Path, FileStatusFilter)} instead.
+   *
+   * @param fs file system
+   * @param dir directory
    * @param filter path filter
    * @return null if dir is empty or doesn't exist, otherwise FileStatus array
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
index 5e14db6..a43af76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.NavigableSet;
 
 import org.apache.commons.logging.Log;
@@ -94,7 +95,7 @@ public final class FSVisitor {
    */
   public static void visitTableStoreFiles(final FileSystem fs, final Path 
tableDir,
       final StoreFileVisitor visitor) throws IOException {
-    FileStatus[] regions = FSUtils.listStatus(fs, tableDir, new 
FSUtils.RegionDirFilter(fs));
+    List<FileStatus> regions = FSUtils.listStatusWithStatusFilter(fs, 
tableDir, new FSUtils.RegionDirFilter(fs));
     if (regions == null) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("No regions under directory:" + tableDir);
@@ -117,7 +118,7 @@ public final class FSVisitor {
    */
   public static void visitRegionStoreFiles(final FileSystem fs, final Path 
regionDir,
       final StoreFileVisitor visitor) throws IOException {
-    FileStatus[] families = FSUtils.listStatus(fs, regionDir, new 
FSUtils.FamilyDirFilter(fs));
+    List<FileStatus> families = FSUtils.listStatusWithStatusFilter(fs, 
regionDir, new FSUtils.FamilyDirFilter(fs));
     if (families == null) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("No families under region directory:" + regionDir);

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java
new file mode 100644
index 0000000..c81db75
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface FileStatusFilter {
+  /**
+   * Tests whether or not the specified filestatus should be
+   * included in a filestatus list.
+   *
+   * @param  f  The filestatus to be tested
+   * @return  <code>true</code> if and only if the filestatus
+   *          should be included
+   */
+  boolean accept(FileStatus f);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index feb75b7..5a1f84f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -40,6 +40,7 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.Vector;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
@@ -99,6 +100,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.Block
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
@@ -878,26 +880,16 @@ public class HBaseFsck extends Configured {
    * Lingering reference file prevents a region from opening. It has to
    * be fixed before a cluster can start properly.
    */
-  private void offlineReferenceFileRepair() throws IOException {
+  private void offlineReferenceFileRepair() throws IOException, 
InterruptedException {
     Configuration conf = getConf();
     Path hbaseRoot = FSUtils.getRootDir(conf);
     FileSystem fs = hbaseRoot.getFileSystem(conf);
     LOG.info("Computing mapping of all store files");
-    Map<String, Path> allFiles = FSUtils.getTableStoreFilePathMap(fs, 
hbaseRoot, errors);
+    Map<String, Path> allFiles = FSUtils.getTableStoreFilePathMap(fs, 
hbaseRoot,
+      new FSUtils.ReferenceFileFilter(fs), executor, errors);
     errors.print("");
     LOG.info("Validating mapping using HDFS state");
     for (Path path: allFiles.values()) {
-      boolean isReference = false;
-      try {
-        isReference = StoreFileInfo.isReference(path);
-      } catch (Throwable t) {
-        // Ignore. Some files may not be store files at all.
-        // For example, files under .oldlogs folder in hbase:meta
-        // Warning message is already logged by
-        // StoreFile#isReference.
-      }
-      if (!isReference) continue;
-
       Path referredToFile = StoreFileInfo.getReferredToFile(path);
       if (fs.exists(referredToFile)) continue;  // good, expected
 
@@ -1573,23 +1565,22 @@ public class HBaseFsck extends Configured {
       }
     }
 
-    // level 1:  <HBASE_DIR>/*
-    List<WorkItemHdfsDir> dirs = new 
ArrayList<WorkItemHdfsDir>(tableDirs.size());
-    List<Future<Void>> dirsFutures;
-
+    // Avoid multithreading at table-level because already multithreaded 
internally at
+    // region-level.  Additionally multithreading at table-level can lead to 
deadlock
+    // if there are many tables in the cluster.  Since there are a limited # 
of threads
+    // in the executor's thread pool and if we multithread at the table-level 
by putting
+    // WorkItemHdfsDir callables into the executor, then we will have some 
threads in the
+    // executor tied up solely in waiting for the tables' region-level calls 
to complete.
+    // If there are enough tables then there will be no actual threads in the 
pool left
+    // for the region-level callables to be serviced.
     for (FileStatus tableDir : tableDirs) {
       LOG.debug("Loading region dirs from " +tableDir.getPath());
-      dirs.add(new WorkItemHdfsDir(this, fs, errors, tableDir));
-    }
-
-    // Invoke and wait for Callables to complete
-    dirsFutures = executor.invokeAll(dirs);
-
-    for(Future<Void> f: dirsFutures) {
+      WorkItemHdfsDir item = new WorkItemHdfsDir(fs, errors, tableDir);
       try {
-        f.get();
-      } catch(ExecutionException e) {
-        LOG.warn("Could not load region dir " , e.getCause());
+        item.call();
+      } catch (ExecutionException e) {
+        LOG.warn("Could not completely load table dir " +
+            tableDir.getPath(), e.getCause());
       }
     }
     errors.print("");
@@ -3737,70 +3728,117 @@ public class HBaseFsck extends Configured {
    * Contact hdfs and get all information about specified table directory into
    * regioninfo list.
    */
-  static class WorkItemHdfsDir implements Callable<Void> {
-    private HBaseFsck hbck;
+  class WorkItemHdfsDir implements Callable<Void> {
     private FileStatus tableDir;
     private ErrorReporter errors;
     private FileSystem fs;
 
-    WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors,
+    WorkItemHdfsDir(FileSystem fs, ErrorReporter errors,
                     FileStatus status) {
-      this.hbck = hbck;
       this.fs = fs;
       this.tableDir = status;
       this.errors = errors;
     }
 
     @Override
-    public synchronized Void call() throws IOException {
+    public synchronized Void call() throws InterruptedException, 
ExecutionException {
+      final Vector<Exception> exceptions = new Vector<Exception>();
+
       try {
-        // level 2: <HBASE_DIR>/<table>/*
-        FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
-        for (FileStatus regionDir : regionDirs) {
+        final FileStatus[] regionDirs = fs.listStatus(tableDir.getPath());
+        final List<Future<?>> futures = new 
ArrayList<Future<?>>(regionDirs.length);
+
+        for (final FileStatus regionDir : regionDirs) {
           errors.progress();
-          String encodedName = regionDir.getPath().getName();
+          final String encodedName = regionDir.getPath().getName();
           // ignore directories that aren't hexadecimal
           if (!encodedName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) {
             continue;
           }
 
-          LOG.debug("Loading region info from hdfs:"+ regionDir.getPath());
-          HbckInfo hbi = hbck.getOrCreateInfo(encodedName);
-          HdfsEntry he = new HdfsEntry();
-          synchronized (hbi) {
-            if (hbi.getHdfsRegionDir() != null) {
-              errors.print("Directory " + encodedName + " duplicate??" +
-                           hbi.getHdfsRegionDir());
-            }
+          if (!exceptions.isEmpty()) {
+            break;
+          }
+
+          futures.add(executor.submit(new Runnable() {
+            @Override
+            public void run() {
+              try {
+                LOG.debug("Loading region info from hdfs:"+ 
regionDir.getPath());
+
+                Path regioninfoFile = new Path(regionDir.getPath(), 
HRegionFileSystem.REGION_INFO_FILE);
+                boolean regioninfoFileExists = fs.exists(regioninfoFile);
+
+                if (!regioninfoFileExists) {
+                  // As tables become larger it is more and more likely that 
by the time you
+                  // reach a given region that it will be gone due to region 
splits/merges.
+                  if (!fs.exists(regionDir.getPath())) {
+                    LOG.warn("By the time we tried to process this region dir 
it was already gone: "
+                        + regionDir.getPath());
+                    return;
+                  }
+                }
 
-            he.hdfsRegionDir = regionDir.getPath();
-            he.hdfsRegionDirModTime = regionDir.getModificationTime();
-            Path regioninfoFile = new Path(he.hdfsRegionDir, 
HRegionFileSystem.REGION_INFO_FILE);
-            he.hdfsRegioninfoFilePresent = fs.exists(regioninfoFile);
-            // we add to orphan list when we attempt to read .regioninfo
-
-            // Set a flag if this region contains only edits
-            // This is special case if a region is left after split
-            he.hdfsOnlyEdits = true;
-            FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
-            Path ePath = 
HLogUtil.getRegionDirRecoveredEditsDir(regionDir.getPath());
-            for (FileStatus subDir : subDirs) {
-              errors.progress();
-              String sdName = subDir.getPath().getName();
-              if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) {
-                he.hdfsOnlyEdits = false;
-                break;
+                HbckInfo hbi = HBaseFsck.this.getOrCreateInfo(encodedName);
+                HdfsEntry he = new HdfsEntry();
+                synchronized (hbi) {
+                  if (hbi.getHdfsRegionDir() != null) {
+                    errors.print("Directory " + encodedName + " duplicate??" +
+                                 hbi.getHdfsRegionDir());
+                  }
+
+                  he.hdfsRegionDir = regionDir.getPath();
+                  he.hdfsRegionDirModTime = regionDir.getModificationTime();
+                  he.hdfsRegioninfoFilePresent = regioninfoFileExists;
+                  // we add to orphan list when we attempt to read .regioninfo
+
+                  // Set a flag if this region contains only edits
+                  // This is special case if a region is left after split
+                  he.hdfsOnlyEdits = true;
+                  FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
+                  Path ePath = 
HLogUtil.getRegionDirRecoveredEditsDir(regionDir.getPath());
+                  for (FileStatus subDir : subDirs) {
+                    errors.progress();
+                    String sdName = subDir.getPath().getName();
+                    if (!sdName.startsWith(".") && 
!sdName.equals(ePath.getName())) {
+                      he.hdfsOnlyEdits = false;
+                      break;
+                    }
+                  }
+                  hbi.hdfsEntry = he;
+                }
+              } catch (Exception e) {
+                LOG.error("Could not load region dir", e);
+                exceptions.add(e);
               }
             }
-            hbi.hdfsEntry = he;
+          }));
+        }
+
+        // Ensure all pending tasks are complete (or that we run into an 
exception)
+        for (Future<?> f : futures) {
+          if (!exceptions.isEmpty()) {
+            break;
           }
+          try {
+            f.get();
+          } catch (ExecutionException e) {
+            LOG.error("Unexpected exec exception!  Should've been caught 
already.  (Bug?)", e);
+            // Shouldn't happen, we already logged/caught any exceptions in 
the Runnable
+          };
         }
       } catch (IOException e) {
-        // unable to connect to the region server.
-        errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: "
-            + tableDir.getPath().getName()
-            + " Unable to fetch region information. " + e);
-        throw e;
+        LOG.error("Cannot execute WorkItemHdfsDir for " + tableDir, e);
+        exceptions.add(e);
+      } finally {
+        if (!exceptions.isEmpty()) {
+          errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: "
+              + tableDir.getPath().getName()
+              + " Unable to fetch all HDFS region information. ");
+          // Just throw the first exception as an indication something bad 
happened
+          // Don't need to propagate all the exceptions, we already logged 
them all anyway
+          throw new ExecutionException("First exception in WorkItemHdfsDir", 
exceptions.firstElement());
+        }
       }
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
index 23dc570..db700c8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java
@@ -152,9 +152,9 @@ public class HFileCorruptionChecker {
    * @throws IOException
    */
   protected void checkColFamDir(Path cfDir) throws IOException {
-    FileStatus[] hfs = null;
+    FileStatus[] statuses = null;
     try {
-      hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as 
scanner.
+      statuses = fs.listStatus(cfDir); // use same filter as scanner.
     } catch (FileNotFoundException fnfe) {
       // Hadoop 0.23+ listStatus semantics throws an exception if the path 
does not exist.
       LOG.warn("Colfam Directory " + cfDir +
@@ -163,8 +163,9 @@ public class HFileCorruptionChecker {
       return;
     }
 
+    List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new 
HFileFilter(fs));
     // Hadoop 1.0 listStatus does not throw an exception if the path does not 
exist.
-    if (hfs.length == 0 && !fs.exists(cfDir)) {
+    if (hfs.size() == 0 && !fs.exists(cfDir)) {
       LOG.warn("Colfam Directory " + cfDir +
           " does not exist.  Likely due to concurrent split/compaction. 
Skipping.");
       missing.add(cfDir);
@@ -184,9 +185,9 @@ public class HFileCorruptionChecker {
    * @throws IOException
    */
   protected void checkRegionDir(Path regionDir) throws IOException {
-    FileStatus[] cfs = null;
+    FileStatus[] statuses = null;
     try {
-      cfs = fs.listStatus(regionDir, new FamilyDirFilter(fs));
+      statuses = fs.listStatus(regionDir);
     } catch (FileNotFoundException fnfe) {
       // Hadoop 0.23+ listStatus semantics throws an exception if the path 
does not exist.
       LOG.warn("Region Directory " + regionDir +
@@ -195,8 +196,9 @@ public class HFileCorruptionChecker {
       return;
     }
 
+    List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new 
FamilyDirFilter(fs));
     // Hadoop 1.0 listStatus does not throw an exception if the path does not 
exist.
-    if (cfs.length == 0 && !fs.exists(regionDir)) {
+    if (cfs.size() == 0 && !fs.exists(regionDir)) {
       LOG.warn("Region Directory " + regionDir +
           " does not exist.  Likely due to concurrent split/compaction. 
Skipping.");
       missing.add(regionDir);
@@ -217,12 +219,13 @@ public class HFileCorruptionChecker {
    * @throws IOException
    */
   void checkTableDir(Path tableDir) throws IOException {
-    FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs));
-    if (rds.length == 0 && !fs.exists(tableDir)) {
-      // interestingly listStatus does not throw an exception if the path does 
not exist.
-      LOG.warn("Table Directory " + tableDir +
-          " does not exist.  Likely due to concurrent delete. Skipping.");
-      missing.add(tableDir);
+    List<FileStatus> rds = FSUtils.listStatusWithStatusFilter(fs, tableDir, 
new RegionDirFilter(fs));
+    if (rds == null) {
+      if (!fs.exists(tableDir)) {
+        LOG.warn("Table Directory " + tableDir +
+            " does not exist.  Likely due to concurrent delete. Skipping.");
+        missing.add(tableDir);
+      }
       return;
     }
 

Reply via email to