Repository: hbase Updated Branches: refs/heads/0.98 beb76b920 -> 6e5a773d3
HBASE-16052 Improve HBaseFsck Scalability (Ben Lau) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e5a773d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e5a773d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e5a773d Branch: refs/heads/0.98 Commit: 6e5a773d359d1ea28d2e4042f4d9b0b6cbb448eb Parents: beb76b9 Author: tedyu <yuzhih...@gmail.com> Authored: Tue Jul 19 13:56:43 2016 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Tue Jul 19 13:56:43 2016 -0700 ---------------------------------------------------------------------- .../hbase/util/AbstractFileStatusFilter.java | 67 ++++ .../org/apache/hadoop/hbase/util/FSUtils.java | 327 +++++++++++++++---- .../org/apache/hadoop/hbase/util/FSVisitor.java | 5 +- .../hadoop/hbase/util/FileStatusFilter.java | 36 ++ .../org/apache/hadoop/hbase/util/HBaseFsck.java | 172 ++++++---- .../hbase/util/hbck/HFileCorruptionChecker.java | 27 +- 6 files changed, 482 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractFileStatusFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractFileStatusFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractFileStatusFilter.java new file mode 100644 index 0000000..289cf89 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/AbstractFileStatusFilter.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; + +import edu.umd.cs.findbugs.annotations.CheckForNull; + +/** + * Typical base class for file status filter. Works more efficiently when + * filtering file statuses, otherwise implementation will need to lookup filestatus + * for the path which will be expensive. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class AbstractFileStatusFilter implements PathFilter, FileStatusFilter { + + /** + * Filters out a path. Can be given an optional directory hint to avoid + * filestatus lookup. + * + * @param p A filesystem path + * @param isDir An optional boolean indicating whether the path is a directory or not + * @return true if the path is accepted, false if the path is filtered out + */ + protected abstract boolean accept(Path p, @CheckForNull Boolean isDir); + + @Override + public boolean accept(FileStatus f) { + return accept(f.getPath(), f.isDirectory()); + } + + @Override + public boolean accept(Path p) { + return accept(p, null); + } + + protected boolean isFile(FileSystem fs, @CheckForNull Boolean isDir, Path p) throws IOException { + return !isDirectory(fs, isDir, p); + } + + protected boolean isDirectory(FileSystem fs, @CheckForNull Boolean isDir, Path p) throws IOException { + return isDir != null ? isDir : fs.isDirectory(p); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 74dad43..218cc18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -31,14 +31,21 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Vector; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -82,8 +89,12 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterators; import com.google.common.primitives.Ints; +import edu.umd.cs.findbugs.annotations.CheckForNull; + /** * Utility methods for interacting with the underlying file system. */ @@ -1154,7 +1165,7 @@ public abstract class FSUtils { /** * A {@link PathFilter} that returns only regular files. */ - static class FileFilter implements PathFilter { + static class FileFilter extends AbstractFileStatusFilter { private final FileSystem fs; public FileFilter(final FileSystem fs) { @@ -1162,11 +1173,11 @@ public abstract class FSUtils { } @Override - public boolean accept(Path p) { + protected boolean accept(Path p, @CheckForNull Boolean isDir) { try { - return fs.isFile(p); + return isFile(fs, isDir, p); } catch (IOException e) { - LOG.debug("unable to verify if path=" + p + " is a regular file", e); + LOG.warn("unable to verify if path=" + p + " is a regular file", e); return false; } } @@ -1175,7 +1186,7 @@ public abstract class FSUtils { /** * Directory filter that doesn't include any of the directories in the specified blacklist */ - public static class BlackListDirFilter implements PathFilter { + public static class BlackListDirFilter extends AbstractFileStatusFilter { private final FileSystem fs; private List<String> blacklist; @@ -1194,19 +1205,18 @@ public abstract class FSUtils { } @Override - public boolean accept(Path p) { - boolean isValid = false; + protected boolean accept(Path p, @CheckForNull Boolean isDir) { + if (!isValidName(p.getName())) { + return false; + } + try { - if (isValidName(p.getName())) { - isValid = fs.getFileStatus(p).isDir(); - } else { - isValid = false; - } + return isDirectory(fs, isDir, p); } catch (IOException e) { LOG.warn("An error occurred while verifying if [" + p.toString() + "] is a valid directory. Returning 'not valid' and continuing.", e); + return false; } - return isValid; } protected boolean isValidName(final String name) { @@ -1344,7 +1354,7 @@ public abstract class FSUtils { /** * Filter for all dirs that don't start with '.' */ - public static class RegionDirFilter implements PathFilter { + public static class RegionDirFilter extends AbstractFileStatusFilter { // This pattern will accept 0.90+ style hex region dirs and older numeric region dir names. final public static Pattern regionDirPattern = Pattern.compile("^[0-9a-f]*$"); final FileSystem fs; @@ -1354,16 +1364,16 @@ public abstract class FSUtils { } @Override - public boolean accept(Path rd) { - if (!regionDirPattern.matcher(rd.getName()).matches()) { + protected boolean accept(Path p, @CheckForNull Boolean isDir) { + if (!regionDirPattern.matcher(p.getName()).matches()) { return false; } try { - return fs.getFileStatus(rd).isDir(); + return isDirectory(fs, isDir, p); } catch (IOException ioe) { // Maybe the file was moved or the fs was disconnected. - LOG.warn("Skipping file " + rd +" due to IOException", ioe); + LOG.warn("Skipping file " + p +" due to IOException", ioe); return false; } } @@ -1379,8 +1389,11 @@ public abstract class FSUtils { */ public static List<Path> getRegionDirs(final FileSystem fs, final Path tableDir) throws IOException { // assumes we are in a table dir. - FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs)); - List<Path> regionDirs = new ArrayList<Path>(rds.length); + List<FileStatus> rds = listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); + if (rds == null) { + return new ArrayList<Path>(); + } + List<Path> regionDirs = new ArrayList<Path>(rds.size()); for (FileStatus rdfs: rds) { Path rdPath = rdfs.getPath(); regionDirs.add(rdPath); @@ -1392,7 +1405,7 @@ public abstract class FSUtils { * Filter for all dirs that are legal column family names. This is generally used for colfam * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. */ - public static class FamilyDirFilter implements PathFilter { + public static class FamilyDirFilter extends AbstractFileStatusFilter { final FileSystem fs; public FamilyDirFilter(FileSystem fs) { @@ -1400,20 +1413,20 @@ public abstract class FSUtils { } @Override - public boolean accept(Path rd) { + protected boolean accept(Path p, @CheckForNull Boolean isDir) { try { // throws IAE if invalid - HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(rd.getName())); + HColumnDescriptor.isLegalFamilyName(Bytes.toBytes(p.getName())); } catch (IllegalArgumentException iae) { // path name is an invalid family name and thus is excluded. return false; } try { - return fs.getFileStatus(rd).isDir(); + return isDirectory(fs, isDir, p); } catch (IOException ioe) { // Maybe the file was moved or the fs was disconnected. - LOG.warn("Skipping file " + rd +" due to IOException", ioe); + LOG.warn("Skipping file " + p +" due to IOException", ioe); return false; } } @@ -1439,8 +1452,11 @@ public abstract class FSUtils { } public static List<Path> getReferenceFilePaths(final FileSystem fs, final Path familyDir) throws IOException { - FileStatus[] fds = fs.listStatus(familyDir, new ReferenceFileFilter(fs)); - List<Path> referenceFiles = new ArrayList<Path>(fds.length); + List<FileStatus> fds = listStatusWithStatusFilter(fs, familyDir, new ReferenceFileFilter(fs)); + if (fds == null) { + return new ArrayList<Path>(); + } + List<Path> referenceFiles = new ArrayList<Path>(fds.size()); for (FileStatus fdfs: fds) { Path fdPath = fdfs.getPath(); referenceFiles.add(fdPath); @@ -1451,7 +1467,7 @@ public abstract class FSUtils { /** * Filter for HFiles that excludes reference files. */ - public static class HFileFilter implements PathFilter { + public static class HFileFilter extends AbstractFileStatusFilter { final FileSystem fs; public HFileFilter(FileSystem fs) { @@ -1459,19 +1475,22 @@ public abstract class FSUtils { } @Override - public boolean accept(Path rd) { + protected boolean accept(Path p, @CheckForNull Boolean isDir) { + if (!StoreFileInfo.isHFile(p)) { + return false; + } + try { - // only files - return !fs.getFileStatus(rd).isDir() && StoreFileInfo.isHFile(rd); + return isFile(fs, isDir, p); } catch (IOException ioe) { // Maybe the file was moved or the fs was disconnected. - LOG.warn("Skipping file " + rd +" due to IOException", ioe); + LOG.warn("Skipping file " + p +" due to IOException", ioe); return false; } } } - public static class ReferenceFileFilter implements PathFilter { + public static class ReferenceFileFilter extends AbstractFileStatusFilter { private final FileSystem fs; @@ -1480,13 +1499,17 @@ public abstract class FSUtils { } @Override - public boolean accept(Path rd) { + protected boolean accept(Path p, @CheckForNull Boolean isDir) { + if (!StoreFileInfo.isReference(p)) { + return false; + } + try { // only files can be references. - return !fs.getFileStatus(rd).isDir() && StoreFileInfo.isReference(rd); + return isFile(fs, isDir, p); } catch (IOException ioe) { // Maybe the file was moved or the fs was disconnected. - LOG.warn("Skipping file " + rd +" due to IOException", ioe); + LOG.warn("Skipping file " + p +" due to IOException", ioe); return false; } } @@ -1518,67 +1541,149 @@ public abstract class FSUtils { * @param tableName name of the table to scan. * @return Map keyed by StoreFile name with a value of the full Path. * @throws IOException When scanning the directory fails. + * @throws InterruptedException */ public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, final FileSystem fs, final Path hbaseRootDir, TableName tableName) - throws IOException { - return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null); + throws IOException, InterruptedException { + return getTableStoreFilePathMap(map, fs, hbaseRootDir, tableName, null, null, null); } /** * Runs through the HBase rootdir/tablename and creates a reverse lookup map for - * table StoreFile names to the full Path. + * table StoreFile names to the full Path. Note that because this method can be called + * on a 'live' HBase system that we will skip files that no longer exist by the time + * we traverse them and similarly the user of the result needs to consider that some + * entries in this map may not exist by the time this call completes. * <br> * Example...<br> * Key = 3944417774205889744 <br> * Value = hdfs://localhost:51169/user/userid/-ROOT-/70236052/info/3944417774205889744 * - * @param map map to add values. If null, this method will create and populate one to return + * @param resultMap map to add values. If null, this method will create and populate one to return * @param fs The file system to use. * @param hbaseRootDir The root directory to scan. * @param tableName name of the table to scan. + * @param sfFilter optional path filter to apply to store files + * @param executor optional executor service to parallelize this operation * @param errors ErrorReporter instance or null * @return Map keyed by StoreFile name with a value of the full Path. * @throws IOException When scanning the directory fails. + * @throws InterruptedException */ - public static Map<String, Path> getTableStoreFilePathMap(Map<String, Path> map, - final FileSystem fs, final Path hbaseRootDir, TableName tableName, ErrorReporter errors) - throws IOException { - if (map == null) { - map = new HashMap<String, Path>(); - } + public static Map<String, Path> getTableStoreFilePathMap( + Map<String, Path> resultMap, + final FileSystem fs, final Path hbaseRootDir, TableName tableName, final PathFilter sfFilter, + ExecutorService executor, final ErrorReporter errors) throws IOException, InterruptedException { + + final Map<String, Path> finalResultMap = + resultMap == null ? new ConcurrentHashMap<String, Path>(128, 0.75f, 32) : resultMap; // only include the directory paths to tables Path tableDir = FSUtils.getTableDir(hbaseRootDir, tableName); // Inside a table, there are compaction.dir directories to skip. Otherwise, all else // should be regions. - PathFilter familyFilter = new FamilyDirFilter(fs); - FileStatus[] regionDirs = fs.listStatus(tableDir, new RegionDirFilter(fs)); - for (FileStatus regionDir : regionDirs) { - if (null != errors) { - errors.progress(); + final FamilyDirFilter familyFilter = new FamilyDirFilter(fs); + final Vector<Exception> exceptions = new Vector<Exception>(); + + try { + List<FileStatus> regionDirs = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); + if (regionDirs == null) { + return finalResultMap; } - Path dd = regionDir.getPath(); - // else its a region name, now look in region for families - FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); - for (FileStatus familyDir : familyDirs) { + + final List<Future<?>> futures = new ArrayList<Future<?>>(regionDirs.size()); + + for (FileStatus regionDir : regionDirs) { if (null != errors) { errors.progress(); } - Path family = familyDir.getPath(); - // now in family, iterate over the StoreFiles and - // put in map - FileStatus[] familyStatus = fs.listStatus(family); - for (FileStatus sfStatus : familyStatus) { - if (null != errors) { - errors.progress(); + final Path dd = regionDir.getPath(); + + if (!exceptions.isEmpty()) { + break; + } + + Runnable getRegionStoreFileMapCall = new Runnable() { + @Override + public void run() { + try { + HashMap<String,Path> regionStoreFileMap = new HashMap<String, Path>(); + List<FileStatus> familyDirs = FSUtils.listStatusWithStatusFilter(fs, dd, familyFilter); + if (familyDirs == null) { + if (!fs.exists(dd)) { + LOG.warn("Skipping region because it no longer exists: " + dd); + } else { + LOG.warn("Skipping region because it has no family dirs: " + dd); + } + return; + } + for (FileStatus familyDir : familyDirs) { + if (null != errors) { + errors.progress(); + } + Path family = familyDir.getPath(); + if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { + continue; + } + // now in family, iterate over the StoreFiles and + // put in map + FileStatus[] familyStatus = fs.listStatus(family); + for (FileStatus sfStatus : familyStatus) { + if (null != errors) { + errors.progress(); + } + Path sf = sfStatus.getPath(); + if (sfFilter == null || sfFilter.accept(sf)) { + regionStoreFileMap.put( sf.getName(), sf); + } + } + } + finalResultMap.putAll(regionStoreFileMap); + } catch (Exception e) { + LOG.error("Could not get region store file map for region: " + dd, e); + exceptions.add(e); + } } - Path sf = sfStatus.getPath(); - map.put( sf.getName(), sf); + }; + + // If executor is available, submit async tasks to exec concurrently, otherwise + // just do serial sync execution + if (executor != null) { + Future<?> future = executor.submit(getRegionStoreFileMapCall); + futures.add(future); + } else { + FutureTask<?> future = new FutureTask<Object>(getRegionStoreFileMapCall, null); + future.run(); + futures.add(future); + } + } + + // Ensure all pending tasks are complete (or that we run into an exception) + for (Future<?> f : futures) { + if (!exceptions.isEmpty()) { + break; } + try { + f.get(); + } catch (ExecutionException e) { + LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); + // Shouldn't happen, we already logged/caught any exceptions in the Runnable + } + } + } catch (IOException e) { + LOG.error("Cannot execute getTableStoreFilePathMap for " + tableName, e); + exceptions.add(e); + } finally { + if (!exceptions.isEmpty()) { + // Just throw the first exception as an indication something bad happened + // Don't need to propagate all the exceptions, we already logged them all anyway + Throwables.propagateIfInstanceOf(exceptions.firstElement(), IOException.class); + throw Throwables.propagate(exceptions.firstElement()); } } - return map; + + return finalResultMap; } public static int getRegionReferenceFileCount(final FileSystem fs, final Path p) { @@ -1605,11 +1710,12 @@ public abstract class FSUtils { * @param hbaseRootDir The root directory to scan. * @return Map keyed by StoreFile name with a value of the full Path. * @throws IOException When scanning the directory fails. + * @throws InterruptedException */ public static Map<String, Path> getTableStoreFilePathMap( final FileSystem fs, final Path hbaseRootDir) - throws IOException { - return getTableStoreFilePathMap(fs, hbaseRootDir, null); + throws IOException, InterruptedException { + return getTableStoreFilePathMap(fs, hbaseRootDir, null, null, null); } /** @@ -1622,14 +1728,18 @@ public abstract class FSUtils { * * @param fs The file system to use. * @param hbaseRootDir The root directory to scan. + * @param sfFilter optional path filter to apply to store files + * @param executor optional executor service to parallelize this operation * @param errors ErrorReporter instance or null * @return Map keyed by StoreFile name with a value of the full Path. * @throws IOException When scanning the directory fails. + * @throws InterruptedException */ public static Map<String, Path> getTableStoreFilePathMap( - final FileSystem fs, final Path hbaseRootDir, ErrorReporter errors) - throws IOException { - Map<String, Path> map = new HashMap<String, Path>(); + final FileSystem fs, final Path hbaseRootDir, PathFilter sfFilter, + ExecutorService executor, ErrorReporter errors) + throws IOException, InterruptedException { + ConcurrentHashMap<String, Path> map = new ConcurrentHashMap<String, Path>(1024, 0.75f, 32); // if this method looks similar to 'getTableFragmentation' that is because // it was borrowed from it. @@ -1637,12 +1747,45 @@ public abstract class FSUtils { // only include the directory paths to tables for (Path tableDir : FSUtils.getTableDirs(fs, hbaseRootDir)) { getTableStoreFilePathMap(map, fs, hbaseRootDir, - FSUtils.getTableName(tableDir), errors); + FSUtils.getTableName(tableDir), sfFilter, executor, errors); } return map; } /** + * Filters FileStatuses in an array and returns a list + * + * @param input An array of FileStatuses + * @param filter A required filter to filter the array + * @return A list of FileStatuses + */ + public static List<FileStatus> filterFileStatuses(FileStatus[] input, + FileStatusFilter filter) { + if (input == null) return null; + return filterFileStatuses(Iterators.forArray(input), filter); + } + + /** + * Filters FileStatuses in an iterator and returns a list + * + * @param input An iterator of FileStatuses + * @param filter A required filter to filter the array + * @return A list of FileStatuses + */ + public static List<FileStatus> filterFileStatuses(Iterator<FileStatus> input, + FileStatusFilter filter) { + if (input == null) return null; + ArrayList<FileStatus> results = new ArrayList<FileStatus>(); + while (input.hasNext()) { + FileStatus f = input.next(); + if (filter.accept(f)) { + results.add(f); + } + } + return results; + } + + /** * Calls fs.listStatus() and treats FileNotFoundException as non-fatal * This accommodates differences between hadoop versions, where hadoop 1 * does not throw a FileNotFoundException, and return an empty FileStatus[] @@ -1650,6 +1793,48 @@ public abstract class FSUtils { * * @param fs file system * @param dir directory + * @param filter file status filter + * @return null if dir is empty or doesn't exist, otherwise FileStatus list + */ + public static List<FileStatus> listStatusWithStatusFilter(final FileSystem fs, + final Path dir, final FileStatusFilter filter) throws IOException { + FileStatus [] status = null; + try { + status = fs.listStatus(dir); + } catch (FileNotFoundException fnfe) { + // if directory doesn't exist, return null + if (LOG.isTraceEnabled()) { + LOG.trace(dir + " doesn't exist"); + } + } + + if (status == null || status.length < 1) { + return null; + } + + if (filter == null) { + return Arrays.asList(status); + } else { + List<FileStatus> status2 = filterFileStatuses(status, filter); + if (status2 == null || status2.isEmpty()) { + return null; + } else { + return status2; + } + } + } + + /** + * Calls fs.listStatus() and treats FileNotFoundException as non-fatal + * This accommodates differences between hadoop versions, where hadoop 1 + * does not throw a FileNotFoundException, and return an empty FileStatus[] + * while Hadoop 2 will throw FileNotFoundException. + * + * Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem, + * Path, FileStatusFilter)} instead. + * + * @param fs file system + * @param dir directory * @param filter path filter * @return null if dir is empty or doesn't exist, otherwise FileStatus array */ http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java index 5e14db6..a43af76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSVisitor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.util.List; import java.util.NavigableSet; import org.apache.commons.logging.Log; @@ -94,7 +95,7 @@ public final class FSVisitor { */ public static void visitTableStoreFiles(final FileSystem fs, final Path tableDir, final StoreFileVisitor visitor) throws IOException { - FileStatus[] regions = FSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs)); + List<FileStatus> regions = FSUtils.listStatusWithStatusFilter(fs, tableDir, new FSUtils.RegionDirFilter(fs)); if (regions == null) { if (LOG.isTraceEnabled()) { LOG.trace("No regions under directory:" + tableDir); @@ -117,7 +118,7 @@ public final class FSVisitor { */ public static void visitRegionStoreFiles(final FileSystem fs, final Path regionDir, final StoreFileVisitor visitor) throws IOException { - FileStatus[] families = FSUtils.listStatus(fs, regionDir, new FSUtils.FamilyDirFilter(fs)); + List<FileStatus> families = FSUtils.listStatusWithStatusFilter(fs, regionDir, new FSUtils.FamilyDirFilter(fs)); if (families == null) { if (LOG.isTraceEnabled()) { LOG.trace("No families under region directory:" + regionDir); http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java new file mode 100644 index 0000000..c81db75 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FileStatusFilter.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileStatus; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface FileStatusFilter { + /** + * Tests whether or not the specified filestatus should be + * included in a filestatus list. + * + * @param f The filestatus to be tested + * @return <code>true</code> if and only if the filestatus + * should be included + */ + boolean accept(FileStatus f); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index feb75b7..5a1f84f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -40,6 +40,7 @@ import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; @@ -99,6 +100,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.Block import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; @@ -878,26 +880,16 @@ public class HBaseFsck extends Configured { * Lingering reference file prevents a region from opening. It has to * be fixed before a cluster can start properly. */ - private void offlineReferenceFileRepair() throws IOException { + private void offlineReferenceFileRepair() throws IOException, InterruptedException { Configuration conf = getConf(); Path hbaseRoot = FSUtils.getRootDir(conf); FileSystem fs = hbaseRoot.getFileSystem(conf); LOG.info("Computing mapping of all store files"); - Map<String, Path> allFiles = FSUtils.getTableStoreFilePathMap(fs, hbaseRoot, errors); + Map<String, Path> allFiles = FSUtils.getTableStoreFilePathMap(fs, hbaseRoot, + new FSUtils.ReferenceFileFilter(fs), executor, errors); errors.print(""); LOG.info("Validating mapping using HDFS state"); for (Path path: allFiles.values()) { - boolean isReference = false; - try { - isReference = StoreFileInfo.isReference(path); - } catch (Throwable t) { - // Ignore. Some files may not be store files at all. - // For example, files under .oldlogs folder in hbase:meta - // Warning message is already logged by - // StoreFile#isReference. - } - if (!isReference) continue; - Path referredToFile = StoreFileInfo.getReferredToFile(path); if (fs.exists(referredToFile)) continue; // good, expected @@ -1573,23 +1565,22 @@ public class HBaseFsck extends Configured { } } - // level 1: <HBASE_DIR>/* - List<WorkItemHdfsDir> dirs = new ArrayList<WorkItemHdfsDir>(tableDirs.size()); - List<Future<Void>> dirsFutures; - + // Avoid multithreading at table-level because already multithreaded internally at + // region-level. Additionally multithreading at table-level can lead to deadlock + // if there are many tables in the cluster. Since there are a limited # of threads + // in the executor's thread pool and if we multithread at the table-level by putting + // WorkItemHdfsDir callables into the executor, then we will have some threads in the + // executor tied up solely in waiting for the tables' region-level calls to complete. + // If there are enough tables then there will be no actual threads in the pool left + // for the region-level callables to be serviced. for (FileStatus tableDir : tableDirs) { LOG.debug("Loading region dirs from " +tableDir.getPath()); - dirs.add(new WorkItemHdfsDir(this, fs, errors, tableDir)); - } - - // Invoke and wait for Callables to complete - dirsFutures = executor.invokeAll(dirs); - - for(Future<Void> f: dirsFutures) { + WorkItemHdfsDir item = new WorkItemHdfsDir(fs, errors, tableDir); try { - f.get(); - } catch(ExecutionException e) { - LOG.warn("Could not load region dir " , e.getCause()); + item.call(); + } catch (ExecutionException e) { + LOG.warn("Could not completely load table dir " + + tableDir.getPath(), e.getCause()); } } errors.print(""); @@ -3737,70 +3728,117 @@ public class HBaseFsck extends Configured { * Contact hdfs and get all information about specified table directory into * regioninfo list. */ - static class WorkItemHdfsDir implements Callable<Void> { - private HBaseFsck hbck; + class WorkItemHdfsDir implements Callable<Void> { private FileStatus tableDir; private ErrorReporter errors; private FileSystem fs; - WorkItemHdfsDir(HBaseFsck hbck, FileSystem fs, ErrorReporter errors, + WorkItemHdfsDir(FileSystem fs, ErrorReporter errors, FileStatus status) { - this.hbck = hbck; this.fs = fs; this.tableDir = status; this.errors = errors; } @Override - public synchronized Void call() throws IOException { + public synchronized Void call() throws InterruptedException, ExecutionException { + final Vector<Exception> exceptions = new Vector<Exception>(); + try { - // level 2: <HBASE_DIR>/<table>/* - FileStatus[] regionDirs = fs.listStatus(tableDir.getPath()); - for (FileStatus regionDir : regionDirs) { + final FileStatus[] regionDirs = fs.listStatus(tableDir.getPath()); + final List<Future<?>> futures = new ArrayList<Future<?>>(regionDirs.length); + + for (final FileStatus regionDir : regionDirs) { errors.progress(); - String encodedName = regionDir.getPath().getName(); + final String encodedName = regionDir.getPath().getName(); // ignore directories that aren't hexadecimal if (!encodedName.toLowerCase(Locale.ROOT).matches("[0-9a-f]+")) { continue; } - LOG.debug("Loading region info from hdfs:"+ regionDir.getPath()); - HbckInfo hbi = hbck.getOrCreateInfo(encodedName); - HdfsEntry he = new HdfsEntry(); - synchronized (hbi) { - if (hbi.getHdfsRegionDir() != null) { - errors.print("Directory " + encodedName + " duplicate??" + - hbi.getHdfsRegionDir()); - } + if (!exceptions.isEmpty()) { + break; + } + + futures.add(executor.submit(new Runnable() { + @Override + public void run() { + try { + LOG.debug("Loading region info from hdfs:"+ regionDir.getPath()); + + Path regioninfoFile = new Path(regionDir.getPath(), HRegionFileSystem.REGION_INFO_FILE); + boolean regioninfoFileExists = fs.exists(regioninfoFile); + + if (!regioninfoFileExists) { + // As tables become larger it is more and more likely that by the time you + // reach a given region that it will be gone due to region splits/merges. + if (!fs.exists(regionDir.getPath())) { + LOG.warn("By the time we tried to process this region dir it was already gone: " + + regionDir.getPath()); + return; + } + } - he.hdfsRegionDir = regionDir.getPath(); - he.hdfsRegionDirModTime = regionDir.getModificationTime(); - Path regioninfoFile = new Path(he.hdfsRegionDir, HRegionFileSystem.REGION_INFO_FILE); - he.hdfsRegioninfoFilePresent = fs.exists(regioninfoFile); - // we add to orphan list when we attempt to read .regioninfo - - // Set a flag if this region contains only edits - // This is special case if a region is left after split - he.hdfsOnlyEdits = true; - FileStatus[] subDirs = fs.listStatus(regionDir.getPath()); - Path ePath = HLogUtil.getRegionDirRecoveredEditsDir(regionDir.getPath()); - for (FileStatus subDir : subDirs) { - errors.progress(); - String sdName = subDir.getPath().getName(); - if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) { - he.hdfsOnlyEdits = false; - break; + HbckInfo hbi = HBaseFsck.this.getOrCreateInfo(encodedName); + HdfsEntry he = new HdfsEntry(); + synchronized (hbi) { + if (hbi.getHdfsRegionDir() != null) { + errors.print("Directory " + encodedName + " duplicate??" + + hbi.getHdfsRegionDir()); + } + + he.hdfsRegionDir = regionDir.getPath(); + he.hdfsRegionDirModTime = regionDir.getModificationTime(); + he.hdfsRegioninfoFilePresent = regioninfoFileExists; + // we add to orphan list when we attempt to read .regioninfo + + // Set a flag if this region contains only edits + // This is special case if a region is left after split + he.hdfsOnlyEdits = true; + FileStatus[] subDirs = fs.listStatus(regionDir.getPath()); + Path ePath = HLogUtil.getRegionDirRecoveredEditsDir(regionDir.getPath()); + for (FileStatus subDir : subDirs) { + errors.progress(); + String sdName = subDir.getPath().getName(); + if (!sdName.startsWith(".") && !sdName.equals(ePath.getName())) { + he.hdfsOnlyEdits = false; + break; + } + } + hbi.hdfsEntry = he; + } + } catch (Exception e) { + LOG.error("Could not load region dir", e); + exceptions.add(e); } } - hbi.hdfsEntry = he; + })); + } + + // Ensure all pending tasks are complete (or that we run into an exception) + for (Future<?> f : futures) { + if (!exceptions.isEmpty()) { + break; } + try { + f.get(); + } catch (ExecutionException e) { + LOG.error("Unexpected exec exception! Should've been caught already. (Bug?)", e); + // Shouldn't happen, we already logged/caught any exceptions in the Runnable + }; } } catch (IOException e) { - // unable to connect to the region server. - errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: " - + tableDir.getPath().getName() - + " Unable to fetch region information. " + e); - throw e; + LOG.error("Cannot execute WorkItemHdfsDir for " + tableDir, e); + exceptions.add(e); + } finally { + if (!exceptions.isEmpty()) { + errors.reportError(ERROR_CODE.RS_CONNECT_FAILURE, "Table Directory: " + + tableDir.getPath().getName() + + " Unable to fetch all HDFS region information. "); + // Just throw the first exception as an indication something bad happened + // Don't need to propagate all the exceptions, we already logged them all anyway + throw new ExecutionException("First exception in WorkItemHdfsDir", exceptions.firstElement()); + } } return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/6e5a773d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java index 23dc570..db700c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/HFileCorruptionChecker.java @@ -152,9 +152,9 @@ public class HFileCorruptionChecker { * @throws IOException */ protected void checkColFamDir(Path cfDir) throws IOException { - FileStatus[] hfs = null; + FileStatus[] statuses = null; try { - hfs = fs.listStatus(cfDir, new HFileFilter(fs)); // use same filter as scanner. + statuses = fs.listStatus(cfDir); // use same filter as scanner. } catch (FileNotFoundException fnfe) { // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. LOG.warn("Colfam Directory " + cfDir + @@ -163,8 +163,9 @@ public class HFileCorruptionChecker { return; } + List<FileStatus> hfs = FSUtils.filterFileStatuses(statuses, new HFileFilter(fs)); // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. - if (hfs.length == 0 && !fs.exists(cfDir)) { + if (hfs.size() == 0 && !fs.exists(cfDir)) { LOG.warn("Colfam Directory " + cfDir + " does not exist. Likely due to concurrent split/compaction. Skipping."); missing.add(cfDir); @@ -184,9 +185,9 @@ public class HFileCorruptionChecker { * @throws IOException */ protected void checkRegionDir(Path regionDir) throws IOException { - FileStatus[] cfs = null; + FileStatus[] statuses = null; try { - cfs = fs.listStatus(regionDir, new FamilyDirFilter(fs)); + statuses = fs.listStatus(regionDir); } catch (FileNotFoundException fnfe) { // Hadoop 0.23+ listStatus semantics throws an exception if the path does not exist. LOG.warn("Region Directory " + regionDir + @@ -195,8 +196,9 @@ public class HFileCorruptionChecker { return; } + List<FileStatus> cfs = FSUtils.filterFileStatuses(statuses, new FamilyDirFilter(fs)); // Hadoop 1.0 listStatus does not throw an exception if the path does not exist. - if (cfs.length == 0 && !fs.exists(regionDir)) { + if (cfs.size() == 0 && !fs.exists(regionDir)) { LOG.warn("Region Directory " + regionDir + " does not exist. Likely due to concurrent split/compaction. Skipping."); missing.add(regionDir); @@ -217,12 +219,13 @@ public class HFileCorruptionChecker { * @throws IOException */ void checkTableDir(Path tableDir) throws IOException { - FileStatus[] rds = fs.listStatus(tableDir, new RegionDirFilter(fs)); - if (rds.length == 0 && !fs.exists(tableDir)) { - // interestingly listStatus does not throw an exception if the path does not exist. - LOG.warn("Table Directory " + tableDir + - " does not exist. Likely due to concurrent delete. Skipping."); - missing.add(tableDir); + List<FileStatus> rds = FSUtils.listStatusWithStatusFilter(fs, tableDir, new RegionDirFilter(fs)); + if (rds == null) { + if (!fs.exists(tableDir)) { + LOG.warn("Table Directory " + tableDir + + " does not exist. Likely due to concurrent delete. Skipping."); + missing.add(tableDir); + } return; }