HBASE-16372 References to previous cell in read path should be avoided
(Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/58e843da
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/58e843da
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/58e843da

Branch: refs/heads/hbase-12439
Commit: 58e843dae22963e119d6bdc5563921a3e1712812
Parents: eb33b60
Author: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Authored: Thu Oct 6 14:19:12 2016 +0530
Committer: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Committed: Thu Oct 6 14:19:12 2016 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/TestPut.java |   5 +
 .../exceptions/TestClientExceptionsUtil.java    |   4 +
 .../org/apache/hadoop/hbase/KeyValueUtil.java   |  17 +-
 .../io/hfile/CompoundBloomFilterWriter.java     |  19 +-
 .../org/apache/hadoop/hbase/io/hfile/HFile.java |   6 +-
 .../hadoop/hbase/io/hfile/HFileWriterImpl.java  |  15 +
 .../hbase/mob/DefaultMobStoreCompactor.java     |   1 +
 .../regionserver/AbstractMultiFileWriter.java   |  13 +-
 .../hadoop/hbase/regionserver/CellSink.java     |  40 ++
 .../hadoop/hbase/regionserver/HStore.java       |   2 +
 .../hbase/regionserver/ShipperListener.java     |  36 ++
 .../hbase/regionserver/StoreFileWriter.java     |  24 +-
 .../hadoop/hbase/regionserver/StoreFlusher.java |   3 +-
 .../hadoop/hbase/regionserver/StoreScanner.java |  10 +
 .../regionserver/compactions/Compactor.java     |  12 +-
 .../querymatcher/ColumnTracker.java             |   3 +-
 .../querymatcher/ExplicitColumnTracker.java     |   5 +
 .../querymatcher/ScanQueryMatcher.java          |  15 +-
 .../querymatcher/ScanWildcardColumnTracker.java |   8 +
 .../apache/hadoop/hbase/util/BloomContext.java  |  23 +-
 .../hadoop/hbase/util/BloomFilterWriter.java    |  10 +-
 .../hadoop/hbase/util/RowBloomContext.java      |  13 +-
 .../hadoop/hbase/util/RowColBloomContext.java   |  13 +-
 ...estAvoidCellReferencesIntoShippedBlocks.java | 447 +++++++++++++++++++
 24 files changed, 697 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestPut.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestPut.java 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestPut.java
index 8603fe1..452f40f 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestPut.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestPut.java
@@ -19,12 +19,17 @@
 
 package org.apache.hadoop.hbase.client;
 
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
+@Category({ SmallTests.class, ClientTests.class })
 public class TestPut {
   @Test
   public void testCopyConstructor() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java
index 97e9574..9c01cd9 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/exceptions/TestClientExceptionsUtil.java
@@ -19,13 +19,17 @@
 package org.apache.hadoop.hbase.exceptions;
 
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 
 import static org.junit.Assert.*;
 
 @SuppressWarnings("ThrowableInstanceNeverThrown")
+@Category({ SmallTests.class, ClientTests.class })
 public class TestClientExceptionsUtil {
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
index b723f58..7b9bcb1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
@@ -97,7 +97,7 @@ public class KeyValueUtil {
   }
 
 
-  /**************** copy key only *********************/
+  /**************** copy the cell to create a new keyvalue 
*********************/
 
   public static KeyValue copyToNewKeyValue(final Cell cell) {
     byte[] bytes = copyToNewByteArray(cell);
@@ -118,6 +118,21 @@ public class KeyValueUtil {
     return buffer;
   }
 
+  /**
+   * Copies the key to a new KeyValue
+   * @param cell
+   * @return the KeyValue that consists only the key part of the incoming cell
+   */
+  public static KeyValue toNewKeyCell(final Cell cell) {
+    byte[] bytes = new byte[keyLength(cell)];
+    appendKeyTo(cell, bytes, 0);
+    KeyValue kv = new KeyValue.KeyOnlyKeyValue(bytes, 0, bytes.length);
+    // Set the seq id. The new key cell could be used in comparisons so it
+    // is important that it uses the seqid also. If not the comparsion would 
fail
+    kv.setSequenceId(cell.getSequenceId());
+    return kv;
+  }
+
   public static byte[] copyToNewByteArray(final Cell cell) {
     int v1Length = length(cell);
     byte[] backingBytes = new byte[v1Length];

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java
index 3193a17..a50566a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilterWriter.java
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -29,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.BloomFilterChunk;
@@ -60,6 +60,8 @@ public class CompoundBloomFilterWriter extends 
CompoundBloomFilterBase
 
   /** The size of individual Bloom filter chunks to create */
   private int chunkByteSize;
+  /** The prev Cell that was processed  */
+  private Cell prevCell;
 
   /** A Bloom filter chunk enqueued for writing */
   private static class ReadyChunk {
@@ -159,7 +161,7 @@ public class CompoundBloomFilterWriter extends 
CompoundBloomFilterBase
   }
 
   @Override
-  public void add(Cell cell) {
+  public void append(Cell cell) throws IOException {
     if (cell == null)
       throw new NullPointerException();
 
@@ -181,9 +183,22 @@ public class CompoundBloomFilterWriter extends 
CompoundBloomFilterBase
     }
 
     chunk.add(cell);
+    this.prevCell = cell;
     ++totalKeyCount;
   }
 
+  @Override
+  public void beforeShipped() throws IOException {
+    if (this.prevCell != null) {
+      this.prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
+    }
+  }
+
+  @Override
+  public Cell getPrevCell() {
+    return this.prevCell;
+  }
+
   private void allocateNewChunk() {
     if (prevChunk == null) {
       // First chunk

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index d3669f4..0b6ceef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.regionserver.CellSink;
+import org.apache.hadoop.hbase.regionserver.ShipperListener;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@@ -194,15 +196,13 @@ public class HFile {
   }
 
   /** API required to write an {@link HFile} */
-  public interface Writer extends Closeable {
+  public interface Writer extends Closeable, CellSink, ShipperListener {
     /** Max memstore (mvcc) timestamp in FileInfo */
     public static final byte [] MAX_MEMSTORE_TS_KEY = 
Bytes.toBytes("MAX_MEMSTORE_TS_KEY");
 
     /** Add an element to the file info map. */
     void appendFileInfo(byte[] key, byte[] value) throws IOException;
 
-    void append(Cell cell) throws IOException;
-
     /** @return the path to this {@link HFile} */
     Path getPath();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
index c57ecf7..c27bf0b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.CellComparator.MetaCellComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -701,6 +702,20 @@ public class HFileWriterImpl implements HFile.Writer {
     }
   }
 
+  @Override
+  public void beforeShipped() throws IOException {
+    // Add clone methods for every cell
+    if (this.lastCell != null) {
+      this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
+    }
+    if (this.firstCellInBlock != null) {
+      this.firstCellInBlock = KeyValueUtil.toNewKeyCell(this.firstCellInBlock);
+    }
+    if (this.lastCellOfPreviousBlock != null) {
+      this.lastCellOfPreviousBlock = 
KeyValueUtil.toNewKeyCell(this.lastCellOfPreviousBlock);
+    }
+  }
+
   protected void finishFileInfo() throws IOException {
     if (lastCell != null) {
       // Make a copy. The copy is stuffed into our fileinfo map. Needs a clean

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 55aea00..d75e448 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.HMobStore;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
index a4e0285..0735629 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -26,13 +26,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
+import org.apache.hadoop.hbase.regionserver.CellSink;
 
 /**
  * Base class for cell sink that separates the provided cells into multiple 
files.
  */
 @InterfaceAudience.Private
-public abstract class AbstractMultiFileWriter implements CellSink {
+public abstract class AbstractMultiFileWriter implements CellSink, 
ShipperListener {
 
   private static final Log LOG = 
LogFactory.getLog(AbstractMultiFileWriter.class);
 
@@ -116,4 +116,13 @@ public abstract class AbstractMultiFileWriter implements 
CellSink {
    */
   protected void preCloseWriter(StoreFileWriter writer) throws IOException {
   }
+
+  @Override
+  public void beforeShipped() throws IOException {
+    if (this.writers() != null) {
+      for (StoreFileWriter writer : writers()) {
+        writer.beforeShipped();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java
new file mode 100644
index 0000000..65b119f
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSink.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.BloomFilterWriter;
+
+/**
+ * A sink of cells that allows appending cells to the Writers that implement 
it.
+ * {@link org.apache.hadoop.hbase.io.hfile.HFile.Writer},
+ * {@link StoreFileWriter}, {@link AbstractMultiFileWriter},
+ * {@link BloomFilterWriter} are some implementors of this.
+ */
+@InterfaceAudience.Private
+public interface CellSink {
+  /**
+   * Append the given cell
+   * @param cell the cell to be added
+   * @throws IOException
+   */
+  void append(Cell cell) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 7a419e4..ce5c91d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -971,6 +971,8 @@ public class HStore implements Store {
    * @param includesTag - includesTag or not
    * @return Writer for a new StoreFile in the tmp dir.
    */
+  // TODO : allow the Writer factory to create Writers of ShipperListener type 
only in case of
+  // compaction
   @Override
   public StoreFileWriter createWriterInTmp(long maxKeyCount, 
Compression.Algorithm compression,
       boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ShipperListener.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ShipperListener.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ShipperListener.java
new file mode 100644
index 0000000..657bae0
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ShipperListener.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Implementors of this interface are the ones who needs to do some action 
when the
+ * {@link Shipper#shipped()} is called
+ */
+@InterfaceAudience.Private
+public interface ShipperListener {
+
+  /**
+   * The action that needs to be performed before {@link Shipper#shipped()} is 
performed
+   * @throws IOException
+   */
+  void beforeShipped() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index cb5d12c..bd1d62e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -35,7 +35,6 @@ import 
org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.util.BloomContext;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
@@ -51,7 +50,7 @@ import com.google.common.base.Preconditions;
  * local because it is an implementation detail of the HBase regionserver.
  */
 @InterfaceAudience.Private
-public class StoreFileWriter implements Compactor.CellSink {
+public class StoreFileWriter implements CellSink, ShipperListener {
   private static final Log LOG = 
LogFactory.getLog(StoreFileWriter.class.getName());
 
   private final BloomFilterWriter generalBloomFilterWriter;
@@ -120,6 +119,7 @@ public class StoreFileWriter implements Compactor.CellSink {
     // it no longer writable.
     this.timeRangeTrackerSet = trt != null;
     this.timeRangeTracker = this.timeRangeTrackerSet? trt: new 
TimeRangeTracker();
+    // TODO : Change all writers to be specifically created for compaction 
context
     writer = HFile.getWriterFactory(conf, cacheConf)
         .withPath(fs, path)
         .withComparator(comparator)
@@ -140,10 +140,10 @@ public class StoreFileWriter implements 
Compactor.CellSink {
       // init bloom context
       switch (bloomType) {
       case ROW:
-        bloomContext = new RowBloomContext(generalBloomFilterWriter);
+        bloomContext = new RowBloomContext(generalBloomFilterWriter, 
comparator);
         break;
       case ROWCOL:
-        bloomContext = new RowColBloomContext(generalBloomFilterWriter);
+        bloomContext = new RowColBloomContext(generalBloomFilterWriter, 
comparator);
         break;
       default:
         throw new IOException(
@@ -160,7 +160,7 @@ public class StoreFileWriter implements Compactor.CellSink {
       this.deleteFamilyBloomFilterWriter = BloomFilterFactory
           .createDeleteBloomAtWrite(conf, cacheConf,
               (int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
-      deleteFamilyBloomContext = new 
RowBloomContext(deleteFamilyBloomFilterWriter);
+      deleteFamilyBloomContext = new 
RowBloomContext(deleteFamilyBloomFilterWriter, comparator);
     } else {
       deleteFamilyBloomFilterWriter = null;
     }
@@ -251,6 +251,7 @@ public class StoreFileWriter implements Compactor.CellSink {
     }
   }
 
+  @Override
   public void append(final Cell cell) throws IOException {
     appendGeneralBloomfilter(cell);
     appendDeleteFamilyBloomFilter(cell);
@@ -258,6 +259,19 @@ public class StoreFileWriter implements Compactor.CellSink 
{
     trackTimestamps(cell);
   }
 
+  @Override
+  public void beforeShipped() throws IOException {
+    // For now these writer will always be of type ShipperListener true.
+    // TODO : Change all writers to be specifically created for compaction 
context
+    writer.beforeShipped();
+    if (generalBloomFilterWriter != null) {
+      generalBloomFilterWriter.beforeShipped();
+    }
+    if (deleteFamilyBloomFilterWriter != null) {
+      deleteFamilyBloomFilterWriter.beforeShipped();
+    }
+  }
+
   public Path getPath() {
     return this.writer.getPath();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 5ba7d33..056cdcc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 
@@ -111,7 +110,7 @@ abstract class StoreFlusher {
    * @param smallestReadPoint Smallest read point used for the flush.
    * @param throughputController A controller to avoid flush too fast
    */
-  protected void performFlush(InternalScanner scanner, Compactor.CellSink sink,
+  protected void performFlush(InternalScanner scanner, CellSink sink,
       long smallestReadPoint, ThroughputController throughputController) 
throws IOException {
     int compactionKVMax =
       conf.getInt(HConstants.COMPACTION_KV_MAX, 
HConstants.COMPACTION_KV_MAX_DEFAULT);

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index e008a40..cb40909 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Scan;
@@ -991,6 +992,15 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
 
   @Override
   public void shipped() throws IOException {
+    if (prevCell != null) {
+      // Do the copy here so that in case the prevCell ref is pointing to the 
previous
+      // blocks we can safely release those blocks.
+      // This applies to blocks that are got from Bucket cache, L1 cache and 
the blocks
+      // fetched from HDFS. Copying this would ensure that we let go the 
references to these
+      // blocks so that they can be GCed safely(in case of bucket cache)
+      prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
+    }
+    matcher.beforeShipped();
     for (KeyValueHeap h : this.heapsForDelayedClose) {
       h.close();// There wont be further fetch of Cells from these scanners. 
Just close.
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index c695788..f4bd9a8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -39,11 +38,13 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
+import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ShipperListener;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileReader;
@@ -51,7 +52,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
-import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
@@ -91,10 +91,6 @@ public abstract class Compactor<T extends CellSink> {
       HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD);
   }
 
-  public interface CellSink {
-    void append(Cell cell) throws IOException;
-  }
-
   protected interface CellSinkFactory<S> {
     S createWriter(InternalScanner scanner, FileDetails fd, boolean 
shouldDropBehind)
         throws IOException;
@@ -391,6 +387,7 @@ public abstract class Compactor<T extends CellSink> {
   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, 
CellSink writer,
       long smallestReadPoint, boolean cleanSeqId, ThroughputController 
throughputController,
       boolean major, int numofFilesToCompact) throws IOException {
+    assert writer instanceof ShipperListener;
     long bytesWrittenProgressForCloseCheck = 0;
     long bytesWrittenProgressForLog = 0;
     long bytesWrittenProgressForShippedCall = 0;
@@ -443,6 +440,9 @@ public abstract class Compactor<T extends CellSink> {
             }
           }
           if (kvs != null && bytesWrittenProgressForShippedCall > 
shippedCallSizeLimit) {
+            // Clone the cells that are in the writer so that they are freed 
of references,
+            // if they are holding any.
+            ((ShipperListener)writer).beforeShipped();
             // The SHARED block references, being read for compaction, will be 
kept in prevBlocks
             // list(See HFileScannerImpl#prevBlocks). In case of scan flow, 
after each set of cells
             // being returned to client, we will call shipped() which can 
clear this list. Here by

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
index 17c6afe..7a2a1e2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ColumnTracker.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.ShipperListener;
 import 
org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
 
 /**
@@ -50,7 +51,7 @@ import 
org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchC
  * This class is NOT thread-safe as queries are never multi-threaded
  */
 @InterfaceAudience.Private
-public interface ColumnTracker {
+public interface ColumnTracker extends ShipperListener {
 
   /**
    * Checks if the column is present in the list of requested columns by 
returning the match code

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java
index da65c78..b4825f0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ExplicitColumnTracker.java
@@ -248,4 +248,9 @@ public class ExplicitColumnTracker implements ColumnTracker 
{
   public boolean isDone(long timestamp) {
     return minVersions <= 0 && isExpired(timestamp);
   }
+
+  @Override
+  public void beforeShipped() throws IOException {
+    // do nothing
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
index b5469d3..82aae6c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
@@ -31,10 +31,11 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.TagUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ShipperListener;
 import 
org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -42,7 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes;
  * A query matcher that is specifically designed for the scan case.
  */
 @InterfaceAudience.Private
-public abstract class ScanQueryMatcher {
+public abstract class ScanQueryMatcher implements ShipperListener {
 
   /**
    * {@link #match} return codes. These instruct the scanner moving through 
memstores and StoreFiles
@@ -334,6 +335,16 @@ public abstract class ScanQueryMatcher {
    */
   public abstract Cell getNextKeyHint(Cell cell) throws IOException;
 
+  @Override
+  public void beforeShipped() throws IOException {
+    if (this.currentRow != null) {
+      this.currentRow = 
CellUtil.createFirstOnRow(CellUtil.copyRow(this.currentRow));
+    }
+    if (columns != null) {
+      columns.beforeShipped();
+    }
+  }
+
   protected static DeleteTracker 
instantiateDeleteTracker(RegionCoprocessorHost host)
       throws IOException {
     DeleteTracker tracker = new ScanDeleteTracker();

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
index e3994b6..a73cc0b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import 
org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -193,6 +194,13 @@ public class ScanWildcardColumnTracker implements 
ColumnTracker {
     return MatchCode.SEEK_NEXT_COL;
   }
 
+  @Override
+  public void beforeShipped() {
+    if (columnCell != null) {
+      this.columnCell = KeyValueUtil.toNewKeyCell(this.columnCell);
+    }
+  }
+
   public boolean isDone(long timestamp) {
     return minVersions <= 0 && isExpired(timestamp);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomContext.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomContext.java
index fc40aaf..8a1c6cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomContext.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 
@@ -30,17 +31,16 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 @InterfaceAudience.Private
 public abstract class BloomContext {
 
-  // TODO : Avoid holding references to lastCell
-  protected Cell lastCell;
-
   protected BloomFilterWriter bloomFilterWriter;
+  protected CellComparator comparator;
 
-  public BloomContext(BloomFilterWriter bloomFilterWriter) {
+  public BloomContext(BloomFilterWriter bloomFilterWriter, CellComparator 
comparator) {
     this.bloomFilterWriter = bloomFilterWriter;
+    this.comparator = comparator;
   }
 
   public Cell getLastCell() {
-    return this.lastCell;
+    return this.bloomFilterWriter.getPrevCell();
   }
 
   /**
@@ -51,8 +51,17 @@ public abstract class BloomContext {
   public void writeBloom(Cell cell) throws IOException {
     // only add to the bloom filter on a new, unique key
     if (isNewKey(cell)) {
-      bloomFilterWriter.add(cell);
-      this.lastCell = cell;
+      sanityCheck(cell);
+      bloomFilterWriter.append(cell);
+    }
+  }
+
+  private void sanityCheck(Cell cell) throws IOException {
+    if (this.getLastCell() != null) {
+      if (comparator.compare(cell, this.getLastCell()) <= 0) {
+        throw new IOException("Added a key not lexically larger than" + " 
previous. Current cell = "
+            + cell + ", prevCell = " + this.getLastCell());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterWriter.java
index 32a9ff4..82d6d16 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BloomFilterWriter.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.util;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.CellSink;
+import org.apache.hadoop.hbase.regionserver.ShipperListener;
 import org.apache.hadoop.io.Writable;
 
 /**
@@ -28,7 +30,7 @@ import org.apache.hadoop.io.Writable;
  * resulting Bloom filter as a sequence of bytes.
  */
 @InterfaceAudience.Private
-public interface BloomFilterWriter extends BloomFilterBase {
+public interface BloomFilterWriter extends BloomFilterBase, CellSink, 
ShipperListener {
 
   /** Compact the Bloom filter before writing metadata &amp; data to disk. */
   void compactBloom();
@@ -48,8 +50,8 @@ public interface BloomFilterWriter extends BloomFilterBase {
   Writable getDataWriter();
 
   /**
-   * Add the specified binary to the bloom filter.
-   * @param cell the cell data to be added to the bloom
+   * Returns the previous cell written by this writer
+   * @return the previous cell
    */
-  void add(Cell cell);
+  Cell getPrevCell();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowBloomContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowBloomContext.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowBloomContext.java
index f6e36d4..592f177 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowBloomContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowBloomContext.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
@@ -31,21 +32,21 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
 @InterfaceAudience.Private
 public class RowBloomContext extends BloomContext {
 
-  public RowBloomContext(BloomFilterWriter bloomFilterWriter) {
-    super(bloomFilterWriter);
+  public RowBloomContext(BloomFilterWriter bloomFilterWriter, CellComparator 
comparator) {
+    super(bloomFilterWriter, comparator);
   }
 
   public void addLastBloomKey(Writer writer) throws IOException {
-    if (lastCell != null) {
-      byte[] key = CellUtil.copyRow(this.lastCell);
+    if (this.getLastCell() != null) {
+      byte[] key = CellUtil.copyRow(this.getLastCell());
       writer.appendFileInfo(StoreFile.LAST_BLOOM_KEY, key);
     }
   }
 
   @Override
   protected boolean isNewKey(Cell cell) {
-    if (this.lastCell != null) {
-      return !CellUtil.matchingRows(cell, this.lastCell);
+    if (this.getLastCell() != null) {
+      return !CellUtil.matchingRows(cell, this.getLastCell());
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowColBloomContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowColBloomContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowColBloomContext.java
index c1b47af..eb0f721 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowColBloomContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/RowColBloomContext.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.util;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
@@ -32,14 +33,14 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
 @InterfaceAudience.Private
 public class RowColBloomContext extends BloomContext {
 
-  public RowColBloomContext(BloomFilterWriter generalBloomFilterWriter) {
-    super(generalBloomFilterWriter);
+  public RowColBloomContext(BloomFilterWriter generalBloomFilterWriter, 
CellComparator comparator) {
+    super(generalBloomFilterWriter, comparator);
   }
 
   @Override
   public void addLastBloomKey(Writer writer) throws IOException {
-    if (this.lastCell != null) {
-      Cell firstOnRow = CellUtil.createFirstOnRowCol(this.lastCell);
+    if (this.getLastCell() != null) {
+      Cell firstOnRow = CellUtil.createFirstOnRowCol(this.getLastCell());
       // This copy happens only once when the writer is closed
       byte[] key = CellUtil.getCellKeySerializedAsKeyValueKey(firstOnRow);
       writer.appendFileInfo(StoreFile.LAST_BLOOM_KEY, key);
@@ -48,8 +49,8 @@ public class RowColBloomContext extends BloomContext {
 
   @Override
   protected boolean isNewKey(Cell cell) {
-    if (this.lastCell != null) {
-      return !CellUtil.matchingRowColumn(cell, this.lastCell);
+    if (this.getLastCell() != null) {
+      return !CellUtil.matchingRowColumn(cell, this.getLastCell());
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/58e843da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
new file mode 100644
index 0000000..cca0eb7
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.CachedBlock;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ LargeTests.class, ClientTests.class })
+@SuppressWarnings("deprecation")
+public class TestAvoidCellReferencesIntoShippedBlocks {
+  private static final Log LOG = 
LogFactory.getLog(TestAvoidCellReferencesIntoShippedBlocks.class);
+  protected final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  static byte[][] ROWS = new byte[2][];
+  private static byte[] ROW = Bytes.toBytes("testRow");
+  private static byte[] ROW1 = Bytes.toBytes("testRow1");
+  private static byte[] ROW2 = Bytes.toBytes("testRow2");
+  private static byte[] ROW3 = Bytes.toBytes("testRow3");
+  private static byte[] ROW4 = Bytes.toBytes("testRow4");
+  private static byte[] ROW5 = Bytes.toBytes("testRow5");
+  private static byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static byte[][] FAMILIES_1 = new byte[1][0];
+  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
+  private static byte[] data = new byte[1000];
+  protected static int SLAVES = 1;
+  private CountDownLatch latch = new CountDownLatch(1);
+  private static CountDownLatch compactReadLatch = new CountDownLatch(1);
+  private static AtomicBoolean doScan = new AtomicBoolean(false);
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    ROWS[0] = ROW;
+    ROWS[1] = ROW1;
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+      MultiRowMutationEndpoint.class.getName());
+    conf.setBoolean("hbase.table.sanity.checks", true); // enable for below
+                                                        // tests
+    conf.setInt("hbase.regionserver.handler.count", 20);
+    conf.setInt("hbase.bucketcache.size", 400);
+    conf.setStrings("hbase.bucketcache.ioengine", "offheap");
+    conf.setInt("hbase.hstore.compactionThreshold", 7);
+    conf.setFloat("hfile.block.cache.size", 0.2f);
+    conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
+    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 500000);
+    FAMILIES_1[0] = FAMILY;
+    TEST_UTIL.startMiniCluster(SLAVES);
+    compactReadLatch = new CountDownLatch(1);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testHBase16372InCompactionWritePath() throws Exception {
+    TableName tableName = 
TableName.valueOf("testHBase16372InCompactionWritePath");
+    // Create a table with block size as 1024
+    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
+      CompactorRegionObserver.class.getName());
+    try {
+      // get the block cache and region
+      RegionLocator locator = 
TEST_UTIL.getConnection().getRegionLocator(tableName);
+      String regionName = 
locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
+      Region region =
+          
TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(regionName);
+      Store store = region.getStores().iterator().next();
+      CacheConfig cacheConf = store.getCacheConfig();
+      cacheConf.setCacheDataOnWrite(true);
+      cacheConf.setEvictOnClose(true);
+      final BlockCache cache = cacheConf.getBlockCache();
+      // insert data. 5 Rows are added
+      Put put = new Put(ROW);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      put = new Put(ROW1);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      // data was in memstore so don't expect any changes
+      region.flush(true);
+      put = new Put(ROW1);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      put = new Put(ROW2);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW2);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      // data was in memstore so don't expect any changes
+      region.flush(true);
+      put = new Put(ROW3);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW3);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      put = new Put(ROW4);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      // data was in memstore so don't expect any changes
+      region.flush(true);
+      put = new Put(ROW4);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      put = new Put(ROW5);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW5);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      // data was in memstore so don't expect any changes
+      region.flush(true);
+      // Load cache
+      Scan s = new Scan();
+      s.setMaxResultSize(1000);
+      ResultScanner scanner = table.getScanner(s);
+      int count = 0;
+      for (Result result : scanner) {
+        count++;
+      }
+      assertEquals("Count all the rows ", count, 6);
+      // all the cache is loaded
+      // trigger a major compaction
+      ScannerThread scannerThread = new ScannerThread(table, cache);
+      scannerThread.start();
+      region.compact(true);
+      s = new Scan();
+      s.setMaxResultSize(1000);
+      scanner = table.getScanner(s);
+      count = 0;
+      for (Result result : scanner) {
+        count++;
+      }
+      assertEquals("Count all the rows ", count, 6);
+    } finally {
+      table.close();
+    }
+  }
+
+  private static class ScannerThread extends Thread {
+    private final Table table;
+    private final BlockCache cache;
+
+    public ScannerThread(Table table, BlockCache cache) {
+      this.table = table;
+      this.cache = cache;
+    }
+
+    public void run() {
+      Scan s = new Scan();
+      s.setCaching(1);
+      s.setStartRow(ROW4);
+      s.setStopRow(ROW5);
+      try {
+        while(!doScan.get()) {
+          try {
+            // Sleep till you start scan
+            Thread.sleep(1);
+          } catch (InterruptedException e) {
+          }
+        }
+        List<BlockCacheKey> cacheList = new ArrayList<BlockCacheKey>();
+        Iterator<CachedBlock> iterator = cache.iterator();
+        // evict all the blocks
+        while (iterator.hasNext()) {
+          CachedBlock next = iterator.next();
+          BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), 
next.getOffset());
+          cacheList.add(cacheKey);
+          // evict what ever is available
+          cache.evictBlock(cacheKey);
+        }
+        ResultScanner scanner = table.getScanner(s);
+        for (Result res : scanner) {
+
+        }
+        compactReadLatch.countDown();
+      } catch (IOException e) {
+      }
+    }
+  }
+
+  public static class CompactorRegionObserver extends BaseRegionObserver {
+    @Override
+    public InternalScanner 
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+        Store store, List<? extends KeyValueScanner> scanners, ScanType 
scanType,
+        long earliestPutTs, InternalScanner s) throws IOException {
+      return createCompactorScanner(store, scanners, scanType, earliestPutTs);
+    }
+
+    @Override
+    public InternalScanner 
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+        Store store, List<? extends KeyValueScanner> scanners, ScanType 
scanType,
+        long earliestPutTs, InternalScanner s, CompactionRequest request) 
throws IOException {
+      return createCompactorScanner(store, scanners, scanType, earliestPutTs);
+    }
+
+    private InternalScanner createCompactorScanner(Store store,
+        List<? extends KeyValueScanner> scanners, ScanType scanType, long 
earliestPutTs)
+        throws IOException {
+      Scan scan = new Scan();
+      scan.setMaxVersions(store.getFamily().getMaxVersions());
+      return new CompactorStoreScanner(store, store.getScanInfo(), scan, 
scanners, scanType,
+          store.getSmallestReadPoint(), earliestPutTs);
+    }
+  }
+
+  private static class CompactorStoreScanner extends StoreScanner {
+
+    public CompactorStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+        List<? extends KeyValueScanner> scanners, ScanType scanType, long 
smallestReadPoint,
+        long earliestPutTs) throws IOException {
+      super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, 
earliestPutTs);
+    }
+
+    @Override
+    public boolean next(List<Cell> outResult, ScannerContext scannerContext) 
throws IOException {
+      boolean next = super.next(outResult, scannerContext);
+      for (Cell cell : outResult) {
+        if(CellComparator.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) 
== 0) {
+          try {
+            // hold the compaction
+            // set doscan to true
+            doScan.compareAndSet(false, true);
+            compactReadLatch.await();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+      return next;
+    }
+  }
+
+  @Test
+  public void testHBASE16372InReadPath() throws Exception {
+    TableName tableName = TableName.valueOf("testHBASE16372");
+    // Create a table with block size as 1024
+    final Table table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024, 
null);
+    try {
+      // get the block cache and region
+      RegionLocator locator = 
TEST_UTIL.getConnection().getRegionLocator(tableName);
+      String regionName = 
locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
+      Region region =
+          
TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(regionName);
+      Store store = region.getStores().iterator().next();
+      CacheConfig cacheConf = store.getCacheConfig();
+      cacheConf.setCacheDataOnWrite(true);
+      cacheConf.setEvictOnClose(true);
+      final BlockCache cache = cacheConf.getBlockCache();
+      // insert data. 5 Rows are added
+      Put put = new Put(ROW);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      put = new Put(ROW1);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW1);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      put = new Put(ROW2);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW2);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      put = new Put(ROW3);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW3);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      put = new Put(ROW4);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW4);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      put = new Put(ROW5);
+      put.addColumn(FAMILY, QUALIFIER, data);
+      table.put(put);
+      put = new Put(ROW5);
+      put.addColumn(FAMILY, QUALIFIER1, data);
+      table.put(put);
+      // data was in memstore so don't expect any changes
+      region.flush(true);
+      // Load cache
+      Scan s = new Scan();
+      s.setMaxResultSize(1000);
+      ResultScanner scanner = table.getScanner(s);
+      int count = 0;
+      for (Result result : scanner) {
+        count++;
+      }
+      assertEquals("Count all the rows ", count, 6);
+
+      // Scan from cache
+      s = new Scan();
+      // Start a scan from row3
+      s.setCaching(1);
+      s.setStartRow(ROW1);
+      // set partial as true so that the scan can send partial columns also
+      s.setAllowPartialResults(true);
+      s.setMaxResultSize(1000);
+      scanner = table.getScanner(s);
+      Thread evictorThread = new Thread() {
+        @Override
+        public void run() {
+          List<BlockCacheKey> cacheList = new ArrayList<BlockCacheKey>();
+          Iterator<CachedBlock> iterator = cache.iterator();
+          // evict all the blocks
+          while (iterator.hasNext()) {
+            CachedBlock next = iterator.next();
+            BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), 
next.getOffset());
+            cacheList.add(cacheKey);
+            cache.evictBlock(cacheKey);
+          }
+          try {
+            Thread.sleep(1);
+          } catch (InterruptedException e1) {
+          }
+          iterator = cache.iterator();
+          int refBlockCount = 0;
+          while (iterator.hasNext()) {
+            iterator.next();
+            refBlockCount++;
+          }
+          assertEquals("One block should be there ", refBlockCount, 1);
+          // Rescan to prepopulate the data
+          // cache this row.
+          Scan s1 = new Scan();
+          // This scan will start from ROW1 and it will populate the cache 
with a
+          // row that is lower than ROW3.
+          s1.setStartRow(ROW3);
+          s1.setStopRow(ROW5);
+          s1.setCaching(1);
+          ResultScanner scanner;
+          try {
+            scanner = table.getScanner(s1);
+            int count = 0;
+            for (Result result : scanner) {
+              count++;
+            }
+            assertEquals("Count the rows", count, 2);
+            iterator = cache.iterator();
+            List<BlockCacheKey> newCacheList = new ArrayList<BlockCacheKey>();
+            while (iterator.hasNext()) {
+              CachedBlock next = iterator.next();
+              BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), 
next.getOffset());
+              newCacheList.add(cacheKey);
+            }
+            int newBlockRefCount = 0;
+            for (BlockCacheKey key : cacheList) {
+              if (newCacheList.contains(key)) {
+                newBlockRefCount++;
+              }
+            }
+
+            assertEquals("old blocks should still be found ", 
newBlockRefCount, 6);
+            latch.countDown();
+
+          } catch (IOException e) {
+          }
+        }
+      };
+      count = 0;
+      for (Result result : scanner) {
+        count++;
+        if (count == 2) {
+          evictorThread.start();
+          latch.await();
+        }
+      }
+      assertEquals("Count should give all rows ", count, 10);
+    } finally {
+      table.close();
+    }
+  }
+}

Reply via email to