Repository: hbase Updated Branches: refs/heads/branch-1.2 9201e0d07 -> b7b18e09f
HBASE-14501 NPE in replication with TDE Conflicts: hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b7b18e09 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b7b18e09 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b7b18e09 Branch: refs/heads/branch-1.2 Commit: b7b18e09f4f12b117d37b929127e8f52927b9d98 Parents: 9201e0d Author: Enis Soztutar <e...@apache.org> Authored: Mon Oct 12 20:37:34 2015 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Mon Oct 12 20:57:57 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hbase/KeyValue.java | 4 +-- .../org/apache/hadoop/hbase/KeyValueUtil.java | 11 +++--- .../apache/hadoop/hbase/codec/BaseDecoder.java | 38 +++++++++++++++----- .../apache/hadoop/hbase/codec/CellCodec.java | 1 + .../regionserver/wal/SecureWALCellCodec.java | 9 ++--- .../regionserver/ReplicationSource.java | 1 + 6 files changed, 42 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b18e09/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index bebbb94..3c1745e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.util.Bytes.len; import java.io.DataInput; import java.io.DataOutput; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -2497,8 +2498,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * Create a KeyValue reading from the raw InputStream. * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)} * @param in - * @return Created KeyValue OR if we find a length of zero, we will return null which - * can be useful marking a stream as done. + * @return Created KeyValue or throws an exception * @throws IOException * @deprecated Use {@link KeyValueUtil#iscreate(InputStream, boolean)} */ http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b18e09/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java index bbdece0..4412da4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase; import java.io.DataInput; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -187,7 +188,7 @@ public class KeyValueUtil { * position to the start of the next KeyValue. Does not allocate a new array or copy data. * @param bb * @param includesMvccVersion - * @param includesTags + * @param includesTags */ public static KeyValue nextShallowCopy(final ByteBuffer bb, final boolean includesMvccVersion, boolean includesTags) { @@ -251,7 +252,7 @@ public class KeyValueUtil { return createFirstOnRow(CellUtil.cloneRow(in), CellUtil.cloneFamily(in), CellUtil.cloneQualifier(in), in.getTimestamp() - 1); } - + /** * Create a KeyValue for the specified row, family and qualifier that would be @@ -546,6 +547,7 @@ public class KeyValueUtil { @Deprecated public static List<KeyValue> ensureKeyValues(List<Cell> cells) { List<KeyValue> lazyList = Lists.transform(cells, new Function<Cell, KeyValue>() { + @Override public KeyValue apply(Cell arg0) { return KeyValueUtil.ensureKeyValue(arg0); } @@ -571,8 +573,9 @@ public class KeyValueUtil { while (bytesRead < intBytes.length) { int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead); if (n < 0) { - if (bytesRead == 0) - return null; // EOF at start is ok + if (bytesRead == 0) { + throw new EOFException(); + } throw new IOException("Failed read of int, read " + bytesRead + " bytes"); } bytesRead += n; http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b18e09/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java index 3776c08..83549a8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java @@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.codec; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.PushbackInputStream; + +import javax.annotation.Nonnull; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,27 +35,41 @@ import org.apache.hadoop.hbase.Cell; @InterfaceAudience.Private public abstract class BaseDecoder implements Codec.Decoder { protected static final Log LOG = LogFactory.getLog(BaseDecoder.class); - protected final InputStream in; - private boolean hasNext = true; + + protected final PBIS in; private Cell current = null; + protected static class PBIS extends PushbackInputStream { + public PBIS(InputStream in, int size) { + super(in, size); + } + + public void resetBuf(int size) { + this.buf = new byte[size]; + this.pos = size; + } + } + public BaseDecoder(final InputStream in) { - this.in = in; + this.in = new PBIS(in, 1); } @Override public boolean advance() throws IOException { - if (!this.hasNext) return this.hasNext; - if (this.in.available() == 0) { - this.hasNext = false; - return this.hasNext; + int firstByte = in.read(); + if (firstByte == -1) { + return false; + } else { + in.unread(firstByte); } + try { this.current = parseCell(); } catch (IOException ioEx) { + in.resetBuf(1); // reset the buffer in case the underlying stream is read from upper layers rethrowEofException(ioEx); } - return this.hasNext; + return true; } private void rethrowEofException(IOException ioEx) throws IOException { @@ -72,9 +89,12 @@ public abstract class BaseDecoder implements Codec.Decoder { } /** - * @return extract a Cell + * Extract a Cell. + * @return a parsed Cell or throws an Exception. EOFException or a generic IOException maybe + * thrown if EOF is reached prematurely. Does not return null. * @throws IOException */ + @Nonnull protected abstract Cell parseCell() throws IOException; @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b18e09/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java index 77cf80a..6691606 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java @@ -78,6 +78,7 @@ public class CellCodec implements Codec { super(in); } + @Override protected Cell parseCell() throws IOException { byte [] row = readByteArray(this.in); byte [] family = readByteArray(in); http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b18e09/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java index 46f3b88..69181e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -84,12 +83,8 @@ public class SecureWALCellCodec extends WALCellCodec { return super.parseCell(); } int ivLength = 0; - try { - ivLength = StreamUtils.readRawVarint32(in); - } catch (EOFException e) { - // EOF at start is OK - return null; - } + + ivLength = StreamUtils.readRawVarint32(in); // TODO: An IV length of 0 could signify an unwrapped cell, when the // encoder supports that just read the remainder in directly http://git-wip-us.apache.org/repos/asf/hbase/blob/b7b18e09/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2126f6d..697968e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -649,6 +649,7 @@ public class ReplicationSource extends Thread if (!CellUtil.matchingRow(cells.get(i), lastCell)) { distinctRowKeys++; } + lastCell = cells.get(i); } return distinctRowKeys; }