This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 14508a6e018369c5438cd3d9af3edd533e65129e Author: liudezhi <[email protected]> AuthorDate: Wed Feb 26 19:26:15 2020 +0800 Consumer received duplicated deplayed messages upon restart Fix when send a delayed message ,there is a case when a consumer restarts and pull duplicate messages. #6403 (cherry picked from commit e71b9fc4e256f24c9b6c0edd14e40e8af1f24374) --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 158a2ce..755314f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1112,7 +1112,14 @@ public class ManagedCursorImpl implements ManagedCursor { }; positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position)) - .forEach(p -> ledger.asyncReadEntry((PositionImpl) p, cb, ctx)); + .forEach(p ->{ + if (((PositionImpl) p).compareTo(this.readPosition) == 0) { + this.setReadPosition(this.readPosition.getNext()); + log.warn("[{}][{}] replayPosition{} equals readPosition{}," + " need set next readPositio", + ledger.getName(), name, (PositionImpl) p, this.readPosition); + } + ledger.asyncReadEntry((PositionImpl) p, cb, ctx); + }); return alreadyAcknowledgedPositions; }
