This is an automated email from the ASF dual-hosted git repository.

Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new c08b1e9502c [fix][broker] Move pending acks cleanup to selected 
mark-delete callbacks (#25592)
c08b1e9502c is described below

commit c08b1e9502c27b5044f7b243d40f3660be9e34ee
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 12 18:52:00 2026 +0800

    [fix][broker] Move pending acks cleanup to selected mark-delete callbacks 
(#25592)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 132 ++++++++++++---------
 .../apache/pulsar/broker/service/Dispatcher.java   |   9 ++
 .../pulsar/broker/service/PendingAcksMap.java      |  35 +++++-
 .../PersistentDispatcherMultipleConsumers.java     |  24 ++--
 .../persistent/PersistentMessageExpiryMonitor.java |  10 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   1 +
 ...tickyKeyDispatcherMultipleConsumersClassic.java |   4 +-
 .../service/persistent/PersistentSubscription.java |   6 +-
 .../client/impl/TransactionEndToEndTest.java       |   8 --
 9 files changed, 140 insertions(+), 89 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index d37cecc2767..8d897c80062 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -253,6 +253,7 @@ public class Consumer {
         OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
     }
 
+
     @VisibleForTesting
     Consumer(String consumerName, int availablePermits) {
         this.subscription = null;
@@ -378,9 +379,18 @@ public class Consumer {
                     } else {
                         stickyKeyHash = stickyKeyHashes.get(i);
                     }
-                    boolean sendingAllowed =
-                            
pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(), 
batchSize,
-                                    stickyKeyHash);
+                    boolean sendingAllowed;
+                    long[] ackSet = batchIndexesAcks == null ? null : 
batchIndexesAcks.getAckSet(i);
+                    int remainingUnacked;
+                    if (ackSet != null) {
+                        remainingUnacked = 
BitSet.valueOf(ackSet).cardinality();
+                        unackedMessages -= (batchSize - remainingUnacked);
+                    } else {
+                        remainingUnacked = batchSize;
+                    }
+                    sendingAllowed =
+                            
pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(),
+                                    remainingUnacked, stickyKeyHash);
                     if (!sendingAllowed) {
                         // sending isn't allowed when pending acks doesn't 
accept adding the entry
                         // this happens when Key_Shared draining hashes 
contains the stickyKeyHash
@@ -395,10 +405,6 @@ public class Consumer {
                                     consumerId);
                         }
                     } else {
-                        long[] ackSet = batchIndexesAcks == null ? null : 
batchIndexesAcks.getAckSet(i);
-                        if (ackSet != null) {
-                            unackedMessages -= (batchSize - 
BitSet.valueOf(ackSet).cardinality());
-                        }
                         if (log.isDebugEnabled()) {
                             log.debug("[{}-{}] Added {}:{} ledger entry with 
batchSize of {} to pendingAcks in"
                                             + " broker.service.Consumer for 
consumerId: {}",
@@ -578,7 +584,7 @@ public class Consumer {
             ObjectIntPair<Consumer> ackOwnerConsumerAndBatchSize =
                     getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), 
msgId.getEntryId());
             Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left();
-            long ackedCount;
+            long ackedCount = 0;
             int batchSize = ackOwnerConsumerAndBatchSize.rightInt();
             if (msgId.getAckSetsCount() > 0) {
                 long[] ackSets = new long[msgId.getAckSetsCount()];
@@ -594,11 +600,19 @@ public class Consumer {
                                 
.syncBatchPositionBitSetForPendingAck(position);
                     }
                 }
-                addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+                if (ackedCount > 0) {
+                    boolean updated = ackOwnerConsumer.updateRemainingUnacked(
+                            position.getLedgerId(), position.getEntryId(), 
(int) ackedCount);
+                    if (updated) {
+                        addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) 
ackedCount);
+                    }
+                }
             } else {
                 position = PositionFactory.create(msgId.getLedgerId(), 
msgId.getEntryId());
-                ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, 
position, ackOwnerConsumer);
-                if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, 
position, msgId)) {
+                IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet(
+                        position.getLedgerId(), position.getEntryId());
+                if (removed != null) {
+                    ackedCount = removed.leftInt();
                     addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
                     updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
                 }
@@ -676,12 +690,22 @@ public class Consumer {
                 }
                 AckSetStateUtil.getAckSetState(position).setAckSet(ackSets);
                 ackedCount = getAckedCountForTransactionAck(batchSize, 
ackSets);
+                if (ackedCount > 0) {
+                    boolean updated = ackOwnerConsumer.updateRemainingUnacked(
+                            position.getLedgerId(), position.getEntryId(), 
(int) ackedCount);
+                    if (updated) {
+                        addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) 
ackedCount);
+                    }
+                }
+            } else {
+                IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet(
+                        position.getLedgerId(), position.getEntryId());
+                if (removed != null) {
+                    addAndGetUnAckedMsgs(ackOwnerConsumer, -removed.leftInt());
+                    updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
+                }
             }
 
