Repository: hbase Updated Branches: refs/heads/0.98 2d37ad9da -> c6608e652
HBASE-14501 NPE in replication with TDE Conflicts: hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java Conflicts: hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Conflicts: hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c6608e65 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c6608e65 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c6608e65 Branch: refs/heads/0.98 Commit: c6608e652a118b1c5dc683e5e5b9694724e1d8f5 Parents: 2d37ad9 Author: Enis Soztutar <e...@apache.org> Authored: Mon Oct 12 20:37:34 2015 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Mon Oct 12 21:05:54 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hbase/KeyValue.java | 8 +++-- .../org/apache/hadoop/hbase/KeyValueUtil.java | 3 +- .../apache/hadoop/hbase/codec/BaseDecoder.java | 38 +++++++++++++++----- .../apache/hadoop/hbase/codec/CellCodec.java | 1 + .../regionserver/wal/SecureWALCellCodec.java | 9 ++--- .../regionserver/ReplicationSource.java | 1 + 6 files changed, 40 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 66f18d0..8e96ef1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.util.Bytes.len; import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -2853,8 +2854,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { * Create a KeyValue reading from the raw InputStream. * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)} * @param in - * @return Created KeyValue OR if we find a length of zero, we will return null which - * can be useful marking a stream as done. + * @return Created KeyValue or throws an exception * @throws IOException */ public static KeyValue iscreate(final InputStream in) throws IOException { @@ -2863,7 +2863,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable { while (bytesRead < intBytes.length) { int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead); if (n < 0) { - if (bytesRead == 0) return null; // EOF at start is ok + if (bytesRead == 0) { + throw new EOFException(); + } throw new IOException("Failed read of int, read " + bytesRead + " bytes"); } bytesRead += n; http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index d35b12e..cc517b9 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -148,7 +148,7 @@ public class KeyValueUtil { * position to the start of the next KeyValue. Does not allocate a new array or copy data. * @param bb * @param includesMvccVersion - * @param includesTags + * @param includesTags */ public static KeyValue nextShallowCopy(final ByteBuffer bb, final boolean includesMvccVersion, boolean includesTags) { @@ -226,6 +226,7 @@ public class KeyValueUtil { public static List<KeyValue> ensureKeyValues(List<Cell> cells) { List<KeyValue> lazyList = Lists.transform(cells, new Function<Cell, KeyValue>() { + @Override public KeyValue apply(Cell arg0) { return KeyValueUtil.ensureKeyValue(arg0); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java index 3776c08..83549a8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.codec; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.PushbackInputStream; + +import javax.annotation.Nonnull; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,27 +35,41 @@ import org.apache.hadoop.hbase.Cell; @InterfaceAudience.Private public abstract class BaseDecoder implements Codec.Decoder { protected static final Log LOG = LogFactory.getLog(BaseDecoder.class); - protected final InputStream in; - private boolean hasNext = true; + + protected final PBIS in; private Cell current = null; + protected static class PBIS extends PushbackInputStream { + public PBIS(InputStream in, int size) { + super(in, size); + } + + public void resetBuf(int size) { + this.buf = new byte[size]; + this.pos = size; + } + } + public BaseDecoder(final InputStream in) { - this.in = in; + this.in = new PBIS(in, 1); } @Override public boolean advance() throws IOException { - if (!this.hasNext) return this.hasNext; - if (this.in.available() == 0) { - this.hasNext = false; - return this.hasNext; + int firstByte = in.read(); + if (firstByte == -1) { + return false; + } else { + in.unread(firstByte); } + try { this.current = parseCell(); } catch (IOException ioEx) { + in.resetBuf(1); // reset the buffer in case the underlying stream is read from upper layers rethrowEofException(ioEx); } - return this.hasNext; + return true; } private void rethrowEofException(IOException ioEx) throws IOException { @@ -72,9 +89,12 @@ public abstract class BaseDecoder implements Codec.Decoder { } /** - * @return extract a Cell + * Extract a Cell. + * @return a parsed Cell or throws an Exception. EOFException or a generic IOException maybe + * thrown if EOF is reached prematurely. Does not return null. * @throws IOException */ + @Nonnull protected abstract Cell parseCell() throws IOException; @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java index 77cf80a..6691606 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java @@ -78,6 +78,7 @@ public class CellCodec implements Codec { super(in); } + @Override protected Cell parseCell() throws IOException { byte [] row = readByteArray(this.in); byte [] family = readByteArray(in); http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java index ac242bf..821dd09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -83,12 +82,8 @@ public class SecureWALCellCodec extends WALCellCodec { return super.parseCell(); } int ivLength = 0; - try { - ivLength = StreamUtils.readRawVarint32(in); - } catch (EOFException e) { - // EOF at start is OK - return null; - } + + ivLength = StreamUtils.readRawVarint32(in); // TODO: An IV length of 0 could signify an unwrapped cell, when the // encoder supports that just read the remainder in directly http://git-wip-us.apache.org/repos/asf/hbase/blob/c6608e65/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 02ddfbd..ebf5996 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -639,6 +639,7 @@ public class ReplicationSource extends Thread if (!kvs.get(i).matchingRow(lastKV)) { distinctRowKeys++; } + lastKV = kvs.get(i); } return distinctRowKeys; }