http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
new file mode 100644
index 0000000..403051f
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -0,0 +1,410 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.io.Writable;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
+ */
+@InterfaceAudience.Private
+public class TableSnapshotInputFormatImpl {
+  // TODO: Snapshots files are owned in fs by the hbase user. There is no
+  // easy way to delegate access.
+
+  public static final Log LOG = 
LogFactory.getLog(TableSnapshotInputFormatImpl.class);
+
+  private static final String SNAPSHOT_NAME_KEY = 
"hbase.TableSnapshotInputFormat.snapshot.name";
+  // key for specifying the root dir of the restored snapshot
+  protected static final String RESTORE_DIR_KEY = 
"hbase.TableSnapshotInputFormat.restore.dir";
+
+  /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
+  private static final String LOCALITY_CUTOFF_MULTIPLIER =
+    "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
+  private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
+
+  /**
+   * Implementation class for InputSplit logic common between mapred and 
mapreduce.
+   */
+  public static class InputSplit implements Writable {
+
+    private TableDescriptor htd;
+    private HRegionInfo regionInfo;
+    private String[] locations;
+    private String scan;
+    private String restoreDir;
+
+    // constructor for mapreduce framework / Writable
+    public InputSplit() {}
+
+    public InputSplit(TableDescriptor htd, HRegionInfo regionInfo, 
List<String> locations,
+        Scan scan, Path restoreDir) {
+      this.htd = htd;
+      this.regionInfo = regionInfo;
+      if (locations == null || locations.isEmpty()) {
+        this.locations = new String[0];
+      } else {
+        this.locations = locations.toArray(new String[locations.size()]);
+      }
+      try {
+        this.scan = scan != null ? 
TableMapReduceUtil.convertScanToString(scan) : "";
+      } catch (IOException e) {
+        LOG.warn("Failed to convert Scan to String", e);
+      }
+
+      this.restoreDir = restoreDir.toString();
+    }
+
+    public TableDescriptor getHtd() {
+      return htd;
+    }
+
+    public String getScan() {
+      return scan;
+    }
+
+    public String getRestoreDir() {
+      return restoreDir;
+    }
+
+    public long getLength() {
+      //TODO: We can obtain the file sizes of the snapshot here.
+      return 0;
+    }
+
+    public String[] getLocations() {
+      return locations;
+    }
+
+    public TableDescriptor getTableDescriptor() {
+      return htd;
+    }
+
+    public HRegionInfo getRegionInfo() {
+      return regionInfo;
+    }
+
+    // TODO: We should have ProtobufSerialization in Hadoop, and directly use 
PB objects instead of
+    // doing this wrapping with Writables.
+    @Override
+    public void write(DataOutput out) throws IOException {
+      TableSnapshotRegionSplit.Builder builder = 
TableSnapshotRegionSplit.newBuilder()
+          .setTable(ProtobufUtil.toTableSchema(htd))
+          .setRegion(HRegionInfo.convert(regionInfo));
+
+      for (String location : locations) {
+        builder.addLocations(location);
+      }
+
+      TableSnapshotRegionSplit split = builder.build();
+
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      split.writeTo(baos);
+      baos.close();
+      byte[] buf = baos.toByteArray();
+      out.writeInt(buf.length);
+      out.write(buf);
+
+      Bytes.writeByteArray(out, Bytes.toBytes(scan));
+      Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
+
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int len = in.readInt();
+      byte[] buf = new byte[len];
+      in.readFully(buf);
+      TableSnapshotRegionSplit split = 
TableSnapshotRegionSplit.PARSER.parseFrom(buf);
+      this.htd = ProtobufUtil.toTableDescriptor(split.getTable());
+      this.regionInfo = HRegionInfo.convert(split.getRegion());
+      List<String> locationsList = split.getLocationsList();
+      this.locations = locationsList.toArray(new String[locationsList.size()]);
+
+      this.scan = Bytes.toString(Bytes.readByteArray(in));
+      this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
+    }
+  }
+
+  /**
+   * Implementation class for RecordReader logic common between mapred and 
mapreduce.
+   */
+  public static class RecordReader {
+    private InputSplit split;
+    private Scan scan;
+    private Result result = null;
+    private ImmutableBytesWritable row = null;
+    private ClientSideRegionScanner scanner;
+
+    public ClientSideRegionScanner getScanner() {
+      return scanner;
+    }
+
+    public void initialize(InputSplit split, Configuration conf) throws 
IOException {
+      this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
+      this.split = split;
+      TableDescriptor htd = split.htd;
+      HRegionInfo hri = this.split.getRegionInfo();
+      FileSystem fs = FSUtils.getCurrentFileSystem(conf);
+
+
+      // region is immutable, this should be fine,
+      // otherwise we have to set the thread read point
+      scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+      // disable caching of data blocks
+      scan.setCacheBlocks(false);
+
+      scanner =
+          new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), 
htd, hri, scan, null);
+    }
+
+    public boolean nextKeyValue() throws IOException {
+      result = scanner.next();
+      if (result == null) {
+        //we are done
+        return false;
+      }
+
+      if (this.row == null) {
+        this.row = new ImmutableBytesWritable();
+      }
+      this.row.set(result.getRow());
+      return true;
+    }
+
+    public ImmutableBytesWritable getCurrentKey() {
+      return row;
+    }
+
+    public Result getCurrentValue() {
+      return result;
+    }
+
+    public long getPos() {
+      return 0;
+    }
+
+    public float getProgress() {
+      return 0; // TODO: use total bytes to estimate
+    }
+
+    public void close() {
+      if (this.scanner != null) {
+        this.scanner.close();
+      }
+    }
+  }
+
+  public static List<InputSplit> getSplits(Configuration conf) throws 
IOException {
+    String snapshotName = getSnapshotName(conf);
+
+    Path rootDir = FSUtils.getRootDir(conf);
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, 
rootDir, fs);
+
+    List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
+
+    // TODO: mapred does not support scan as input API. Work around for now.
+    Scan scan = extractScanFromConf(conf);
+    // the temp dir where the snapshot is restored
+    Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
+
+    return getSplits(scan, manifest, regionInfos, restoreDir, conf);
+  }
+
+  public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest 
manifest) {
+    List<SnapshotRegionManifest> regionManifests = 
manifest.getRegionManifests();
+    if (regionManifests == null) {
+      throw new IllegalArgumentException("Snapshot seems empty");
+    }
+
+    List<HRegionInfo> regionInfos = 
Lists.newArrayListWithCapacity(regionManifests.size());
+
+    for (SnapshotRegionManifest regionManifest : regionManifests) {
+      HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
+      if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
+        continue;
+      }
+      regionInfos.add(hri);
+    }
+    return regionInfos;
+  }
+
+  public static SnapshotManifest getSnapshotManifest(Configuration conf, 
String snapshotName,
+      Path rootDir, FileSystem fs) throws IOException {
+    Path snapshotDir = 
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
+    SnapshotDescription snapshotDesc = 
SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
+    return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
+  }
+
+  public static Scan extractScanFromConf(Configuration conf) throws 
IOException {
+    Scan scan = null;
+    if (conf.get(TableInputFormat.SCAN) != null) {
+      scan = 
TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
+    } else if 
(conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) 
{
+      String[] columns =
+        
conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" 
");
+      scan = new Scan();
+      for (String col : columns) {
+        scan.addFamily(Bytes.toBytes(col));
+      }
+    } else {
+      throw new IllegalArgumentException("Unable to create scan");
+    }
+    return scan;
+  }
+
+  public static List<InputSplit> getSplits(Scan scan, SnapshotManifest 
manifest,
+      List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) 
throws IOException {
+    // load table descriptor
+    TableDescriptor htd = manifest.getTableDescriptor();
+
+    Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
+
+    List<InputSplit> splits = new ArrayList<>();
+    for (HRegionInfo hri : regionManifests) {
+      // load region descriptor
+
+      if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), 
hri.getStartKey(),
+          hri.getEndKey())) {
+        // compute HDFS locations from snapshot files (which will get the 
locations for
+        // referred hfiles)
+        List<String> hosts = getBestLocations(conf,
+            HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
+
+        int len = Math.min(3, hosts.size());
+        hosts = hosts.subList(0, len);
+        splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
+      }
+    }
+
+    return splits;
+
+  }
+
+  /**
+   * This computes the locations to be passed from the InputSplit. MR/Yarn 
schedulers does not take
+   * weights into account, thus will treat every location passed from the 
input split as equal. We
+   * do not want to blindly pass all the locations, since we are creating one 
split per region, and
+   * the region's blocks are all distributed throughout the cluster unless 
favorite node assignment
+   * is used. On the expected stable case, only one location will contain most 
of the blocks as
+   * local.
+   * On the other hand, in favored node assignment, 3 nodes will contain 
highly local blocks. Here
+   * we are doing a simple heuristic, where we will pass all hosts which have 
at least 80%
+   * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block 
locality as the top
+   * host with the best locality.
+   */
+  public static List<String> getBestLocations(
+      Configuration conf, HDFSBlocksDistribution blockDistribution) {
+    List<String> locations = new ArrayList<>(3);
+
+    HostAndWeight[] hostAndWeights = 
blockDistribution.getTopHostsWithWeights();
+
+    if (hostAndWeights.length == 0) {
+      return locations;
+    }
+
+    HostAndWeight topHost = hostAndWeights[0];
+    locations.add(topHost.getHost());
+
+    // Heuristic: filter all hosts which have at least cutoffMultiplier % of 
block locality
+    double cutoffMultiplier
+      = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, 
DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
+
+    double filterWeight = topHost.getWeight() * cutoffMultiplier;
+
+    for (int i = 1; i < hostAndWeights.length; i++) {
+      if (hostAndWeights[i].getWeight() >= filterWeight) {
+        locations.add(hostAndWeights[i].getHost());
+      } else {
+        break;
+      }
+    }
+
+    return locations;
+  }
+
+  private static String getSnapshotName(Configuration conf) {
+    String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
+    if (snapshotName == null) {
+      throw new IllegalArgumentException("Snapshot name must be provided");
+    }
+    return snapshotName;
+  }
+
+  /**
+   * Configures the job to use TableSnapshotInputFormat to read from a 
snapshot.
+   * @param conf the job to configuration
+   * @param snapshotName the name of the snapshot to read from
+   * @param restoreDir a temporary directory to restore the snapshot into. 
Current user should
+   * have write permissions to this directory, and this should not be a 
subdirectory of rootdir.
+   * After the job is finished, restoreDir can be deleted.
+   * @throws IOException if an error occurs
+   */
+  public static void setInput(Configuration conf, String snapshotName, Path 
restoreDir)
+      throws IOException {
+    conf.set(SNAPSHOT_NAME_KEY, snapshotName);
+
+    Path rootDir = FSUtils.getRootDir(conf);
+    FileSystem fs = rootDir.getFileSystem(conf);
+
+    restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
+
+    // TODO: restore from record readers to parallelize.
+    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, 
restoreDir, snapshotName);
+
+    conf.set(RESTORE_DIR_KEY, restoreDir.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
new file mode 100644
index 0000000..13c7c67
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java
@@ -0,0 +1,395 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * A table split corresponds to a key range (low, high) and an optional 
scanner.
+ * All references to row below refer to the key of the row.
+ */
+@InterfaceAudience.Public
+public class TableSplit extends InputSplit
+implements Writable, Comparable<TableSplit> {
+  /** @deprecated LOG variable would be made private. fix in hbase 3.0 */
+  @Deprecated
+  public static final Log LOG = LogFactory.getLog(TableSplit.class);
+
+  // should be < 0 (@see #readFields(DataInput))
+  // version 1 supports Scan data member
+  enum Version {
+    UNVERSIONED(0),
+    // Initial number we put on TableSplit when we introduced versioning.
+    INITIAL(-1),
+    // Added an encoded region name field for easier identification of split 
-> region
+    WITH_ENCODED_REGION_NAME(-2);
+
+    final int code;
+    static final Version[] byCode;
+    static {
+      byCode = Version.values();
+      for (int i = 0; i < byCode.length; i++) {
+        if (byCode[i].code != -1 * i) {
+          throw new AssertionError("Values in this enum should be descending 
by one");
+        }
+      }
+    }
+
+    Version(int code) {
+      this.code = code;
+    }
+
+    boolean atLeast(Version other) {
+      return code <= other.code;
+    }
+
+    static Version fromCode(int code) {
+      return byCode[code * -1];
+    }
+  }
+
+  private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME;
+  private TableName tableName;
+  private byte [] startRow;
+  private byte [] endRow;
+  private String regionLocation;
+  private String encodedRegionName = "";
+  private String scan = ""; // stores the serialized form of the Scan
+  private long length; // Contains estimation of region size in bytes
+
+  /** Default constructor. */
+  public TableSplit() {
+    this((TableName)null, null, HConstants.EMPTY_BYTE_ARRAY,
+      HConstants.EMPTY_BYTE_ARRAY, "");
+  }
+
+  /**
+   * Creates a new instance while assigning all variables.
+   * Length of region is set to 0
+   * Encoded name of the region is set to blank
+   *
+   * @param tableName  The name of the current table.
+   * @param scan The scan associated with this split.
+   * @param startRow  The start row of the split.
+   * @param endRow  The end row of the split.
+   * @param location  The location of the region.
+   */
+  public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] 
endRow,
+                    final String location) {
+    this(tableName, scan, startRow, endRow, location, 0L);
+  }
+
+  /**
+   * Creates a new instance while assigning all variables.
+   * Encoded name of region is set to blank
+   *
+   * @param tableName  The name of the current table.
+   * @param scan The scan associated with this split.
+   * @param startRow  The start row of the split.
+   * @param endRow  The end row of the split.
+   * @param location  The location of the region.
+   */
+  public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] 
endRow,
+      final String location, long length) {
+    this(tableName, scan, startRow, endRow, location, "", length);
+  }
+
+  /**
+   * Creates a new instance while assigning all variables.
+   *
+   * @param tableName  The name of the current table.
+   * @param scan The scan associated with this split.
+   * @param startRow  The start row of the split.
+   * @param endRow  The end row of the split.
+   * @param encodedRegionName The region ID.
+   * @param location  The location of the region.
+   */
+  public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] 
endRow,
+      final String location, final String encodedRegionName, long length) {
+    this.tableName = tableName;
+    try {
+      this.scan =
+        (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan);
+    } catch (IOException e) {
+      LOG.warn("Failed to convert Scan to String", e);
+    }
+    this.startRow = startRow;
+    this.endRow = endRow;
+    this.regionLocation = location;
+    this.encodedRegionName = encodedRegionName;
+    this.length = length;
+  }
+
+  /**
+   * Creates a new instance without a scanner.
+   * Length of region is set to 0
+   *
+   * @param tableName The name of the current table.
+   * @param startRow The start row of the split.
+   * @param endRow The end row of the split.
+   * @param location The location of the region.
+   */
+  public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
+      final String location) {
+    this(tableName, null, startRow, endRow, location);
+  }
+
+  /**
+   * Creates a new instance without a scanner.
+   *
+   * @param tableName The name of the current table.
+   * @param startRow The start row of the split.
+   * @param endRow The end row of the split.
+   * @param location The location of the region.
+   * @param length Size of region in bytes
+   */
+  public TableSplit(TableName tableName, byte[] startRow, byte[] endRow,
+                    final String location, long length) {
+    this(tableName, null, startRow, endRow, location, length);
+  }
+
+  /**
+   * Returns a Scan object from the stored string representation.
+   *
+   * @return Returns a Scan object based on the stored scanner.
+   * @throws IOException
+   */
+  public Scan getScan() throws IOException {
+    return TableMapReduceUtil.convertStringToScan(this.scan);
+  }
+
+  /**
+   * Returns the table name converted to a byte array.
+   * @see #getTable()
+   * @return The table name.
+   */
+  public byte [] getTableName() {
+    return tableName.getName();
+  }
+
+  /**
+   * Returns the table name.
+   *
+   * @return The table name.
+   */
+  public TableName getTable() {
+    // It is ugly that usually to get a TableName, the method is called 
getTableName.  We can't do
+    // that in here though because there was an existing getTableName in place 
already since
+    // deprecated.
+    return tableName;
+  }
+
+  /**
+   * Returns the start row.
+   *
+   * @return The start row.
+   */
+  public byte [] getStartRow() {
+    return startRow;
+  }
+
+  /**
+   * Returns the end row.
+   *
+   * @return The end row.
+   */
+  public byte [] getEndRow() {
+    return endRow;
+  }
+
+  /**
+   * Returns the region location.
+   *
+   * @return The region's location.
+   */
+  public String getRegionLocation() {
+    return regionLocation;
+  }
+
+  /**
+   * Returns the region's location as an array.
+   *
+   * @return The array containing the region location.
+   * @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
+   */
+  @Override
+  public String[] getLocations() {
+    return new String[] {regionLocation};
+  }
+
+  /**
+   * Returns the region's encoded name.
+   *
+   * @return The region's encoded name.
+   */
+  public String getEncodedRegionName() {
+    return encodedRegionName;
+  }
+
+  /**
+   * Returns the length of the split.
+   *
+   * @return The length of the split.
+   * @see org.apache.hadoop.mapreduce.InputSplit#getLength()
+   */
+  @Override
+  public long getLength() {
+    return length;
+  }
+
+  /**
+   * Reads the values of each field.
+   *
+   * @param in  The input to read from.
+   * @throws IOException When reading the input fails.
+   */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    Version version = Version.UNVERSIONED;
+    // TableSplit was not versioned in the beginning.
+    // In order to introduce it now, we make use of the fact
+    // that tableName was written with Bytes.writeByteArray,
+    // which encodes the array length as a vint which is >= 0.
+    // Hence if the vint is >= 0 we have an old version and the vint
+    // encodes the length of tableName.
+    // If < 0 we just read the version and the next vint is the length.
+    // @see Bytes#readByteArray(DataInput)
+    int len = WritableUtils.readVInt(in);
+    if (len < 0) {
+      // what we just read was the version
+      version = Version.fromCode(len);
+      len = WritableUtils.readVInt(in);
+    }
+    byte[] tableNameBytes = new byte[len];
+    in.readFully(tableNameBytes);
+    tableName = TableName.valueOf(tableNameBytes);
+    startRow = Bytes.readByteArray(in);
+    endRow = Bytes.readByteArray(in);
+    regionLocation = Bytes.toString(Bytes.readByteArray(in));
+    if (version.atLeast(Version.INITIAL)) {
+      scan = Bytes.toString(Bytes.readByteArray(in));
+    }
+    length = WritableUtils.readVLong(in);
+    if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) {
+      encodedRegionName = Bytes.toString(Bytes.readByteArray(in));
+    }
+  }
+
+  /**
+   * Writes the field values to the output.
+   *
+   * @param out  The output to write to.
+   * @throws IOException When writing the values to the output fails.
+   */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeVInt(out, VERSION.code);
+    Bytes.writeByteArray(out, tableName.getName());
+    Bytes.writeByteArray(out, startRow);
+    Bytes.writeByteArray(out, endRow);
+    Bytes.writeByteArray(out, Bytes.toBytes(regionLocation));
+    Bytes.writeByteArray(out, Bytes.toBytes(scan));
+    WritableUtils.writeVLong(out, length);
+    Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName));
+  }
+
+  /**
+   * Returns the details about this instance as a string.
+   *
+   * @return The values of this instance as a string.
+   * @see java.lang.Object#toString()
+   */
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("HBase table split(");
+    sb.append("table name: ").append(tableName);
+    // null scan input is represented by ""
+    String printScan = "";
+    if (!scan.equals("")) {
+      try {
+        // get the real scan here in toString, not the Base64 string
+        printScan = TableMapReduceUtil.convertStringToScan(scan).toString();
+      }
+      catch (IOException e) {
+        printScan = "";
+      }
+    }
+    sb.append(", scan: ").append(printScan);
+    sb.append(", start row: ").append(Bytes.toStringBinary(startRow));
+    sb.append(", end row: ").append(Bytes.toStringBinary(endRow));
+    sb.append(", region location: ").append(regionLocation);
+    sb.append(", encoded region name: ").append(encodedRegionName);
+    sb.append(")");
+    return sb.toString();
+  }
+
+  /**
+   * Compares this split against the given one.
+   *
+   * @param split  The split to compare to.
+   * @return The result of the comparison.
+   * @see java.lang.Comparable#compareTo(java.lang.Object)
+   */
+  @Override
+  public int compareTo(TableSplit split) {
+    // If The table name of the two splits is the same then compare start row
+    // otherwise compare based on table names
+    int tableNameComparison =
+        getTable().compareTo(split.getTable());
+    return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo(
+        getStartRow(), split.getStartRow());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null || !(o instanceof TableSplit)) {
+      return false;
+    }
+    return tableName.equals(((TableSplit)o).tableName) &&
+      Bytes.equals(startRow, ((TableSplit)o).startRow) &&
+      Bytes.equals(endRow, ((TableSplit)o).endRow) &&
+      regionLocation.equals(((TableSplit)o).regionLocation);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = tableName != null ? tableName.hashCode() : 0;
+    result = 31 * result + (scan != null ? scan.hashCode() : 0);
+    result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0);
+    result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0);
+    result = 31 * result + (regionLocation != null ? regionLocation.hashCode() 
: 0);
+    result = 31 * result + (encodedRegionName != null ? 
encodedRegionName.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
new file mode 100644
index 0000000..30cd461
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts 
them before emit.
+ * @see HFileOutputFormat2
+ * @see KeyValueSortReducer
+ * @see PutSortReducer
+ */
+@InterfaceAudience.Public
+public class TextSortReducer extends
+    Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> {
+
+  /** Timestamp for all inserted rows */
+  private long ts;
+
+  /** Column seperator */
+  private String separator;
+
+  /** Should skip bad lines */
+  private boolean skipBadLines;
+
+  private Counter badLineCount;
+
+  private ImportTsv.TsvParser parser;
+
+  /** Cell visibility expr **/
+  private String cellVisibilityExpr;
+
+  /** Cell TTL */
+  private long ttl;
+
+  private CellCreator kvCreator;
+
+  public long getTs() {
+    return ts;
+  }
+
+  public boolean getSkipBadLines() {
+    return skipBadLines;
+  }
+
+  public Counter getBadLineCount() {
+    return badLineCount;
+  }
+
+  public void incrementBadLineCount(int count) {
+    this.badLineCount.increment(count);
+  }
+
+  /**
+   * Handles initializing this class with objects specific to it (i.e., the 
parser).
+   * Common initialization that might be leveraged by a subsclass is done in
+   * <code>doSetup</code>. Hence a subclass may choose to override this method
+   * and call <code>doSetup</code> as well before handling it's own custom 
params.
+   *
+   * @param context
+   */
+  @Override
+  protected void setup(Context context) {
+    Configuration conf = context.getConfiguration();
+    doSetup(context, conf);
+
+    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), 
separator);
+    if (parser.getRowKeyColumnIndex() == -1) {
+      throw new RuntimeException("No row key column specified");
+    }
+    this.kvCreator = new CellCreator(conf);
+  }
+
+  /**
+   * Handles common parameter initialization that a subclass might want to 
leverage.
+   * @param context
+   * @param conf
+   */
+  protected void doSetup(Context context, Configuration conf) {
+    // If a custom separator has been used,
+    // decode it back from Base64 encoding.
+    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
+    if (separator == null) {
+      separator = ImportTsv.DEFAULT_SEPARATOR;
+    } else {
+      separator = new String(Base64.decode(separator));
+    }
+
+    // Should never get 0 as we are setting this to a valid value in job 
configuration.
+    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
+
+    skipBadLines = 
context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
+    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+  }
+
+  @Override
+  protected void reduce(
+      ImmutableBytesWritable rowKey,
+      java.lang.Iterable<Text> lines,
+      Reducer<ImmutableBytesWritable, Text,
+              ImmutableBytesWritable, KeyValue>.Context context)
+      throws java.io.IOException, InterruptedException
+  {
+    // although reduce() is called per-row, handle pathological case
+    long threshold = context.getConfiguration().getLong(
+        "reducer.row.threshold", 1L * (1<<30));
+    Iterator<Text> iter = lines.iterator();
+    while (iter.hasNext()) {
+      Set<KeyValue> kvs = new TreeSet<>(CellComparator.COMPARATOR);
+      long curSize = 0;
+      // stop at the end or the RAM threshold
+      while (iter.hasNext() && curSize < threshold) {
+        Text line = iter.next();
+        byte[] lineBytes = line.getBytes();
+        try {
+          ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, 
line.getLength());
+          // Retrieve timestamp if exists
+          ts = parsed.getTimestamp(ts);
+          cellVisibilityExpr = parsed.getCellVisibility();
+          ttl = parsed.getCellTTL();
+
+          // create tags for the parsed line
+          List<Tag> tags = new ArrayList<>();
+          if (cellVisibilityExpr != null) {
+            
tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags(
+              cellVisibilityExpr));
+          }
+          // Add TTL directly to the KV so we can vary them when packing more 
than one KV
+          // into puts
+          if (ttl > 0) {
+            tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, 
Bytes.toBytes(ttl)));
+          }
+          for (int i = 0; i < parsed.getColumnCount(); i++) {
+            if (i == parser.getRowKeyColumnIndex() || i == 
parser.getTimestampKeyColumnIndex()
+                || i == parser.getAttributesKeyColumnIndex() || i == 
parser.getCellVisibilityColumnIndex()
+                || i == parser.getCellTTLColumnIndex()) {
+              continue;
+            }
+            // Creating the KV which needs to be directly written to HFiles. 
Using the Facade
+            // KVCreator for creation of kvs.
+            Cell cell = this.kvCreator.create(lineBytes, 
parsed.getRowKeyOffset(),
+                parsed.getRowKeyLength(), parser.getFamily(i), 0, 
parser.getFamily(i).length,
+                parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, 
lineBytes,
+                parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
+            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+            kvs.add(kv);
+            curSize += kv.heapSize();
+          }
+        } catch (ImportTsv.TsvParser.BadTsvLineException | 
IllegalArgumentException
+            | InvalidLabelException badLine) {
+          if (skipBadLines) {
+            System.err.println("Bad line." + badLine.getMessage());
+            incrementBadLineCount(1);
+            continue;
+          }
+          throw new IOException(badLine);
+        }
+      }
+      context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass()
+          + "(" + StringUtils.humanReadableInt(curSize) + ")");
+      int index = 0;
+      for (KeyValue kv : kvs) {
+        context.write(rowKey, kv);
+        if (++index > 0 && index % 100 == 0)
+          context.setStatus("Wrote " + index + " key values.");
+      }
+
+      // if we have more entries to process
+      if (iter.hasNext()) {
+        // force flush because we cannot guarantee intra-row sorted order
+        context.write(null, null);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
new file mode 100644
index 0000000..3c507b3
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import 
org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
+import org.apache.hadoop.hbase.security.visibility.CellVisibility;
+import org.apache.hadoop.hbase.security.visibility.InvalidLabelException;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * Write table content out to files in hdfs.
+ */
+@InterfaceAudience.Public
+public class TsvImporterMapper
+extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
+{
+
+  /** Timestamp for all inserted rows */
+  protected long ts;
+
+  /** Column seperator */
+  private String separator;
+
+  /** Should skip bad lines */
+  private boolean skipBadLines;
+  /** Should skip empty columns*/
+  private boolean skipEmptyColumns;
+  private Counter badLineCount;
+  private boolean logBadLines;
+
+  protected ImportTsv.TsvParser parser;
+
+  protected Configuration conf;
+
+  protected String cellVisibilityExpr;
+
+  protected long ttl;
+
+  protected CellCreator kvCreator;
+
+  private String hfileOutPath;
+
+  /** List of cell tags */
+  private List<Tag> tags;
+
+  public long getTs() {
+    return ts;
+  }
+
+  public boolean getSkipBadLines() {
+    return skipBadLines;
+  }
+
+  public Counter getBadLineCount() {
+    return badLineCount;
+  }
+
+  public void incrementBadLineCount(int count) {
+    this.badLineCount.increment(count);
+  }
+
+  /**
+   * Handles initializing this class with objects specific to it (i.e., the 
parser).
+   * Common initialization that might be leveraged by a subsclass is done in
+   * <code>doSetup</code>. Hence a subclass may choose to override this method
+   * and call <code>doSetup</code> as well before handling it's own custom 
params.
+   *
+   * @param context
+   */
+  @Override
+  protected void setup(Context context) {
+    doSetup(context);
+
+    conf = context.getConfiguration();
+    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
+                           separator);
+    if (parser.getRowKeyColumnIndex() == -1) {
+      throw new RuntimeException("No row key column specified");
+    }
+    this.kvCreator = new CellCreator(conf);
+    tags = new ArrayList<>();
+  }
+
+  /**
+   * Handles common parameter initialization that a subclass might want to 
leverage.
+   * @param context
+   */
+  protected void doSetup(Context context) {
+    Configuration conf = context.getConfiguration();
+
+    // If a custom separator has been used,
+    // decode it back from Base64 encoding.
+    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
+    if (separator == null) {
+      separator = ImportTsv.DEFAULT_SEPARATOR;
+    } else {
+      separator = new String(Base64.decode(separator));
+    }
+    // Should never get 0 as we are setting this to a valid value in job
+    // configuration.
+    ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
+
+    skipEmptyColumns = context.getConfiguration().getBoolean(
+        ImportTsv.SKIP_EMPTY_COLUMNS, false);
+    skipBadLines = context.getConfiguration().getBoolean(
+        ImportTsv.SKIP_LINES_CONF_KEY, true);
+    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+    logBadLines = 
context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
+    hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY);
+  }
+
+  /**
+   * Convert a line of TSV text into an HBase table row.
+   */
+  @Override
+  public void map(LongWritable offset, Text value,
+    Context context)
+  throws IOException {
+    byte[] lineBytes = value.getBytes();
+
+    try {
+      ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
+          lineBytes, value.getLength());
+      ImmutableBytesWritable rowKey =
+        new ImmutableBytesWritable(lineBytes,
+            parsed.getRowKeyOffset(),
+            parsed.getRowKeyLength());
+      // Retrieve timestamp if exists
+      ts = parsed.getTimestamp(ts);
+      cellVisibilityExpr = parsed.getCellVisibility();
+      ttl = parsed.getCellTTL();
+
+      // create tags for the parsed line
+      if (hfileOutPath != null) {
+        tags.clear();
+        if (cellVisibilityExpr != null) {
+          
tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags(
+            cellVisibilityExpr));
+        }
+        // Add TTL directly to the KV so we can vary them when packing more 
than one KV
+        // into puts
+        if (ttl > 0) {
+          tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, 
Bytes.toBytes(ttl)));
+        }
+      }
+      Put put = new Put(rowKey.copyBytes());
+      for (int i = 0; i < parsed.getColumnCount(); i++) {
+        if (i == parser.getRowKeyColumnIndex() || i == 
parser.getTimestampKeyColumnIndex()
+            || i == parser.getAttributesKeyColumnIndex() || i == 
parser.getCellVisibilityColumnIndex()
+            || i == parser.getCellTTLColumnIndex() || (skipEmptyColumns
+            && parsed.getColumnLength(i) == 0)) {
+          continue;
+        }
+        populatePut(lineBytes, parsed, put, i);
+      }
+      context.write(rowKey, put);
+    } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException
+        | InvalidLabelException badLine) {
+      if (logBadLines) {
+        System.err.println(value);
+      }
+      System.err.println("Bad line at offset: " + offset.get() + ":\n" + 
badLine.getMessage());
+      if (skipBadLines) {
+        incrementBadLineCount(1);
+        return;
+      }
+      throw new IOException(badLine);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine 
parsed, Put put,
+      int i) throws BadTsvLineException, IOException {
+    Cell cell = null;
+    if (hfileOutPath == null) {
+      cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), 
parsed.getRowKeyLength(),
+          parser.getFamily(i), 0, parser.getFamily(i).length, 
parser.getQualifier(i), 0,
+          parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes,
+          parsed.getColumnOffset(i), parsed.getColumnLength(i));
+      if (cellVisibilityExpr != null) {
+        // We won't be validating the expression here. The Visibility CP will 
do
+        // the validation
+        put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
+      }
+      if (ttl > 0) {
+        put.setTTL(ttl);
+      }
+    } else {
+      // Creating the KV which needs to be directly written to HFiles. Using 
the Facade
+      // KVCreator for creation of kvs.
+      cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), 
parsed.getRowKeyLength(),
+          parser.getFamily(i), 0, parser.getFamily(i).length, 
parser.getQualifier(i), 0,
+          parser.getQualifier(i).length, ts, lineBytes, 
parsed.getColumnOffset(i),
+          parsed.getColumnLength(i), tags);
+    }
+    put.add(cell);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
new file mode 100644
index 0000000..a3b095c
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Write table content out to map output files.
+ */
+@InterfaceAudience.Public
+public class TsvImporterTextMapper
+extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
+{
+
+  /** Column seperator */
+  private String separator;
+
+  /** Should skip bad lines */
+  private boolean skipBadLines;
+  private Counter badLineCount;
+  private boolean logBadLines;
+
+  private ImportTsv.TsvParser parser;
+
+  public boolean getSkipBadLines() {
+    return skipBadLines;
+  }
+
+  public Counter getBadLineCount() {
+    return badLineCount;
+  }
+
+  public void incrementBadLineCount(int count) {
+    this.badLineCount.increment(count);
+  }
+
+  /**
+   * Handles initializing this class with objects specific to it (i.e., the 
parser).
+   * Common initialization that might be leveraged by a subsclass is done in
+   * <code>doSetup</code>. Hence a subclass may choose to override this method
+   * and call <code>doSetup</code> as well before handling it's own custom 
params.
+   *
+   * @param context
+   */
+  @Override
+  protected void setup(Context context) {
+    doSetup(context);
+
+    Configuration conf = context.getConfiguration();
+
+    parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), 
separator);
+    if (parser.getRowKeyColumnIndex() == -1) {
+      throw new RuntimeException("No row key column specified");
+    }
+  }
+
+  /**
+   * Handles common parameter initialization that a subclass might want to 
leverage.
+   * @param context
+   */
+  protected void doSetup(Context context) {
+    Configuration conf = context.getConfiguration();
+
+    // If a custom separator has been used,
+    // decode it back from Base64 encoding.
+    separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY);
+    if (separator == null) {
+      separator = ImportTsv.DEFAULT_SEPARATOR;
+    } else {
+      separator = new String(Base64.decode(separator));
+    }
+
+    skipBadLines = 
context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
+    logBadLines = 
context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false);
+    badLineCount = context.getCounter("ImportTsv", "Bad Lines");
+  }
+
+  /**
+   * Convert a line of TSV text into an HBase table row.
+   */
+  @Override
+  public void map(LongWritable offset, Text value, Context context) throws 
IOException {
+    try {
+      Pair<Integer,Integer> rowKeyOffests = 
parser.parseRowKey(value.getBytes(), value.getLength());
+      ImmutableBytesWritable rowKey = new ImmutableBytesWritable(
+          value.getBytes(), rowKeyOffests.getFirst(), 
rowKeyOffests.getSecond());
+      context.write(rowKey, value);
+    } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException 
badLine) {
+      if (logBadLines) {
+        System.err.println(value);
+      }
+      System.err.println("Bad line at offset: " + offset.get() + ":\n" + 
badLine.getMessage());
+      if (skipBadLines) {
+        incrementBadLineCount(1);
+        return;
+      }
+      throw new IOException(badLine);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Thread.currentThread().interrupt();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java
new file mode 100644
index 0000000..a83a88f
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.hbase.Tag;
+
+/**
+ * Interface to convert visibility expressions into Tags for storing along 
with Cells in HFiles.
+ */
+@InterfaceAudience.Public
+public interface VisibilityExpressionResolver extends Configurable {
+
+  /**
+   * Giving a chance for the initialization.
+   */
+  void init();
+
+  /**
+   * Convert visibility expression into tags to be serialized.
+   * @param visExpression the label expression
+   * @return The list of tags corresponds to the visibility expression. These 
tags will be stored
+   *         along with the Cells.
+   */
+  List<Tag> createVisibilityExpTags(String visExpression) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
new file mode 100644
index 0000000..8b4e967
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} 
files.
+ */
+@InterfaceAudience.Public
+public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
+  private static final Log LOG = LogFactory.getLog(WALInputFormat.class);
+
+  public static final String START_TIME_KEY = "wal.start.time";
+  public static final String END_TIME_KEY = "wal.end.time";
+
+  /**
+   * {@link InputSplit} for {@link WAL} files. Each split represent
+   * exactly one log file.
+   */
+  static class WALSplit extends InputSplit implements Writable {
+    private String logFileName;
+    private long fileSize;
+    private long startTime;
+    private long endTime;
+
+    /** for serialization */
+    public WALSplit() {}
+
+    /**
+     * Represent an WALSplit, i.e. a single WAL file.
+     * Start- and EndTime are managed by the split, so that WAL files can be
+     * filtered before WALEdits are passed to the mapper(s).
+     * @param logFileName
+     * @param fileSize
+     * @param startTime
+     * @param endTime
+     */
+    public WALSplit(String logFileName, long fileSize, long startTime, long 
endTime) {
+      this.logFileName = logFileName;
+      this.fileSize = fileSize;
+      this.startTime = startTime;
+      this.endTime = endTime;
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return fileSize;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      // TODO: Find the data node with the most blocks for this WAL?
+      return new String[] {};
+    }
+
+    public String getLogFileName() {
+      return logFileName;
+    }
+
+    public long getStartTime() {
+      return startTime;
+    }
+
+    public long getEndTime() {
+      return endTime;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      logFileName = in.readUTF();
+      fileSize = in.readLong();
+      startTime = in.readLong();
+      endTime = in.readLong();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(logFileName);
+      out.writeLong(fileSize);
+      out.writeLong(startTime);
+      out.writeLong(endTime);
+    }
+
+    @Override
+    public String toString() {
+      return logFileName + " (" + startTime + ":" + endTime + ") length:" + 
fileSize;
+    }
+  }
+
+  /**
+   * {@link RecordReader} for an {@link WAL} file.
+   * Implementation shared with deprecated HLogInputFormat.
+   */
+  static abstract class WALRecordReader<K extends WALKey> extends 
RecordReader<K, WALEdit> {
+    private Reader reader = null;
+    // visible until we can remove the deprecated HLogInputFormat
+    Entry currentEntry = new Entry();
+    private long startTime;
+    private long endTime;
+    private Configuration conf;
+    private Path logFile;
+    private long currentPos;
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      WALSplit hsplit = (WALSplit)split;
+      logFile = new Path(hsplit.getLogFileName());
+      conf = context.getConfiguration();
+      LOG.info("Opening reader for "+split);
+      openReader(logFile);
+      this.startTime = hsplit.getStartTime();
+      this.endTime = hsplit.getEndTime();
+    }
+
+    private void openReader(Path path) throws IOException
+    {
+      closeReader();
+      reader = AbstractFSWALProvider.openReader(path, conf);
+      seek();
+      setCurrentPath(path);
+    }
+
+    private void setCurrentPath(Path path) {
+      this.logFile = path;
+    }
+
+    private void closeReader() throws IOException {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    }
+
+    private void seek() throws IOException {
+      if (currentPos != 0) {
+        reader.seek(currentPos);
+      }
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (reader == null) return false;
+      this.currentPos = reader.getPosition();
+      Entry temp;
+      long i = -1;
+      try {
+        do {
+          // skip older entries
+          try {
+            temp = reader.next(currentEntry);
+            i++;
+          } catch (EOFException x) {
+            LOG.warn("Corrupted entry detected. Ignoring the rest of the file."
+                + " (This is normal when a RegionServer crashed.)");
+            return false;
+          }
+        } while (temp != null && temp.getKey().getWriteTime() < startTime);
+
+        if (temp == null) {
+          if (i > 0) LOG.info("Skipped " + i + " entries.");
+          LOG.info("Reached end of file.");
+          return false;
+        } else if (i > 0) {
+          LOG.info("Skipped " + i + " entries, until ts: " + 
temp.getKey().getWriteTime() + ".");
+        }
+        boolean res = temp.getKey().getWriteTime() <= endTime;
+        if (!res) {
+          LOG.info("Reached ts: " + temp.getKey().getWriteTime()
+              + " ignoring the rest of the file.");
+        }
+        return res;
+      } catch (IOException e) {
+        Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, 
conf);
+        if (logFile != archivedLog) {
+          openReader(archivedLog);
+          // Try call again in recursion
+          return nextKeyValue();
+        } else {
+          throw e;
+        }
+      }
+    }
+
+    @Override
+    public WALEdit getCurrentValue() throws IOException, InterruptedException {
+      return currentEntry.getEdit();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      // N/A depends on total number of entries, which is unknown
+      return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+      LOG.info("Closing reader");
+      if (reader != null) this.reader.close();
+    }
+  }
+
+  /**
+   * handler for non-deprecated WALKey version. fold into WALRecordReader once 
we no longer
+   * need to support HLogInputFormat.
+   */
+  static class WALKeyRecordReader extends WALRecordReader<WALKey> {
+    @Override
+    public WALKey getCurrentKey() throws IOException, InterruptedException {
+      return currentEntry.getKey();
+    }
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException,
+      InterruptedException {
+    return getSplits(context, START_TIME_KEY, END_TIME_KEY);
+  }
+
+  /**
+   * implementation shared with deprecated HLogInputFormat
+   */
+  List<InputSplit> getSplits(final JobContext context, final String startKey, 
final String endKey)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, 
false);
+    Path[] inputPaths = getInputPaths(conf);
+    long startTime = conf.getLong(startKey, Long.MIN_VALUE);
+    long endTime = conf.getLong(endKey, Long.MAX_VALUE);
+
+    List<FileStatus> allFiles = new ArrayList<FileStatus>();
+    for(Path inputPath: inputPaths){
+      FileSystem fs = inputPath.getFileSystem(conf);
+      try {
+        List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime);
+        allFiles.addAll(files);
+      } catch (FileNotFoundException e) {
+        if (ignoreMissing) {
+          LOG.warn("File "+ inputPath +" is missing. Skipping it.");
+          continue;
+        }
+        throw e;
+      }
+    }
+    List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size());
+    for (FileStatus file : allFiles) {
+      splits.add(new WALSplit(file.getPath().toString(), file.getLen(), 
startTime, endTime));
+    }
+    return splits;
+  }
+
+  private Path[] getInputPaths(Configuration conf) {
+    String inpDirs = conf.get(FileInputFormat.INPUT_DIR);
+    return StringUtils.stringToPath(
+      inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ",")));
+  }
+
+  private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, 
long endTime)
+      throws IOException {
+    List<FileStatus> result = new ArrayList<>();
+    LOG.debug("Scanning " + dir.toString() + " for WAL files");
+
+    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir);
+    if (!iter.hasNext()) return Collections.emptyList();
+    while (iter.hasNext()) {
+      LocatedFileStatus file = iter.next();
+      if (file.isDirectory()) {
+        // recurse into sub directories
+        result.addAll(getFiles(fs, file.getPath(), startTime, endTime));
+      } else {
+        String name = file.getPath().toString();
+        int idx = name.lastIndexOf('.');
+        if (idx > 0) {
+          try {
+            long fileStartTime = Long.parseLong(name.substring(idx+1));
+            if (fileStartTime <= endTime) {
+              LOG.info("Found: " + file);
+              result.add(file);
+            }
+          } catch (NumberFormatException x) {
+            idx = 0;
+          }
+        }
+        if (idx == 0) {
+          LOG.warn("File " + name + " does not appear to be an WAL file. 
Skipping...");
+        }
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new WALKeyRecordReader();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
new file mode 100644
index 0000000..b1e655c
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * A tool to replay WAL files as a M/R job.
+ * The WAL can be replayed for a set of tables or all tables,
+ * and a time range can be provided (in milliseconds).
+ * The WAL is filtered to the passed set of tables and  the output
+ * can optionally be mapped to another set of tables.
+ *
+ * WAL replay can also generate HFiles for later bulk importing,
+ * in that case the WAL is replayed for a single table only.
+ */
+@InterfaceAudience.Public
+public class WALPlayer extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(WALPlayer.class);
+  final static String NAME = "WALPlayer";
+  public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output";
+  public final static String TABLES_KEY = "wal.input.tables";
+  public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
+  public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
+  public final static String IGNORE_MISSING_FILES = 
"wal.input.ignore.missing.files";
+
+
+  // This relies on Hadoop Configuration to handle warning about deprecated 
configs and
+  // to set the correct non-deprecated configs when an old one shows up.
+  static {
+    Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY);
+    Configuration.addDeprecation("hlog.input.tables", TABLES_KEY);
+    Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY);
+  }
+
+  private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+
+  public WALPlayer(){
+  }
+
+  protected WALPlayer(final Configuration c) {
+    super(c);
+  }
+
+  /**
+   * A mapper that just writes out KeyValues.
+   * This one can be used together with {@link KeyValueSortReducer}
+   */
+  static class WALKeyValueMapper
+    extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> {
+    private byte[] table;
+
+    @Override
+    public void map(WALKey key, WALEdit value,
+      Context context)
+    throws IOException {
+      try {
+        // skip all other tables
+        if (Bytes.equals(table, key.getTablename().getName())) {
+          for (Cell cell : value.getCells()) {
+            KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+            if (WALEdit.isMetaEditFamily(kv)) {
+              continue;
+            }
+            context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), 
kv);
+          }
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      // only a single table is supported when HFiles are generated with 
HFileOutputFormat
+      String[] tables = context.getConfiguration().getStrings(TABLES_KEY);
+      if (tables == null || tables.length != 1) {
+        // this can only happen when WALMapper is used directly by a class 
other than WALPlayer
+        throw new IOException("Exactly one table must be specified for bulk 
HFile case.");
+      }
+      table = Bytes.toBytes(tables[0]);
+
+    }
+
+  }
+
+  /**
+   * A mapper that writes out {@link Mutation} to be directly applied to
+   * a running HBase instance.
+   */
+  protected static class WALMapper
+  extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
+    private Map<TableName, TableName> tables = new TreeMap<>();
+
+    @Override
+    public void map(WALKey key, WALEdit value, Context context)
+    throws IOException {
+      try {
+        if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
+          TableName targetTable = tables.isEmpty() ?
+                key.getTablename() :
+                tables.get(key.getTablename());
+          ImmutableBytesWritable tableOut = new 
ImmutableBytesWritable(targetTable.getName());
+          Put put = null;
+          Delete del = null;
+          Cell lastCell = null;
+          for (Cell cell : value.getCells()) {
+            // filtering WAL meta entries
+            if (WALEdit.isMetaEditFamily(cell)) {
+              continue;
+            }
+
+            // Allow a subclass filter out this cell.
+            if (filter(context, cell)) {
+              // A WALEdit may contain multiple operations (HBASE-3584) and/or
+              // multiple rows (HBASE-5229).
+              // Aggregate as much as possible into a single Put/Delete
+              // operation before writing to the context.
+              if (lastCell == null || lastCell.getTypeByte() != 
cell.getTypeByte()
+                  || !CellUtil.matchingRow(lastCell, cell)) {
+                // row or type changed, write out aggregate KVs.
+                if (put != null) {
+                  context.write(tableOut, put);
+                }
+                if (del != null) {
+                  context.write(tableOut, del);
+                }
+                if (CellUtil.isDelete(cell)) {
+                  del = new Delete(CellUtil.cloneRow(cell));
+                } else {
+                  put = new Put(CellUtil.cloneRow(cell));
+                }
+              }
+              if (CellUtil.isDelete(cell)) {
+                del.add(cell);
+              } else {
+                put.add(cell);
+              }
+            }
+            lastCell = cell;
+          }
+          // write residual KVs
+          if (put != null) {
+            context.write(tableOut, put);
+          }
+          if (del != null) {
+            context.write(tableOut, del);
+          }
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    protected boolean filter(Context context, final Cell cell) {
+      return true;
+    }
+
+    @Override
+    protected void
+        cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, 
Mutation>.Context context)
+            throws IOException, InterruptedException {
+      super.cleanup(context);
+    }
+
+    @Override
+    public void setup(Context context) throws IOException {
+      String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
+      String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
+      if (tableMap == null) {
+        tableMap = tablesToUse;
+      }
+      if (tablesToUse == null) {
+        // Then user wants all tables.
+      } else if (tablesToUse.length != tableMap.length) {
+        // this can only happen when WALMapper is used directly by a class 
other than WALPlayer
+        throw new IOException("Incorrect table mapping specified .");
+      }
+      int i = 0;
+      if (tablesToUse != null) {
+        for (String table : tablesToUse) {
+          tables.put(TableName.valueOf(table),
+            TableName.valueOf(tableMap[i++]));
+        }
+      }
+    }
+  }
+
+  void setupTime(Configuration conf, String option) throws IOException {
+    String val = conf.get(option);
+    if (null == val) {
+      return;
+    }
+    long ms;
+    try {
+      // first try to parse in user friendly form
+      ms = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime();
+    } catch (ParseException pe) {
+      try {
+        // then see if just a number of ms's was specified
+        ms = Long.parseLong(val);
+      } catch (NumberFormatException nfe) {
+        throw new IOException(option
+            + " must be specified either in the form 2001-02-20T16:35:06.99 "
+            + "or as number of milliseconds");
+      }
+    }
+    conf.setLong(option, ms);
+  }
+
+  /**
+   * Sets up the actual job.
+   *
+   * @param args  The command line parameters.
+   * @return The newly created job.
+   * @throws IOException When setting up the job fails.
+   */
+  public Job createSubmittableJob(String[] args) throws IOException {
+    Configuration conf = getConf();
+    setupTime(conf, WALInputFormat.START_TIME_KEY);
+    setupTime(conf, WALInputFormat.END_TIME_KEY);
+    String inputDirs = args[0];
+    String[] tables = args[1].split(",");
+    String[] tableMap;
+    if (args.length > 2) {
+      tableMap = args[2].split(",");
+      if (tableMap.length != tables.length) {
+        throw new IOException("The same number of tables and mapping must be 
provided.");
+      }
+    } else {
+      // if not mapping is specified map each table to itself
+      tableMap = tables;
+    }
+    conf.setStrings(TABLES_KEY, tables);
+    conf.setStrings(TABLE_MAP_KEY, tableMap);
+    conf.set(FileInputFormat.INPUT_DIR, inputDirs);
+    Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + 
System.currentTimeMillis()));
+    job.setJarByClass(WALPlayer.class);
+
+    job.setInputFormatClass(WALInputFormat.class);
+    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+
+    String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    if (hfileOutPath != null) {
+      LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
+
+      // the bulk HFile case
+      if (tables.length != 1) {
+        throw new IOException("Exactly one table must be specified for the 
bulk export option");
+      }
+      TableName tableName = TableName.valueOf(tables[0]);
+      job.setMapperClass(WALKeyValueMapper.class);
+      job.setReducerClass(KeyValueSortReducer.class);
+      Path outputDir = new Path(hfileOutPath);
+      FileOutputFormat.setOutputPath(job, outputDir);
+      job.setMapOutputValueClass(KeyValue.class);
+      try (Connection conn = ConnectionFactory.createConnection(conf);
+          Table table = conn.getTable(tableName);
+          RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
+        HFileOutputFormat2.configureIncrementalLoad(job, 
table.getDescriptor(), regionLocator);
+      }
+      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+          
org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class);
+    } else {
+      // output to live cluster
+      job.setMapperClass(WALMapper.class);
+      job.setOutputFormatClass(MultiTableOutputFormat.class);
+      TableMapReduceUtil.addDependencyJars(job);
+      TableMapReduceUtil.initCredentials(job);
+      // No reducers.
+      job.setNumReduceTasks(0);
+    }
+    String codecCls = WALCellCodec.getWALCellCodecClass(conf);
+    try {
+      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), 
Class.forName(codecCls));
+    } catch (Exception e) {
+      throw new IOException("Cannot determine wal codec class " + codecCls, e);
+    }
+    return job;
+  }
+
+
+  /**
+   * Print usage
+   * @param errorMsg Error message.  Can be null.
+   */
+  private void usage(final String errorMsg) {
+    if (errorMsg != null && errorMsg.length() > 0) {
+      System.err.println("ERROR: " + errorMsg);
+    }
+    System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> 
[<tableMappings>]");
+    System.err.println("Read all WAL entries for <tables>.");
+    System.err.println("If no tables (\"\") are specific, all tables are 
imported.");
+    System.err.println("(Careful, even hbase:meta entries will be imported"+
+      " in that case.)");
+    System.err.println("Otherwise <tables> is a comma separated list of 
tables.\n");
+    System.err.println("The WAL entries can be mapped to new set of tables via 
<tableMapping>.");
+    System.err.println("<tableMapping> is a command separated list of 
targettables.");
+    System.err.println("If specified, each table in <tables> must have a 
mapping.\n");
+    System.err.println("By default " + NAME + " will load data directly into 
HBase.");
+    System.err.println("To generate HFiles for a bulk data load instead, pass 
the option:");
+    System.err.println("  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output");
+    System.err.println("  (Only one table can be specified, and no mapping is 
allowed!)");
+    System.err.println("Other options: (specify time range to WAL edit to 
consider)");
+    System.err.println("  -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
+    System.err.println("  -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
+    System.err.println("   -D " + JOB_NAME_CONF_KEY
+      + "=jobName - use the specified mapreduce job name for the wal player");
+    System.err.println("For performance also consider the following options:\n"
+      + "  -Dmapreduce.map.speculative=false\n"
+      + "  -Dmapreduce.reduce.speculative=false");
+  }
+
+  /**
+   * Main entry point.
+   *
+   * @param args  The command line parameters.
+   * @throws Exception When running the job fails.
+   */
+  public static void main(String[] args) throws Exception {
+    int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args);
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage("Wrong number of arguments: " + args.length);
+      System.exit(-1);
+    }
+    Job job = createSubmittableJob(args);
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java
----------------------------------------------------------------------
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java
new file mode 100644
index 0000000..b1f15ba
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+Provides HBase <a 
href="http://wiki.apache.org/hadoop/HadoopMapReduce";>MapReduce</a>
+Input/OutputFormats, a table indexing MapReduce job, and utility methods.
+
+<p>See <a href="http://hbase.apache.org/book.html#mapreduce";>HBase and 
MapReduce</a>
+in the HBase Reference Guide for mapreduce over hbase documentation.
+*/
+package org.apache.hadoop.hbase.mapreduce;

Reply via email to