This is an automated email from the ASF dual-hosted git repository.
Technoboy- pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new c08b1e9502c [fix][broker] Move pending acks cleanup to selected
mark-delete callbacks (#25592)
c08b1e9502c is described below
commit c08b1e9502c27b5044f7b243d40f3660be9e34ee
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue May 12 18:52:00 2026 +0800
[fix][broker] Move pending acks cleanup to selected mark-delete callbacks
(#25592)
---
.../org/apache/pulsar/broker/service/Consumer.java | 132 ++++++++++++---------
.../apache/pulsar/broker/service/Dispatcher.java | 9 ++
.../pulsar/broker/service/PendingAcksMap.java | 35 +++++-
.../PersistentDispatcherMultipleConsumers.java | 24 ++--
.../persistent/PersistentMessageExpiryMonitor.java | 10 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 1 +
...tickyKeyDispatcherMultipleConsumersClassic.java | 4 +-
.../service/persistent/PersistentSubscription.java | 6 +-
.../client/impl/TransactionEndToEndTest.java | 8 --
9 files changed, 140 insertions(+), 89 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index d37cecc2767..8d897c80062 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -253,6 +253,7 @@ public class Consumer {
OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null);
}
+
@VisibleForTesting
Consumer(String consumerName, int availablePermits) {
this.subscription = null;
@@ -378,9 +379,18 @@ public class Consumer {
} else {
stickyKeyHash = stickyKeyHashes.get(i);
}
- boolean sendingAllowed =
-
pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(),
batchSize,
- stickyKeyHash);
+ boolean sendingAllowed;
+ long[] ackSet = batchIndexesAcks == null ? null :
batchIndexesAcks.getAckSet(i);
+ int remainingUnacked;
+ if (ackSet != null) {
+ remainingUnacked =
BitSet.valueOf(ackSet).cardinality();
+ unackedMessages -= (batchSize - remainingUnacked);
+ } else {
+ remainingUnacked = batchSize;
+ }
+ sendingAllowed =
+
pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(),
+ remainingUnacked, stickyKeyHash);
if (!sendingAllowed) {
// sending isn't allowed when pending acks doesn't
accept adding the entry
// this happens when Key_Shared draining hashes
contains the stickyKeyHash
@@ -395,10 +405,6 @@ public class Consumer {
consumerId);
}
} else {
- long[] ackSet = batchIndexesAcks == null ? null :
batchIndexesAcks.getAckSet(i);
- if (ackSet != null) {
- unackedMessages -= (batchSize -
BitSet.valueOf(ackSet).cardinality());
- }
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Added {}:{} ledger entry with
batchSize of {} to pendingAcks in"
+ " broker.service.Consumer for
consumerId: {}",
@@ -578,7 +584,7 @@ public class Consumer {
ObjectIntPair<Consumer> ackOwnerConsumerAndBatchSize =
getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(),
msgId.getEntryId());
Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left();
- long ackedCount;
+ long ackedCount = 0;
int batchSize = ackOwnerConsumerAndBatchSize.rightInt();
if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
@@ -594,11 +600,19 @@ public class Consumer {
.syncBatchPositionBitSetForPendingAck(position);
}
}
- addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+ if (ackedCount > 0) {
+ boolean updated = ackOwnerConsumer.updateRemainingUnacked(
+ position.getLedgerId(), position.getEntryId(),
(int) ackedCount);
+ if (updated) {
+ addAndGetUnAckedMsgs(ackOwnerConsumer, -(int)
ackedCount);
+ }
+ }
} else {
position = PositionFactory.create(msgId.getLedgerId(),
msgId.getEntryId());
- ackedCount = getAckedCountForMsgIdNoAckSets(batchSize,
position, ackOwnerConsumer);
- if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer,
position, msgId)) {
+ IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet(
+ position.getLedgerId(), position.getEntryId());
+ if (removed != null) {
+ ackedCount = removed.leftInt();
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
}
@@ -676,12 +690,22 @@ public class Consumer {
}
AckSetStateUtil.getAckSetState(position).setAckSet(ackSets);
ackedCount = getAckedCountForTransactionAck(batchSize,
ackSets);
+ if (ackedCount > 0) {
+ boolean updated = ackOwnerConsumer.updateRemainingUnacked(
+ position.getLedgerId(), position.getEntryId(),
(int) ackedCount);
+ if (updated) {
+ addAndGetUnAckedMsgs(ackOwnerConsumer, -(int)
ackedCount);
+ }
+ }
+ } else {
+ IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet(
+ position.getLedgerId(), position.getEntryId());
+ if (removed != null) {
+ addAndGetUnAckedMsgs(ackOwnerConsumer, -removed.leftInt());
+ updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
+ }
}
- addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
-
- checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position,
msgId);
-
checkAckValidationError(ack, position);
totalAckCount.add(ackedCount);
@@ -705,16 +729,6 @@ public class Consumer {
return completableFuture.thenApply(__ -> totalAckCount.sum());
}
- private long getAckedCountForMsgIdNoAckSets(int batchSize, Position
position, Consumer consumer) {
- if (isAcknowledgmentAtBatchIndexLevelEnabled &&
Subscription.isIndividualAckMode(subType)) {
- long[] cursorAckSet = getCursorAckSet(position);
- if (cursorAckSet != null) {
- return getAckedCountForBatchIndexLevelEnabled(position,
batchSize, EMPTY_ACK_SET, consumer);
- }
- }
- return batchSize;
- }
-
private long getAckedCountForBatchIndexLevelEnabled(Position position, int
batchSize, long[] ackSets,
Consumer consumer) {
long ackedCount = 0;
@@ -744,19 +758,6 @@ public class Consumer {
return ackedCount;
}
- private long getUnAckedCountForBatchIndexLevelEnabled(Position position,
int batchSize) {
- long unAckedCount = batchSize;
- if (isAcknowledgmentAtBatchIndexLevelEnabled) {
- long[] cursorAckSet = getCursorAckSet(position);
- if (cursorAckSet != null) {
- BitSetRecyclable cursorBitSet =
BitSetRecyclable.create().resetWords(cursorAckSet);
- unAckedCount = cursorBitSet.cardinality();
- cursorBitSet.recycle();
- }
- }
- return unAckedCount;
- }
-
private void checkAckValidationError(CommandAck ack, Position position) {
if (ack.hasValidationError()) {
log.warn("[{}] [{}] Received ack for corrupted message at {} -
Reason: {}", subscription,
@@ -764,14 +765,6 @@ public class Consumer {
}
}
- private boolean checkCanRemovePendingAcksAndHandle(Consumer
ackOwnedConsumer,
- Position position,
MessageIdData msgId) {
- if (Subscription.isIndividualAckMode(subType) &&
msgId.getAckSetsCount() == 0) {
- return removePendingAcks(ackOwnedConsumer, position);
- }
- return false;
- }
-
/**
* Retrieves the acknowledgment owner consumer and batch size for the
specified ledgerId and entryId.
*
@@ -1113,6 +1106,37 @@ public class Consumer {
return pendingAcks;
}
+ /**
+ * Atomically decrement the remaining unacked count for the specified
position
+ * by the given acknowledged delta.
+ *
+ * <p>No-op if {@code pendingAcks} is not initialized.
+ *
+ * @return {@code true} if the update succeeds or pendingAcks is null;
+ * {@code false} otherwise
+ */
+ public boolean updateRemainingUnacked(long ledgerId, long entryId, int
ackedDelta) {
+ if (pendingAcks != null) {
+ return pendingAcks.updateRemainingUnacked(ledgerId, entryId,
ackedDelta);
+ }
+ return true;
+ }
+
+ /**
+ * Atomically remove the pending ack entry and return its stored values.
+ *
+ * <p>No-op if {@code pendingAcks} is not initialized.
+ *
+ * @return the removed {@link IntIntPair#leftInt() remainingUnacked} and
+ * {@link IntIntPair#rightInt() stickyKeyHash}, or {@code null} if
not found
+ */
+ public IntIntPair removePendingAckAndGet(long ledgerId, long entryId) {
+ if (pendingAcks != null) {
+ return pendingAcks.removeAndGet(ledgerId, entryId);
+ }
+ return null;
+ }
+
/**
* Remove all pending acks up to the given mark-delete position and
decrement the consumer's unacked message
* counter by the remaining unacked count for each removed entry.
@@ -1131,9 +1155,8 @@ public class Consumer {
MutableInt mutableTotalUnacked = new MutableInt(0);
pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId,
- (ledgerId, entryId, batchSize, stickyKeyHash) -> {
- mutableTotalUnacked.add((int)
getUnAckedCountForBatchIndexLevelEnabled(
- PositionFactory.create(ledgerId, entryId),
batchSize));
+ (ledgerId, entryId, remainingUnacked, stickyKeyHash) -> {
+ mutableTotalUnacked.add(remainingUnacked);
});
int totalUnacked = mutableTotalUnacked.intValue();
if (totalUnacked > 0) {
@@ -1155,11 +1178,8 @@ public class Consumer {
if (pendingAcks != null) {
List<Position> pendingPositions = new ArrayList<>((int)
pendingAcks.size());
MutableInt totalRedeliveryMessages = new MutableInt(0);
- pendingAcks.forEachAndClear((ledgerId, entryId, batchSize,
stickyKeyHash) -> {
- int unAckedCount =
- (int)
getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId,
entryId),
- batchSize);
- totalRedeliveryMessages.add(unAckedCount);
+ pendingAcks.forEachAndClear((ledgerId, entryId, remainingUnacked,
stickyKeyHash) -> {
+ totalRedeliveryMessages.add(remainingUnacked);
pendingPositions.add(PositionFactory.create(ledgerId,
entryId));
});
@@ -1188,9 +1208,7 @@ public class Consumer {
Position position = PositionFactory.create(msg.getLedgerId(),
msg.getEntryId());
IntIntPair pendingAck =
pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId());
if (pendingAck != null) {
- int unAckedCount = (int)
getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt());
- pendingAcks.remove(position.getLedgerId(),
position.getEntryId());
- totalRedeliveryMessages += unAckedCount;
+ totalRedeliveryMessages += pendingAck.leftInt();
pendingPositions.add(position);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index e19deb34e31..9f59d4cd175 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -131,6 +131,15 @@ public interface Dispatcher {
//No-op
}
+ /**
+ * This hook is invoked after cursor mark-delete operations triggered by
+ * message removal flows such as expiry, skip, or clear backlog, but not
for
+ * regular ack-driven mark-delete operations due to their higher frequency.
+ *
+ * <p>Since the cursor ack set may no longer be available after
mark-delete,
+ * the cleanup logic relies on the remaining unacked count stored in
+ * {@code PendingAcksMap} entries.
+ */
default void markDeletePositionMoveForward() {
// No-op
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
index 549aef36cec..a0e48e5a2df 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java
@@ -124,11 +124,12 @@ public class PendingAcksMap {
*
* @param ledgerId the ledger ID
* @param entryId the entry ID
- * @param batchSize the batch size
+ * @param remainingUnacked the number of remaining unacked messages in
this entry
+ * (for batch entries with some indexes already acked, this may be less
than batchSize)
* @param stickyKeyHash the sticky key hash
* @return true if the pending ack was added, and it's allowed to send a
message, false otherwise
*/
- public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int
batchSize, int stickyKeyHash) {
+ public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int
remainingUnacked, int stickyKeyHash) {
try {
writeLock.lock();
// prevent adding sticky hash to pending acks if the
PendingAcksMap has already been closed
@@ -145,7 +146,7 @@ public class PendingAcksMap {
}
Long2ObjectSortedMap<IntIntPair> ledgerPendingAcks =
pendingAcks.computeIfAbsent(ledgerId, k -> new
Long2ObjectRBTreeMap<>());
- ledgerPendingAcks.put(entryId, IntIntPair.of(batchSize,
stickyKeyHash));
+ ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked,
stickyKeyHash));
return true;
} finally {
writeLock.unlock();
@@ -312,6 +313,34 @@ public class PendingAcksMap {
}
}
+ /**
+ * Atomically update the remaining unacked count for a pending ack entry
by subtracting the given delta.
+ * Called from the ack handler after computing the number of batch indexes
acknowledged in a partial ack.
+ *
+ * @param ledgerId the ledger ID
+ * @param entryId the entry ID
+ * @param ackedDelta the number of batch indexes that were just
acknowledged
+ * @return true if the entry was found and updated, false otherwise
+ */
+ public boolean updateRemainingUnacked(long ledgerId, long entryId, int
ackedDelta) {
+ try {
+ writeLock.lock();
+ Long2ObjectSortedMap<IntIntPair> ledgerMap =
pendingAcks.get(ledgerId);
+ if (ledgerMap == null) {
+ return false;
+ }
+ IntIntPair current = ledgerMap.get(entryId);
+ if (current == null) {
+ return false;
+ }
+ int newRemaining = current.leftInt() - ackedDelta;
+ ledgerMap.put(entryId, IntIntPair.of(newRemaining,
current.rightInt()));
+ return true;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
/**
* Remove the pending ack for the given ledger ID and entry ID.
*
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index cfabc9333ad..ce56ea3689d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -142,7 +142,6 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractPersistentDis
protected enum ReadType {
Normal, Replay
}
- private Position lastMarkDeletePositionBeforeReadMoreEntries;
private volatile long readMoreEntriesCallCount;
public PersistentDispatcherMultipleConsumers(PersistentTopic topic,
ManagedCursor cursor,
@@ -358,17 +357,6 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractPersistentDis
// increment the counter for readMoreEntries calls, to track the
number of times readMoreEntries is called
readMoreEntriesCallCount++;
- // remove possible expired messages from redelivery tracker and
pending acks
- Position markDeletePosition = cursor.getMarkDeletedPosition();
- if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition)
{
- redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
- for (Consumer consumer : consumerList) {
- consumer.removePendingAcksUpToPositionAndDecrementUnacked(
- markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
- }
- lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition;
- }
-
// totalAvailablePermits may be updated by other threads
int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
int currentTotalAvailablePermits = Math.max(totalAvailablePermits,
firstAvailableConsumerPermits);
@@ -605,6 +593,18 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractPersistentDis
return consumerList;
}
+ @Override
+ public void markDeletePositionMoveForward() {
+ Position markDeletePosition = cursor.getMarkDeletedPosition();
+ if (markDeletePosition != null) {
+ redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
+ for (Consumer consumer : consumerList) {
+ consumer.removePendingAcksUpToPositionAndDecrementUnacked(
+ markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
+ }
+ }
+ }
+
@Override
public synchronized boolean canUnsubscribe(Consumer consumer) {
return consumerList.size() == 1 && consumerSet.contains(consumer);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 929261783df..0ad8e8cfe78 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -37,9 +37,9 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.proto.ManagedLedgerInfo;
+import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.stats.Rate;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
@@ -210,9 +210,11 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback, Messag
long numMessagesExpired = (long) ctx -
cursor.getNumberOfEntriesInBacklog(false);
msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value
stats */);
totalMsgExpired.add(numMessagesExpired);
- // If the subscription is a Key_Shared subscription, we should to
trigger message dispatch.
- if (subscription != null && subscription.getType() ==
SubType.Key_Shared) {
- subscription.getDispatcher().markDeletePositionMoveForward();
+ if (subscription != null) {
+ Dispatcher dispatcher = subscription.getDispatcher();
+ if (dispatcher != null) {
+ dispatcher.markDeletePositionMoveForward();
+ }
}
expirationCheckInProgress = FALSE;
if (log.isDebugEnabled()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index c2afc35c619..86c4329521b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -622,6 +622,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers
extends PersistentDi
@Override
public void markDeletePositionMoveForward() {
+ super.markDeletePositionMoveForward();
// reschedule a read with a backoff after moving the mark-delete
position forward since there might have
// been consumers that were blocked by hash and couldn't make progress
reScheduleReadWithKeySharedUnblockingInterval();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
index 1f29cb73626..748b57af5c2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java
@@ -481,11 +481,13 @@ public class
PersistentStickyKeyDispatcherMultipleConsumersClassic
}
@Override
- public void markDeletePositionMoveForward() {
+ public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion)
{
+ super.afterAckMessages(exOfDeletion, ctxOfDeletion);
// Execute the notification in different thread to avoid a mutex chain
here
// from the delete operation that was completed
topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
synchronized
(PersistentStickyKeyDispatcherMultipleConsumersClassic.this) {
+ super.markDeletePositionMoveForward();
if (recentlyJoinedConsumers != null &&
!recentlyJoinedConsumers.isEmpty()
&& removeConsumersFromRecentJoinedConsumers()) {
// After we process acks, we need to check whether the
mark-delete position was advanced and we
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index a5cd832e99f..ce7795a917a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -553,10 +553,6 @@ public class PersistentSubscription extends
AbstractSubscription {
if (newMD.compareTo(oldPosition) != 0) {
updateLastMarkDeleteAdvancedTimestamp();
handleReplicatedSubscriptionsUpdate(newMD);
-
- if (dispatcher != null) {
- dispatcher.markDeletePositionMoveForward();
- }
}
}
@@ -778,6 +774,7 @@ public class PersistentSubscription extends
AbstractSubscription {
future.complete(null);
}
});
+ dispatcher.markDeletePositionMoveForward();
dispatcher.afterAckMessages(null, ctx);
} else {
future.complete(null);
@@ -815,6 +812,7 @@ public class PersistentSubscription extends
AbstractSubscription {
}
future.complete(null);
if (dispatcher != null) {
+ dispatcher.markDeletePositionMoveForward();
dispatcher.afterAckMessages(null, ctx);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 6741cb5c786..f1bbe6415db 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1518,14 +1518,6 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
messageIds.add(consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS).getMessageId());
}
- MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
-
-
- // remove the message from the pendingAcks, in fact redeliver will
remove the messageId from the pendingAck
- getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
-
.get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks()
- .remove(messageId.ledgerId, messageId.entryId);
-
Transaction txn = getTxn();
consumer.acknowledgeAsync(messageIds.get(1), txn).get();