This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 14508a6e018369c5438cd3d9af3edd533e65129e
Author: liudezhi <[email protected]>
AuthorDate: Wed Feb 26 19:26:15 2020 +0800

    Consumer received duplicated deplayed messages upon restart
    
    Fix when send a delayed message ,there is a case when a consumer restarts 
and pull duplicate messages. #6403
    (cherry picked from commit e71b9fc4e256f24c9b6c0edd14e40e8af1f24374)
---
 .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java    | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 158a2ce..755314f 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1112,7 +1112,14 @@ public class ManagedCursorImpl implements ManagedCursor {
         };
 
         positions.stream().filter(position -> 
!alreadyAcknowledgedPositions.contains(position))
-                .forEach(p -> ledger.asyncReadEntry((PositionImpl) p, cb, 
ctx));
+                .forEach(p ->{
+                    if (((PositionImpl) p).compareTo(this.readPosition) == 0) {
+                        this.setReadPosition(this.readPosition.getNext());
+                        log.warn("[{}][{}] replayPosition{} equals 
readPosition{}," + " need set next readPositio",
+                                ledger.getName(), name, (PositionImpl) p, 
this.readPosition);
+                    }
+                    ledger.asyncReadEntry((PositionImpl) p, cb, ctx);
+                });
 
         return alreadyAcknowledgedPositions;
     }

Reply via email to