Repository: hbase
Updated Branches:
  refs/heads/master 7e4cd5982 -> 0f614a1c4


HBASE-12374 Change DBEs to work with new BB based cell.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0f614a1c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0f614a1c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0f614a1c

Branch: refs/heads/master
Commit: 0f614a1c44e1887ca7177b66bb6208b6e69db7e1
Parents: 7e4cd59
Author: anoopsjohn <[email protected]>
Authored: Mon Jul 20 23:28:45 2015 +0530
Committer: anoopsjohn <[email protected]>
Committed: Mon Jul 20 23:28:45 2015 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/CellUtil.java  |   6 +
 .../org/apache/hadoop/hbase/HConstants.java     |   3 +
 .../hadoop/hbase/OffheapKeyOnlyKeyValue.java    | 210 +++++++++
 .../hadoop/hbase/io/ByteBufferOutputStream.java |   5 +
 .../hadoop/hbase/io/TagCompressionContext.java  |   3 +-
 .../io/encoding/BufferedDataBlockEncoder.java   | 464 +++++++++++++------
 .../io/encoding/CopyKeyDataBlockEncoder.java    |  11 +-
 .../hbase/io/encoding/DataBlockEncoder.java     |   9 +-
 .../hbase/io/encoding/DiffKeyDeltaEncoder.java  |  25 +-
 .../hbase/io/encoding/FastDiffDeltaEncoder.java |  24 +-
 .../io/encoding/PrefixKeyDeltaEncoder.java      |  16 +-
 .../hadoop/hbase/io/util/StreamUtils.java       |   4 +-
 .../org/apache/hadoop/hbase/nio/ByteBuff.java   |  20 +
 .../hbase/io/TestTagCompressionContext.java     |   5 +-
 .../codec/prefixtree/PrefixTreeSeeker.java      |   7 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java  |   5 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |  14 +-
 .../encoding/TestBufferedDataBlockEncoder.java  |   5 +-
 .../io/encoding/TestDataBlockEncoders.java      |  84 ++--
 .../io/encoding/TestPrefixTreeEncoding.java     |  23 +-
 .../encoding/TestSeekToBlockWithEncoders.java   |  24 +-
 21 files changed, 716 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 4d0d3b3..89b4eb1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -100,6 +100,12 @@ public final class CellUtil {
     return output;
   }
 
+  public static byte[] cloneTags(Cell cell) {
+    byte[] output = new byte[cell.getTagsLength()];
+    copyTagTo(cell, output, 0);
+    return output;
+  }
+
   /**
    * Returns tag value in a new byte array. If server-side, use
    * {@link Tag#getBuffer()} with appropriate {@link Tag#getTagOffset()} and

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 704b90f..0fddd3c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase;
 
 import static org.apache.hadoop.hbase.io.hfile.BlockType.MAGIC_LENGTH;
 
+import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.Collections;
@@ -495,6 +496,8 @@ public final class HConstants {
    */
   public static final byte [] EMPTY_BYTE_ARRAY = new byte [0];
 
