This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4330c2f16eca9e18c50d5e5a956d03e3613f6b6b Author: Zixuan Liu <[email protected]> AuthorDate: Wed Apr 29 23:18:31 2026 +0800 [fix][broker] Move pending acks cleanup to selected mark-delete callbacks (#25592) (cherry-pick from commit 138595f6256c301956f9d77fde8534699e992536) --- .../org/apache/pulsar/broker/service/Consumer.java | 132 ++++++++++++--------- .../apache/pulsar/broker/service/Dispatcher.java | 9 ++ .../pulsar/broker/service/PendingAcksMap.java | 35 +++++- .../PersistentDispatcherMultipleConsumers.java | 24 ++-- .../persistent/PersistentMessageExpiryMonitor.java | 10 +- ...istentStickyKeyDispatcherMultipleConsumers.java | 1 + ...tickyKeyDispatcherMultipleConsumersClassic.java | 4 +- .../service/persistent/PersistentSubscription.java | 6 +- .../client/impl/TransactionEndToEndTest.java | 8 -- 9 files changed, 140 insertions(+), 89 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 0d9492afee1..1a49325ca80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -251,6 +251,7 @@ public class Consumer { OPEN_TELEMETRY_ATTRIBUTES_FIELD_UPDATER.set(this, null); } + @VisibleForTesting Consumer(String consumerName, int availablePermits) { this.subscription = null; @@ -376,9 +377,18 @@ public class Consumer { } else { stickyKeyHash = stickyKeyHashes.get(i); } - boolean sendingAllowed = - pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(), batchSize, - stickyKeyHash); + boolean sendingAllowed; + long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i); + int remainingUnacked; + if (ackSet != null) { + remainingUnacked = BitSet.valueOf(ackSet).cardinality(); + unackedMessages -= (batchSize - remainingUnacked); + } else { + remainingUnacked = batchSize; + } + sendingAllowed = + pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(), + remainingUnacked, stickyKeyHash); if (!sendingAllowed) { // sending isn't allowed when pending acks doesn't accept adding the entry // this happens when Key_Shared draining hashes contains the stickyKeyHash @@ -393,10 +403,6 @@ public class Consumer { consumerId); } } else { - long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i); - if (ackSet != null) { - unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality()); - } if (log.isDebugEnabled()) { log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in" + " broker.service.Consumer for consumerId: {}", @@ -573,7 +579,7 @@ public class Consumer { ObjectIntPair<Consumer> ackOwnerConsumerAndBatchSize = getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), msgId.getEntryId()); Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left(); - long ackedCount; + long ackedCount = 0; int batchSize = ackOwnerConsumerAndBatchSize.rightInt(); if (msgId.getAckSetsCount() > 0) { long[] ackSets = new long[msgId.getAckSetsCount()]; @@ -589,11 +595,19 @@ public class Consumer { .syncBatchPositionBitSetForPendingAck(position); } } - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); + if (ackedCount > 0) { + boolean updated = ackOwnerConsumer.updateRemainingUnacked( + position.getLedgerId(), position.getEntryId(), (int) ackedCount); + if (updated) { + addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); + } + } } else { position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); - ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer); - if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId)) { + IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet( + position.getLedgerId(), position.getEntryId()); + if (removed != null) { + ackedCount = removed.leftInt(); addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); } @@ -671,12 +685,22 @@ public class Consumer { } AckSetStateUtil.getAckSetState(position).setAckSet(ackSets); ackedCount = getAckedCountForTransactionAck(batchSize, ackSets); + if (ackedCount > 0) { + boolean updated = ackOwnerConsumer.updateRemainingUnacked( + position.getLedgerId(), position.getEntryId(), (int) ackedCount); + if (updated) { + addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); + } + } + } else { + IntIntPair removed = ackOwnerConsumer.removePendingAckAndGet( + position.getLedgerId(), position.getEntryId()); + if (removed != null) { + addAndGetUnAckedMsgs(ackOwnerConsumer, -removed.leftInt()); + updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); + } } - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); - - checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId); - checkAckValidationError(ack, position); totalAckCount.add(ackedCount); @@ -700,16 +724,6 @@ public class Consumer { return completableFuture.thenApply(__ -> totalAckCount.sum()); } - private long getAckedCountForMsgIdNoAckSets(int batchSize, Position position, Consumer consumer) { - if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) { - long[] cursorAckSet = getCursorAckSet(position); - if (cursorAckSet != null) { - return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET, consumer); - } - } - return batchSize; - } - private long getAckedCountForBatchIndexLevelEnabled(Position position, int batchSize, long[] ackSets, Consumer consumer) { long ackedCount = 0; @@ -739,19 +753,6 @@ public class Consumer { return ackedCount; } - private long getUnAckedCountForBatchIndexLevelEnabled(Position position, int batchSize) { - long unAckedCount = batchSize; - if (isAcknowledgmentAtBatchIndexLevelEnabled) { - long[] cursorAckSet = getCursorAckSet(position); - if (cursorAckSet != null) { - BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); - unAckedCount = cursorBitSet.cardinality(); - cursorBitSet.recycle(); - } - } - return unAckedCount; - } - private void checkAckValidationError(CommandAck ack, Position position) { if (ack.hasValidationError()) { log.warn("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", subscription, @@ -759,14 +760,6 @@ public class Consumer { } } - private boolean checkCanRemovePendingAcksAndHandle(Consumer ackOwnedConsumer, - Position position, MessageIdData msgId) { - if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) { - return removePendingAcks(ackOwnedConsumer, position); - } - return false; - } - /** * Retrieves the acknowledgment owner consumer and batch size for the specified ledgerId and entryId. * @@ -1102,6 +1095,37 @@ public class Consumer { return pendingAcks; } + /** + * Atomically decrement the remaining unacked count for the specified position + * by the given acknowledged delta. + * + * <p>No-op if {@code pendingAcks} is not initialized. + * + * @return {@code true} if the update succeeds or pendingAcks is null; + * {@code false} otherwise + */ + public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { + if (pendingAcks != null) { + return pendingAcks.updateRemainingUnacked(ledgerId, entryId, ackedDelta); + } + return true; + } + + /** + * Atomically remove the pending ack entry and return its stored values. + * + * <p>No-op if {@code pendingAcks} is not initialized. + * + * @return the removed {@link IntIntPair#leftInt() remainingUnacked} and + * {@link IntIntPair#rightInt() stickyKeyHash}, or {@code null} if not found + */ + public IntIntPair removePendingAckAndGet(long ledgerId, long entryId) { + if (pendingAcks != null) { + return pendingAcks.removeAndGet(ledgerId, entryId); + } + return null; + } + /** * Remove all pending acks up to the given mark-delete position and decrement the consumer's unacked message * counter by the remaining unacked count for each removed entry. @@ -1120,9 +1144,8 @@ public class Consumer { MutableInt mutableTotalUnacked = new MutableInt(0); pendingAcks.removeAllUpTo(markDeleteLedgerId, markDeleteEntryId, - (ledgerId, entryId, batchSize, stickyKeyHash) -> { - mutableTotalUnacked.add((int) getUnAckedCountForBatchIndexLevelEnabled( - PositionFactory.create(ledgerId, entryId), batchSize)); + (ledgerId, entryId, remainingUnacked, stickyKeyHash) -> { + mutableTotalUnacked.add(remainingUnacked); }); int totalUnacked = mutableTotalUnacked.intValue(); if (totalUnacked > 0) { @@ -1144,11 +1167,8 @@ public class Consumer { if (pendingAcks != null) { List<Position> pendingPositions = new ArrayList<>((int) pendingAcks.size()); MutableInt totalRedeliveryMessages = new MutableInt(0); - pendingAcks.forEachAndClear((ledgerId, entryId, batchSize, stickyKeyHash) -> { - int unAckedCount = - (int) getUnAckedCountForBatchIndexLevelEnabled(PositionFactory.create(ledgerId, entryId), - batchSize); - totalRedeliveryMessages.add(unAckedCount); + pendingAcks.forEachAndClear((ledgerId, entryId, remainingUnacked, stickyKeyHash) -> { + totalRedeliveryMessages.add(remainingUnacked); pendingPositions.add(PositionFactory.create(ledgerId, entryId)); }); @@ -1177,9 +1197,7 @@ public class Consumer { Position position = PositionFactory.create(msg.getLedgerId(), msg.getEntryId()); IntIntPair pendingAck = pendingAcks.removeAndGet(position.getLedgerId(), position.getEntryId()); if (pendingAck != null) { - int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(position, pendingAck.leftInt()); - pendingAcks.remove(position.getLedgerId(), position.getEntryId()); - totalRedeliveryMessages += unAckedCount; + totalRedeliveryMessages += pendingAck.leftInt(); pendingPositions.add(position); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index e19deb34e31..9f59d4cd175 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -131,6 +131,15 @@ public interface Dispatcher { //No-op } + /** + * This hook is invoked after cursor mark-delete operations triggered by + * message removal flows such as expiry, skip, or clear backlog, but not for + * regular ack-driven mark-delete operations due to their higher frequency. + * + * <p>Since the cursor ack set may no longer be available after mark-delete, + * the cleanup logic relies on the remaining unacked count stored in + * {@code PendingAcksMap} entries. + */ default void markDeletePositionMoveForward() { // No-op } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 549aef36cec..a0e48e5a2df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -124,11 +124,12 @@ public class PendingAcksMap { * * @param ledgerId the ledger ID * @param entryId the entry ID - * @param batchSize the batch size + * @param remainingUnacked the number of remaining unacked messages in this entry + * (for batch entries with some indexes already acked, this may be less than batchSize) * @param stickyKeyHash the sticky key hash * @return true if the pending ack was added, and it's allowed to send a message, false otherwise */ - public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int batchSize, int stickyKeyHash) { + public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int remainingUnacked, int stickyKeyHash) { try { writeLock.lock(); // prevent adding sticky hash to pending acks if the PendingAcksMap has already been closed @@ -145,7 +146,7 @@ public class PendingAcksMap { } Long2ObjectSortedMap<IntIntPair> ledgerPendingAcks = pendingAcks.computeIfAbsent(ledgerId, k -> new Long2ObjectRBTreeMap<>()); - ledgerPendingAcks.put(entryId, IntIntPair.of(batchSize, stickyKeyHash)); + ledgerPendingAcks.put(entryId, IntIntPair.of(remainingUnacked, stickyKeyHash)); return true; } finally { writeLock.unlock(); @@ -312,6 +313,34 @@ public class PendingAcksMap { } } + /** + * Atomically update the remaining unacked count for a pending ack entry by subtracting the given delta. + * Called from the ack handler after computing the number of batch indexes acknowledged in a partial ack. + * + * @param ledgerId the ledger ID + * @param entryId the entry ID + * @param ackedDelta the number of batch indexes that were just acknowledged + * @return true if the entry was found and updated, false otherwise + */ + public boolean updateRemainingUnacked(long ledgerId, long entryId, int ackedDelta) { + try { + writeLock.lock(); + Long2ObjectSortedMap<IntIntPair> ledgerMap = pendingAcks.get(ledgerId); + if (ledgerMap == null) { + return false; + } + IntIntPair current = ledgerMap.get(entryId); + if (current == null) { + return false; + } + int newRemaining = current.leftInt() - ackedDelta; + ledgerMap.put(entryId, IntIntPair.of(newRemaining, current.rightInt())); + return true; + } finally { + writeLock.unlock(); + } + } + /** * Remove the pending ack for the given ledger ID and entry ID. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 59fd48a8079..da6fc52b916 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -142,7 +142,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis protected enum ReadType { Normal, Replay } - private Position lastMarkDeletePositionBeforeReadMoreEntries; private volatile long readMoreEntriesCallCount; public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, @@ -358,17 +357,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis // increment the counter for readMoreEntries calls, to track the number of times readMoreEntries is called readMoreEntriesCallCount++; - // remove possible expired messages from redelivery tracker and pending acks - Position markDeletePosition = cursor.getMarkDeletedPosition(); - if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) { - redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); - for (Consumer consumer : consumerList) { - consumer.removePendingAcksUpToPositionAndDecrementUnacked( - markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); - } - lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition; - } - // totalAvailablePermits may be updated by other threads int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits(); int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits); @@ -605,6 +593,18 @@ public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDis return consumerList; } + @Override + public void markDeletePositionMoveForward() { + Position markDeletePosition = cursor.getMarkDeletedPosition(); + if (markDeletePosition != null) { + redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); + for (Consumer consumer : consumerList) { + consumer.removePendingAcksUpToPositionAndDecrementUnacked( + markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); + } + } + } + @Override public synchronized boolean canUnsubscribe(Consumer consumer) { return consumerList.size() == 1 && consumerSet.contains(consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 6fbb52047bb..c63df11f0b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -36,9 +36,9 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.client.impl.MessageImpl; -import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.jspecify.annotations.Nullable; @@ -211,9 +211,11 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false); msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */); totalMsgExpired.add(numMessagesExpired); - // If the subscription is a Key_Shared subscription, we should to trigger message dispatch. - if (subscription != null && subscription.getType() == SubType.Key_Shared) { - subscription.getDispatcher().markDeletePositionMoveForward(); + if (subscription != null) { + Dispatcher dispatcher = subscription.getDispatcher(); + if (dispatcher != null) { + dispatcher.markDeletePositionMoveForward(); + } } expirationCheckInProgress = FALSE; if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index d803bc7d963..8a54d07f230 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -620,6 +620,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi @Override public void markDeletePositionMoveForward() { + super.markDeletePositionMoveForward(); // reschedule a read with a backoff after moving the mark-delete position forward since there might have // been consumers that were blocked by hash and couldn't make progress reScheduleReadWithKeySharedUnblockingInterval(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index c3b246fe9ba..500f6a9472c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -481,11 +481,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumersClassic } @Override - public void markDeletePositionMoveForward() { + public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) { + super.afterAckMessages(exOfDeletion, ctxOfDeletion); // Execute the notification in different thread to avoid a mutex chain here // from the delete operation that was completed topic.getBrokerService().getTopicOrderedExecutor().execute(() -> { synchronized (PersistentStickyKeyDispatcherMultipleConsumersClassic.this) { + super.markDeletePositionMoveForward(); if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty() && removeConsumersFromRecentJoinedConsumers()) { // After we process acks, we need to check whether the mark-delete position was advanced and we diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index d23db646963..d24a3e9dec9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -540,10 +540,6 @@ public class PersistentSubscription extends AbstractSubscription { if (newMD.compareTo(oldPosition) != 0) { updateLastMarkDeleteAdvancedTimestamp(); handleReplicatedSubscriptionsUpdate(newMD); - - if (dispatcher != null) { - dispatcher.markDeletePositionMoveForward(); - } } } @@ -760,6 +756,7 @@ public class PersistentSubscription extends AbstractSubscription { future.complete(null); } }); + dispatcher.markDeletePositionMoveForward(); dispatcher.afterAckMessages(null, ctx); } else { future.complete(null); @@ -797,6 +794,7 @@ public class PersistentSubscription extends AbstractSubscription { } future.complete(null); if (dispatcher != null) { + dispatcher.markDeletePositionMoveForward(); dispatcher.afterAckMessages(null, ctx); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 5d2b633ceb1..def9bc5129a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -1528,14 +1528,6 @@ public class TransactionEndToEndTest extends TransactionTestBase { messageIds.add(consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS).getMessageId()); } - MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0); - - - // remove the message from the pendingAcks, in fact redeliver will remove the messageId from the pendingAck - getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false) - .get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks() - .remove(messageId.ledgerId, messageId.entryId); - Transaction txn = getTxn(); consumer.acknowledgeAsync(messageIds.get(1), txn).get();
