This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 2c85dd13384 [fix][broker] Fix incorrect unack msk count when dup ack a 
message (#20990) (#22223)
2c85dd13384 is described below

commit 2c85dd13384ea7622d9bb0715d04cb5e3dc4ed95
Author: Zixuan Liu <node...@gmail.com>
AuthorDate: Thu Mar 14 10:24:36 2024 +0800

    [fix][broker] Fix incorrect unack msk count when dup ack a message (#20990) 
(#22223)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  24 ++--
 .../pulsar/broker/service/BrokerServiceTest.java   |  26 ++++
 .../client/impl/KeySharedSubscriptionTest.java     | 136 +++++++++------------
 3 files changed, 98 insertions(+), 88 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 8924b750eb6..a7c06d0c85d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -444,6 +444,7 @@ public class Consumer {
     private CompletableFuture<Long> individualAckNormal(CommandAck ack, 
Map<String, Long> properties) {
         List<Position> positionsAcked = new ArrayList<>();
         long totalAckCount = 0;
+        boolean individualAck = false;
         for (int i = 0; i < ack.getMessageIdsCount(); i++) {
             MessageIdData msgId = ack.getMessageIdAt(i);
             PositionImpl position;
@@ -467,14 +468,18 @@ public class Consumer {
             } else {
                 position = PositionImpl.get(msgId.getLedgerId(), 
msgId.getEntryId());
                 ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, 
position, ackOwnerConsumer);
+                individualAck = true;
             }
 
-            addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
-
+            if (individualAck) {
+                if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
+                    addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+                }
+            } else {
+                addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
+            }
             positionsAcked.add(position);
 
-            checkCanRemovePendingAcksAndHandle(position, msgId);
-
             checkAckValidationError(ack, position);
 
             totalAckCount += ackedCount;
@@ -636,10 +641,11 @@ public class Consumer {
         }
     }
 
-    private void checkCanRemovePendingAcksAndHandle(PositionImpl position, 
MessageIdData msgId) {
+    private boolean checkCanRemovePendingAcksAndHandle(PositionImpl position, 
MessageIdData msgId) {
         if (Subscription.isIndividualAckMode(subType) && 
msgId.getAckSetsCount() == 0) {
-            removePendingAcks(position);
+            return removePendingAcks(position);
         }
+        return false;
     }
 
     private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
@@ -886,7 +892,7 @@ public class Consumer {
      *
      * @param position
      */
-    private void removePendingAcks(PositionImpl position) {
+    private boolean removePendingAcks(PositionImpl position) {
         Consumer ackOwnedConsumer = null;
         if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == 
null) {
             for (Consumer consumer : subscription.getConsumers()) {
@@ -907,7 +913,7 @@ public class Consumer {
         if (ackedPosition != null) {
             if 
(!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), 
position.getEntryId())) {
                 // Message was already removed by the other consumer
-                return;
+                return false;
             }
             if (log.isDebugEnabled()) {
                 log.debug("[{}-{}] consumer {} received ack {}", topicName, 
subscription, consumerId, position);
@@ -921,7 +927,9 @@ public class Consumer {
                 ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
                 flowConsumerBlockedPermits(ackOwnedConsumer);
             }
+            return true;
         }
+        return false;
     }
 
     public ConcurrentLongLongPairHashMap getPendingAcks() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 1135e2c8ff4..34d1d15764d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1435,4 +1435,30 @@ public class BrokerServiceTest extends BrokerTestBase {
             assertTrue(conf.isForceDeleteTenantAllowed());
         });
     }
+
+    @Test
+    public void testDuplicateAcknowledgement() throws Exception {
+        final String ns = "prop/ns-test";
+
+        admin.namespaces().createNamespace(ns, 2);
+        final String topicName = 
"persistent://prop/ns-test/duplicated-acknowledgement-test";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName("sub-1")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(true)
+                .subscribe();
+        producer.send("1".getBytes(StandardCharsets.UTF_8));
+        Message<byte[]> message = consumer1.receive();
+        consumer1.acknowledge(message);
+        consumer1.acknowledge(message);
+        assertEquals(admin.topics().getStats(topicName).getSubscriptions()
+                .get("sub-1").getUnackedMessages(), 0);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
index 213296d2283..d2288f948b8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java
@@ -18,32 +18,35 @@
  */
 package org.apache.pulsar.client.impl;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import lombok.Cleanup;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.KeySharedPolicy;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 @Test(groups = "broker-impl")
 public class KeySharedSubscriptionTest extends ProducerConsumerBase {
@@ -70,91 +73,58 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
     @Test(dataProvider = "subType")
     public void 
testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(SubscriptionType 
subscriptionType)
             throws PulsarClientException {
-        PulsarClient pulsarClient = PulsarClient.builder().
-                serviceUrl(lookupUrl.toString())
-                .build();
         final int totalMsg = 1000;
         String topic = "broker-close-test-" + 
RandomStringUtils.randomAlphabetic(5);
-        Map<Consumer<?>, List<MessageId>> nameToId = Maps.newConcurrentMap();
+        Map<Consumer<?>, List<MessageId>> nameToId = new ConcurrentHashMap<>();
         Set<MessageId> pubMessages = Sets.newConcurrentHashSet();
         Set<MessageId> recMessages = Sets.newConcurrentHashSet();
         AtomicLong lastActiveTime = new AtomicLong();
         AtomicBoolean canAcknowledgement = new AtomicBoolean(false);
 
-        @Cleanup
-        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("sub-1")
-                .subscriptionType(subscriptionType)
-                .consumerName("con-1")
-                .messageListener((cons1, msg) -> {
-                    lastActiveTime.set(System.currentTimeMillis());
-                    nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>())
-                            .add(msg.getMessageId());
-                    recMessages.add(msg.getMessageId());
-                    if (canAcknowledgement.get()) {
-                        try {
-                            cons1.acknowledge(msg);
-                        } catch (PulsarClientException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                })
-                .subscribe();
-        @Cleanup
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("sub-1")
-                .subscriptionType(subscriptionType)
-                .messageListener((cons2, msg) -> {
-                    lastActiveTime.set(System.currentTimeMillis());
-                    nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>())
-                            .add(msg.getMessageId());
-                    recMessages.add(msg.getMessageId());
-                    if (canAcknowledgement.get()) {
-                        try {
-                            cons2.acknowledge(msg);
-                        } catch (PulsarClientException e) {
-                            throw new RuntimeException(e);
+        List<Consumer<?>> consumerList = new ArrayList<>();
+        // create 3 consumers
+        for (int i = 0; i < 3; i++) {
+            ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer()
+                    .topic(topic)
+                    .subscriptionName("sub-1")
+                    .subscriptionType(subscriptionType)
+                    .messageListener((consumer, msg) -> {
+                        lastActiveTime.set(System.currentTimeMillis());
+                        nameToId.computeIfAbsent(consumer, (k) -> new 
ArrayList<>())
+                                .add(msg.getMessageId());
+                        recMessages.add(msg.getMessageId());
+                        if (canAcknowledgement.get()) {
+                            try {
+                                consumer.acknowledge(msg);
+                            } catch (PulsarClientException e) {
+                                throw new RuntimeException(e);
+                            }
                         }
-                    }
-                })
-                .consumerName("con-2")
-                .subscribe();
-        @Cleanup
-        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("sub-1")
-                .subscriptionType(subscriptionType)
-                .messageListener((cons3, msg) -> {
-                    lastActiveTime.set(System.currentTimeMillis());
-                    nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>())
-                            .add(msg.getMessageId());
-                    recMessages.add(msg.getMessageId());
-                    if (canAcknowledgement.get()) {
-                        try {
-                            cons3.acknowledge(msg);
-                        } catch (PulsarClientException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                })
-                .consumerName("con-3")
-                .subscribe();
+                    });
+
+            if (subscriptionType == SubscriptionType.Key_Shared) {
+                // ensure every consumer can be distributed messages
+                int hash = Murmur3_32Hash.getInstance().makeHash(("key-" + 
i).getBytes())
+                        % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
+                
builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, 
hash)));
+            }
+
+            consumerList.add(builder.subscribe());
+        }
 
-        @Cleanup
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
                 .enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
                 // We chose 9 because the maximum unacked message is 10
                 .batchingMaxMessages(9)
+                .batcherBuilder(BatcherBuilder.KEY_BASED)
                 .create();
 
         for (int i = 0; i < totalMsg; i++) {
-            producer.sendAsync(UUID.randomUUID().toString()
-                            .getBytes(StandardCharsets.UTF_8))
-                    .thenAccept(pubMessages::add);
+            byte[] msg = 
UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8);
+            producer.newMessage().key("key-" + (i % 3)).value(msg)
+                    .sendAsync().thenAccept(pubMessages::add);
         }
 
         // Wait for all consumers can not read more messages. the consumers 
are stuck by max unacked messages.
@@ -176,7 +146,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
 
         // Wait for all consumers to continue receiving messages.
         Awaitility.await()
-                .atMost(30, TimeUnit.SECONDS)
+                .atMost(15, TimeUnit.SECONDS)
                 .pollDelay(5, TimeUnit.SECONDS)
                 .until(() ->
                         (System.currentTimeMillis() - lastActiveTime.get()) > 
TimeUnit.SECONDS.toMillis(5));
@@ -186,5 +156,11 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         Assert.assertEquals(pubMessages.size(), totalMsg);
         Assert.assertEquals(pubMessages.size(), recMessages.size());
         Assert.assertTrue(recMessages.containsAll(pubMessages));
+
+        // cleanup
+        producer.close();
+        for (Consumer<?> consumer : consumerList) {
+            consumer.close();
+        }
     }
 }

Reply via email to