+  public static final ByteBuffer EMPTY_BYTE_BUFFER = 
ByteBuffer.wrap(EMPTY_BYTE_ARRAY);
+
   /**
    * Used by scanners, etc when they want to start at the beginning of a region
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyOnlyKeyValue.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyOnlyKeyValue.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyOnlyKeyValue.java
new file mode 100644
index 0000000..477d16e
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyOnlyKeyValue.java
@@ -0,0 +1,210 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This is a key only Cell implementation which is identical to {@link 
KeyValue.KeyOnlyKeyValue}
+ * with respect to key serialization but have its data in off heap memory.
+ */
[email protected]
+public class OffheapKeyOnlyKeyValue extends ByteBufferedCell {
+
+  private ByteBuffer buf;
+  private int offset = 0; // offset into buffer where key starts at
+  private int length = 0; // length of this.
+  private short rowLen;
+
+  public OffheapKeyOnlyKeyValue(ByteBuffer buf, int offset, int length) {
+    assert buf.isDirect();
+    this.buf = buf;
+    this.offset = offset;
+    this.length = length;
+    this.rowLen = ByteBufferUtils.toShort(this.buf, this.offset);
+  }
+
+  @Override
+  public byte[] getRowArray() {
+    return CellUtil.cloneRow(this);
+  }
+
+  @Override
+  public int getRowOffset() {
+    return 0;
+  }
+
+  @Override
+  public short getRowLength() {
+    return this.rowLen;
+  }
+
+  @Override
+  public byte[] getFamilyArray() {
+    return CellUtil.cloneFamily(this);
+  }
+
+  @Override
+  public int getFamilyOffset() {
+    return 0;
+  }
+
+  @Override
+  public byte getFamilyLength() {
+    return getFamilyLength(getFamilyLengthPosition());
+  }
+
+  private byte getFamilyLength(int famLenPos) {
+    return ByteBufferUtils.toByte(this.buf, famLenPos);
+  }
+
+  @Override
+  public byte[] getQualifierArray() {
+    return CellUtil.cloneQualifier(this);
+  }
+
+  @Override
+  public int getQualifierOffset() {
+    return 0;
+  }
+
+  @Override
+  public int getQualifierLength() {
+    return getQualifierLength(getRowLength(), getFamilyLength());
+  }
+
+  private int getQualifierLength(int rlength, int flength) {
+    return this.length - (int) KeyValue.getKeyDataStructureSize(rlength, 
flength, 0);
+  }
+
+  @Override
+  public long getTimestamp() {
+    return ByteBufferUtils.toLong(this.buf, getTimestampOffset());
+  }
+
+  private int getTimestampOffset() {
+    return this.offset + this.length - KeyValue.TIMESTAMP_TYPE_SIZE;
+  }
+
+  @Override
+  public byte getTypeByte() {
+    return ByteBufferUtils.toByte(this.buf, this.offset + this.length - 1);
+  }
+
+  @Override
+  public long getSequenceId() {
+    return 0;
+  }
+
+  @Override
+  public byte[] getValueArray() {
+    throw new IllegalArgumentException("This is a key only Cell");
+  }
+
+  @Override
+  public int getValueOffset() {
+    return 0;
+  }
+
+  @Override
+  public int getValueLength() {
+    return 0;
+  }
+
+  @Override
+  public byte[] getTagsArray() {
+    throw new IllegalArgumentException("This is a key only Cell");
+  }
+
+  @Override
+  public int getTagsOffset() {
+    return 0;
+  }
+
+  @Override
+  public int getTagsLength() {
+    return 0;
+  }
+
+  @Override
+  public ByteBuffer getRowByteBuffer() {
+    return this.buf;
+  }
+
+  @Override
+  public int getRowPositionInByteBuffer() {
+    return this.offset + Bytes.SIZEOF_SHORT;
+  }
+
+  @Override
+  public ByteBuffer getFamilyByteBuffer() {
+    return this.buf;
+  }
+
+  @Override
+  public int getFamilyPositionInByteBuffer() {
+    return getFamilyLengthPosition() + Bytes.SIZEOF_BYTE;
+  }
+
+  // The position in BB where the family length is added.
+  private int getFamilyLengthPosition() {
+    return this.offset + Bytes.SIZEOF_SHORT + getRowLength();
+  }
+
+  @Override
+  public ByteBuffer getQualifierByteBuffer() {
+    return this.buf;
+  }
+
+  @Override
+  public int getQualifierPositionInByteBuffer() {
+    int famLenPos = getFamilyLengthPosition();
+    return famLenPos + Bytes.SIZEOF_BYTE + getFamilyLength(famLenPos);
+  }
+
+  @Override
+  public ByteBuffer getValueByteBuffer() {
+    throw new IllegalArgumentException("This is a key only Cell");
+  }
+
+  @Override
+  public int getValuePositionInByteBuffer() {
+    return 0;
+  }
+
+  @Override
+  public ByteBuffer getTagsByteBuffer() {
+    throw new IllegalArgumentException("This is a key only Cell");
+  }
+
+  @Override
+  public int getTagsPositionInByteBuffer() {
+    return 0;
+  }
+
+  @Override
+  public String toString() {
+    return CellUtil.toString(this, false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
index 1a18b2d..1b2ab5d 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferOutputStream.java
@@ -127,6 +127,11 @@ public class ByteBufferOutputStream extends OutputStream {
     ByteBufferUtils.copyFromArrayToBuffer(buf, b, off, len);
   }
 
+  public void write(ByteBuffer b, int off, int len) throws IOException {
+    checkSizeAndGrow(len);
+    ByteBufferUtils.copyFromBufferToBuffer(b, buf, off, len);
+  }
+
   /**
    * Writes an <code>int</code> to the underlying output stream as four
    * bytes, high byte first.

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
index 26e7b50..05c4ad1 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.util.Dictionary;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IOUtils;
@@ -133,7 +134,7 @@ public class TagCompressionContext {
    * @return bytes count read from source to uncompress all tags.
    * @throws IOException
    */
-  public int uncompressTags(ByteBuffer src, byte[] dest, int offset, int 
length)
+  public int uncompressTags(ByteBuff src, byte[] dest, int offset, int length)
       throws IOException {
     int srcBeginPos = src.position();
     int endOffset = offset + length;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index 966c59b..1a65223 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -22,10 +22,13 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.hbase.ByteBufferedCell;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.OffheapKeyOnlyKeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.SettableSequenceId;
@@ -38,9 +41,11 @@ import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.util.LRUDictionary;
 import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 
 /**
@@ -102,8 +107,8 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
             - qualCommonPrefix);
   }
 
-  protected static class SeekerState implements Cell {
-    protected ByteBuffer currentBuffer;
+  protected static class SeekerState {
+    protected ByteBuff currentBuffer;
     protected TagCompressionContext tagCompressionContext;
     protected int valueOffset = -1;
     protected int keyLength;
@@ -121,6 +126,15 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
     protected long memstoreTS;
     protected int nextKvOffset;
     protected KeyValue.KeyOnlyKeyValue currentKey = new 
KeyValue.KeyOnlyKeyValue();
+    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer 
calls. This avoids too
+    // many object creations.
+    private final Pair<ByteBuffer, Integer> tmpPair;
+    private final boolean includeTags;
+
+    public SeekerState(Pair<ByteBuffer, Integer> tmpPair, boolean includeTags) 
{
+      this.tmpPair = tmpPair;
+      this.includeTags = includeTags;
+    }
 
     protected boolean isValid() {
       return valueOffset != -1;
@@ -200,74 +214,190 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
       }
     }
 
+    public Cell toCell() {
+      // Buffer backing the value and tags part from the HFileBlock's buffer
+      // When tag compression in use, this will be only the value bytes area.
+      ByteBuffer valAndTagsBuffer;
+      int vOffset;
+      int valAndTagsLength = this.valueLength;
+      int tagsLenSerializationSize = 0;
+      if (this.includeTags && this.tagCompressionContext == null) {
+        // Include the tags part also. This will be the tags bytes + 2 bytes 
of for storing tags
+        // length
+        tagsLenSerializationSize = this.tagsOffset - (this.valueOffset + 
this.valueLength);
+        valAndTagsLength += tagsLenSerializationSize + this.tagsLength;
+      }
+      this.currentBuffer.asSubByteBuffer(this.valueOffset, valAndTagsLength, 
this.tmpPair);
+      valAndTagsBuffer = this.tmpPair.getFirst();
+      vOffset = this.tmpPair.getSecond();// This is the offset to value part 
in the BB
+      if (valAndTagsBuffer.hasArray()) {
+        return toOnheapCell(valAndTagsBuffer, vOffset, 
tagsLenSerializationSize);
+      } else {
+        return toOffheapCell(valAndTagsBuffer, vOffset, 
tagsLenSerializationSize);
+      }
+    }
+
+    private Cell toOnheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
+        int tagsLenSerializationSize) {
+      byte[] tagsArray = HConstants.EMPTY_BYTE_ARRAY;
+      int tOffset = 0;
+      if (this.includeTags) {
+        if (this.tagCompressionContext == null) {
+          tagsArray = valAndTagsBuffer.array();
+          tOffset = valAndTagsBuffer.arrayOffset() + vOffset + this.valueLength
+              + tagsLenSerializationSize;
+        } else {
+          tagsArray = Bytes.copy(tagsBuffer, 0, this.tagsLength);
+          tOffset = 0;
+        }
+      }
+      return new OnheapDecodedCell(Bytes.copy(keyBuffer, 0, this.keyLength),
+          currentKey.getRowLength(), currentKey.getFamilyOffset(), 
currentKey.getFamilyLength(),
+          currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
+          currentKey.getTimestamp(), currentKey.getTypeByte(), 
valAndTagsBuffer.array(),
+          valAndTagsBuffer.arrayOffset() + vOffset, this.valueLength, 
memstoreTS, tagsArray,
+          tOffset, this.tagsLength);
+    }
+
+    private Cell toOffheapCell(ByteBuffer valAndTagsBuffer, int vOffset,
+        int tagsLenSerializationSize) {
+      ByteBuffer tagsBuf =  HConstants.EMPTY_BYTE_BUFFER;
+      int tOffset = 0;
+      if (this.includeTags) {
+        if (this.tagCompressionContext == null) {
+          tagsBuf = valAndTagsBuffer;
+          tOffset = vOffset + this.valueLength + tagsLenSerializationSize;
+        } else {
+          tagsBuf = ByteBuffer.wrap(Bytes.copy(tagsBuffer, 0, 
this.tagsLength));
+          tOffset = 0;
+        }
+      }
+      return new OffheapDecodedCell(ByteBuffer.wrap(Bytes.copy(keyBuffer, 0, 
this.keyLength)),
+          currentKey.getRowLength(), currentKey.getFamilyOffset(), 
currentKey.getFamilyLength(),
+          currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
+          currentKey.getTimestamp(), currentKey.getTypeByte(), 
valAndTagsBuffer, vOffset,
+          this.valueLength, memstoreTS, tagsBuf, tOffset, this.tagsLength);
+    }
+  }
+
+  /**
+   * Copies only the key part of the keybuffer by doing a deep copy and passes 
the
+   * seeker state members for taking a clone.
+   * Note that the value byte[] part is still pointing to the currentBuffer 
and the
+   * represented by the valueOffset and valueLength
+   */
+  // We return this as a Cell to the upper layers of read flow and might try 
setting a new SeqId
+  // there. So this has to be an instance of SettableSequenceId.
+  protected static class OnheapDecodedCell implements Cell, HeapSize, 
SettableSequenceId,
+      Streamable {
+    private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+        + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * 
Bytes.SIZEOF_INT)
+        + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * 
ClassSize.ARRAY));
+    private byte[] keyOnlyBuffer;
+    private short rowLength;
+    private int familyOffset;
+    private byte familyLength;
+    private int qualifierOffset;
+    private int qualifierLength;
+    private long timestamp;
+    private byte typeByte;
+    private byte[] valueBuffer;
+    private int valueOffset;
+    private int valueLength;
+    private byte[] tagsBuffer;
+    private int tagsOffset;
+    private int tagsLength;
+    private long seqId;
+
+    protected OnheapDecodedCell(byte[] keyBuffer, short rowLength, int 
familyOffset,
+        byte familyLength, int qualOffset, int qualLength, long timeStamp, 
byte typeByte,
+        byte[] valueBuffer, int valueOffset, int valueLen, long seqId, byte[] 
tagsBuffer,
+        int tagsOffset, int tagsLength) {
+      this.keyOnlyBuffer = keyBuffer;
+      this.rowLength = rowLength;
+      this.familyOffset = familyOffset;
+      this.familyLength = familyLength;
+      this.qualifierOffset = qualOffset;
+      this.qualifierLength = qualLength;
+      this.timestamp = timeStamp;
+      this.typeByte = typeByte;
+      this.valueBuffer = valueBuffer;
+      this.valueOffset = valueOffset;
+      this.valueLength = valueLen;
+      this.tagsBuffer = tagsBuffer;
+      this.tagsOffset = tagsOffset;
+      this.tagsLength = tagsLength;
+      setSequenceId(seqId);
+    }
+
     @Override
     public byte[] getRowArray() {
-      return currentKey.getRowArray();
+      return keyOnlyBuffer;
     }
 
     @Override
-    public int getRowOffset() {
-      return Bytes.SIZEOF_SHORT;
+    public byte[] getFamilyArray() {
+      return keyOnlyBuffer;
     }
 
     @Override
-    public short getRowLength() {
-      return currentKey.getRowLength();
+    public byte[] getQualifierArray() {
+      return keyOnlyBuffer;
     }
 
     @Override
-    public byte[] getFamilyArray() {
-      return currentKey.getFamilyArray();
+    public int getRowOffset() {
+      return Bytes.SIZEOF_SHORT;
     }
 
     @Override
-    public int getFamilyOffset() {
-      return currentKey.getFamilyOffset();
+    public short getRowLength() {
+      return rowLength;
     }
 
     @Override
-    public byte getFamilyLength() {
-      return currentKey.getFamilyLength();
+    public int getFamilyOffset() {
+      return familyOffset;
     }
 
     @Override
-    public byte[] getQualifierArray() {
-      return currentKey.getQualifierArray();
+    public byte getFamilyLength() {
+      return familyLength;
     }
 
     @Override
     public int getQualifierOffset() {
-      return currentKey.getQualifierOffset();
+      return qualifierOffset;
     }
 
     @Override
     public int getQualifierLength() {
-      return currentKey.getQualifierLength();
+      return qualifierLength;
     }
 
     @Override
     public long getTimestamp() {
-      return currentKey.getTimestamp();
+      return timestamp;
     }
 
     @Override
     public byte getTypeByte() {
-      return currentKey.getTypeByte();
+      return typeByte;
     }
 
     @Override
     public long getSequenceId() {
-      return memstoreTS;
+      return seqId;
     }
 
     @Override
     public byte[] getValueArray() {
-      return currentBuffer.array();
+      return this.valueBuffer;
     }
 
     @Override
     public int getValueOffset() {
-      return currentBuffer.arrayOffset() + valueOffset;
+      return valueOffset;
     }
 
     @Override
@@ -277,18 +407,12 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
 
     @Override
     public byte[] getTagsArray() {
-      if (tagCompressionContext != null) {
-        return tagsBuffer;
-      }
-      return currentBuffer.array();
+      return this.tagsBuffer;
     }
 
     @Override
     public int getTagsOffset() {
-      if (tagCompressionContext != null) {
-        return 0;
-      }
-      return currentBuffer.arrayOffset() + tagsOffset;
+      return this.tagsOffset;
     }
 
     @Override
@@ -298,36 +422,54 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
 
     @Override
     public String toString() {
-      return KeyValue.keyToString(this.keyBuffer, 0, 
KeyValueUtil.keyLength(this)) + "/vlen="
-          + getValueLength() + "/seqid=" + memstoreTS;
+      return KeyValue.keyToString(this.keyOnlyBuffer, 0, 
KeyValueUtil.keyLength(this)) + "/vlen="
+          + getValueLength() + "/seqid=" + seqId;
     }
 
-    public Cell shallowCopy() {
-      return new ClonedSeekerState(currentBuffer, keyBuffer, 
currentKey.getRowLength(),
-          currentKey.getFamilyOffset(), currentKey.getFamilyLength(), 
keyLength,
-          currentKey.getQualifierOffset(), currentKey.getQualifierLength(),
-          currentKey.getTimestamp(), currentKey.getTypeByte(), valueLength, 
valueOffset,
-          memstoreTS, tagsOffset, tagsLength, tagCompressionContext, 
tagsBuffer);
+    @Override
+    public void setSequenceId(long seqId) {
+      this.seqId = seqId;
+    }
+
+    @Override
+    public long heapSize() {
+      return FIXED_OVERHEAD + rowLength + familyLength + qualifierLength + 
valueLength + tagsLength;
+    }
+
+    @Override
+    public int write(OutputStream out) throws IOException {
+      return write(out, true);
+    }
+
+    @Override
+    public int write(OutputStream out, boolean withTags) throws IOException {
+      int lenToWrite = KeyValueUtil.length(rowLength, familyLength, 
qualifierLength, valueLength,
+          tagsLength, withTags);
+      writeInt(out, lenToWrite);
+      writeInt(out, keyOnlyBuffer.length);
+      writeInt(out, valueLength);
+      // Write key
+      out.write(keyOnlyBuffer);
+      // Write value
+      out.write(this.valueBuffer, this.valueOffset, this.valueLength);
+      if (withTags) {
+        // 2 bytes tags length followed by tags bytes
+        // tags length is serialized with 2 bytes only(short way) even if the 
type is int.
+        // As this is non -ve numbers, we save the sign bit. See HBASE-11437
+        out.write((byte) (0xff & (this.tagsLength >> 8)));
+        out.write((byte) (0xff & this.tagsLength));
+        out.write(this.tagsBuffer, this.tagsOffset, this.tagsLength);
+      }
+      return lenToWrite + Bytes.SIZEOF_INT;
     }
   }
 
-  /**
-   * Copies only the key part of the keybuffer by doing a deep copy and passes 
the
-   * seeker state members for taking a clone.
-   * Note that the value byte[] part is still pointing to the currentBuffer 
and the
-   * represented by the valueOffset and valueLength
-   */
-  // We return this as a Cell to the upper layers of read flow and might try 
setting a new SeqId
-  // there. So this has to be an instance of SettableSequenceId. SeekerState 
need not be
-  // SettableSequenceId as we never return that to top layers. When we have 
to, we make
-  // ClonedSeekerState from it.
-  protected static class ClonedSeekerState implements Cell, HeapSize, 
SettableSequenceId,
-      Streamable {
+  protected static class OffheapDecodedCell extends ByteBufferedCell 
implements HeapSize,
+      SettableSequenceId, Streamable {
     private static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
-        + (4 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * 
Bytes.SIZEOF_INT)
-        + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (2 * 
ClassSize.ARRAY));
-    private byte[] keyOnlyBuffer;
-    private ByteBuffer currentBuffer;
+        + (3 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG) + (7 * 
Bytes.SIZEOF_INT)
+        + (Bytes.SIZEOF_SHORT) + (2 * Bytes.SIZEOF_BYTE) + (3 * 
ClassSize.BYTE_BUFFER));
+    private ByteBuffer keyBuffer;
     private short rowLength;
     private int familyOffset;
     private byte familyLength;
@@ -335,22 +477,22 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
     private int qualifierLength;
     private long timestamp;
     private byte typeByte;
+    private ByteBuffer valueBuffer;
     private int valueOffset;
     private int valueLength;
-    private int tagsLength;
+    private ByteBuffer tagsBuffer;
     private int tagsOffset;
-    private byte[] cloneTagsBuffer;
+    private int tagsLength;
     private long seqId;
-    private TagCompressionContext tagCompressionContext;
-
-    protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, 
short rowLength,
-        int familyOffset, byte familyLength, int keyLength, int qualOffset, 
int qualLength,
-        long timeStamp, byte typeByte, int valueLen, int valueOffset, long 
seqId,
-        int tagsOffset, int tagsLength, TagCompressionContext 
tagCompressionContext,
-        byte[] tagsBuffer) {
-      this.currentBuffer = currentBuffer;
-      keyOnlyBuffer = new byte[keyLength];
-      this.tagCompressionContext = tagCompressionContext;
+
+    protected OffheapDecodedCell(ByteBuffer keyBuffer, short rowLength, int 
familyOffset,
+        byte familyLength, int qualOffset, int qualLength, long timeStamp, 
byte typeByte,
+        ByteBuffer valueBuffer, int valueOffset, int valueLen, long seqId, 
ByteBuffer tagsBuffer,
+        int tagsOffset, int tagsLength) {
+      // The keyBuffer is always onheap
+      assert keyBuffer.hasArray();
+      assert keyBuffer.arrayOffset() == 0;
+      this.keyBuffer = keyBuffer;
       this.rowLength = rowLength;
       this.familyOffset = familyOffset;
       this.familyLength = familyLength;
@@ -358,123 +500,153 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
       this.qualifierLength = qualLength;
       this.timestamp = timeStamp;
       this.typeByte = typeByte;
-      this.valueLength = valueLen;
+      this.valueBuffer = valueBuffer;
       this.valueOffset = valueOffset;
+      this.valueLength = valueLen;
+      this.tagsBuffer = tagsBuffer;
       this.tagsOffset = tagsOffset;
       this.tagsLength = tagsLength;
-      System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
-      if (tagCompressionContext != null) {
-        this.cloneTagsBuffer = new byte[tagsLength];
-        System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
-      }
       setSequenceId(seqId);
     }
 
     @Override
     public byte[] getRowArray() {
-      return keyOnlyBuffer;
+      return this.keyBuffer.array();
     }
 
     @Override
-    public byte[] getFamilyArray() {
-      return keyOnlyBuffer;
+    public int getRowOffset() {
+      return getRowPositionInByteBuffer();
     }
 
     @Override
-    public byte[] getQualifierArray() {
-      return keyOnlyBuffer;
+    public short getRowLength() {
+      return this.rowLength;
     }
 
     @Override
-    public int getRowOffset() {
-      return Bytes.SIZEOF_SHORT;
+    public byte[] getFamilyArray() {
+      return this.keyBuffer.array();
     }
 
     @Override
-    public short getRowLength() {
-      return rowLength;
+    public int getFamilyOffset() {
+      return getFamilyPositionInByteBuffer();
     }
 
     @Override
-    public int getFamilyOffset() {
-      return familyOffset;
+    public byte getFamilyLength() {
+      return this.familyLength;
     }
 
     @Override
-    public byte getFamilyLength() {
-      return familyLength;
+    public byte[] getQualifierArray() {
+      return this.keyBuffer.array();
     }
 
     @Override
     public int getQualifierOffset() {
-      return qualifierOffset;
+      return getQualifierPositionInByteBuffer();
     }
 
     @Override
     public int getQualifierLength() {
-      return qualifierLength;
+      return this.qualifierLength;
     }
 
     @Override
     public long getTimestamp() {
-      return timestamp;
+      return this.timestamp;
     }
 
     @Override
     public byte getTypeByte() {
-      return typeByte;
+      return this.typeByte;
     }
 
     @Override
     public long getSequenceId() {
-      return seqId;
+      return this.seqId;
     }
 
     @Override
     public byte[] getValueArray() {
-      return currentBuffer.array();
+      return CellUtil.cloneValue(this);
     }
 
     @Override
     public int getValueOffset() {
-      return currentBuffer.arrayOffset() + valueOffset;
+      return 0;
     }
 
     @Override
     public int getValueLength() {
-      return valueLength;
+      return this.valueLength;
     }
 
     @Override
     public byte[] getTagsArray() {
-      if (tagCompressionContext != null) {
-        return cloneTagsBuffer;
-      }
-      return currentBuffer.array();
+      return CellUtil.cloneTags(this);
     }
 
     @Override
     public int getTagsOffset() {
-      if (tagCompressionContext != null) {
-        return 0;
-      }
-      return currentBuffer.arrayOffset() + tagsOffset;
+      return 0;
     }
 
     @Override
     public int getTagsLength() {
-      return tagsLength;
+      return this.tagsLength;
     }
 
     @Override
-    public String toString() {
-      return KeyValue.keyToString(this.keyOnlyBuffer, 0, 
KeyValueUtil.keyLength(this)) + "/vlen="
-          + getValueLength() + "/seqid=" + seqId;
+    public ByteBuffer getRowByteBuffer() {
+      return this.keyBuffer;
     }
 
     @Override
-    public void setSequenceId(long seqId) {
-      this.seqId = seqId;
+    public int getRowPositionInByteBuffer() {
+      return Bytes.SIZEOF_SHORT;
+    }
+
+    @Override
+    public ByteBuffer getFamilyByteBuffer() {
+      return this.keyBuffer;
+    }
+
+    @Override
+    public int getFamilyPositionInByteBuffer() {
+      return this.familyOffset;
+    }
+
+    @Override
+    public ByteBuffer getQualifierByteBuffer() {
+      return this.keyBuffer;
+    }
+
+    @Override
+    public int getQualifierPositionInByteBuffer() {
+      return this.qualifierOffset;
+    }
+
+    @Override
+    public ByteBuffer getValueByteBuffer() {
+      return this.valueBuffer;
+    }
+
+    @Override
+    public int getValuePositionInByteBuffer() {
+      return this.valueOffset;
+    }
+
+    @Override
+    public ByteBuffer getTagsByteBuffer() {
+      return this.tagsBuffer;
+    }
+
+    @Override
+    public int getTagsPositionInByteBuffer() {
+      return this.tagsOffset;
     }
 
     @Override
@@ -483,6 +655,11 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
     }
 
     @Override
+    public void setSequenceId(long seqId) {
+      this.seqId = seqId;
+    }
+
+    @Override
     public int write(OutputStream out) throws IOException {
       return write(out, true);
     }
@@ -492,26 +669,19 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
       int lenToWrite = KeyValueUtil.length(rowLength, familyLength, 
qualifierLength, valueLength,
           tagsLength, withTags);
       writeInt(out, lenToWrite);
-      writeInt(out, keyOnlyBuffer.length);
+      writeInt(out, keyBuffer.capacity());
       writeInt(out, valueLength);
       // Write key
-      out.write(keyOnlyBuffer);
+      out.write(keyBuffer.array());
       // Write value
-      assert this.currentBuffer.hasArray();
-      out.write(this.currentBuffer.array(), this.currentBuffer.arrayOffset() + 
this.valueOffset,
-          this.valueLength);
+      writeByteBuffer(out, this.valueBuffer, this.valueOffset, 
this.valueLength);
       if (withTags) {
         // 2 bytes tags length followed by tags bytes
         // tags length is serialized with 2 bytes only(short way) even if the 
type is int.
         // As this is non -ve numbers, we save the sign bit. See HBASE-11437
         out.write((byte) (0xff & (this.tagsLength >> 8)));
         out.write((byte) (0xff & this.tagsLength));
-        if (this.tagCompressionContext != null) {
-          out.write(cloneTagsBuffer);
-        } else {
-          out.write(this.currentBuffer.array(), 
this.currentBuffer.arrayOffset() + this.tagsOffset,
-              this.tagsLength);
-        }
+        writeByteBuffer(out, this.tagsBuffer, this.tagsOffset, 
this.tagsLength);
       }
       return lenToWrite + Bytes.SIZEOF_INT;
     }
@@ -527,16 +697,30 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
     }
   }
 
+  private static void writeByteBuffer(OutputStream out, ByteBuffer b, int 
offset, int length)
+      throws IOException {
+    // We have write which takes ByteBuffer in ByteBufferOutputStream so that 
it can directly write
+    // bytes from the src ByteBuffer to the destination ByteBuffer. This avoid 
need for temp array
+    // creation and copy
+    if (out instanceof ByteBufferOutputStream) {
+      ((ByteBufferOutputStream) out).write(b, offset, length);
+    } else {
+      ByteBufferUtils.copyBufferToStream(out, b, offset, length);
+    }
+  }
+
   protected abstract static class
       BufferedEncodedSeeker<STATE extends SeekerState>
       implements EncodedSeeker {
     protected HFileBlockDecodingContext decodingCtx;
     protected final CellComparator comparator;
-    protected ByteBuffer currentBuffer;
-    protected STATE current = createSeekerState(); // always valid
-    protected STATE previous = createSeekerState(); // may not be valid
+    protected ByteBuff currentBuffer;
     protected TagCompressionContext tagCompressionContext = null;
     protected  KeyValue.KeyOnlyKeyValue keyOnlyKV = new 
KeyValue.KeyOnlyKeyValue();
+    // A temp pair object which will be reused by ByteBuff#asSubByteBuffer 
calls. This avoids too
+    // many object creations.
+    protected final Pair<ByteBuffer, Integer> tmpPair = new Pair<ByteBuffer, 
Integer>();
+    protected STATE current, previous;
 
     public BufferedEncodedSeeker(CellComparator comparator,
         HFileBlockDecodingContext decodingCtx) {
@@ -549,6 +733,8 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
           throw new RuntimeException("Failed to initialize 
TagCompressionContext", e);
         }
       }
+      current = createSeekerState(); // always valid
+      previous = createSeekerState(); // may not be valid
     }
 
     protected boolean includesMvcc() {
@@ -566,7 +752,7 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
     }
 
     @Override
-    public void setCurrentBuffer(ByteBuffer buffer) {
+    public void setCurrentBuffer(ByteBuff buffer) {
       if (this.tagCompressionContext != null) {
         this.tagCompressionContext.clear();
       }
@@ -589,9 +775,10 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
 
     @Override
     public ByteBuffer getValueShallowCopy() {
-      ByteBuffer dup = currentBuffer.duplicate();
-      dup.position(current.valueOffset);
-      dup.limit(current.valueOffset + current.valueLength);
+      currentBuffer.asSubByteBuffer(current.valueOffset, current.valueLength, 
tmpPair);
+      ByteBuffer dup = tmpPair.getFirst().duplicate();
+      dup.position(tmpPair.getSecond());
+      dup.limit(tmpPair.getSecond() + current.valueLength);
       return dup.slice();
     }
 
@@ -601,8 +788,7 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
       kvBuffer.putInt(current.keyLength);
       kvBuffer.putInt(current.valueLength);
       kvBuffer.put(current.keyBuffer, 0, current.keyLength);
-      ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, 
current.valueOffset,
-          current.valueLength);
+      currentBuffer.get(kvBuffer, current.valueOffset, current.valueLength);
       if (current.tagsLength > 0) {
         // Put short as unsigned
         kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
@@ -610,8 +796,7 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
         if (current.tagsOffset != -1) {
           // the offset of the tags bytes in the underlying buffer is marked. 
So the temp
           // buffer,tagsBuffer was not been used.
-          ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, 
current.tagsOffset,
-              current.tagsLength);
+          currentBuffer.get(kvBuffer, current.tagsOffset, current.tagsLength);
         } else {
           // When tagsOffset is marked as -1, tag compression was present and 
so the tags were
           // uncompressed into temp buffer, tagsBuffer. Let us copy it from 
there
@@ -630,8 +815,8 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
     }
 
     @Override
-    public Cell getKeyValue() {
-      return current.shallowCopy();
+    public Cell getCell() {
+      return current.toCell();
     }
 
     @Override
@@ -657,7 +842,7 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
     }
 
     protected void decodeTags() {
-      current.tagsLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+      current.tagsLength = ByteBuff.readCompressedInt(currentBuffer);
       if (tagCompressionContext != null) {
         if (current.uncompressTags) {
           // Tag compression is been used. uncompress it into tagsBuffer
@@ -669,7 +854,7 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
             throw new RuntimeException("Exception while uncompressing tags", 
e);
           }
         } else {
-          ByteBufferUtils.skip(currentBuffer, current.tagsCompressedLength);
+          currentBuffer.skip(current.tagsCompressedLength);
           current.uncompressTags = true;// Reset this.
         }
         current.tagsOffset = -1;
@@ -677,7 +862,7 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
         // When tag compress is not used, let us not do copying of tags bytes 
into tagsBuffer.
         // Just mark the tags Offset so as to create the KV buffer later in 
getKeyValueBuffer()
         current.tagsOffset = currentBuffer.position();
-        ByteBufferUtils.skip(currentBuffer, current.tagsLength);
+        currentBuffer.skip(current.tagsLength);
       }
     }
 
@@ -844,7 +1029,7 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
     protected STATE createSeekerState() {
       // This will fail for non-default seeker state if the subclass does not
       // override this method.
-      return (STATE) new SeekerState();
+      return (STATE) new SeekerState(this.tmpPair, this.includesTags());
     }
 
     abstract protected void decodeFirst();
@@ -1017,4 +1202,13 @@ abstract class BufferedDataBlockEncoder implements 
DataBlockEncoder {
       encodingCtx.postEncoding(BlockType.DATA);
     }
   }
+
+  protected Cell createFirstKeyCell(ByteBuffer key, int keyLength) {
+    if (key.hasArray()) {
+      return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + 
key.position(),
+          keyLength);
+    } else {
+      return new OffheapKeyOnlyKeyValue(key, key.position(), keyLength);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
index 662be29..9310f32 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java
@@ -71,8 +71,7 @@ public class CopyKeyDataBlockEncoder extends 
BufferedDataBlockEncoder {
     int keyLength = block.getIntStrictlyForward(Bytes.SIZEOF_INT);
     int pos = 3 * Bytes.SIZEOF_INT;
     ByteBuffer key = block.asSubByteBuffer(pos + keyLength).duplicate();
-    // TODO : to be changed here for BBCell
-    return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + pos, 
keyLength);
+    return createFirstKeyCell(key, keyLength);
   }
 
   @Override
@@ -91,14 +90,14 @@ public class CopyKeyDataBlockEncoder extends 
BufferedDataBlockEncoder {
         current.ensureSpaceForKey();
         currentBuffer.get(current.keyBuffer, 0, current.keyLength);
         current.valueOffset = currentBuffer.position();
-        ByteBufferUtils.skip(currentBuffer, current.valueLength);
+        currentBuffer.skip(current.valueLength);
         if (includesTags()) {
           // Read short as unsigned, high byte first
           current.tagsLength = ((currentBuffer.get() & 0xff) << 8) ^ 
(currentBuffer.get() & 0xff);
-          ByteBufferUtils.skip(currentBuffer, current.tagsLength);
+          currentBuffer.skip(current.tagsLength);
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
@@ -107,7 +106,7 @@ public class CopyKeyDataBlockEncoder extends 
BufferedDataBlockEncoder {
 
       @Override
       protected void decodeFirst() {
-        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+        currentBuffer.skip(Bytes.SIZEOF_INT);
         current.lastCommonPrefix = 0;
         decodeNext();
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
index ce71308..ed4528c 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java
@@ -131,14 +131,14 @@ public interface DataBlockEncoder {
    * An interface which enable to seek while underlying data is encoded.
    *
    * It works on one HFileBlock, but it is reusable. See
-   * {@link #setCurrentBuffer(ByteBuffer)}.
+   * {@link #setCurrentBuffer(ByteBuff)}.
    */
   interface EncodedSeeker {
     /**
      * Set on which buffer there will be done seeking.
      * @param buffer Used for seeking.
      */
-    void setCurrentBuffer(ByteBuffer buffer);
+    void setCurrentBuffer(ByteBuff buffer);
 
     /**
      * From the current position creates a cell using the key part
@@ -160,10 +160,9 @@ public interface DataBlockEncoder {
     ByteBuffer getKeyValueBuffer();
 
     /**
-     * @return the KeyValue object at the current position. Includes memstore
-     *         timestamp.
+     * @return the Cell at the current position. Includes memstore timestamp.
      */
-    Cell getKeyValue();
+    Cell getCell();
 
     /** Set position to beginning of given block */
     void rewind();

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
index 90b8e6e..4587352 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java
@@ -30,6 +30,7 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Compress using:
@@ -362,9 +363,14 @@ public class DiffKeyDeltaEncoder extends 
BufferedDataBlockEncoder {
   }
 
   protected static class DiffSeekerState extends SeekerState {
+
     private int rowLengthWithSize;
     private long timestamp;
 
+    public DiffSeekerState(Pair<ByteBuffer, Integer> tmpPair, boolean 
includeTags) {
+      super(tmpPair, includeTags);
+    }
+
     @Override
     protected void copyFromNext(SeekerState that) {
       super.copyFromNext(that);
@@ -389,14 +395,12 @@ public class DiffKeyDeltaEncoder extends 
BufferedDataBlockEncoder {
           if (!isFirst) {
             type = current.keyBuffer[current.keyLength - Bytes.SIZEOF_BYTE];
           }
-          current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+          current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
         }
         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
-          current.valueLength =
-              ByteBufferUtils.readCompressedInt(currentBuffer);
+          current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
         }
-        current.lastCommonPrefix =
-            ByteBufferUtils.readCompressedInt(currentBuffer);
+        current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
 
         current.ensureSpaceForKey();
 
@@ -446,8 +450,7 @@ public class DiffKeyDeltaEncoder extends 
BufferedDataBlockEncoder {
         int pos = current.keyLength - TIMESTAMP_WITH_TYPE_LENGTH;
         int timestampFitInBytes = 1 +
             ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH);
-        long timestampOrDiff =
-            ByteBufferUtils.readLong(currentBuffer, timestampFitInBytes);
+        long timestampOrDiff = ByteBuff.readLong(currentBuffer, 
timestampFitInBytes);
         if ((flag & FLAG_TIMESTAMP_SIGN) != 0) {
           timestampOrDiff = -timestampOrDiff;
         }
@@ -467,13 +470,13 @@ public class DiffKeyDeltaEncoder extends 
BufferedDataBlockEncoder {
         }
 
         current.valueOffset = currentBuffer.position();
-        ByteBufferUtils.skip(currentBuffer, current.valueLength);
+        currentBuffer.skip(current.valueLength);
 
         if (includesTags()) {
           decodeTags();
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
@@ -482,7 +485,7 @@ public class DiffKeyDeltaEncoder extends 
BufferedDataBlockEncoder {
 
       @Override
       protected void decodeFirst() {
-        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+        currentBuffer.skip(Bytes.SIZEOF_INT);
 
         // read column family
         byte familyNameLength = currentBuffer.get();
@@ -500,7 +503,7 @@ public class DiffKeyDeltaEncoder extends 
BufferedDataBlockEncoder {
 
       @Override
       protected DiffSeekerState createSeekerState() {
-        return new DiffSeekerState();
+        return new DiffSeekerState(this.tmpPair, this.includesTags());
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
index fa4adbd..79e2792 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java
@@ -30,6 +30,7 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Encoder similar to {@link DiffKeyDeltaEncoder} but supposedly faster.
@@ -364,8 +365,7 @@ public class FastDiffDeltaEncoder extends 
BufferedDataBlockEncoder {
     ByteBuff.readCompressedInt(block); // commonLength
     ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
     block.reset();
-    // TODO : Change to BBCell.
-    return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + 
key.position(), keyLength);
+    return createFirstKeyCell(key, keyLength);
   }
 
   @Override
@@ -379,6 +379,10 @@ public class FastDiffDeltaEncoder extends 
BufferedDataBlockEncoder {
     private int rowLengthWithSize;
     private int familyLengthWithSize;
 
+    public FastDiffSeekerState(Pair<ByteBuffer, Integer> tmpPair, boolean 
includeTags) {
+      super(tmpPair, includeTags);
+    }
+
     @Override
     protected void copyFromNext(SeekerState that) {
       super.copyFromNext(that);
@@ -404,14 +408,12 @@ public class FastDiffDeltaEncoder extends 
BufferedDataBlockEncoder {
                 current.prevTimestampAndType, 0,
                 current.prevTimestampAndType.length);
           }
-          current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
+          current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
         }
         if ((flag & FLAG_SAME_VALUE_LENGTH) == 0) {
-          current.valueLength =
-              ByteBufferUtils.readCompressedInt(currentBuffer);
+          current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
         }
-        current.lastCommonPrefix =
-            ByteBufferUtils.readCompressedInt(currentBuffer);
+        current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
 
         current.ensureSpaceForKey();
 
@@ -491,14 +493,14 @@ public class FastDiffDeltaEncoder extends 
BufferedDataBlockEncoder {
         // handle value
         if ((flag & FLAG_SAME_VALUE) == 0) {
           current.valueOffset = currentBuffer.position();
-          ByteBufferUtils.skip(currentBuffer, current.valueLength);
+          currentBuffer.skip(current.valueLength);
         }
 
         if (includesTags()) {
           decodeTags();
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
@@ -507,7 +509,7 @@ public class FastDiffDeltaEncoder extends 
BufferedDataBlockEncoder {
 
       @Override
       protected void decodeFirst() {
-        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+        currentBuffer.skip(Bytes.SIZEOF_INT);
         decode(true);
       }
 
@@ -518,7 +520,7 @@ public class FastDiffDeltaEncoder extends 
BufferedDataBlockEncoder {
 
       @Override
       protected FastDiffSeekerState createSeekerState() {
-        return new FastDiffSeekerState();
+        return new FastDiffSeekerState(this.tmpPair, this.includesTags());
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
index 6e89de4..6f5acd5 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java
@@ -186,8 +186,7 @@ public class PrefixKeyDeltaEncoder extends 
BufferedDataBlockEncoder {
     }
     ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate();
     block.reset();
-    // TODO : Change to BBCell
-    return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + 
key.position(), keyLength);
+    return createFirstKeyCell(key, keyLength);
   }
 
   @Override
@@ -201,21 +200,20 @@ public class PrefixKeyDeltaEncoder extends 
BufferedDataBlockEncoder {
     return new BufferedEncodedSeeker<SeekerState>(comparator, decodingCtx) {
       @Override
       protected void decodeNext() {
-        current.keyLength = ByteBufferUtils.readCompressedInt(currentBuffer);
-        current.valueLength = ByteBufferUtils.readCompressedInt(currentBuffer);
-        current.lastCommonPrefix =
-            ByteBufferUtils.readCompressedInt(currentBuffer);
+        current.keyLength = ByteBuff.readCompressedInt(currentBuffer);
+        current.valueLength = ByteBuff.readCompressedInt(currentBuffer);
+        current.lastCommonPrefix = ByteBuff.readCompressedInt(currentBuffer);
         current.keyLength += current.lastCommonPrefix;
         current.ensureSpaceForKey();
         currentBuffer.get(current.keyBuffer, current.lastCommonPrefix,
             current.keyLength - current.lastCommonPrefix);
         current.valueOffset = currentBuffer.position();
-        ByteBufferUtils.skip(currentBuffer, current.valueLength);
+        currentBuffer.skip(current.valueLength);
         if (includesTags()) {
           decodeTags();
         }
         if (includesMvcc()) {
-          current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+          current.memstoreTS = ByteBuff.readVLong(currentBuffer);
         } else {
           current.memstoreTS = 0;
         }
@@ -224,7 +222,7 @@ public class PrefixKeyDeltaEncoder extends 
BufferedDataBlockEncoder {
 
       @Override
       protected void decodeFirst() {
-        ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_INT);
+        currentBuffer.skip(Bytes.SIZEOF_INT);
         decodeNext();
       }
     };

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
index 0b442a5..6e13b44 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
@@ -21,9 +21,9 @@ package org.apache.hadoop.hbase.io.util;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.common.base.Preconditions;
@@ -84,7 +84,7 @@ public class StreamUtils {
     return result;
   }
 
-  public static int readRawVarint32(ByteBuffer input) throws IOException {
+  public static int readRawVarint32(ByteBuff input) throws IOException {
     byte tmp = input.get();
     if (tmp >= 0) {
       return tmp;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
index 14e77a7..4398f5d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java
@@ -23,6 +23,7 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
 
 /**
  * An abstract class that abstracts out as to how the byte buffers are used,
@@ -435,4 +436,23 @@ public abstract class ByteBuff {
     }
     return tmpLength;
   }
+
+  /**
+   * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a
+   * {@link ByteBuff}.
+   */
+  public static long readVLong(ByteBuff in) {
+    byte firstByte = in.get();
+    int len = WritableUtils.decodeVIntSize(firstByte);
+    if (len == 1) {
+      return firstByte;
+    }
+    long i = 0;
+    for (int idx = 0; idx < len-1; idx++) {
+      byte b = in.get();
+      i = i << 8;
+      i = i | (b & 0xFF);
+    }
+    return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
index 841c468..f4c4afe 100644
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestTagCompressionContext.java
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.io.util.LRUDictionary;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -60,11 +61,11 @@ public class TestTagCompressionContext {
 
     byte[] dest = new byte[tagsLength1];
     ByteBuffer ob = ByteBuffer.wrap(baos.toByteArray());
-    context.uncompressTags(ob, dest, 0, tagsLength1);
+    context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength1);
     assertTrue(Bytes.equals(kv1.getTagsArray(), kv1.getTagsOffset(), 
tagsLength1, dest, 0,
         tagsLength1));
     dest = new byte[tagsLength2];
-    context.uncompressTags(ob, dest, 0, tagsLength2);
+    context.uncompressTags(new SingleByteBuff(ob), dest, 0, tagsLength2);
     assertTrue(Bytes.equals(kv2.getTagsArray(), kv2.getTagsOffset(), 
tagsLength2, dest, 0,
         tagsLength2));
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
----------------------------------------------------------------------
diff --git 
a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
 
b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
index 93610c4..d77bb24 100644
--- 
a/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
+++ 
b/hbase-prefix-tree/src/main/java/org/apache/hadoop/hbase/codec/prefixtree/PrefixTreeSeeker.java
@@ -32,6 +32,7 @@ import 
org.apache.hadoop.hbase.codec.prefixtree.decode.PrefixTreeArraySearcher;
 import org.apache.hadoop.hbase.codec.prefixtree.scanner.CellScannerPosition;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 
@@ -55,8 +56,8 @@ public class PrefixTreeSeeker implements EncodedSeeker {
   }
 
   @Override
-  public void setCurrentBuffer(ByteBuffer fullBlockBuffer) {
-    block = fullBlockBuffer;
+  public void setCurrentBuffer(ByteBuff fullBlockBuffer) {
+    block = fullBlockBuffer.asSubByteBuffer(fullBlockBuffer.limit());
     // TODO : change to Bytebuff
     ptSearcher = DecoderFactory.checkOut(block, includeMvccVersion);
     rewind();
@@ -97,7 +98,7 @@ public class PrefixTreeSeeker implements EncodedSeeker {
    * currently must do deep copy into new array
    */
   @Override
-  public Cell getKeyValue() {
+  public Cell getCell() {
     Cell cell = ptSearcher.current();
     if (cell == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index ae2f6c1..b285f01 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1454,8 +1454,7 @@ public class HFileReaderImpl implements HFile.Reader, 
Configurable {
           + DataBlockEncoding.getNameFromId(dataBlockEncoderId));
       }
       ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
-      // TODO : Change the DBEs to work with ByteBuffs
-      
seeker.setCurrentBuffer(encodedBuffer.asSubByteBuffer(encodedBuffer.limit()));
+      seeker.setCurrentBuffer(encodedBuffer);
       blockFetches++;
 
       // Reset the next indexed key
@@ -1528,7 +1527,7 @@ public class HFileReaderImpl implements HFile.Reader, 
Configurable {
       if (block == null) {
         return null;
       }
-      return seeker.getKeyValue();
+      return seeker.getCell();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index e8b79a8..feec5f8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HRegionLocator;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionLocator;
@@ -249,6 +248,19 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
     return Collections.unmodifiableList(configurations);
   }
 
+  public static List<Object[]> memStoreTSTagsAndOffheapCombination() {
+    List<Object[]> configurations = new ArrayList<Object[]>();
+    configurations.add(new Object[] { false, false, true });
+    configurations.add(new Object[] { false, false, false });
+    configurations.add(new Object[] { false, true, true });
+    configurations.add(new Object[] { false, true, false });
+    configurations.add(new Object[] { true, false, true });
+    configurations.add(new Object[] { true, false, false });
+    configurations.add(new Object[] { true, true, true });
+    configurations.add(new Object[] { true, true, false });
+    return Collections.unmodifiableList(configurations);
+  }
+
   public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
       bloomAndCompressionCombinations();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java
index 238e6da..5e61c8f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestBufferedDataBlockEncoder.java
@@ -19,10 +19,13 @@ package org.apache.hadoop.hbase.io.encoding;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -47,7 +50,7 @@ public class TestBufferedDataBlockEncoder {
   @Test
   public void testEnsureSpaceForKey() {
     BufferedDataBlockEncoder.SeekerState state =
-        new BufferedDataBlockEncoder.SeekerState();
+        new BufferedDataBlockEncoder.SeekerState(new Pair<ByteBuffer, 
Integer>(), false);
     for (int i = 1; i <= 65536; ++i) {
       state.keyLength = i;
       state.ensureSpaceForKey();

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
index f4160db..075c6fe 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestDataBlockEncoders.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import 
org.apache.hadoop.hbase.io.hfile.HFileBlock.Writer.BufferGrabbingByteArrayOutputStream;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.nio.MultiByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -74,14 +74,18 @@ public class TestDataBlockEncoders {
 
   private final boolean includesMemstoreTS;
   private final boolean includesTags;
+  private final boolean useOffheapData;
 
   @Parameters
   public static Collection<Object[]> parameters() {
-    return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
+    return HBaseTestingUtility.memStoreTSTagsAndOffheapCombination();
   }
-  public TestDataBlockEncoders(boolean includesMemstoreTS, boolean 
includesTag) {
+
+  public TestDataBlockEncoders(boolean includesMemstoreTS, boolean includesTag,
+      boolean useOffheapData) {
     this.includesMemstoreTS = includesMemstoreTS;
     this.includesTags = includesTag;
+    this.useOffheapData = useOffheapData;
   }
   
   private HFileBlockEncodingContext getEncodingContext(Compression.Algorithm 
algo,
@@ -178,12 +182,15 @@ public class TestDataBlockEncoders {
     List<DataBlockEncoder.EncodedSeeker> encodedSeekers = 
         new ArrayList<DataBlockEncoder.EncodedSeeker>();
     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+      // Off heap block data support not added for PREFIX_TREE DBE yet.
+      // TODO remove this once support is added. HBASE-12298
+      if (this.useOffheapData && encoding == DataBlockEncoding.PREFIX_TREE) 
continue;
       DataBlockEncoder encoder = encoding.getEncoder();
       if (encoder == null) {
         continue;
       }
       ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
-          getEncodingContext(Compression.Algorithm.NONE, encoding));
+          getEncodingContext(Compression.Algorithm.NONE, encoding), 
this.useOffheapData);
       HFileContext meta = new HFileContextBuilder()
                           .withHBaseCheckSum(false)
                           .withIncludesMvcc(includesMemstoreTS)
@@ -192,7 +199,7 @@ public class TestDataBlockEncoders {
                           .build();
       DataBlockEncoder.EncodedSeeker seeker = 
encoder.createSeeker(CellComparator.COMPARATOR,
           encoder.newDataBlockDecodingContext(meta));
-      seeker.setCurrentBuffer(encodedBuffer);
+      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
       encodedSeekers.add(seeker);
     }
     // test it!
@@ -222,7 +229,7 @@ public class TestDataBlockEncoders {
   }
 
   static ByteBuffer encodeKeyValues(DataBlockEncoding encoding, List<KeyValue> 
kvs,
-      HFileBlockEncodingContext encodingContext) throws IOException {
+      HFileBlockEncodingContext encodingContext, boolean useOffheapData) 
throws IOException {
     DataBlockEncoder encoder = encoding.getEncoder();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
@@ -236,6 +243,12 @@ public class TestDataBlockEncoders {
     encoder.endBlockEncoding(encodingContext, dos, stream.getBuffer());
     byte[] encodedData = new byte[baos.size() - ENCODED_DATA_OFFSET];
     System.arraycopy(baos.toByteArray(), ENCODED_DATA_OFFSET, encodedData, 0, 
encodedData.length);
+    if (useOffheapData) {
+      ByteBuffer bb = ByteBuffer.allocateDirect(encodedData.length);
+      bb.put(encodedData);
+      bb.rewind();
+      return bb;
+    }
     return ByteBuffer.wrap(encodedData);
   }
 
@@ -244,12 +257,15 @@ public class TestDataBlockEncoders {
     List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, 
includesTags);
 
     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+      // Off heap block data support not added for PREFIX_TREE DBE yet.
+      // TODO remove this once support is added. HBASE-12298
+      if (this.useOffheapData && encoding == DataBlockEncoding.PREFIX_TREE) 
continue;
       if (encoding.getEncoder() == null) {
         continue;
       }
       DataBlockEncoder encoder = encoding.getEncoder();
       ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
-          getEncodingContext(Compression.Algorithm.NONE, encoding));
+          getEncodingContext(Compression.Algorithm.NONE, encoding), 
this.useOffheapData);
       HFileContext meta = new HFileContextBuilder()
                           .withHBaseCheckSum(false)
                           .withIncludesMvcc(includesMemstoreTS)
@@ -258,31 +274,19 @@ public class TestDataBlockEncoders {
                           .build();
       DataBlockEncoder.EncodedSeeker seeker = 
encoder.createSeeker(CellComparator.COMPARATOR,
           encoder.newDataBlockDecodingContext(meta));
-      seeker.setCurrentBuffer(encodedBuffer);
+      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
       int i = 0;
       do {
         KeyValue expectedKeyValue = sampleKv.get(i);
-        ByteBuffer keyValue = seeker.getKeyValueBuffer();
-        if (0 != Bytes.compareTo(keyValue.array(), keyValue.arrayOffset(), 
keyValue.limit(),
-            expectedKeyValue.getBuffer(), expectedKeyValue.getOffset(),
-            expectedKeyValue.getLength())) {
-
-          int commonPrefix = 0;
-          byte[] left = keyValue.array();
-          byte[] right = expectedKeyValue.getBuffer();
-          int leftOff = keyValue.arrayOffset();
-          int rightOff = expectedKeyValue.getOffset();
-          int length = Math.min(keyValue.limit(), 
expectedKeyValue.getLength());
-          while (commonPrefix < length
-              && left[commonPrefix + leftOff] == right[commonPrefix + 
rightOff]) {
-            commonPrefix++;
-          }
-
+        Cell cell = seeker.getCell();
+        if (CellComparator.COMPARATOR.compareKeyIgnoresMvcc(expectedKeyValue, 
cell) != 0) {
+          int commonPrefix = CellUtil
+              .findCommonPrefixInFlatKey(expectedKeyValue, cell, false, true);
           fail(String.format("next() produces wrong results "
               + "encoder: %s i: %d commonPrefix: %d" + "\n expected %s\n 
actual      %s", encoder
               .toString(), i, commonPrefix, 
Bytes.toStringBinary(expectedKeyValue.getBuffer(),
-              expectedKeyValue.getOffset(), expectedKeyValue.getLength()), 
Bytes
-              .toStringBinary(keyValue)));
+              expectedKeyValue.getKeyOffset(), 
expectedKeyValue.getKeyLength()), CellUtil.toString(
+              cell, false)));
         }
         i++;
       } while (seeker.next());
@@ -298,33 +302,19 @@ public class TestDataBlockEncoders {
     List<KeyValue> sampleKv = generator.generateTestKeyValues(NUMBER_OF_KV, 
includesTags);
 
     for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
+      // Off heap block data support not added for PREFIX_TREE DBE yet.
+      // TODO remove this once support is added. HBASE-12298
+      if (this.useOffheapData && encoding == DataBlockEncoding.PREFIX_TREE) 
continue;
       if (encoding.getEncoder() == null) {
         continue;
       }
       DataBlockEncoder encoder = encoding.getEncoder();
       ByteBuffer encodedBuffer = encodeKeyValues(encoding, sampleKv,
-          getEncodingContext(Compression.Algorithm.NONE, encoding));
-      Cell key = encoder.getFirstKeyCellInBlock(new MultiByteBuff(
-          encodedBuffer));
-      KeyValue keyBuffer = null;
-      if(encoding == DataBlockEncoding.PREFIX_TREE) {
-        // This is not an actual case. So the Prefix tree block is not loaded 
in case of Prefix_tree
-        // Just copy only the key part to form a keyBuffer
-        byte[] serializedKey = CellUtil.getCellKeySerializedAsKeyValueKey(key);
-        keyBuffer = KeyValueUtil.createKeyValueFromKey(serializedKey);
-      } else {
-        keyBuffer = KeyValueUtil.ensureKeyValue(key);
-      }
+          getEncodingContext(Compression.Algorithm.NONE, encoding), 
this.useOffheapData);
+      Cell key = encoder.getFirstKeyCellInBlock(new 
SingleByteBuff(encodedBuffer));
       KeyValue firstKv = sampleKv.get(0);
-      if (0 != CellComparator.COMPARATOR.compareKeyIgnoresMvcc(keyBuffer, 
firstKv)) {
-
-        int commonPrefix = 0;
-        int length = Math.min(keyBuffer.getKeyLength(), 
firstKv.getKeyLength());
-        while (commonPrefix < length
-            && keyBuffer.getBuffer()[keyBuffer.getKeyOffset() + commonPrefix] 
== firstKv
-                .getBuffer()[firstKv.getKeyOffset() + commonPrefix]) {
-          commonPrefix++;
-        }
+      if (0 != CellComparator.COMPARATOR.compareKeyIgnoresMvcc(key, firstKv)) {
+        int commonPrefix = CellUtil.findCommonPrefixInFlatKey(key, firstKv, 
false, true);
         fail(String.format("Bug in '%s' commonPrefix %d", encoder.toString(), 
commonPrefix));
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java
index 7e9b5a8..031bf25 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestPrefixTreeEncoding.java
@@ -46,6 +46,7 @@ import 
org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -117,27 +118,27 @@ public class TestPrefixTreeEncoding {
     byte[] onDiskBytes = baosInMemory.toByteArray();
     ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, 
DataBlockEncoding.ID_SIZE,
         onDiskBytes.length - DataBlockEncoding.ID_SIZE);
-    seeker.setCurrentBuffer(readBuffer);
+    seeker.setCurrentBuffer(new SingleByteBuff(readBuffer));
 
     // Seek before the first keyvalue;
     Cell seekKey = 
CellUtil.createFirstDeleteFamilyCellOnRow(getRowKey(batchId, 0), CF_BYTES);
     seeker.seekToKeyInBlock(seekKey, true);
-    assertEquals(null, seeker.getKeyValue());
+    assertEquals(null, seeker.getCell());
 
     // Seek before the middle keyvalue;
     seekKey = CellUtil.createFirstDeleteFamilyCellOnRow(getRowKey(batchId, 
NUM_ROWS_PER_BATCH / 3),
         CF_BYTES);
     seeker.seekToKeyInBlock(seekKey, true);
-    assertNotNull(seeker.getKeyValue());
+    assertNotNull(seeker.getCell());
     assertArrayEquals(getRowKey(batchId, NUM_ROWS_PER_BATCH / 3 - 1),
-      CellUtil.cloneRow(seeker.getKeyValue()));
+      CellUtil.cloneRow(seeker.getCell()));
 
     // Seek before the last keyvalue;
     seekKey = CellUtil.createFirstDeleteFamilyCellOnRow(Bytes.toBytes("zzzz"), 
CF_BYTES);
     seeker.seekToKeyInBlock(seekKey, true);
-    assertNotNull(seeker.getKeyValue());
+    assertNotNull(seeker.getCell());
     assertArrayEquals(getRowKey(batchId, NUM_ROWS_PER_BATCH - 1),
-      CellUtil.cloneRow(seeker.getKeyValue()));
+      CellUtil.cloneRow(seeker.getCell()));
   }
 
   @Test
@@ -160,10 +161,10 @@ public class TestPrefixTreeEncoding {
     byte[] onDiskBytes = baosInMemory.toByteArray();
     ByteBuffer readBuffer = ByteBuffer.wrap(onDiskBytes, 
DataBlockEncoding.ID_SIZE,
         onDiskBytes.length - DataBlockEncoding.ID_SIZE);
-    seeker.setCurrentBuffer(readBuffer);
+    seeker.setCurrentBuffer(new SingleByteBuff(readBuffer));
     Cell previousKV = null;
     do {
-      Cell currentKV = seeker.getKeyValue();
+      Cell currentKV = seeker.getCell();
       System.out.println(currentKV);
       if (previousKV != null && CellComparator.COMPARATOR.compare(currentKV, 
previousKV) < 0) {
         dumpInputKVSet();
@@ -229,7 +230,7 @@ public class TestPrefixTreeEncoding {
     List<KeyValue> kvList = new ArrayList<KeyValue>();
     for (int i = 0; i < NUM_ROWS_PER_BATCH; ++i) {
       kvList.clear();
-      encodeSeeker.setCurrentBuffer(encodedData);
+      encodeSeeker.setCurrentBuffer(new SingleByteBuff(encodedData));
       KeyValue firstOnRow = KeyValueUtil.createFirstOnRow(getRowKey(batchId, 
i));
       encodeSeeker.seekToKeyInBlock(
           new KeyValue.KeyOnlyKeyValue(firstOnRow.getBuffer(), 
firstOnRow.getKeyOffset(),
@@ -243,11 +244,11 @@ public class TestPrefixTreeEncoding {
         fail("Get error result after seeking " + firstOnRow);
       }
       if (hasMoreOfEncodeScanner) {
-        if (CellComparator.COMPARATOR.compare(encodeSeeker.getKeyValue(),
+        if (CellComparator.COMPARATOR.compare(encodeSeeker.getCell(),
             collectionScanner.peek()) != 0) {
           dumpInputKVSet();
           fail("Expected " + collectionScanner.peek() + " actual "
-              + encodeSeeker.getKeyValue() + ", after seeking " + firstOnRow);
+              + encodeSeeker.getCell() + ", after seeking " + firstOnRow);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0f614a1c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
index cf94c6b..783f58e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
@@ -21,25 +21,43 @@ import static org.junit.Assert.assertEquals;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 @Category({IOTests.class, SmallTests.class})
+@RunWith(Parameterized.class)
 public class TestSeekToBlockWithEncoders {
 
+  private final boolean useOffheapData;
+
+  @Parameters
+  public static Collection<Object[]> parameters() {
+    return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
+  }
+
+  public TestSeekToBlockWithEncoders(boolean useOffheapData) {
+    this.useOffheapData = useOffheapData;
+  }
+
   /**
    * Test seeking while file is encoded.
    */
@@ -265,10 +283,10 @@ public class TestSeekToBlockWithEncoders {
       HFileBlockEncodingContext encodingContext = 
encoder.newDataBlockEncodingContext(encoding,
           HConstants.HFILEBLOCK_DUMMY_HEADER, meta);
       ByteBuffer encodedBuffer = 
TestDataBlockEncoders.encodeKeyValues(encoding, kvs,
-          encodingContext);
+          encodingContext, this.useOffheapData);
       DataBlockEncoder.EncodedSeeker seeker = 
encoder.createSeeker(CellComparator.COMPARATOR,
           encoder.newDataBlockDecodingContext(meta));
-      seeker.setCurrentBuffer(encodedBuffer);
+      seeker.setCurrentBuffer(new SingleByteBuff(encodedBuffer));
       encodedSeekers.add(seeker);
     }
     // test it!
@@ -280,7 +298,7 @@ public class TestSeekToBlockWithEncoders {
       Cell keyValue, KeyValue expected) {
     for (DataBlockEncoder.EncodedSeeker seeker : encodedSeekers) {
       seeker.seekToKeyInBlock(keyValue, false);
-      Cell keyValue2 = seeker.getKeyValue();
+      Cell keyValue2 = seeker.getCell();
       assertEquals(expected, keyValue2);
       seeker.rewind();
     }

Reply via email to