http://git-wip-us.apache.org/repos/asf/hbase/blob/7546190a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/HFileArchiver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/HFileArchiver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/HFileArchiver.java
new file mode 100644
index 0000000..d682ccc
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/HFileArchiver.java
@@ -0,0 +1,690 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.io.MultipleIOException;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Utility class to handle the removal of HFiles (or the respective {@link 
StoreFile StoreFiles})
+ * for a HRegion from the {@link FileSystem}. The hfiles will be archived or 
deleted, depending on
+ * the state of the system.
+ */
+public class HFileArchiver {
+  private static final Log LOG = LogFactory.getLog(HFileArchiver.class);
+  private static final String SEPARATOR = ".";
+
+  /** Number of retries in case of fs operation failure */
+  private static final int DEFAULT_RETRIES_NUMBER = 3;
+
+  private HFileArchiver() {
+    // hidden ctor since this is just a util
+  }
+
+  /**
+   * Cleans up all the files for a HRegion by archiving the HFiles to the
+   * archive directory
+   * @param conf the configuration to use
+   * @param fs the file system object
+   * @param info HRegionInfo for region to be deleted
+   * @throws IOException
+   */
+  public static void archiveRegion(Configuration conf, FileSystem fs, 
HRegionInfo info)
+      throws IOException {
+    Path rootDir = FSUtils.getRootDir(conf);
+    archiveRegion(fs, rootDir, FSUtils.getTableDir(rootDir, info.getTable()),
+      HRegion.getRegionDir(rootDir, info));
+  }
+
+  /**
+   * Remove an entire region from the table directory via archiving the 
region's hfiles.
+   * @param fs {@link FileSystem} from which to remove the region
+   * @param rootdir {@link Path} to the root directory where hbase files are 
stored (for building
+   *          the archive path)
+   * @param tableDir {@link Path} to where the table is being stored (for 
building the archive path)
+   * @param regionDir {@link Path} to where a region is being stored (for 
building the archive path)
+   * @return <tt>true</tt> if the region was sucessfully deleted. 
<tt>false</tt> if the filesystem
+   *         operations could not complete.
+   * @throws IOException if the request cannot be completed
+   */
+  public static boolean archiveRegion(FileSystem fs, Path rootdir, Path 
tableDir, Path regionDir)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ARCHIVING " + regionDir.toString());
+    }
+
+    // otherwise, we archive the files
+    // make sure we can archive
+    if (tableDir == null || regionDir == null) {
+      LOG.error("No archive directory could be found because tabledir (" + 
tableDir
+          + ") or regiondir (" + regionDir + "was null. Deleting files 
instead.");
+      deleteRegionWithoutArchiving(fs, regionDir);
+      // we should have archived, but failed to. Doesn't matter if we deleted
+      // the archived files correctly or not.
+      return false;
+    }
+
+    // make sure the regiondir lives under the tabledir
+    
Preconditions.checkArgument(regionDir.toString().startsWith(tableDir.toString()));
+    Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(rootdir,
+        FSUtils.getTableName(tableDir),
+        regionDir.getName());
+
+    FileStatusConverter getAsFile = new FileStatusConverter(fs);
+    // otherwise, we attempt to archive the store files
+
+    // build collection of just the store directories to archive
+    Collection<File> toArchive = new ArrayList<File>();
+    final PathFilter dirFilter = new FSUtils.DirFilter(fs);
+    PathFilter nonHidden = new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        return dirFilter.accept(file) && 
!file.getName().toString().startsWith(".");
+      }
+    };
+    FileStatus[] storeDirs = FSUtils.listStatus(fs, regionDir, nonHidden);
+    // if there no files, we can just delete the directory and return;
+    if (storeDirs == null) {
+      LOG.debug("Region directory (" + regionDir + ") was empty, just deleting 
and returning!");
+      return deleteRegionWithoutArchiving(fs, regionDir);
+    }
+
+    // convert the files in the region to a File
+    toArchive.addAll(Lists.transform(Arrays.asList(storeDirs), getAsFile));
+    LOG.debug("Archiving " + toArchive);
+    boolean success = false;
+    try {
+      success = resolveAndArchive(fs, regionArchiveDir, toArchive);
+    } catch (IOException e) {
+      LOG.error("Failed to archive " + toArchive, e);
+      success = false;
+    }
+
+    // if that was successful, then we delete the region
+    if (success) {
+      return deleteRegionWithoutArchiving(fs, regionDir);
+    }
+
+    throw new IOException("Received error when attempting to archive files (" 
+ toArchive
+        + "), cannot delete region directory. ");
+  }
+
+  /**
+   * Remove from the specified region the store files of the specified column 
family,
+   * either by archiving them or outright deletion
+   * @param fs the filesystem where the store files live
+   * @param conf {@link Configuration} to examine to determine the archive 
directory
+   * @param parent Parent region hosting the store files
+   * @param tableDir {@link Path} to where the table is being stored (for 
building the archive path)
+   * @param family the family hosting the store files
+   * @throws IOException if the files could not be correctly disposed.
+   */
+  public static void archiveFamily(FileSystem fs, Configuration conf,
+      HRegionInfo parent, Path tableDir, byte[] family) throws IOException {
+    Path familyDir = new Path(tableDir, new Path(parent.getEncodedName(), 
Bytes.toString(family)));
+    FileStatus[] storeFiles = FSUtils.listStatus(fs, familyDir);
+    if (storeFiles == null) {
+      LOG.debug("No store files to dispose for region=" + 
parent.getRegionNameAsString() +
+          ", family=" + Bytes.toString(family));
+      return;
+    }
+
+    FileStatusConverter getAsFile = new FileStatusConverter(fs);
+    Collection<File> toArchive = Lists.transform(Arrays.asList(storeFiles), 
getAsFile);
+    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, 
tableDir, family);
+
+    // do the actual archive
+    if (!resolveAndArchive(fs, storeArchiveDir, toArchive)) {
+      throw new IOException("Failed to archive/delete all the files for 
region:"
+          + Bytes.toString(parent.getRegionName()) + ", family:" + 
Bytes.toString(family)
+          + " into " + storeArchiveDir + ". Something is probably awry on the 
filesystem.");
+    }
+  }
+
+  /**
+   * Remove the store files, either by archiving them or outright deletion
+   * @param conf {@link Configuration} to examine to determine the archive 
directory
+   * @param fs the filesystem where the store files live
+   * @param regionInfo {@link HRegionInfo} of the region hosting the store 
files
+   * @param family the family hosting the store files
+   * @param compactedFiles files to be disposed of. No further reading of 
these files should be
+   *          attempted; otherwise likely to cause an {@link IOException}
+   * @throws IOException if the files could not be correctly disposed.
+   */
+  public static void archiveStoreFiles(Configuration conf, FileSystem fs, 
HRegionInfo regionInfo,
+      Path tableDir, byte[] family, Collection<StoreFile> compactedFiles) 
throws IOException {
+
+    // sometimes in testing, we don't have rss, so we need to check for that
+    if (fs == null) {
+      LOG.warn("Passed filesystem is null, so just deleting the files without 
archiving for region:"
+          + Bytes.toString(regionInfo.getRegionName()) + ", family:" + 
Bytes.toString(family));
+      deleteStoreFilesWithoutArchiving(compactedFiles);
+      return;
+    }
+
+    // short circuit if we don't have any files to delete
+    if (compactedFiles.size() == 0) {
+      LOG.debug("No store files to dispose, done!");
+      return;
+    }
+
+    // build the archive path
+    if (regionInfo == null || family == null) throw new IOException(
+        "Need to have a region and a family to archive from.");
+
+    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, 
regionInfo, tableDir, family);
+
+    // make sure we don't archive if we can't and that the archive dir exists
+    if (!fs.mkdirs(storeArchiveDir)) {
+      throw new IOException("Could not make archive directory (" + 
storeArchiveDir + ") for store:"
+          + Bytes.toString(family) + ", deleting compacted files instead.");
+    }
+
+    // otherwise we attempt to archive the store files
+    if (LOG.isDebugEnabled()) LOG.debug("Archiving compacted store files.");
+
+    // Wrap the storefile into a File
+    StoreToFile getStorePath = new StoreToFile(fs);
+    Collection<File> storeFiles = Collections2.transform(compactedFiles, 
getStorePath);
+
+    // do the actual archive
+    if (!resolveAndArchive(fs, storeArchiveDir, storeFiles)) {
+      throw new IOException("Failed to archive/delete all the files for 
region:"
+          + Bytes.toString(regionInfo.getRegionName()) + ", family:" + 
Bytes.toString(family)
+          + " into " + storeArchiveDir + ". Something is probably awry on the 
filesystem.");
+    }
+  }
+
+  /**
+   * Archive the store file
+   * @param fs the filesystem where the store files live
+   * @param regionInfo region hosting the store files
+   * @param conf {@link Configuration} to examine to determine the archive 
directory
+   * @param tableDir {@link Path} to where the table is being stored (for 
building the archive path)
+   * @param family the family hosting the store files
+   * @param storeFile file to be archived
+   * @throws IOException if the files could not be correctly disposed.
+   */
+  public static void archiveStoreFile(Configuration conf, FileSystem fs, 
HRegionInfo regionInfo,
+      Path tableDir, byte[] family, Path storeFile) throws IOException {
+    Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, 
regionInfo, tableDir, family);
+    // make sure we don't archive if we can't and that the archive dir exists
+    if (!fs.mkdirs(storeArchiveDir)) {
+      throw new IOException("Could not make archive directory (" + 
storeArchiveDir + ") for store:"
+          + Bytes.toString(family) + ", deleting compacted files instead.");
+    }
+
+    // do the actual archive
+    long start = EnvironmentEdgeManager.currentTime();
+    File file = new FileablePath(fs, storeFile);
+    if (!resolveAndArchiveFile(storeArchiveDir, file, Long.toString(start))) {
+      throw new IOException("Failed to archive/delete the file for region:"
+          + regionInfo.getRegionNameAsString() + ", family:" + 
Bytes.toString(family)
+          + " into " + storeArchiveDir + ". Something is probably awry on the 
filesystem.");
+    }
+  }
+
+  /**
+   * Archive the given files and resolve any conflicts with existing files via 
appending the time
+   * archiving started (so all conflicts in the same group have the same 
timestamp appended).
+   * <p>
+   * If any of the passed files to archive are directories, archives all the 
files under that
+   * directory. Archive directory structure for children is the base archive 
directory name + the
+   * parent directory and is built recursively is passed files are directories 
themselves.
+   * @param fs {@link FileSystem} on which to archive the files
+   * @param baseArchiveDir base archive directory to archive the given files
+   * @param toArchive files to be archived
+   * @return <tt>true</tt> on success, <tt>false</tt> otherwise
+   * @throws IOException on unexpected failure
+   */
+  private static boolean resolveAndArchive(FileSystem fs, Path baseArchiveDir,
+      Collection<File> toArchive) throws IOException {
+    if (LOG.isTraceEnabled()) LOG.trace("Starting to archive " + toArchive);
+    long start = EnvironmentEdgeManager.currentTime();
+    List<File> failures = resolveAndArchive(fs, baseArchiveDir, toArchive, 
start);
+
+    // notify that some files were not archived.
+    // We can't delete the files otherwise snapshots or other backup system
+    // that relies on the archiver end up with data loss.
+    if (failures.size() > 0) {
+      LOG.warn("Failed to complete archive of: " + failures +
+        ". Those files are still in the original location, and they may slow 
down reads.");
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Resolve any conflict with an existing archive file via timestamp-append
+   * renaming of the existing file and then archive the passed in files.
+   * @param fs {@link FileSystem} on which to archive the files
+   * @param baseArchiveDir base archive directory to store the files. If any of
+   *          the files to archive are directories, will append the name of the
+   *          directory to the base archive directory name, creating a parallel
+   *          structure.
+   * @param toArchive files/directories that need to be archvied
+   * @param start time the archiving started - used for resolving archive
+   *          conflicts.
+   * @return the list of failed to archive files.
+   * @throws IOException if an unexpected file operation exception occured
+   */
+  private static List<File> resolveAndArchive(FileSystem fs, Path 
baseArchiveDir,
+      Collection<File> toArchive, long start) throws IOException {
+    // short circuit if no files to move
+    if (toArchive.size() == 0) return Collections.emptyList();
+
+    if (LOG.isTraceEnabled()) LOG.trace("moving files to the archive 
directory: " + baseArchiveDir);
+
+    // make sure the archive directory exists
+    if (!fs.exists(baseArchiveDir)) {
+      if (!fs.mkdirs(baseArchiveDir)) {
+        throw new IOException("Failed to create the archive directory:" + 
baseArchiveDir
+            + ", quitting archive attempt.");
+      }
+      if (LOG.isTraceEnabled()) LOG.trace("Created archive directory:" + 
baseArchiveDir);
+    }
+
+    List<File> failures = new ArrayList<File>();
+    String startTime = Long.toString(start);
+    for (File file : toArchive) {
+      // if its a file archive it
+      try {
+        if (LOG.isTraceEnabled()) LOG.trace("Archiving: " + file);
+        if (file.isFile()) {
+          // attempt to archive the file
+          if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
+            LOG.warn("Couldn't archive " + file + " into backup directory: " + 
baseArchiveDir);
+            failures.add(file);
+          }
+        } else {
+          // otherwise its a directory and we need to archive all files
+          if (LOG.isTraceEnabled()) LOG.trace(file + " is a directory, 
archiving children files");
+          // so we add the directory name to the one base archive
+          Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
+          // and then get all the files from that directory and attempt to
+          // archive those too
+          Collection<File> children = file.getChildren();
+          failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, 
start));
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to archive " + file, e);
+        failures.add(file);
+      }
+    }
+    return failures;
+  }
+
+  /**
+   * Attempt to archive the passed in file to the archive directory.
+   * <p>
+   * If the same file already exists in the archive, it is moved to a 
timestamped directory under
+   * the archive directory and the new file is put in its place.
+   * @param archiveDir {@link Path} to the directory that stores the archives 
of the hfiles
+   * @param currentFile {@link Path} to the original HFile that will be 
archived
+   * @param archiveStartTime time the archiving started, to resolve naming 
conflicts
+   * @return <tt>true</tt> if the file is successfully archived. 
<tt>false</tt> if there was a
+   *         problem, but the operation still completed.
+   * @throws IOException on failure to complete {@link FileSystem} operations.
+   */
+  private static boolean resolveAndArchiveFile(Path archiveDir, File 
currentFile,
+      String archiveStartTime) throws IOException {
+    // build path as it should be in the archive
+    String filename = currentFile.getName();
+    Path archiveFile = new Path(archiveDir, filename);
+    FileSystem fs = currentFile.getFileSystem();
+
+    // if the file already exists in the archive, move that one to a 
timestamped backup. This is a
+    // really, really unlikely situtation, where we get the same name for the 
existing file, but
+    // is included just for that 1 in trillion chance.
+    if (fs.exists(archiveFile)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("File:" + archiveFile + " already exists in archive, moving 
to "
+            + "timestamped backup and overwriting current.");
+      }
+
+      // move the archive file to the stamped backup
+      Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + 
archiveStartTime);
+      if (!fs.rename(archiveFile, backedupArchiveFile)) {
+        LOG.error("Could not rename archive file to backup: " + 
backedupArchiveFile
+            + ", deleting existing file in favor of newer.");
+        // try to delete the exisiting file, if we can't rename it
+        if (!fs.delete(archiveFile, false)) {
+          throw new IOException("Couldn't delete existing archive file (" + 
archiveFile
+              + ") or rename it to the backup file (" + backedupArchiveFile
+              + ") to make room for similarly named file.");
+        }
+      }
+      LOG.debug("Backed up archive file from " + archiveFile);
+    }
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("No existing file in archive for: " + archiveFile +
+        ", free to archive original file.");
+    }
+
+    // at this point, we should have a free spot for the archive file
+    boolean success = false;
+    for (int i = 0; !success && i < DEFAULT_RETRIES_NUMBER; ++i) {
+      if (i > 0) {
+        // Ensure that the archive directory exists.
+        // The previous "move to archive" operation has failed probably because
+        // the cleaner has removed our archive directory (HBASE-7643).
+        // (we're in a retry loop, so don't worry too much about the exception)
+        try {
+          if (!fs.exists(archiveDir)) {
+            if (fs.mkdirs(archiveDir)) {
+              LOG.debug("Created archive directory:" + archiveDir);
+            }
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to create directory: " + archiveDir, e);
+        }
+      }
+
+      try {
+        success = currentFile.moveAndClose(archiveFile);
+      } catch (IOException e) {
+        LOG.warn("Failed to archive " + currentFile + " on try #" + i, e);
+        success = false;
+      }
+    }
+
+    if (!success) {
+      LOG.error("Failed to archive " + currentFile);
+      return false;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Finished archiving from " + currentFile + ", to " + 
archiveFile);
+    }
+    return true;
+  }
+
+  /**
+   * Without regard for backup, delete a region. Should be used with caution.
+   * @param regionDir {@link Path} to the region to be deleted.
+   * @param fs FileSystem from which to delete the region
+   * @return <tt>true</tt> on successful deletion, <tt>false</tt> otherwise
+   * @throws IOException on filesystem operation failure
+   */
+  private static boolean deleteRegionWithoutArchiving(FileSystem fs, Path 
regionDir)
+      throws IOException {
+    if (fs.delete(regionDir, true)) {
+      LOG.debug("Deleted all region files in: " + regionDir);
+      return true;
+    }
+    LOG.debug("Failed to delete region directory:" + regionDir);
+    return false;
+  }
+
+  /**
+   * Just do a simple delete of the given store files
+   * <p>
+   * A best effort is made to delete each of the files, rather than bailing on 
the first failure.
+   * <p>
+   * This method is preferable to {@link 
#deleteFilesWithoutArchiving(Collection)} since it consumes
+   * less resources, but is limited in terms of usefulness
+   * @param compactedFiles store files to delete from the file system.
+   * @throws IOException if a file cannot be deleted. All files will be 
attempted to deleted before
+   *           throwing the exception, rather than failing at the first file.
+   */
+  private static void deleteStoreFilesWithoutArchiving(Collection<StoreFile> 
compactedFiles)
+      throws IOException {
+    LOG.debug("Deleting store files without archiving.");
+    List<IOException> errors = new ArrayList<IOException>(0);
+    for (StoreFile hsf : compactedFiles) {
+      try {
+        hsf.deleteReader();
+      } catch (IOException e) {
+        LOG.error("Failed to delete store file:" + hsf.getPath());
+        errors.add(e);
+      }
+    }
+    if (errors.size() > 0) {
+      throw MultipleIOException.createIOException(errors);
+    }
+  }
+
+  /**
+   * Adapt a type to match the {@link File} interface, which is used 
internally for handling
+   * archival/removal of files
+   * @param <T> type to adapt to the {@link File} interface
+   */
+  private static abstract class FileConverter<T> implements Function<T, File> {
+    protected final FileSystem fs;
+
+    public FileConverter(FileSystem fs) {
+      this.fs = fs;
+    }
+  }
+
+  /**
+   * Convert a FileStatus to something we can manage in the archiving
+   */
+  private static class FileStatusConverter extends FileConverter<FileStatus> {
+    public FileStatusConverter(FileSystem fs) {
+      super(fs);
+    }
+
+    @Override
+    public File apply(FileStatus input) {
+      return new FileablePath(fs, input.getPath());
+    }
+  }
+
+  /**
+   * Convert the {@link StoreFile} into something we can manage in the archive
+   * methods
+   */
+  private static class StoreToFile extends FileConverter<StoreFile> {
+    public StoreToFile(FileSystem fs) {
+      super(fs);
+    }
+
+    @Override
+    public File apply(StoreFile input) {
+      return new FileableStoreFile(fs, input);
+    }
+  }
+
+  /**
+   * Wrapper to handle file operations uniformly
+   */
+  private static abstract class File {
+    protected final FileSystem fs;
+
+    public File(FileSystem fs) {
+      this.fs = fs;
+    }
+
+    /**
+     * Delete the file
+     * @throws IOException on failure
+     */
+    abstract void delete() throws IOException;
+
+    /**
+     * Check to see if this is a file or a directory
+     * @return <tt>true</tt> if it is a file, <tt>false</tt> otherwise
+     * @throws IOException on {@link FileSystem} connection error
+     */
+    abstract boolean isFile() throws IOException;
+
+    /**
+     * @return if this is a directory, returns all the children in the
+     *         directory, otherwise returns an empty list
+     * @throws IOException
+     */
+    abstract Collection<File> getChildren() throws IOException;
+
+    /**
+     * close any outside readers of the file
+     * @throws IOException
+     */
+    abstract void close() throws IOException;
+
+    /**
+     * @return the name of the file (not the full fs path, just the individual
+     *         file name)
+     */
+    abstract String getName();
+
+    /**
+     * @return the path to this file
+     */
+    abstract Path getPath();
+
+    /**
+     * Move the file to the given destination
+     * @param dest
+     * @return <tt>true</tt> on success
+     * @throws IOException
+     */
+    public boolean moveAndClose(Path dest) throws IOException {
+      this.close();
+      Path p = this.getPath();
+      return FSUtils.renameAndSetModifyTime(fs, p, dest);
+    }
+
+    /**
+     * @return the {@link FileSystem} on which this file resides
+     */
+    public FileSystem getFileSystem() {
+      return this.fs;
+    }
+
+    @Override
+    public String toString() {
+      return this.getClass() + ", file:" + getPath().toString();
+    }
+  }
+
+  /**
+   * A {@link File} that wraps a simple {@link Path} on a {@link FileSystem}.
+   */
+  private static class FileablePath extends File {
+    private final Path file;
+    private final FileStatusConverter getAsFile;
+
+    public FileablePath(FileSystem fs, Path file) {
+      super(fs);
+      this.file = file;
+      this.getAsFile = new FileStatusConverter(fs);
+    }
+
+    @Override
+    public void delete() throws IOException {
+      if (!fs.delete(file, true)) throw new IOException("Failed to delete:" + 
this.file);
+    }
+
+    @Override
+    public String getName() {
+      return file.getName();
+    }
+
+    @Override
+    public Collection<File> getChildren() throws IOException {
+      if (fs.isFile(file)) return Collections.emptyList();
+      return Collections2.transform(Arrays.asList(fs.listStatus(file)), 
getAsFile);
+    }
+
+    @Override
+    public boolean isFile() throws IOException {
+      return fs.isFile(file);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // NOOP - files are implicitly closed on removal
+    }
+
+    @Override
+    Path getPath() {
+      return file;
+    }
+  }
+
+  /**
+   * {@link File} adapter for a {@link StoreFile} living on a {@link 
FileSystem}
+   * .
+   */
+  private static class FileableStoreFile extends File {
+    StoreFile file;
+
+    public FileableStoreFile(FileSystem fs, StoreFile store) {
+      super(fs);
+      this.file = store;
+    }
+
+    @Override
+    public void delete() throws IOException {
+      file.deleteReader();
+    }
+
+    @Override
+    public String getName() {
+      return file.getPath().getName();
+    }
+
+    @Override
+    public boolean isFile() {
+      return true;
+    }
+
+    @Override
+    public Collection<File> getChildren() throws IOException {
+      // storefiles don't have children
+      return Collections.emptyList();
+    }
+
+    @Override
+    public void close() throws IOException {
+      file.closeReader(true);
+    }
+
+    @Override
+    Path getPath() {
+      return file.getPath();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7546190a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyArchiver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyArchiver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyArchiver.java
new file mode 100644
index 0000000..1386fe4
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyArchiver.java
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+
+public final class LegacyArchiver {
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7546190a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyLayout.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyLayout.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyLayout.java
index 8f54bae..8c26cdf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyLayout.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyLayout.java
@@ -23,10 +23,20 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.mob.MobConstants;
 
 public final class LegacyLayout {
   /** Name of the region info file that resides just under the region 
directory. */
-  public final static String REGION_INFO_FILE = ".regioninfo";
+  private final static String REGION_INFO_FILE = ".regioninfo";
+
+  /** Temporary subdirectory of the region directory used for merges. */
+  public static final String REGION_MERGES_DIR = ".merges";
+
+  /** Temporary subdirectory of the region directory used for splits. */
+  public static final String REGION_SPLITS_DIR = ".splits";
+
+  /** Temporary subdirectory of the region directory used for compaction 
output. */
+  private static final String REGION_TEMP_DIR = ".tmp";
 
   private LegacyLayout() {}
 
@@ -63,8 +73,44 @@ public final class LegacyLayout {
     return new Path(nsDir, table.getQualifierAsString());
   }
 
-  public static Path getRegionDir(Path baseDataDir, TableName table, 
HRegionInfo hri) {
-    return new Path(getTableDir(baseDataDir, table), hri.getEncodedName());
+  public static Path getRegionDir(Path tableDir, HRegionInfo hri) {
+    return new Path(tableDir, hri.getEncodedName());
+  }
+
+  public static Path getFamilyDir(Path regionDir, String familyName) {
+    return new Path(regionDir, familyName);
+  }
+
+  public static Path getStoreFile(Path familyDir, String fileName) {
+    return new Path(familyDir, fileName);
+  }
+
+  public static Path getRegionInfoFile(Path regionDir) {
+    return new Path(regionDir, REGION_INFO_FILE);
+  }
+
+  public static Path getRegionTempDir(Path regionDir) {
+    return new Path(regionDir, REGION_TEMP_DIR);
+  }
+
+  public static Path getRegionMergesDir(Path regionDir) {
+    return new Path(regionDir, REGION_MERGES_DIR);
+  }
+
+  public static Path getRegionMergesDir(Path mergeDir, HRegionInfo hri) {
+    return new Path(mergeDir, hri.getEncodedName());
+  }
+
+  public static Path getRegionSplitsDir(Path regionDir) {
+    return new Path(regionDir, REGION_SPLITS_DIR);
+  }
+
+  public static Path getRegionSplitsDir(Path splitDir, HRegionInfo hri) {
+    return new Path(splitDir, hri.getEncodedName());
+  }
+
+  public static Path getMobDir(Path rootDir) {
+    return new Path(rootDir, MobConstants.MOB_DIR_NAME);
   }
 
   public static Path getBulkDir(Path rootDir) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7546190a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyMasterFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyMasterFileSystem.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyMasterFileSystem.java
index 992dce1..b2675a0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyMasterFileSystem.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyMasterFileSystem.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -186,6 +187,32 @@ public class LegacyMasterFileSystem extends 
MasterFileSystem {
   }
 
   // ==========================================================================
+  //  PUBLIC Methods - Table Regions related
+  // ==========================================================================
+  @Override
+  public Collection<HRegionInfo> getRegions(FsContext ctx, TableName tableName)
+      throws IOException {
+    FileStatus[] stats = FSUtils.listStatus(getFileSystem(),
+        getTableDir(ctx, tableName), new 
FSUtils.RegionDirFilter(getFileSystem()));
+    if (stats == null) return Collections.emptyList();
+
+    ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>(stats.length);
+    for (int i = 0; i < stats.length; ++i) {
+      regions.add(loadRegionInfo(stats[i].getPath()));
+    }
+    return regions;
+  }
+
+  protected HRegionInfo loadRegionInfo(Path regionDir) throws IOException {
+    FSDataInputStream in = 
getFileSystem().open(LegacyLayout.getRegionInfoFile(regionDir));
+    try {
+      return HRegionInfo.parseFrom(in);
+    } finally {
+      in.close();
+    }
+  }
+
+  // ==========================================================================
   //  PROTECTED Methods - Bootstrap
   // ==========================================================================
   @Override
@@ -365,7 +392,7 @@ public class LegacyMasterFileSystem extends 
MasterFileSystem {
   }
 
   protected Path getRegionDir(FsContext ctx, TableName table, HRegionInfo hri) 
{
-    return LegacyLayout.getRegionDir(getBaseDirFromContext(ctx), table, hri);
+    return LegacyLayout.getRegionDir(getTableDir(ctx, table), hri);
   }
 
   public Path getTempDir() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7546190a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionFileSystem.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionFileSystem.java
new file mode 100644
index 0000000..d07a1f1
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/legacy/LegacyRegionFileSystem.java
@@ -0,0 +1,758 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.fs.legacy;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableDescriptor;
+import org.apache.hadoop.hbase.fs.FSUtilsWithRetries;
+import org.apache.hadoop.hbase.fs.FsContext;
+import org.apache.hadoop.hbase.fs.RegionFileSystem;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.MetaUtils;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobUtils;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+
+@InterfaceAudience.Private
+public class LegacyRegionFileSystem extends RegionFileSystem {
+  private static final Log LOG = 
LogFactory.getLog(LegacyRegionFileSystem.class);
+
+  private final Path tableDir;
+  private final Path regionDir;
+  private final Path mobDir;
+
+  // regionInfo for interacting with FS (getting encodedName, etc)
+  private final HRegionInfo regionInfoForFs;
+
+  private final FSUtilsWithRetries fsWithRetries;
+
+  public LegacyRegionFileSystem(Configuration conf, FileSystem fs, Path 
rootDir, HRegionInfo hri) {
+    super(conf, fs, rootDir, hri);
+
+    Path dataDir = LegacyLayout.getDataDir(rootDir);
+    this.tableDir = LegacyLayout.getTableDir(dataDir, hri.getTable());
+    this.regionDir = LegacyLayout.getRegionDir(tableDir, hri);
+    this.mobDir = LegacyLayout.getDataDir(LegacyLayout.getMobDir(rootDir));
+    this.fsWithRetries = new FSUtilsWithRetries(conf, fs);
+
+    this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(hri);
+  }
+
+  private Path getRegionDir() {
+    return regionDir;
+  }
+
+  // ==========================================================================
+  //  PUBLIC Methods - Families Related
+  // ==========================================================================
+  @Override
+  public Collection<String> getFamilies() throws IOException {
+    FileSystem fs = getFileSystem();
+    FileStatus[] fds = FSUtils.listStatus(fs, regionDir, new 
FSUtils.FamilyDirFilter(fs));
+    if (fds == null) return null;
+
+    ArrayList<String> families = new ArrayList<String>(fds.length);
+    for (FileStatus status: fds) {
+      families.add(status.getPath().getName());
+    }
+    return families;
+  }
+
+  @Override
+  public void deleteFamily(String familyName, boolean hasMob) throws 
IOException {
+    // archive family store files
+    byte[] bFamilyName = Bytes.toBytes(familyName);
+
+    FileSystem fs = getFileSystem();
+    HFileArchiver.archiveFamily(fs, getConfiguration(), getRegionInfo(), 
tableDir, bFamilyName);
+
+    // delete the family folder
+    HRegionInfo region = getRegionInfo();
+    Path familyDir = new Path(tableDir, new Path(region.getEncodedName(), 
familyName));
+    if (!fsWithRetries.deleteDir(familyDir)) {
+      throw new IOException("Could not delete family "
+          + familyName + " from FileSystem for region "
+          + region.getRegionNameAsString() + "(" + region.getEncodedName()
+          + ")");
+    }
+
+    // archive and delete mob files
+    if (hasMob) {
+      Path mobTableDir = LegacyLayout.getTableDir(mobDir, getTable());
+      HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(getTable());
+      Path mobRegionDir = LegacyLayout.getRegionDir(mobTableDir, 
mobRegionInfo);
+      Path mobFamilyDir = LegacyLayout.getFamilyDir(mobRegionDir, familyName);
+      // archive mob family store files
+      MobUtils.archiveMobStoreFiles(getConfiguration(), getFileSystem(),
+          mobRegionInfo, mobFamilyDir, bFamilyName);
+
+      if (!fsWithRetries.deleteDir(mobFamilyDir)) {
+        throw new IOException("Could not delete mob store files for family "
+            + familyName + " from FileSystem region "
+            + mobRegionInfo.getRegionNameAsString() + "(" + 
mobRegionInfo.getEncodedName() + ")");
+      }
+    }
+  }
+
+  // 
===========================================================================
+  //  Temp Helpers
+  // 
===========================================================================
+  /** @return {@link Path} to the region's temp directory, used for file 
creations */
+  Path getTempDir() {
+    return LegacyLayout.getRegionTempDir(regionDir);
+  }
+
+  /**
+   * Clean up any temp detritus that may have been left around from previous 
operation attempts.
+   */
+  void cleanupTempDir() throws IOException {
+    fsWithRetries.deleteDir(getTempDir());
+  }
+
+  // 
===========================================================================
+  //  Store/StoreFile Helpers
+  // 
===========================================================================
+  /**
+   * Returns the directory path of the specified family
+   * @param familyName Column Family Name
+   * @return {@link Path} to the directory of the specified family
+   */
+  public Path getStoreDir(final String familyName) {
+    return LegacyLayout.getFamilyDir(getRegionDir(), familyName);
+  }
+
+  /**
+   * Create the store directory for the specified family name
+   * @param familyName Column Family Name
+   * @return {@link Path} to the directory of the specified family
+   * @throws IOException if the directory creation fails.
+   */
+  Path createStoreDir(final String familyName) throws IOException {
+    Path storeDir = getStoreDir(familyName);
+    if (!fsWithRetries.createDir(storeDir))
+      throw new IOException("Failed creating "+storeDir);
+    return storeDir;
+  }
+
+  // ==========================================================================
+  //  PUBLIC Methods - Store Files related
+  // ==========================================================================
+
+  @Override
+  public Collection<StoreFileInfo> getStoreFiles(final String familyName, 
final boolean validate)
+      throws IOException {
+    Path familyDir = getStoreDir(familyName);
+    FileStatus[] files = FSUtils.listStatus(getFileSystem(), familyDir);
+    if (files == null) {
+      LOG.debug("No StoreFiles for: " + familyDir);
+      return null;
+    }
+
+    ArrayList<StoreFileInfo> storeFiles = new 
ArrayList<StoreFileInfo>(files.length);
+    for (FileStatus status: files) {
+      if (validate && !StoreFileInfo.isValid(status)) {
+        LOG.warn("Invalid StoreFile: " + status.getPath());
+        continue;
+      }
+      StoreFileInfo info = 
ServerRegionReplicaUtil.getStoreFileInfo(getConfiguration(),
+          getFileSystem(), getRegionInfo(), regionInfoForFs, familyName, 
status.getPath());
+      storeFiles.add(info);
+
+    }
+    return storeFiles;
+  }
+
+
+  /**
+   * Return Qualified Path of the specified family/file
+   *
+   * @param familyName Column Family Name
+   * @param fileName File Name
+   * @return The qualified Path for the specified family/file
+   */
+  Path getStoreFilePath(final String familyName, final String fileName) {
+    Path familyDir = getStoreDir(familyName);
+    return LegacyLayout.getStoreFile(familyDir, 
fileName).makeQualified(getFileSystem());
+  }
+
+  /**
+   * Return the store file information of the specified family/file.
+   *
+   * @param familyName Column Family Name
+   * @param fileName File Name
+   * @return The {@link StoreFileInfo} for the specified family/file
+   */
+  StoreFileInfo getStoreFileInfo(final String familyName, final String 
fileName)
+      throws IOException {
+    Path familyDir = getStoreDir(familyName);
+    return ServerRegionReplicaUtil.getStoreFileInfo(getConfiguration(),
+      getFileSystem(), getRegionInfo(), regionInfoForFs, familyName,
+      LegacyLayout.getStoreFile(familyDir, fileName));
+  }
+
+  /**
+   * Returns true if the specified family has reference files
+   * @param familyName Column Family Name
+   * @return true if family contains reference files
+   * @throws IOException
+   */
+  public boolean hasReferences(final String familyName) throws IOException {
+    FileStatus[] files = FSUtils.listStatus(getFileSystem(),
+        getStoreDir(familyName), new 
FSUtils.ReferenceFileFilter(getFileSystem()));
+    return files != null && files.length > 0;
+  }
+
+  /**
+   * Check whether region has Reference file
+   * @param htd table desciptor of the region
+   * @return true if region has reference file
+   * @throws IOException
+   */
+  public boolean hasReferences(final HTableDescriptor htd) throws IOException {
+    for (HColumnDescriptor family : htd.getFamilies()) {
+      if (hasReferences(family.getNameAsString())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+
+  /**
+   * Generate a unique file name, used by createTempName() and 
commitStoreFile()
+   * @param suffix extra information to append to the generated name
+   * @return Unique file name
+   */
+  private static String generateUniqueName(final String suffix) {
+    String name = UUID.randomUUID().toString().replaceAll("-", "");
+    if (suffix != null) name += suffix;
+    return name;
+  }
+
+  /**
+   * Generate a unique temporary Path. Used in conjuction with 
commitStoreFile()
+   * to get a safer file creation.
+   * <code>
+   * Path file = fs.createTempName();
+   * ...StoreFile.Writer(file)...
+   * fs.commitStoreFile("family", file);
+   * </code>
+   *
+   * @return Unique {@link Path} of the temporary file
+   */
+  public Path createTempName() {
+    return createTempName(null);
+  }
+
+  /**
+   * Generate a unique temporary Path. Used in conjuction with 
commitStoreFile()
+   * to get a safer file creation.
+   * <code>
+   * Path file = fs.createTempName();
+   * ...StoreFile.Writer(file)...
+   * fs.commitStoreFile("family", file);
+   * </code>
+   *
+   * @param suffix extra information to append to the generated name
+   * @return Unique {@link Path} of the temporary file
+   */
+  public Path createTempName(final String suffix) {
+    return new Path(getTempDir(), generateUniqueName(suffix));
+  }
+
+  /**
+   * Move the file from a build/temp location to the main family store 
directory.
+   * @param familyName Family that will gain the file
+   * @param buildPath {@link Path} to the file to commit.
+   * @return The new {@link Path} of the committed file
+   * @throws IOException
+   */
+  public Path commitStoreFile(final String familyName, final Path buildPath) 
throws IOException {
+    return commitStoreFile(familyName, buildPath, -1, false);
+  }
+
+  /**
+   * Move the file from a build/temp location to the main family store 
directory.
+   * @param familyName Family that will gain the file
+   * @param buildPath {@link Path} to the file to commit.
+   * @param seqNum Sequence Number to append to the file name (less then 0 if 
no sequence number)
+   * @param generateNewName False if you want to keep the buildPath name
+   * @return The new {@link Path} of the committed file
+   * @throws IOException
+   */
+  private Path commitStoreFile(final String familyName, final Path buildPath,
+      final long seqNum, final boolean generateNewName) throws IOException {
+    Path storeDir = getStoreDir(familyName);
+    if(!fsWithRetries.createDir(storeDir))
+      throw new IOException("Failed creating " + storeDir);
+
+    String name = buildPath.getName();
+    if (generateNewName) {
+      name = generateUniqueName((seqNum < 0) ? null : "_SeqId_" + seqNum + 
"_");
+    }
+    Path dstPath = new Path(storeDir, name);
+    if (!fsWithRetries.exists(buildPath)) {
+      throw new FileNotFoundException(buildPath.toString());
+    }
+    LOG.debug("Committing store file " + buildPath + " as " + dstPath);
+    // buildPath exists, therefore not doing an exists() check.
+    if (!fsWithRetries.rename(buildPath, dstPath)) {
+      throw new IOException("Failed rename of " + buildPath + " to " + 
dstPath);
+    }
+    return dstPath;
+  }
+
+
+  /**
+   * Moves multiple store files to the relative region's family store 
directory.
+   * @param storeFiles list of store files divided by family
+   * @throws IOException
+   */
+  void commitStoreFiles(final Map<byte[], List<StoreFile>> storeFiles) throws 
IOException {
+    for (Map.Entry<byte[], List<StoreFile>> es: storeFiles.entrySet()) {
+      String familyName = Bytes.toString(es.getKey());
+      for (StoreFile sf: es.getValue()) {
+        commitStoreFile(familyName, sf.getPath());
+      }
+    }
+  }
+
+  /**
+   * Archives the specified store file from the specified family.
+   * @param familyName Family that contains the store files
+   * @param filePath {@link Path} to the store file to remove
+   * @throws IOException if the archiving fails
+   */
+  public void removeStoreFile(final String familyName, final Path filePath)
+      throws IOException {
+    HFileArchiver.archiveStoreFile(getConfiguration(), getFileSystem(), 
this.regionInfoForFs,
+        this.tableDir, Bytes.toBytes(familyName), filePath);
+  }
+
+  /**
+   * Closes and archives the specified store files from the specified family.
+   * @param familyName Family that contains the store files
+   * @param storeFiles set of store files to remove
+   * @throws IOException if the archiving fails
+   */
+  public void removeStoreFiles(final String familyName, final 
Collection<StoreFile> storeFiles)
+      throws IOException {
+    HFileArchiver.archiveStoreFiles(getConfiguration(), getFileSystem(), 
this.regionInfoForFs,
+        this.tableDir, Bytes.toBytes(familyName), storeFiles);
+  }
+
+  /**
+   * Bulk load: Add a specified store file to the specified family.
+   * If the source file is on the same different file-system is moved from the
+   * source location to the destination location, otherwise is copied over.
+   *
+   * @param familyName Family that will gain the file
+   * @param srcPath {@link Path} to the file to import
+   * @param seqNum Bulk Load sequence number
+   * @return The destination {@link Path} of the bulk loaded file
+   * @throws IOException
+   */
+  Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
+      throws IOException {
+    // Copy the file if it's on another filesystem
+    FileSystem fs = getFileSystem();
+    FileSystem srcFs = srcPath.getFileSystem(getConfiguration());
+    FileSystem desFs = fs instanceof HFileSystem ? 
((HFileSystem)fs).getBackingFs() : fs;
+
+    // We can't compare FileSystem instances as equals() includes UGI instance
+    // as part of the comparison and won't work when doing SecureBulkLoad
+    // TODO deal with viewFS
+    if (!FSHDFSUtils.isSameHdfs(getConfiguration(), srcFs, desFs)) {
+      LOG.info("Bulk-load file " + srcPath + " is on different filesystem than 
" +
+          "the destination store. Copying file over to destination 
filesystem.");
+      Path tmpPath = createTempName();
+      FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, getConfiguration());
+      LOG.info("Copied " + srcPath + " to temporary path on destination 
filesystem: " + tmpPath);
+      srcPath = tmpPath;
+    }
+
+    return commitStoreFile(familyName, srcPath, seqNum, true);
+  }
+
+  // 
===========================================================================
+  //  Splits Helpers
+  // 
===========================================================================
+  /** @return {@link Path} to the temp directory used during split operations 
*/
+  Path getSplitsDir() {
+    return LegacyLayout.getRegionSplitsDir(getRegionDir());
+  }
+
+  Path getSplitsDir(final HRegionInfo hri) {
+    return LegacyLayout.getRegionSplitsDir(getSplitsDir(), hri);
+  }
+
+  /**
+   * Clean up any split detritus that may have been left around from previous 
split attempts.
+   */
+  void cleanupSplitsDir() throws IOException {
+    fsWithRetries.deleteDir(getSplitsDir());
+  }
+
+  /**
+   * Clean up any split detritus that may have been left around from previous
+   * split attempts.
+   * Call this method on initial region deploy.
+   * @throws IOException
+   */
+  void cleanupAnySplitDetritus() throws IOException {
+    Path splitdir = this.getSplitsDir();
+    if (!fsWithRetries.exists(splitdir)) return;
+    // Look at the splitdir.  It could have the encoded names of the daughter
+    // regions we tried to make.  See if the daughter regions actually got made
+    // out under the tabledir.  If here under splitdir still, then the split 
did
+    // not complete.  Try and do cleanup.  This code WILL NOT catch the case
+    // where we successfully created daughter a but regionserver crashed during
+    // the creation of region b.  In this case, there'll be an orphan daughter
+    // dir in the filesystem.  TOOD: Fix.
+    FileSystem fs = getFileSystem();
+    FileStatus[] daughters = FSUtils.listStatus(fs, splitdir, new 
FSUtils.DirFilter(fs));
+    if (daughters != null) {
+      for (int i = 0; i < daughters.length; ++i) {
+        Path daughterDir = new Path(this.tableDir, 
daughters[i].getPath().getName());
+        if (!fsWithRetries.deleteDir(daughterDir)) {
+          throw new IOException("Failed delete of " + daughterDir);
+        }
+      }
+    }
+    cleanupSplitsDir();
+    LOG.info("Cleaned up old failed split transaction detritus: " + splitdir);
+  }
+
+  /**
+   * Remove daughter region
+   * @param regionInfo daughter {@link HRegionInfo}
+   * @throws IOException
+   */
+  void cleanupDaughterRegion(final HRegionInfo regionInfo) throws IOException {
+    Path regionDir = LegacyLayout.getRegionDir(tableDir, regionInfo);
+    if (!fsWithRetries.deleteDir(regionDir)) {
+      throw new IOException("Failed delete of " + regionDir);
+    }
+  }
+
+  /**
+   * Commit a daughter region, moving it from the split temporary directory
+   * to the proper location in the filesystem.
+   *
+   * @param regionInfo                 daughter {@link 
org.apache.hadoop.hbase.HRegionInfo}
+   * @throws IOException
+   */
+  Path commitDaughterRegion(final HRegionInfo regionInfo)
+      throws IOException {
+    Path regionDir = LegacyLayout.getRegionDir(tableDir, regionInfo);
+    Path daughterTmpDir = this.getSplitsDir(regionInfo);
+
+    if (fsWithRetries.exists(daughterTmpDir)) {
+
+      // Write HRI to a file in case we need to recover hbase:meta
+      Path regionInfoFile = LegacyLayout.getRegionInfoFile(daughterTmpDir);
+      byte[] regionInfoContent = getRegionInfoFileContent(regionInfo);
+      writeRegionInfoFileContent(getConfiguration(), getFileSystem(), 
regionInfoFile, regionInfoContent);
+
+      // Move the daughter temp dir to the table dir
+      if (!fsWithRetries.rename(daughterTmpDir, regionDir)) {
+        throw new IOException("Unable to rename " + daughterTmpDir + " to " + 
regionDir);
+      }
+    }
+
+    return regionDir;
+  }
+
+  /**
+   * Create the region splits directory.
+   */
+  void createSplitsDir() throws IOException {
+    createTempDir(getSplitsDir());
+  }
+
+  void createTempDir(Path dir) throws IOException {
+    if (fsWithRetries.exists(dir)) {
+      LOG.info("The " + dir + " directory exists.  Hence deleting it to 
recreate it");
+      if (!fsWithRetries.deleteDir(dir)) {
+        throw new IOException("Failed deletion of " + dir + " before creating 
them again.");
+      }
+    }
+    // dir doesn't exists now. No need to do an exists() call for it.
+    if (!fsWithRetries.createDir(dir)) {
+      throw new IOException("Failed create of " + dir);
+    }
+  }
+
+  /**
+   * Write out a split reference. Package local so it doesnt leak out of
+   * regionserver.
+   * @param hri {@link HRegionInfo} of the destination
+   * @param familyName Column Family Name
+   * @param f File to split.
+   * @param splitRow Split Row
+   * @param top True if we are referring to the top half of the hfile.
+   * @param splitPolicy
+   * @return Path to created reference.
+   * @throws IOException
+   */
+  Path splitStoreFile(final HRegionInfo hri, final String familyName, final 
StoreFile f,
+      final byte[] splitRow, final boolean top, RegionSplitPolicy splitPolicy)
+          throws IOException {
+
+    if (splitPolicy == null || 
!splitPolicy.skipStoreFileRangeCheck(familyName)) {
+      // Check whether the split row lies in the range of the store file
+      // If it is outside the range, return directly.
+      try {
+        if (top) {
+          //check if larger than last key.
+          KeyValue splitKey = KeyValueUtil.createFirstOnRow(splitRow);
+          Cell lastKey = f.createReader().getLastKey();
+          // If lastKey is null means storefile is empty.
+          if (lastKey == null) {
+            return null;
+          }
+          if (f.getReader().getComparator().compare(splitKey, lastKey) > 0) {
+            return null;
+          }
+        } else {
+          //check if smaller than first key
+          KeyValue splitKey = KeyValueUtil.createLastOnRow(splitRow);
+          Cell firstKey = f.createReader().getFirstKey();
+          // If firstKey is null means storefile is empty.
+          if (firstKey == null) {
+            return null;
+          }
+          if (f.getReader().getComparator().compare(splitKey, firstKey) < 0) {
+            return null;
+          }
+        }
+      } finally {
+        f.closeReader(true);
+      }
+    }
+
+    Path splitDir = new Path(getSplitsDir(hri), familyName);
+    // A reference to the bottom half of the hsf store file.
+    Reference r =
+      top ? Reference.createTopReference(splitRow): 
Reference.createBottomReference(splitRow);
+    // Add the referred-to regions name as a dot separated suffix.
+    // See REF_NAME_REGEX regex above.  The referred-to regions name is
+    // up in the path of the passed in <code>f</code> -- parentdir is family,
+    // then the directory above is the region name.
+    String parentRegionName = regionInfoForFs.getEncodedName();
+    // Write reference with same file id only with the other region name as
+    // suffix and into the new region location (under same family).
+    Path p = new Path(splitDir, f.getPath().getName() + "." + 
parentRegionName);
+    return r.write(getFileSystem(), p);
+  }
+
+  // 
===========================================================================
+  //  Merge Helpers
+  // 
===========================================================================
+  /** @return {@link Path} to the temp directory used during merge operations 
*/
+  Path getMergesDir() {
+    return LegacyLayout.getRegionMergesDir(getRegionDir());
+  }
+
+  Path getMergesDir(final HRegionInfo hri) {
+    return LegacyLayout.getRegionMergesDir(getMergesDir(), hri);
+  }
+
+  /**
+   * Clean up any merge detritus that may have been left around from previous 
merge attempts.
+   */
+  void cleanupMergesDir() throws IOException {
+    fsWithRetries.deleteDir(getMergesDir());
+  }
+
+  /**
+   * Remove merged region
+   * @param mergedRegion {@link HRegionInfo}
+   * @throws IOException
+   */
+  void cleanupMergedRegion(final HRegionInfo mergedRegion) throws IOException {
+    Path regionDir = LegacyLayout.getRegionDir(tableDir, mergedRegion);
+    if (fsWithRetries.deleteDir(regionDir)) {
+      throw new IOException("Failed delete of " + regionDir);
+    }
+  }
+
+  /**
+   * Create the region merges directory.
+   * @throws IOException If merges dir already exists or we fail to create it.
+   * @see HRegionFileSystem#cleanupMergesDir()
+   */
+  void createMergesDir() throws IOException {
+    createTempDir(getMergesDir());
+  }
+
+  /**
+   * Write out a merge reference under the given merges directory. Package 
local
+   * so it doesnt leak out of regionserver.
+   * @param mergedRegion {@link HRegionInfo} of the merged region
+   * @param familyName Column Family Name
+   * @param f File to create reference.
+   * @param mergedDir
+   * @return Path to created reference.
+   * @throws IOException
+   */
+  Path mergeStoreFile(final HRegionInfo mergedRegion, final String familyName,
+      final StoreFile f, final Path mergedDir)
+      throws IOException {
+    Path referenceDir = new Path(new Path(mergedDir,
+        mergedRegion.getEncodedName()), familyName);
+    // A whole reference to the store file.
+    Reference r = Reference.createTopReference(regionInfoForFs.getStartKey());
+    // Add the referred-to regions name as a dot separated suffix.
+    // See REF_NAME_REGEX regex above. The referred-to regions name is
+    // up in the path of the passed in <code>f</code> -- parentdir is family,
+    // then the directory above is the region name.
+    String mergingRegionName = regionInfoForFs.getEncodedName();
+    // Write reference with same file id only with the other region name as
+    // suffix and into the new region location (under same family).
+    Path p = new Path(referenceDir, f.getPath().getName() + "."
+        + mergingRegionName);
+    return r.write(getFileSystem(), p);
+  }
+
+  /**
+   * Commit a merged region, moving it from the merges temporary directory to
+   * the proper location in the filesystem.
+   * @param mergedRegionInfo merged region {@link HRegionInfo}
+   * @throws IOException
+   */
+  void commitMergedRegion(final HRegionInfo mergedRegionInfo) throws 
IOException {
+    Path regionDir = new Path(this.tableDir, 
mergedRegionInfo.getEncodedName());
+    Path mergedRegionTmpDir = this.getMergesDir(mergedRegionInfo);
+    // Move the tmp dir in the expected location
+    if (mergedRegionTmpDir != null && 
fsWithRetries.exists(mergedRegionTmpDir)) {
+      if (!fsWithRetries.rename(mergedRegionTmpDir, regionDir)) {
+        throw new IOException("Unable to rename " + mergedRegionTmpDir + " to "
+            + regionDir);
+      }
+    }
+  }
+
+  // 
===========================================================================
+  //  Create/Open/Delete Helpers
+  // 
===========================================================================
+  /**
+   * Log the current state of the region
+   * @param LOG log to output information
+   * @throws IOException if an unexpected exception occurs
+   */
+  void logFileSystemState(final Log LOG) throws IOException {
+    FSUtils.logFileSystemState(getFileSystem(), this.getRegionDir(), LOG);
+  }
+
+  /**
+   * @param hri
+   * @return Content of the file we write out to the filesystem under a region
+   * @throws IOException
+   */
+  private static byte[] getRegionInfoFileContent(final HRegionInfo hri) throws 
IOException {
+    return hri.toDelimitedByteArray();
+  }
+
+  /**
+   * Write the .regioninfo file on-disk.
+   */
+  private static void writeRegionInfoFileContent(final Configuration conf, 
final FileSystem fs,
+      final Path regionInfoFile, final byte[] content) throws IOException {
+    // First check to get the permissions
+    FsPermission perms = FSUtils.getFilePermissions(fs, conf, 
HConstants.DATA_FILE_UMASK_KEY);
+    // Write the RegionInfo file content
+    FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, 
null);
+    try {
+      out.write(content);
+    } finally {
+      out.close();
+    }
+  }
+
+  // ==========================================================================
+  //  PUBLIC bootstrap
+  // ==========================================================================
+  @Override
+  protected void bootstrap() throws IOException {
+    // Cleanup temporary directories
+    cleanupTempDir();
+    cleanupSplitsDir();
+    cleanupMergesDir();
+
+    // if it doesn't exists, Write HRI to a file, in case we need to recover 
hbase:meta
+    checkRegionInfoOnFilesystem();
+  }
+
+  private void checkRegionInfoOnFilesystem() throws IOException {
+    // TODO
+  }
+
+  @Override
+  protected void destroy() throws IOException {
+    fsWithRetries.deleteDir(regionDir);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7546190a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
index ba08a05..e875f30 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -677,10 +677,9 @@ public class RegionStates {
         // This is RPC to meta table. It is done while we have a synchronize on
         // regionstates. No progress will be made if meta is not available at 
this time.
         // This is a cleanup task. Not critical.
-        if (MetaTableAccessor.getRegion(server.getConnection(), 
hri.getEncodedNameAsBytes()) ==
-            null) {
+        if (MetaTableAccessor.getRegion(server.getConnection(), 
hri.getEncodedNameAsBytes()) == null) {
           regionOffline(hri);
-          FSUtils.deleteRegionDir(server.getConfiguration(), hri);
+          server.getMasterFileSystem().deleteRegion(hri);
         }
       } catch (IOException e) {
         LOG.warn("Got exception while deleting " + hri + " directories from 
file system.", e);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7546190a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 3c12045..d022c99 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -277,19 +278,48 @@ public class StoreFileInfo {
     return reader;
   }
 
+  private interface ComputeFileInfo<T> {
+    T compute(FileSystem fs, FileStatus status, long offset, long length)
+      throws IOException;
+  }
+
   /**
    * Compute the HDFS Block Distribution for this StoreFile
    */
   public HDFSBlocksDistribution computeHDFSBlocksDistribution(final FileSystem 
fs)
       throws IOException {
+    return computeFileInfo(fs, new ComputeFileInfo<HDFSBlocksDistribution>() {
+      @Override
+      public HDFSBlocksDistribution compute(FileSystem fs, FileStatus status,
+          long offset, long length) throws IOException {
+        return FSUtils.computeHDFSBlocksDistribution(fs, status, offset, 
length);
+      }
+    });
+  }
 
+  public BlockLocation[] getFileBlockLocations(final FileSystem fs)
+      throws IOException {
+    return computeFileInfo(fs, new ComputeFileInfo<BlockLocation[]>() {
+      @Override
+      public BlockLocation[] compute(FileSystem fs, FileStatus status,
+          long offset, long length) throws IOException {
+        return fs.getFileBlockLocations(status, offset, length);
+      }
+    });
+  }
+
+  /**
+   * Compute the HDFS Block Distribution for this StoreFile
+   */
+  private <T> T computeFileInfo(final FileSystem fs,
+      final ComputeFileInfo<T> computeObj) throws IOException {
     // guard against the case where we get the FileStatus from link, but by 
the time we
     // call compute the file is moved again
     if (this.link != null) {
       FileNotFoundException exToThrow = null;
       for (int i = 0; i < this.link.getLocations().length; i++) {
         try {
-          return computeHDFSBlocksDistributionInternal(fs);
+          return computeFileInfoInternal(fs, computeObj);
         } catch (FileNotFoundException ex) {
           // try the other location
           exToThrow = ex;
@@ -297,21 +327,52 @@ public class StoreFileInfo {
       }
       throw exToThrow;
     } else {
-      return computeHDFSBlocksDistributionInternal(fs);
+      return computeFileInfoInternal(fs, computeObj);
     }
   }
 
-  private HDFSBlocksDistribution computeHDFSBlocksDistributionInternal(final 
FileSystem fs)
+  private <T> T computeFileInfoInternal(final FileSystem fs, final 
ComputeFileInfo<T> computeObj)
       throws IOException {
     FileStatus status = getReferencedFileStatus(fs);
     if (this.reference != null) {
-      return computeRefFileHDFSBlockDistribution(fs, reference, status);
+      return computeRefFileInfo(fs, reference, status, computeObj);
     } else {
-      return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, 
status.getLen());
+      return computeObj.compute(fs, status, 0, status.getLen());
     }
   }
 
   /**
+   * helper function to compute HDFS blocks distribution of a given reference
+   * file.For reference file, we don't compute the exact value. We use some
+   * estimate instead given it might be good enough. we assume bottom part
+   * takes the first half of reference file, top part takes the second half
+   * of the reference file. This is just estimate, given
+   * midkey ofregion != midkey of HFile, also the number and size of keys vary.
+   * If this estimate isn't good enough, we can improve it later.
+   * @param fs  The FileSystem
+   * @param reference  The reference
+   * @param status  The reference FileStatus
+   * @return HDFS blocks distribution
+   */
+  private static <T> T computeRefFileInfo(final FileSystem fs, final Reference 
reference,
+      final FileStatus status, final ComputeFileInfo<T> computeObj) throws 
IOException {
+    if (status == null) {
+      return null;
+    }
+
+    long start = 0;
+    long length = 0;
+    if (Reference.isTopFileRegion(reference.getFileRegion())) {
+      start = status.getLen()/2;
+      length = status.getLen() - status.getLen()/2;
+    } else {
+      start = 0;
+      length = status.getLen()/2;
+    }
+    return computeObj.compute(fs, status, start, length);
+  }
+
+  /**
    * Get the {@link FileStatus} of the file referenced by this StoreFileInfo
    * @param fs The current file system to use.
    * @return The {@link FileStatus} of the file referenced by this 
StoreFileInfo
@@ -496,39 +557,6 @@ public class StoreFileInfo {
     return validateStoreFileName(p.getName());
   }
 
-  /**
-   * helper function to compute HDFS blocks distribution of a given reference
-   * file.For reference file, we don't compute the exact value. We use some
-   * estimate instead given it might be good enough. we assume bottom part
-   * takes the first half of reference file, top part takes the second half
-   * of the reference file. This is just estimate, given
-   * midkey ofregion != midkey of HFile, also the number and size of keys vary.
-   * If this estimate isn't good enough, we can improve it later.
-   * @param fs  The FileSystem
-   * @param reference  The reference
-   * @param status  The reference FileStatus
-   * @return HDFS blocks distribution
-   */
-  private static HDFSBlocksDistribution computeRefFileHDFSBlockDistribution(
-      final FileSystem fs, final Reference reference, final FileStatus status)
-      throws IOException {
-    if (status == null) {
-      return null;
-    }
-
-    long start = 0;
-    long length = 0;
-
-    if (Reference.isTopFileRegion(reference.getFileRegion())) {
-      start = status.getLen()/2;
-      length = status.getLen() - status.getLen()/2;
-    } else {
-      start = 0;
-      length = status.getLen()/2;
-    }
-    return FSUtils.computeHDFSBlocksDistribution(fs, status, start, length);
-  }
-
   @Override
   public boolean equals(Object that) {
     if (this == that) return true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/7546190a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java
index b0af52b..d38d5c1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSRegionScanner.java
@@ -25,7 +25,13 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.fs.RegionFileSystem;
+import org.apache.hadoop.hbase.fs.RegionFileSystem.StoreFileVisitor;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -41,30 +47,18 @@ import org.apache.hadoop.hbase.util.FSUtils;
 class FSRegionScanner implements Runnable {
   static private final Log LOG = LogFactory.getLog(FSRegionScanner.class);
 
-  private Path regionPath;
-
-  /**
-   * The file system used
-   */
-  private FileSystem fs;
-
-  /**
-   * Maps each region to the RS with highest locality for that region.
-   */
-  private Map<String,String> regionToBestLocalityRSMapping;
+  private final RegionFileSystem rfs;
 
   /**
    * Maps region encoded names to maps of hostnames to fractional locality of
    * that region on that host.
    */
-  private Map<String, Map<String, Float>> regionDegreeLocalityMapping;
+  private final Map<String, Map<String, Float>> regionDegreeLocalityMapping;
 
-  FSRegionScanner(FileSystem fs, Path regionPath,
-                  Map<String, String> regionToBestLocalityRSMapping,
-                  Map<String, Map<String, Float>> regionDegreeLocalityMapping) 
{
-    this.fs = fs;
-    this.regionPath = regionPath;
-    this.regionToBestLocalityRSMapping = regionToBestLocalityRSMapping;
+  FSRegionScanner(Configuration conf, HRegionInfo hri,
+                  Map<String, Map<String, Float>> regionDegreeLocalityMapping)
+      throws IOException {
+    this.rfs = RegionFileSystem.open(conf, hri, true);
     this.regionDegreeLocalityMapping = regionDegreeLocalityMapping;
   }
 
@@ -72,38 +66,17 @@ class FSRegionScanner implements Runnable {
   public void run() {
     try {
       // empty the map for each region
-      Map<String, AtomicInteger> blockCountMap = new HashMap<String, 
AtomicInteger>();
-
-      //get table name
-      String tableName = regionPath.getParent().getName();
-      int totalBlkCount = 0;
+      final Map<String, AtomicInteger> blockCountMap = new HashMap<String, 
AtomicInteger>();
+      final AtomicInteger totalBlkCount = new AtomicInteger(0);
 
-      // ignore null
-      FileStatus[] cfList = fs.listStatus(regionPath, new 
FSUtils.FamilyDirFilter(fs));
-      if (null == cfList) {
-        return;
-      }
-
-      // for each cf, get all the blocks information
-      for (FileStatus cfStatus : cfList) {
-        if (!cfStatus.isDirectory()) {
-          // skip because this is not a CF directory
-          continue;
-        }
-
-        FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
-        if (null == storeFileLists) {
-          continue;
-        }
-
-        for (FileStatus storeFile : storeFileLists) {
-          BlockLocation[] blkLocations =
-            fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
-          if (null == blkLocations) {
-            continue;
-          }
+      rfs.visitStoreFiles(new StoreFileVisitor() {
+        @Override
+        public void storeFile(HRegionInfo region, String family, StoreFileInfo 
storeFile)
+            throws IOException {
+          BlockLocation[] blkLocations = 
storeFile.getFileBlockLocations(rfs.getFileSystem());
+          if (blkLocations == null) return;
 
-          totalBlkCount += blkLocations.length;
+          totalBlkCount.addAndGet(blkLocations.length);
           for(BlockLocation blk: blkLocations) {
             for (String host: blk.getHosts()) {
               AtomicInteger count = blockCountMap.get(host);
@@ -115,36 +88,9 @@ class FSRegionScanner implements Runnable {
             }
           }
         }
-      }
-
-      if (regionToBestLocalityRSMapping != null) {
-        int largestBlkCount = 0;
-        String hostToRun = null;
-        for (Map.Entry<String, AtomicInteger> entry : 
blockCountMap.entrySet()) {
-          String host = entry.getKey();
-
-          int tmp = entry.getValue().get();
-          if (tmp > largestBlkCount) {
-            largestBlkCount = tmp;
-            hostToRun = host;
-          }
-        }
-
-        // empty regions could make this null
-        if (null == hostToRun) {
-          return;
-        }
-
-        if (hostToRun.endsWith(".")) {
-          hostToRun = hostToRun.substring(0, hostToRun.length()-1);
-        }
-        String name = tableName + ":" + regionPath.getName();
-        synchronized (regionToBestLocalityRSMapping) {
-          regionToBestLocalityRSMapping.put(name,  hostToRun);
-        }
-      }
+      });
 
-      if (regionDegreeLocalityMapping != null && totalBlkCount > 0) {
+      if (regionDegreeLocalityMapping != null && totalBlkCount.get() > 0) {
         Map<String, Float> hostLocalityMap = new HashMap<String, Float>();
         for (Map.Entry<String, AtomicInteger> entry : 
blockCountMap.entrySet()) {
           String host = entry.getKey();
@@ -152,12 +98,12 @@ class FSRegionScanner implements Runnable {
             host = host.substring(0, host.length() - 1);
           }
           // Locality is fraction of blocks local to this host.
-          float locality = ((float)entry.getValue().get()) / totalBlkCount;
+          float locality = ((float)entry.getValue().get()) / 
totalBlkCount.get();
           hostLocalityMap.put(host, locality);
         }
         // Put the locality map into the result map, keyed by the encoded name
         // of the region.
-        regionDegreeLocalityMapping.put(regionPath.getName(), hostLocalityMap);
+        regionDegreeLocalityMapping.put(rfs.getRegionInfo().getEncodedName(), 
hostLocalityMap);
       }
     } catch (IOException e) {
       LOG.warn("Problem scanning file system", e);

Reply via email to