-            addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
-
-            checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, 
msgId);
-
             checkAckValidationError(ack, position);
 
             totalAckCount.add(ackedCount);
@@ -705,16 +729,6 @@ public class Consumer {
         return completableFuture.thenApply(__ -> totalAckCount.sum());
     }
 
-    private long getAckedCountForMsgIdNoAckSets(int batchSize, Position 
position, Consumer consumer) {
-        if (isAcknowledgmentAtBatchIndexLevelEnabled && 
Subscription.isIndividualAckMode(subType)) {
-            long[] cursorAckSet = getCursorAckSet(position);
-            if (cursorAckSet != null) {
-                return getAckedCountForBatchIndexLevelEnabled(position, 
batchSize, EMPTY_ACK_SET, consumer);
-            }
-        }
-        return batchSize;
-    }
-
     private long getAckedCountForBatchIndexLevelEnabled(Position position, int 
batchSize, long[] ackSets,
                                                         Consumer consumer) {
         long ackedCount = 0;
@@ -744,19 +758,6 @@ public class Consumer {
         return ackedCount;
     }
 
-    private long getUnAckedCountForBatchIndexLevelEnabled(Position position, 
int batchSize) {
-        long unAckedCount = batchSize;
-        if (isAcknowledgmentAtBatchIndexLevelEnabled) {
-            long[] cursorAckSet = getCursorAckSet(position);
-            if (cursorAckSet != null) {
-                BitSetRecyclable cursorBitSet = 
BitSetRecyclable.create().resetWords(cursorAckSet);
-                unAckedCount = cursorBitSet.cardinality();
-                cursorBitSet.recycle();
-            }
-        }
-        return unAckedCount;
-    }
-
     private void checkAckValidationError(CommandAck ack, Position position) {
         if (ack.hasValidationError()) {
             log.warn("[{}] [{}] Received ack for corrupted message at {} - 
Reason: {}", subscription,
@@ -764,14 +765,6 @@ public class Consumer {
         }
     }
 
-    private boolean checkCanRemovePendingAcksAndHandle(Consumer 
ackOwnedConsumer,
-                                                       Position position, 
MessageIdData msgId) {
-        if (Subscription.isIndividualAckMode(subType) && 
msgId.getAckSetsCount() == 0) {
-            return removePendingAcks(ackOwnedConsumer, position);
-        }
-        return false;
-    }
-
     /**
      * Retrieves the acknowledgment owner consumer and batch size for the 
specified ledgerId and entryId.
      *
@@ -1113,6 +1106,37 @@ public class Consumer {
         return pendingAcks;
     }
 
+    /**
+     * Atomically decrement the remaining unacked count for the specified 
position
+     * by the given acknowledged delta.
+     *
+     * <p>No-op if {@code pendingAcks} is not initialized.
+     *
+     * @return {@code true} if the update succeeds or pendingAcks is null;
+     *         {@code false} otherwise
+     */
+    public boolean updateRemainingUnacked(long ledgerId, long entryId, int 
ackedDelta) {
+        if (pendingAcks != null) {
+            return pendingAcks.updateRemainingUnacked(ledgerId, entryId, 
ackedDelta);
+        }
+        return true;
+    }
+
+    /**
+     * Atomically remove the pending ack entry and return its stored values.
+     *
+     * <p>No-op if {@code pendingAcks} is not initialized.
+     *
+     * @return the removed {@link IntIntPair#leftInt() remainingUnacked} and
+     *         {@link IntIntPair#rightInt() stickyKeyHash}, or {@code null} if 
not found
+     */
+    public IntIntPair removePendingAckAndGet(long ledgerId, long entryId) {
+        if (pendingAcks != null) {
+            return pendingAcks.removeAndGet(ledgerId, entryId);
+        }
+        return null;
+    }
+
     /**
      * Remove all pending acks up to the given mark-delete position and 
decrement the consumer's unacked message
      * counter by the remaining unacked count for each removed entry.
@@ -1131,9 +1155,8 @@ public class Consumer {
 
         MutableInt mutableTotalUnacked = new MutableInt(0);
         pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId,
-                (ledgerId, entryId, batchSize, stickyKeyHash) -> {
-                    mutableTotalUnacked.add((int) 
getUnAckedCountForBatchIndexLevelEnabled(
-                            PositionFactory.create(ledgerId, entryId), 
batchSize));
+                (ledgerId, entryId, remainingUnacked, stickyKeyHash) -> {
+                    mutableTotalUnacked.add(remainingUnacked);
                 });
         int totalUnacked = mutableTotalUnacked.intValue();
         if (totalUnacked > 0) {
@@ -1155,11 +1178,8 @@ public class Consumer {
         if (pendingAcks != null) {
             List<Position> pendingPositions = new ArrayList<>((int) 
pendingAcks.size());
             MutableInt totalRedeliveryMessages = new MutableInt(0);
-            pendingAcks.forEachAndClear((ledgerId, entryId, batchSize, 
stickyKeyHash) -> {
-                int unAckedCount =
-                        (int) 
getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId, 
entryId),
-                                batchSize);
-                totalRedeliveryMessages.add(unAckedCount);
+            pendingAcks.forEachAndClear((ledgerId, entryId, remainingUnacked, 
stickyKeyHash) -> {
+                totalRedeliveryMessages.add(remainingUnacked);
                 pendingPositions.add(PositionFactory.create(ledgerId, 
entryId));
             });
 
@@ -1188,9 +1208,7 @@ public class Consumer {
             Position position = PositionFactory.create(msg.getLedgerId(), 
msg.getEntryId());
             IntIntPair pendingAck = 
pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId());
             if (pendingAck != null) {
-                int unAckedCount = (int) 
getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt());
-                pendingAcks.remove(position.getLedgerId(), 
position.getEntryId());
-                totalRedeliveryMessages += unAckedCount;
+                totalRedeliveryMessages += pendingAck.leftInt();
                 pendingPositions.add(position);
             }
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index e19deb34e31..9f59d4cd175 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -131,6 +131,15 @@ public interface Dispatcher {
         //No-op
     }
 
+    /**
+     * This hook is invoked after cursor mark-delete operations triggered by
+     * message removal flows such as expiry, skip, or clear backlog, but not 
for
+     * regular ack-driven mark-delete operations due to their higher frequency.
+     *
+     * <p>Since the cursor ack set may no longer be available after 
mark-delete,
+     * the cleanup logic relies on the remaining unacked count stored in
+     * {@code PendingAcksMap} entries.
+     */
     default void markDeletePositionMoveForward() {
         // No-op
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
index 549aef36cec..a0e48e5a2df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
@@ -124,11 +124,12 @@ public class PendingAcksMap {
      *
      * @param ledgerId the ledger ID
      * @param entryId the entry ID
-     * @param batchSize the batch size
+     * @param remainingUnacked the number of remaining unacked messages in 
this entry
+     *   (for batch entries with some indexes already acked, this may be less 
than batchSize)
      * @param stickyKeyHash the sticky key hash
      * @return true if the pending ack was added, and it's allowed to send a 
message, false otherwise
      */
-    public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int 
batchSize, int stickyKeyHash) {
+    public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int 
remainingUnacked, int stickyKeyHash) {
         try {
             writeLock.lock();
             // prevent adding sticky hash to pending acks if the 
PendingAcksMap has already been closed
@@ -145,7 +146,7 @@ public class PendingAcksMap {
             }
             Long2ObjectSortedMap<IntIntPair> ledgerPendingAcks =
                     pendingAcks.computeIfAbsent(ledgerId, k -> new 
Long2ObjectRBTreeMap<>());
-            ledgerPendingAcks.put(entryId, IntIntPair.of(batchSize, 
stickyKeyHash));
+            ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked, 
stickyKeyHash));
             return true;
         } finally {
             writeLock.unlock();
@@ -312,6 +313,34 @@ public class PendingAcksMap {
         }
     }
 
+    /**
+     * Atomically update the remaining unacked count for a pending ack entry 
by subtracting the given delta.
+     * Called from the ack handler after computing the number of batch indexes 
acknowledged in a partial ack.
+     *
+     * @param ledgerId the ledger ID
+     * @param entryId the entry ID
+     * @param ackedDelta the number of batch indexes that were just 
acknowledged
+     * @return true if the entry was found and updated, false otherwise
+     */
+    public boolean updateRemainingUnacked(long ledgerId, long entryId, int 
ackedDelta) {
+        try {
+            writeLock.lock();
+            Long2ObjectSortedMap<IntIntPair> ledgerMap = 
pendingAcks.get(ledgerId);
+            if (ledgerMap == null) {
+                return false;
+            }
+            IntIntPair current = ledgerMap.get(entryId);
+            if (current == null) {
+                return false;
+            }
+            int newRemaining = current.leftInt() - ackedDelta;
+            ledgerMap.put(entryId, IntIntPair.of(newRemaining, 
current.rightInt()));
+            return true;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     /**
      * Remove the pending ack for the given ledger ID and entry ID.
      *
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index cfabc9333ad..ce56ea3689d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -142,7 +142,6 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
     protected enum ReadType {
         Normal, Replay
     }
-    private Position lastMarkDeletePositionBeforeReadMoreEntries;
     private volatile long readMoreEntriesCallCount;
 
     public PersistentDispatcherMultipleConsumers(PersistentTopic topic, 
ManagedCursor cursor,
@@ -358,17 +357,6 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
         // increment the counter for readMoreEntries calls, to track the 
number of times readMoreEntries is called
         readMoreEntriesCallCount++;
 
-        // remove possible expired messages from redelivery tracker and 
pending acks
-        Position markDeletePosition = cursor.getMarkDeletedPosition();
-        if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) 
{
-            redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
-            for (Consumer consumer : consumerList) {
-                consumer.removePendingAcksUpToPositionAndDecrementUnacked(
-                        markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
-            }
-            lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition;
-        }
-
         // totalAvailablePermits may be updated by other threads
         int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
         int currentTotalAvailablePermits = Math.max(totalAvailablePermits, 
firstAvailableConsumerPermits);
@@ -605,6 +593,18 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
         return consumerList;
     }
 
+    @Override
+    public void markDeletePositionMoveForward() {
+        Position markDeletePosition = cursor.getMarkDeletedPosition();
+        if (markDeletePosition != null) {
+            redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
+            for (Consumer consumer : consumerList) {
+                consumer.removePendingAcksUpToPositionAndDecrementUnacked(
+                        markDeletePosition.getLedgerId(), 
markDeletePosition.getEntryId());
+            }
+        }
+    }
+
     @Override
     public synchronized boolean canUnsubscribe(Consumer consumer) {
         return consumerList.size() == 1 && consumerSet.contains(consumer);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 929261783df..0ad8e8cfe78 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -37,9 +37,9 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
+import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.MessageExpirer;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.stats.Rate;
 import org.jspecify.annotations.Nullable;
 import org.slf4j.Logger;
@@ -210,9 +210,11 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
             long numMessagesExpired = (long) ctx - 
cursor.getNumberOfEntriesInBacklog(false);
             msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value 
stats */);
             totalMsgExpired.add(numMessagesExpired);
-            // If the subscription is a Key_Shared subscription, we should to 
trigger message dispatch.
-            if (subscription != null && subscription.getType() == 
SubType.Key_Shared) {
-                subscription.getDispatcher().markDeletePositionMoveForward();
+            if (subscription != null) {
+                Dispatcher dispatcher = subscription.getDispatcher();
+                if (dispatcher != null) {
+                    dispatcher.markDeletePositionMoveForward();
+                }
             }
             expirationCheckInProgress = FALSE;
             if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index c2afc35c619..86c4329521b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -622,6 +622,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers 
extends PersistentDi
 
     @Override
     public void markDeletePositionMoveForward() {
+        super.markDeletePositionMoveForward();
         // reschedule a read with a backoff after moving the mark-delete 
position forward since there might have
         // been consumers that were blocked by hash and couldn't make progress
         reScheduleReadWithKeySharedUnblockingInterval();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
index 1f29cb73626..748b57af5c2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
@@ -481,11 +481,13 @@ public class 
PersistentStickyKeyDispatcherMultipleConsumersClassic
     }
 
     @Override
-    public void markDeletePositionMoveForward() {
+    public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) 
{
+        super.afterAckMessages(exOfDeletion, ctxOfDeletion);
         // Execute the notification in different thread to avoid a mutex chain 
here
         // from the delete operation that was completed
         topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
             synchronized 
(PersistentStickyKeyDispatcherMultipleConsumersClassic.this) {
+                super.markDeletePositionMoveForward();
                 if (recentlyJoinedConsumers != null && 
!recentlyJoinedConsumers.isEmpty()
                         && removeConsumersFromRecentJoinedConsumers()) {
                     // After we process acks, we need to check whether the 
mark-delete position was advanced and we
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a5cd832e99f..ce7795a917a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -553,10 +553,6 @@ public class PersistentSubscription extends 
AbstractSubscription {
         if (newMD.compareTo(oldPosition) != 0) {
             updateLastMarkDeleteAdvancedTimestamp();
             handleReplicatedSubscriptionsUpdate(newMD);
-
-            if (dispatcher != null) {
-                dispatcher.markDeletePositionMoveForward();
-            }
         }
     }
 
@@ -778,6 +774,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
                             future.complete(null);
                         }
                     });
+                    dispatcher.markDeletePositionMoveForward();
                     dispatcher.afterAckMessages(null, ctx);
                 } else {
                     future.complete(null);
@@ -815,6 +812,7 @@ public class PersistentSubscription extends 
AbstractSubscription {
                         }
                         future.complete(null);
                         if (dispatcher != null) {
+                            dispatcher.markDeletePositionMoveForward();
                             dispatcher.afterAckMessages(null, ctx);
                         }
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 6741cb5c786..f1bbe6415db 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1518,14 +1518,6 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
             messageIds.add(consumer.receive(waitTimeForCanReceiveMsgInSec, 
TimeUnit.SECONDS).getMessageId());
         }
 
-        MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
-
-
-        // remove the message from the pendingAcks, in fact redeliver will 
remove the messageId from the pendingAck
-        getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
-                
.get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks()
-                .remove(messageId.ledgerId, messageId.entryId);
-
         Transaction txn = getTxn();
         consumer.acknowledgeAsync(messageIds.get(1), txn).get();
 

Reply via email to