Repository: hbase
Updated Branches:
  refs/heads/branch-1 35fa34191 -> c899897bc


HBASE-16213 A new HFileBlock structure for fast random get. (binlijin)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c899897b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c899897b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c899897b

Branch: refs/heads/branch-1
Commit: c899897bc8dc4a7eccc9e2a80fd05ad55654f18e
Parents: 35fa341
Author: anoopsamjohn <anoopsamj...@gmail.com>
Authored: Mon Aug 29 12:11:46 2016 +0530
Committer: anoopsamjohn <anoopsamj...@gmail.com>
Committed: Mon Aug 29 12:11:46 2016 +0530

----------------------------------------------------------------------
 .../hadoop/hbase/io/ByteArrayOutputStream.java  | 127 ++++++
 .../hbase/io/encoding/DataBlockEncoding.java    |   3 +-
 .../hbase/io/encoding/RowIndexCodecV1.java      | 165 +++++++
 .../hbase/io/encoding/RowIndexEncoderV1.java    | 115 +++++
 .../hbase/io/encoding/RowIndexSeekerV1.java     | 431 +++++++++++++++++++
 .../encoding/TestSeekToBlockWithEncoders.java   |   4 +-
 6 files changed, 842 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c899897b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
new file mode 100644
index 0000000..a3c571f
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Our own implementation of ByteArrayOutputStream where all methods are NOT
+ * synchronized and supports writing ByteBuffer directly to it.
+ */
+@InterfaceAudience.Private
+public class ByteArrayOutputStream extends OutputStream {
+
+  // Borrowed from openJDK:
+  // 
http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/util/ArrayList.java#221
+  private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
+
+  private byte[] buf;
+  private int pos = 0;
+
+  public ByteArrayOutputStream() {
+    this(32);
+  }
+
+  public ByteArrayOutputStream(int capacity) {
+    this.buf = new byte[capacity];
+  }
+
+  /**
+   * Writes an <code>int</code> to the underlying output stream as four
+   * bytes, high byte first.
+   * @param i the <code>int</code> to write
+   * @throws IOException if an I/O error occurs.
+  */
+  public void writeInt(int i) throws IOException {
+    checkSizeAndGrow(Bytes.SIZEOF_INT);
+    Bytes.putInt(this.buf, this.pos, i);
+    this.pos += Bytes.SIZEOF_INT;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    checkSizeAndGrow(Bytes.SIZEOF_BYTE);
+    buf[this.pos] = (byte) b;
+    this.pos++;
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    checkSizeAndGrow(len);
+    System.arraycopy(b, off, this.buf, this.pos, len);
+    this.pos += len;
+  }
+
+  private void checkSizeAndGrow(int extra) {
+    long capacityNeeded = this.pos + (long) extra;
+    if (capacityNeeded > this.buf.length) {
+      // guarantee it's possible to fit
+      if (capacityNeeded > MAX_ARRAY_SIZE) {
+        throw new BufferOverflowException();
+      }
+      // double until hit the cap
+      long nextCapacity = Math.min(this.buf.length << 1, MAX_ARRAY_SIZE);
+      // but make sure there is enough if twice the existing capacity is still
+      // too small
+      nextCapacity = Math.max(nextCapacity, capacityNeeded);
+      if (nextCapacity > MAX_ARRAY_SIZE) {
+        throw new BufferOverflowException();
+      }
+      byte[] newBuf = new byte[(int) nextCapacity];
+      System.arraycopy(buf, 0, newBuf, 0, buf.length);
+      buf = newBuf;
+    }
+  }
+
+  /**
+   * Resets the <code>pos</code> field of this byte array output stream to 
zero.
+   * The output stream can be used again.
+   */
+  public void reset() {
+    this.pos = 0;
+  }
+
+  /**
+   * Copies the content of this Stream into a new byte array.
+   *
+   * @return the contents of this output stream, as new byte array.
+   */
+  public byte toByteArray()[] {
+    return Arrays.copyOf(buf, pos);
+  }
+
+  /**
+   * @return the underlying array where the data gets accumulated
+   */
+  public byte[] getBuffer() {
+    return this.buf;
+  }
+
+  /**
+   * @return The current size of the buffer.
+   */
+  public int size() {
+    return this.pos;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/c899897b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
index 67d18ed..71b55e2 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoding.java
@@ -43,7 +43,8 @@ public enum DataBlockEncoding {
   FAST_DIFF(4, "org.apache.hadoop.hbase.io.encoding.FastDiffDeltaEncoder"),
   // id 5 is reserved for the COPY_KEY algorithm for benchmarking
   // COPY_KEY(5, 
"org.apache.hadoop.hbase.io.encoding.CopyKeyDataBlockEncoder"),
-  PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec");
+  PREFIX_TREE(6, "org.apache.hadoop.hbase.codec.prefixtree.PrefixTreeCodec"),
+  ROW_INDEX_V1(7, "org.apache.hadoop.hbase.io.encoding.RowIndexCodecV1");
 
   private final short id;
   private final byte[] idInBytes;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c899897b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java
new file mode 100644
index 0000000..f18e094
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexCodecV1.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Store cells following every row's start offset, so we can binary search to 
a row's cells.
+ *
+ * Format:
+ * flat cells
+ * integer: number of rows
+ * integer: row0's offset
+ * integer: row1's offset
+ * ....
+ * integer: dataSize
+ *
+*/
+@InterfaceAudience.Private
+public class RowIndexCodecV1 implements DataBlockEncoder {
+
+  private static class RowIndexEncodingState extends EncodingState {
+    RowIndexEncoderV1 encoder = null;
+  }
+
+  @Override
+  public void startBlockEncoding(HFileBlockEncodingContext blkEncodingCtx,
+      DataOutputStream out) throws IOException {
+    if (blkEncodingCtx.getClass() != HFileBlockDefaultEncodingContext.class) {
+      throw new IOException(this.getClass().getName() + " only accepts "
+          + HFileBlockDefaultEncodingContext.class.getName() + " as the "
+          + "encoding context.");
+    }
+
+    HFileBlockDefaultEncodingContext encodingCtx = 
(HFileBlockDefaultEncodingContext) blkEncodingCtx;
+    encodingCtx.prepareEncoding(out);
+
+    RowIndexEncoderV1 encoder = new RowIndexEncoderV1(out, encodingCtx);
+    RowIndexEncodingState state = new RowIndexEncodingState();
+    state.encoder = encoder;
+    blkEncodingCtx.setEncodingState(state);
+  }
+
+  @Override
+  public int encode(Cell cell, HFileBlockEncodingContext encodingCtx,
+      DataOutputStream out) throws IOException {
+    RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
+        .getEncodingState();
+    RowIndexEncoderV1 encoder = state.encoder;
+    return encoder.write(cell);
+  }
+
+  @Override
+  public void endBlockEncoding(HFileBlockEncodingContext encodingCtx,
+      DataOutputStream out, byte[] uncompressedBytesWithHeader)
+      throws IOException {
+    RowIndexEncodingState state = (RowIndexEncodingState) encodingCtx
+        .getEncodingState();
+    RowIndexEncoderV1 encoder = state.encoder;
+    encoder.flush();
+    if (encodingCtx.getDataBlockEncoding() != DataBlockEncoding.NONE) {
+      encodingCtx.postEncoding(BlockType.ENCODED_DATA);
+    } else {
+      encodingCtx.postEncoding(BlockType.DATA);
+    }
+  }
+
+  @Override
+  public ByteBuffer decodeKeyValues(DataInputStream source,
+      HFileBlockDecodingContext decodingCtx) throws IOException {
+    if (!decodingCtx.getHFileContext().isIncludesTags()) {
+      ByteBuffer sourceAsBuffer = ByteBufferUtils
+          .drainInputStreamToBuffer(source);// waste
+      sourceAsBuffer.mark();
+      sourceAsBuffer.position(sourceAsBuffer.limit() - Bytes.SIZEOF_INT);
+      int onDiskSize = sourceAsBuffer.getInt();
+      sourceAsBuffer.reset();
+      ByteBuffer dup = sourceAsBuffer.duplicate();
+      dup.position(sourceAsBuffer.position());
+      dup.limit(sourceAsBuffer.position() + onDiskSize);
+      return dup.slice();
+    } else {
+      ByteBuffer sourceAsBuffer = ByteBufferUtils
+          .drainInputStreamToBuffer(source);// waste
+      sourceAsBuffer.mark();
+      RowIndexSeekerV1 seeker = new RowIndexSeekerV1(KeyValue.COMPARATOR,
+          decodingCtx);
+      seeker.setCurrentBuffer(sourceAsBuffer);
+      List<ByteBuffer> kvs = new ArrayList<ByteBuffer>();
+      kvs.add(seeker.getKeyValueBuffer());
+      while (seeker.next()) {
+        kvs.add(seeker.getKeyValueBuffer());
+      }
+      int totalLength = 0;
+      for (ByteBuffer buf : kvs) {
+        totalLength += buf.remaining();
+      }
+      byte[] keyValueBytes = new byte[totalLength];
+      ByteBuffer result = ByteBuffer.wrap(keyValueBytes);
+      for (ByteBuffer buf : kvs) {
+        result.put(buf);
+      }
+      return result;
+    }
+  }
+
+  @Override
+  public ByteBuffer getFirstKeyInBlock(ByteBuffer block) {
+    block.mark();
+    int keyLength = block.getInt();
+    block.getInt();
+    int pos = block.position();
+    block.reset();
+    ByteBuffer dup = block.duplicate();
+    dup.position(pos);
+    dup.limit(pos + keyLength);
+    return dup.slice();
+  }
+
+  @Override
+  public EncodedSeeker createSeeker(KVComparator comparator,
+      HFileBlockDecodingContext decodingCtx) {
+    return new RowIndexSeekerV1(comparator, decodingCtx);
+  }
+
+  @Override
+  public HFileBlockEncodingContext newDataBlockEncodingContext(
+      DataBlockEncoding encoding, byte[] header, HFileContext meta) {
+    return new HFileBlockDefaultEncodingContext(encoding, header, meta);
+  }
+
+  @Override
+  public HFileBlockDecodingContext newDataBlockDecodingContext(HFileContext 
meta) {
+    return new HFileBlockDefaultDecodingContext(meta);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c899897b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java
new file mode 100644
index 0000000..888ef9e
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexEncoderV1.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
+import org.apache.hadoop.io.WritableUtils;
+
+@InterfaceAudience.Private
+public class RowIndexEncoderV1 {
+  private static final Log LOG = LogFactory.getLog(RowIndexEncoderV1.class);
+
+  /** The Cell previously appended. */
+  private Cell lastCell = null;
+
+  private DataOutputStream out;
+  private HFileBlockDefaultEncodingContext encodingCtx;
+  private int startOffset = -1;
+  private ByteArrayOutputStream rowsOffsetBAOS = new ByteArrayOutputStream(
+      64 * 4);
+
+  public RowIndexEncoderV1(DataOutputStream out,
+      HFileBlockDefaultEncodingContext encodingCtx) {
+    this.out = out;
+    this.encodingCtx = encodingCtx;
+  }
+
+  public int write(Cell cell) throws IOException {
+    // checkKey uses comparator to check we are writing in order.
+    if (!checkRow(cell)) {
+      if (startOffset < 0) {
+        startOffset = out.size();
+      }
+      rowsOffsetBAOS.writeInt(out.size() - startOffset);
+    }
+    int klength = KeyValueUtil.keyLength(cell);
+    int vlength = cell.getValueLength();
+    out.writeInt(klength);
+    out.writeInt(vlength);
+    CellUtil.writeFlatKey(cell, out);
+    // Write the value part
+    out.write(cell.getValueArray(), cell.getValueOffset(), vlength);
+    int encodedKvSize = klength + vlength
+        + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE;
+    // Write the additional tag into the stream
+    if (encodingCtx.getHFileContext().isIncludesTags()) {
+      int tagsLength = cell.getTagsLength();
+      out.writeShort(tagsLength);
+      // There are some tags to be written
+      if (tagsLength > 0) {
+        out.write(cell.getTagsArray(), cell.getTagsOffset(), tagsLength);
+      }
+      encodedKvSize += tagsLength + KeyValue.TAGS_LENGTH_SIZE;
+    }
+    if (encodingCtx.getHFileContext().isIncludesMvcc()) {
+      WritableUtils.writeVLong(out, cell.getSequenceId());
+      encodedKvSize += WritableUtils.getVIntSize(cell.getSequenceId());
+    }
+    lastCell = cell;
+    return encodedKvSize;
+  }
+
+  protected boolean checkRow(final Cell cell) throws IOException {
+    boolean isDuplicateRow = false;
+    if (cell == null) {
+      throw new IOException("Key cannot be null or empty");
+    }
+    if (lastCell != null) {
+      int keyComp = KeyValue.COMPARATOR.compareRows(lastCell, cell);
+      if (keyComp > 0) {
+        throw new IOException("Added a key not lexically larger than"
+            + " previous. Current cell = " + cell + ", lastCell = " + 
lastCell);
+      } else if (keyComp == 0) {
+        isDuplicateRow = true;
+      }
+    }
+    return isDuplicateRow;
+  }
+
+  public void flush() throws IOException {
+    int onDiskDataSize = 0;
+    if (startOffset >= 0) {
+      onDiskDataSize = out.size() - startOffset;
+    }
+    // rowsOffsetBAOS.size() / 4
+    out.writeInt(rowsOffsetBAOS.size() >> 2);
+    if (rowsOffsetBAOS.size() > 0) {
+      out.write(rowsOffsetBAOS.getBuffer(), 0, rowsOffsetBAOS.size());
+    }
+    out.writeInt(onDiskDataSize);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("RowNumber: " + (rowsOffsetBAOS.size() >> 2)
+          + ", onDiskDataSize: " + onDiskDataSize + ", totalOnDiskSize: "
+          + (out.size() - startOffset));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c899897b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
new file mode 100644
index 0000000..a3289d6
--- /dev/null
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.encoding;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.NoTagsKeyValue;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder.EncodedSeeker;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
+import org.apache.hadoop.io.WritableUtils;
+
+@InterfaceAudience.Private
+public class RowIndexSeekerV1 implements EncodedSeeker {
+
+  private HFileBlockDecodingContext decodingCtx;
+  private final KVComparator comparator;
+
+  private ByteBuffer currentBuffer;
+  private SeekerState current = new SeekerState(); // always valid
+  private SeekerState previous = new SeekerState(); // may not be valid
+
+  private int rowNumber;
+  private ByteBuffer rowOffsets = null;
+
+  public RowIndexSeekerV1(KVComparator comparator,
+      HFileBlockDecodingContext decodingCtx) {
+    this.comparator = comparator;
+    this.decodingCtx = decodingCtx;
+  }
+
+  @Override
+  public void setCurrentBuffer(ByteBuffer buffer) {
+    int onDiskSize = Bytes.toIntUnsafe(buffer.array(), buffer.arrayOffset()
+        + buffer.limit() - Bytes.SIZEOF_INT);
+    // int onDiskSize = buffer.getInt(buffer.limit() - Bytes.SIZEOF_INT);
+
+    // Data part
+    ByteBuffer dup = buffer.duplicate();
+    dup.position(buffer.position());
+    dup.limit(buffer.position() + onDiskSize);
+    currentBuffer = dup.slice();
+    current.currentBuffer = currentBuffer;
+    ByteBufferUtils.skip(buffer, onDiskSize);
+
+    // Row offset
+    rowNumber = buffer.getInt();
+    // equals Bytes.SIZEOF_INT * rowNumber
+    int totalRowOffsetsLength = rowNumber << 2;
+    ByteBuffer rowDup = buffer.duplicate();
+    rowDup.position(buffer.position());
+    rowDup.limit(buffer.position() + totalRowOffsetsLength);
+    rowOffsets = rowDup.slice();
+
+    decodeFirst();
+  }
+
+  @Override
+  public ByteBuffer getKeyDeepCopy() {
+    ByteBuffer keyBuffer = ByteBuffer.allocate(current.keyLength);
+    keyBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(),
+        current.keyLength);
+    keyBuffer.rewind();
+    return keyBuffer;
+  }
+
+  @Override
+  public ByteBuffer getValueShallowCopy() {
+    ByteBuffer dup = currentBuffer.duplicate();
+    dup.position(current.valueOffset);
+    dup.limit(current.valueOffset + current.valueLength);
+    return dup.slice();
+  }
+
+  ByteBuffer getKeyValueBuffer() {
+    ByteBuffer kvBuffer = createKVBuffer();
+    kvBuffer.putInt(current.keyLength);
+    kvBuffer.putInt(current.valueLength);
+    kvBuffer.put(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(),
+        current.keyLength);
+    ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer,
+        current.valueOffset, current.valueLength);
+    if (current.tagsLength > 0) {
+      // Put short as unsigned
+      kvBuffer.put((byte) (current.tagsLength >> 8 & 0xff));
+      kvBuffer.put((byte) (current.tagsLength & 0xff));
+      if (current.tagsOffset != -1) {
+        ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer,
+            current.tagsOffset, current.tagsLength);
+      }
+    }
+    if (includesMvcc()) {
+      ByteBufferUtils.writeVLong(kvBuffer, current.getSequenceId());
+    }
+    kvBuffer.rewind();
+    return kvBuffer;
+  }
+
+  protected ByteBuffer createKVBuffer() {
+    int kvBufSize = (int) KeyValue.getKeyValueDataStructureSize(
+        current.keyLength, current.valueLength, current.tagsLength);
+    if (includesMvcc()) {
+      kvBufSize += WritableUtils.getVIntSize(current.getSequenceId());
+    }
+    ByteBuffer kvBuffer = ByteBuffer.allocate(kvBufSize);
+    return kvBuffer;
+  }
+
+  @Override
+  public Cell getKeyValue() {
+    return current.toCell();
+  }
+
+  @Override
+  public void rewind() {
+    currentBuffer.rewind();
+    decodeFirst();
+  }
+
+  @Override
+  public boolean next() {
+    if (!currentBuffer.hasRemaining()) {
+      return false;
+    }
+    decodeNext();
+    previous.invalidate();
+    return true;
+  }
+
+  @Override
+  public int seekToKeyInBlock(byte[] key, int offset, int length,
+      boolean seekBefore) {
+    return seekToKeyInBlock(new KeyValue.KeyOnlyKeyValue(key, offset, length),
+        seekBefore);
+  }
+
+  private int binarySearch(Cell seekCell, boolean seekBefore) {
+    int low = 0;
+    int high = rowNumber - 1;
+    int mid = (low + high) >>> 1;
+    int comp = 0;
+    SimpleMutableByteRange row = new SimpleMutableByteRange();
+    while (low <= high) {
+      mid = (low + high) >>> 1;
+      getRow(mid, row);
+      comp = comparator.compareRows(row.getBytes(), row.getOffset(),
+          row.getLength(), seekCell.getRowArray(), seekCell.getRowOffset(),
+          seekCell.getRowLength());
+      if (comp < 0) {
+        low = mid + 1;
+      } else if (comp > 0) {
+        high = mid - 1;
+      } else {
+        // key found
+        if (seekBefore) {
+          return mid - 1;
+        } else {
+          return mid;
+        }
+      }
+    }
+    // key not found.
+    if (comp > 0) {
+      return mid - 1;
+    } else {
+      return mid;
+    }
+  }
+
+  private void getRow(int index, SimpleMutableByteRange row) {
+    int offset = Bytes.toIntUnsafe(rowOffsets.array(), rowOffsets.arrayOffset()
+        + (index << 2)); // index * Bytes.SIZEOF_INT
+    int position = currentBuffer.arrayOffset() + offset + Bytes.SIZEOF_LONG;
+    short rowLen = Bytes.toShortUnsafe(currentBuffer.array(), position);
+    row.set(currentBuffer.array(), position + Bytes.SIZEOF_SHORT, rowLen);
+  }
+
+  @Override
+  public int seekToKeyInBlock(Cell seekCell, boolean seekBefore) {
+    previous.invalidate();
+    int index = binarySearch(seekCell, seekBefore);
+    if (index < 0) {
+      return HConstants.INDEX_KEY_MAGIC; // using optimized index key
+    } else {
+      int offset = Bytes.toIntUnsafe(rowOffsets.array(),
+          rowOffsets.arrayOffset() + (index << 2));
+      if (offset != 0) {
+        decodeAtPosition(offset);
+      }
+    }
+    do {
+      int comp;
+      comp = comparator.compareOnlyKeyPortion(seekCell, current.currentKey);
+      if (comp == 0) { // exact match
+        if (seekBefore) {
+          if (!previous.isValid()) {
+            // The caller (seekBefore) has to ensure that we are not at the
+            // first key in the block.
+            throw new IllegalStateException("Cannot seekBefore if "
+                + "positioned at the first key in the block: key="
+                + Bytes.toStringBinary(seekCell.getRowArray()));
+          }
+          moveToPrevious();
+          return 1;
+        }
+        return 0;
+      }
+
+      if (comp < 0) { // already too large, check previous
+        if (previous.isValid()) {
+          moveToPrevious();
+        } else {
+          return HConstants.INDEX_KEY_MAGIC; // using optimized index key
+        }
+        return 1;
+      }
+
+      // move to next, if more data is available
+      if (currentBuffer.hasRemaining()) {
+        previous.copyFromNext(current);
+        decodeNext();
+      } else {
+        break;
+      }
+    } while (true);
+
+    // we hit the end of the block, not an exact match
+    return 1;
+  }
+
+  private void moveToPrevious() {
+    if (!previous.isValid()) {
+      throw new IllegalStateException(
+          "Can move back only once and not in first key in the block.");
+    }
+
+    SeekerState tmp = previous;
+    previous = current;
+    current = tmp;
+
+    // move after last key value
+    currentBuffer.position(current.nextKvOffset);
+    previous.invalidate();
+  }
+
+  @Override
+  public int compareKey(KVComparator comparator, byte[] key, int offset,
+      int length) {
+    return comparator.compareFlatKey(key, offset, length,
+        current.keyBuffer.getBytes(), current.keyBuffer.getOffset(),
+        current.keyBuffer.getLength());
+  }
+
+  @Override
+  public int compareKey(KVComparator comparator, Cell key) {
+    return comparator.compareOnlyKeyPortion(key, new KeyValue.KeyOnlyKeyValue(
+        current.keyBuffer.getBytes(), current.keyBuffer.getOffset(),
+        current.keyBuffer.getLength()));
+  }
+
+  protected void decodeFirst() {
+    decodeNext();
+    previous.invalidate();
+  }
+
+  protected void decodeAtPosition(int position) {
+    currentBuffer.position(position);
+    decodeNext();
+    previous.invalidate();
+  }
+
+  protected void decodeNext() {
+    current.startOffset = currentBuffer.position();
+    int p = currentBuffer.position() + currentBuffer.arrayOffset();
+    long ll = Bytes.toLong(currentBuffer.array(), p);
+    // Read top half as an int of key length and bottom int as value length
+    current.keyLength = (int) (ll >> Integer.SIZE);
+    current.valueLength = (int) (Bytes.MASK_FOR_LOWER_INT_IN_LONG ^ ll);
+    ByteBufferUtils.skip(currentBuffer, Bytes.SIZEOF_LONG);
+    // key part
+    current.keyBuffer.set(currentBuffer.array(), currentBuffer.arrayOffset()
+        + currentBuffer.position(), current.keyLength);
+    ByteBufferUtils.skip(currentBuffer, current.keyLength);
+    // value part
+    current.valueOffset = currentBuffer.position();
+    ByteBufferUtils.skip(currentBuffer, current.valueLength);
+    if (includesTags()) {
+      decodeTags();
+    }
+    if (includesMvcc()) {
+      current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer);
+    } else {
+      current.memstoreTS = 0;
+    }
+    current.nextKvOffset = currentBuffer.position();
+    current.setKey(current.keyBuffer.getBytes(), current.keyBuffer.getOffset(),
+        current.keyBuffer.getLength());
+  }
+
+  protected boolean includesMvcc() {
+    return this.decodingCtx.getHFileContext().isIncludesMvcc();
+  }
+
+  protected boolean includesTags() {
+    return this.decodingCtx.getHFileContext().isIncludesTags();
+  }
+
+  protected void decodeTags() {
+    current.tagsLength = currentBuffer.getShort();
+    current.tagsOffset = currentBuffer.position();
+    ByteBufferUtils.skip(currentBuffer, current.tagsLength);
+  }
+
+  protected class SeekerState {
+    /**
+     * The size of a (key length, value length) tuple that prefixes each entry
+     * in a data block.
+     */
+    public final static int KEY_VALUE_LEN_SIZE = 2 * Bytes.SIZEOF_INT;
+
+    protected ByteBuffer currentBuffer;
+    protected int startOffset = -1;
+    protected int valueOffset = -1;
+    protected int keyLength;
+    protected int valueLength;
+    protected int tagsLength = 0;
+    protected int tagsOffset = -1;
+
+    protected SimpleMutableByteRange keyBuffer = new SimpleMutableByteRange();
+    protected long memstoreTS;
+    protected int nextKvOffset;
+    protected KeyValue.KeyOnlyKeyValue currentKey = new 
KeyValue.KeyOnlyKeyValue();
+
+    protected boolean isValid() {
+      return valueOffset != -1;
+    }
+
+    protected void invalidate() {
+      valueOffset = -1;
+      currentKey = new KeyValue.KeyOnlyKeyValue();
+      currentBuffer = null;
+    }
+
+    protected void setKey(byte[] key, int offset, int length) {
+      currentKey.setKey(key, offset, length);
+    }
+
+    protected long getSequenceId() {
+      return memstoreTS;
+    }
+
+    /**
+     * Copy the state from the next one into this instance (the previous state
+     * placeholder). Used to save the previous state when we are advancing the
+     * seeker to the next key/value.
+     */
+    protected void copyFromNext(SeekerState nextState) {
+      keyBuffer.set(nextState.keyBuffer.getBytes(),
+          nextState.keyBuffer.getOffset(), nextState.keyBuffer.getLength());
+      currentKey.setKey(nextState.keyBuffer.getBytes(),
+          nextState.keyBuffer.getOffset(), nextState.keyBuffer.getLength());
+
+      startOffset = nextState.startOffset;
+      valueOffset = nextState.valueOffset;
+      keyLength = nextState.keyLength;
+      valueLength = nextState.valueLength;
+      nextKvOffset = nextState.nextKvOffset;
+      memstoreTS = nextState.memstoreTS;
+      currentBuffer = nextState.currentBuffer;
+      tagsOffset = nextState.tagsOffset;
+      tagsLength = nextState.tagsLength;
+    }
+
+    @Override
+    public String toString() {
+      return CellUtil.getCellKeyAsString(toCell());
+    }
+
+    protected int getCellBufSize() {
+      int kvBufSize = KEY_VALUE_LEN_SIZE + keyLength + valueLength;
+      if (includesTags()) {
+        kvBufSize += Bytes.SIZEOF_SHORT + tagsLength;
+      }
+      return kvBufSize;
+    }
+
+    protected Cell formNoTagsKeyValue() {
+      NoTagsKeyValue ret = new NoTagsKeyValue(currentBuffer.array(),
+          currentBuffer.arrayOffset() + startOffset, getCellBufSize());
+      if (includesMvcc()) {
+        ret.setSequenceId(memstoreTS);
+      }
+      return ret;
+    }
+
+    public Cell toCell() {
+      if (tagsOffset > 0) {
+        KeyValue ret = new KeyValue(currentBuffer.array(),
+            currentBuffer.arrayOffset() + startOffset, getCellBufSize());
+        if (includesMvcc()) {
+          ret.setSequenceId(memstoreTS);
+        }
+        return ret;
+      } else {
+        return formNoTagsKeyValue();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c899897b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
index 914a37b..5f8c1ef 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestSeekToBlockWithEncoders.java
@@ -83,7 +83,7 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv4 = new KeyValue(Bytes.toBytes("aad"), Bytes.toBytes("f1"), 
Bytes.toBytes("q1"),
         Bytes.toBytes("val"));
     sampleKv.add(kv4);
-    KeyValue kv5 = new KeyValue(Bytes.toBytes("aaaad"), Bytes.toBytes("f1"), 
Bytes.toBytes("q1"),
+    KeyValue kv5 = new KeyValue(Bytes.toBytes("aaddd"), Bytes.toBytes("f1"), 
Bytes.toBytes("q1"),
         Bytes.toBytes("val"));
     sampleKv.add(kv5);
     KeyValue toSeek = new KeyValue(Bytes.toBytes("aaaa"), Bytes.toBytes("f1"), 
Bytes.toBytes("q1"),
@@ -106,7 +106,7 @@ public class TestSeekToBlockWithEncoders {
     KeyValue kv3 = new KeyValue(Bytes.toBytes("aac"), Bytes.toBytes("f1"), 
Bytes.toBytes("q1"),
         Bytes.toBytes("val"));
     sampleKv.add(kv3);
-    KeyValue kv4 = new KeyValue(Bytes.toBytes("aaae"), Bytes.toBytes("f1"), 
Bytes.toBytes("q1"),
+    KeyValue kv4 = new KeyValue(Bytes.toBytes("aade"), Bytes.toBytes("f1"), 
Bytes.toBytes("q1"),
         Bytes.toBytes("val"));
     sampleKv.add(kv4);
     KeyValue kv5 = new KeyValue(Bytes.toBytes("bbbcd"), Bytes.toBytes("f1"), 
Bytes.toBytes("q1"),

Reply via email to