Repository: hbase
Updated Branches:
  refs/heads/0.98 2d37ad9da -> c6608e652


HBASE-14501 NPE in replication with TDE

Conflicts:
        hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
        hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java

Conflicts:
        hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
        
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Conflicts:
        hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c6608e65
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c6608e65
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c6608e65

Branch: refs/heads/0.98
Commit: c6608e652a118b1c5dc683e5e5b9694724e1d8f5
Parents: 2d37ad9
Author: Enis Soztutar <e...@apache.org>
Authored: Mon Oct 12 20:37:34 2015 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Mon Oct 12 21:05:54 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/KeyValue.java  |  8 +++--
 .../org/apache/hadoop/hbase/KeyValueUtil.java   |  3 +-
 .../apache/hadoop/hbase/codec/BaseDecoder.java  | 38 +++++++++++++++-----
 .../apache/hadoop/hbase/codec/CellCodec.java    |  1 +
 .../regionserver/wal/SecureWALCellCodec.java    |  9 ++---
 .../regionserver/ReplicationSource.java         |  1 +
 6 files changed, 40 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 66f18d0..8e96ef1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.util.Bytes.len;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -2853,8 +2854,7 @@ public class KeyValue implements Cell, HeapSize, 
Cloneable {
    * Create a KeyValue reading from the raw InputStream.
    * Named <code>iscreate</code> so doesn't clash with {@link 
#create(DataInput)}
    * @param in
-   * @return Created KeyValue OR if we find a length of zero, we will return 
null which
-   * can be useful marking a stream as done.
+   * @return Created KeyValue or throws an exception
    * @throws IOException
    */
   public static KeyValue iscreate(final InputStream in) throws IOException {
@@ -2863,7 +2863,9 @@ public class KeyValue implements Cell, HeapSize, 
Cloneable {
     while (bytesRead < intBytes.length) {
       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
       if (n < 0) {
-        if (bytesRead == 0) return null; // EOF at start is ok
+        if (bytesRead == 0) {
+          throw new EOFException();
+        }
         throw new IOException("Failed read of int, read " + bytesRead + " 
bytes");
       }
       bytesRead += n;

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
index d35b12e..cc517b9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
@@ -148,7 +148,7 @@ public class KeyValueUtil {
    * position to the start of the next KeyValue. Does not allocate a new array 
or copy data.
    * @param bb
    * @param includesMvccVersion
-   * @param includesTags 
+   * @param includesTags
    */
   public static KeyValue nextShallowCopy(final ByteBuffer bb, final boolean 
includesMvccVersion,
       boolean includesTags) {
@@ -226,6 +226,7 @@ public class KeyValueUtil {
 
   public static List<KeyValue> ensureKeyValues(List<Cell> cells) {
     List<KeyValue> lazyList = Lists.transform(cells, new Function<Cell, 
KeyValue>() {
+      @Override
       public KeyValue apply(Cell arg0) {
         return KeyValueUtil.ensureKeyValue(arg0);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
index 3776c08..83549a8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.codec;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.PushbackInputStream;
+
+import javax.annotation.Nonnull;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,27 +35,41 @@ import org.apache.hadoop.hbase.Cell;
 @InterfaceAudience.Private
 public abstract class BaseDecoder implements Codec.Decoder {
   protected static final Log LOG = LogFactory.getLog(BaseDecoder.class);
-  protected final InputStream in;
-  private boolean hasNext = true;
+
+  protected final PBIS in;
   private Cell current = null;
 
+  protected static class PBIS extends PushbackInputStream {
+    public PBIS(InputStream in, int size) {
+      super(in, size);
+    }
+
+    public void resetBuf(int size) {
+      this.buf = new byte[size];
+      this.pos = size;
+    }
+  }
+
   public BaseDecoder(final InputStream in) {
-    this.in = in;
+    this.in = new PBIS(in, 1);
   }
 
   @Override
   public boolean advance() throws IOException {
-    if (!this.hasNext) return this.hasNext;
-    if (this.in.available() == 0) {
-      this.hasNext = false;
-      return this.hasNext;
+    int firstByte = in.read();
+    if (firstByte == -1) {
+      return false;
+    } else {
+      in.unread(firstByte);
     }
+
     try {
       this.current = parseCell();
     } catch (IOException ioEx) {
+      in.resetBuf(1); // reset the buffer in case the underlying stream is 
read from upper layers
       rethrowEofException(ioEx);
     }
-    return this.hasNext;
+    return true;
   }
 
   private void rethrowEofException(IOException ioEx) throws IOException {
@@ -72,9 +89,12 @@ public abstract class BaseDecoder implements Codec.Decoder {
   }
 
   /**
-   * @return extract a Cell
+   * Extract a Cell.
+   * @return a parsed Cell or throws an Exception. EOFException or a generic 
IOException maybe
+   * thrown if EOF is reached prematurely. Does not return null.
    * @throws IOException
    */
+  @Nonnull
   protected abstract Cell parseCell() throws IOException;
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
index 77cf80a..6691606 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
@@ -78,6 +78,7 @@ public class CellCodec implements Codec {
       super(in);
     }
 
+    @Override
     protected Cell parseCell() throws IOException {
       byte [] row = readByteArray(this.in);
       byte [] family = readByteArray(in);

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
index ac242bf..821dd09 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -83,12 +82,8 @@ public class SecureWALCellCodec extends WALCellCodec {
         return super.parseCell();
       }
       int ivLength = 0;
-      try {
-        ivLength = StreamUtils.readRawVarint32(in);
-      } catch (EOFException e) {
-        // EOF at start is OK
-        return null;
-      }
+
+      ivLength = StreamUtils.readRawVarint32(in);
 
       // TODO: An IV length of 0 could signify an unwrapped cell, when the
       // encoder supports that just read the remainder in directly

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 02ddfbd..ebf5996 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -639,6 +639,7 @@ public class ReplicationSource extends Thread
       if (!kvs.get(i).matchingRow(lastKV)) {
         distinctRowKeys++;
       }
+      lastKV = kvs.get(i);
     }
     return distinctRowKeys;
   }

Reply via email to