This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 97f707ba837 [improve][broker] Remove check ack.getMessageIdsCount() ==
1 for cumulative ack (#18333)
97f707ba837 is described below
commit 97f707ba837c15f3ab70d128c4b436a5d4c050b2
Author: houxiaoyu <[email protected]>
AuthorDate: Mon Nov 7 10:33:09 2022 +0800
[improve][broker] Remove check ack.getMessageIdsCount() == 1 for cumulative
ack (#18333)
---
.../org/apache/pulsar/broker/service/Consumer.java | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index f43f16ef587..ac924725f7e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -425,19 +425,19 @@ public class Consumer {
subscription, consumerId);
return CompletableFuture.completedFuture(null);
}
- PositionImpl position = PositionImpl.EARLIEST;
- if (ack.getMessageIdsCount() == 1) {
- MessageIdData msgId = ack.getMessageIdAt(0);
- if (msgId.getAckSetsCount() > 0) {
- long[] ackSets = new long[msgId.getAckSetsCount()];
- for (int j = 0; j < msgId.getAckSetsCount(); j++) {
- ackSets[j] = msgId.getAckSetAt(j);
- }
- position = PositionImpl.get(msgId.getLedgerId(),
msgId.getEntryId(), ackSets);
- } else {
- position = PositionImpl.get(msgId.getLedgerId(),
msgId.getEntryId());
+
+ PositionImpl position;
+ MessageIdData msgId = ack.getMessageIdAt(0);
+ if (msgId.getAckSetsCount() > 0) {
+ long[] ackSets = new long[msgId.getAckSetsCount()];
+ for (int j = 0; j < msgId.getAckSetsCount(); j++) {
+ ackSets[j] = msgId.getAckSetAt(j);
}
+ position = PositionImpl.get(msgId.getLedgerId(),
msgId.getEntryId(), ackSets);
+ } else {
+ position = PositionImpl.get(msgId.getLedgerId(),
msgId.getEntryId());
}
+
if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
List<PositionImpl> positionsAcked =
Collections.singletonList(position);
future =
transactionCumulativeAcknowledge(ack.getTxnidMostBits(),