Repository: hbase Updated Branches: refs/heads/branch-1 5fcfdcdc5 -> efc203a5a refs/heads/branch-1.2 56ae22869 -> 3d9b0409c refs/heads/branch-1.3 484b651fc -> 6bb7b4cde refs/heads/branch-1.4 3f1fb46ca -> 58dfaab4f refs/heads/branch-2 e5fb2f968 -> dcdebbffd refs/heads/branch-2.0 5fff00419 -> 6214e7801 refs/heads/branch-2.1 0ec9f81bc -> 3a13088a2 refs/heads/master 2153d2c0c -> a8ad61ec8
HBASE-20604 ProtobufLogReader#readNext can incorrectly loop to the same position in the stream until the the WAL is rolled Signed-off-by: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/efc203a5 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/efc203a5 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/efc203a5 Branch: refs/heads/branch-1 Commit: efc203a5a2d735723135f4f16b72075acb631ce0 Parents: 5fcfdcd Author: Esteban Gutierrez <este...@apache.org> Authored: Fri May 18 15:11:13 2018 -0500 Committer: Andrew Purtell <apurt...@apache.org> Committed: Wed Nov 7 18:24:13 2018 -0800 ---------------------------------------------------------------------- .../regionserver/wal/ProtobufLogReader.java | 30 +++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/efc203a5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 5643174..3edbc85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -336,6 +336,7 @@ public class ProtobufLogReader extends ReaderBase { } WALKey.Builder builder = WALKey.newBuilder(); long size = 0; + boolean resetPosition = false; try { long available = -1; try { @@ -354,6 +355,7 @@ public class ProtobufLogReader extends ReaderBase { ProtobufUtil.mergeFrom(builder, new LimitInputStream(this.inputStream, size), (int)size); } catch (InvalidProtocolBufferException ipbe) { + resetPosition = true; throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition=" + originalPosition + ", currentPosition=" + this.inputStream.getPos() + ", messageSize=" + size + ", currentAvailable=" + available).initCause(ipbe); @@ -370,13 +372,15 @@ public class ProtobufLogReader extends ReaderBase { if (LOG.isTraceEnabled()) { LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + this.inputStream.getPos()); } - continue; + seekOnFs(originalPosition); + return false; } int expectedCells = walKey.getFollowingKvCount(); long posBefore = this.inputStream.getPos(); try { int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells); if (expectedCells != actualCells) { + resetPosition = true; throw new EOFException("Only read " + actualCells); // other info added in catch } } catch (Exception ex) { @@ -404,16 +408,28 @@ public class ProtobufLogReader extends ReaderBase { // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs) if (originalPosition < 0) { if (LOG.isTraceEnabled()) { - LOG.trace("Encountered a malformed edit, but can't seek back to last good position because originalPosition is negative. last offset=" + this.inputStream.getPos(), eof); + LOG.trace("Encountered a malformed edit, but can't seek back to last good position " + + "because originalPosition is negative. last offset=" + + this.inputStream.getPos(), eof); } throw eof; } - // Else restore our position to original location in hope that next time through we will - // read successfully. - if (LOG.isTraceEnabled()) { - LOG.trace("Encountered a malformed edit, seeking back to last good position in file, from "+ inputStream.getPos()+" to " + originalPosition, eof); + // If stuck at the same place and we got and exception, lets go back at the beginning. + if (inputStream.getPos() == originalPosition && resetPosition) { + if (LOG.isTraceEnabled()) { + LOG.trace("Encountered a malformed edit, seeking to the beginning of the WAL since " + + "current position and original position match at " + originalPosition); + } + seekOnFs(0); + } else { + // Else restore our position to original location in hope that next time through we will + // read successfully. + if (LOG.isTraceEnabled()) { + LOG.trace("Encountered a malformed edit, seeking back to last good position in file, " + + "from " + inputStream.getPos()+" to " + originalPosition, eof); + } + seekOnFs(originalPosition); } - seekOnFs(originalPosition); return false; } return true;