This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new 2c85dd13384 [fix][broker] Fix incorrect unack msk count when dup ack a message (#20990) (#22223) 2c85dd13384 is described below commit 2c85dd13384ea7622d9bb0715d04cb5e3dc4ed95 Author: Zixuan Liu <node...@gmail.com> AuthorDate: Thu Mar 14 10:24:36 2024 +0800 [fix][broker] Fix incorrect unack msk count when dup ack a message (#20990) (#22223) --- .../org/apache/pulsar/broker/service/Consumer.java | 24 ++-- .../pulsar/broker/service/BrokerServiceTest.java | 26 ++++ .../client/impl/KeySharedSubscriptionTest.java | 136 +++++++++------------ 3 files changed, 98 insertions(+), 88 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 8924b750eb6..a7c06d0c85d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -444,6 +444,7 @@ public class Consumer { private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) { List<Position> positionsAcked = new ArrayList<>(); long totalAckCount = 0; + boolean individualAck = false; for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); PositionImpl position; @@ -467,14 +468,18 @@ public class Consumer { } else { position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer); + individualAck = true; } - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); - + if (individualAck) { + if (checkCanRemovePendingAcksAndHandle(position, msgId)) { + addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); + } + } else { + addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); + } positionsAcked.add(position); - checkCanRemovePendingAcksAndHandle(position, msgId); - checkAckValidationError(ack, position); totalAckCount += ackedCount; @@ -636,10 +641,11 @@ public class Consumer { } } - private void checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) { + private boolean checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) { if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) { - removePendingAcks(position); + return removePendingAcks(position); } + return false; } private Consumer getAckOwnerConsumer(long ledgerId, long entryId) { @@ -886,7 +892,7 @@ public class Consumer { * * @param position */ - private void removePendingAcks(PositionImpl position) { + private boolean removePendingAcks(PositionImpl position) { Consumer ackOwnedConsumer = null; if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) { for (Consumer consumer : subscription.getConsumers()) { @@ -907,7 +913,7 @@ public class Consumer { if (ackedPosition != null) { if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) { // Message was already removed by the other consumer - return; + return false; } if (log.isDebugEnabled()) { log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position); @@ -921,7 +927,9 @@ public class Consumer { ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false; flowConsumerBlockedPermits(ackOwnedConsumer); } + return true; } + return false; } public ConcurrentLongLongPairHashMap getPendingAcks() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 1135e2c8ff4..34d1d15764d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1435,4 +1435,30 @@ public class BrokerServiceTest extends BrokerTestBase { assertTrue(conf.isForceDeleteTenantAllowed()); }); } + + @Test + public void testDuplicateAcknowledgement() throws Exception { + final String ns = "prop/ns-test"; + + admin.namespaces().createNamespace(ns, 2); + final String topicName = "persistent://prop/ns-test/duplicated-acknowledgement-test"; + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + @Cleanup + Consumer<byte[]> consumer1 = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub-1") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .isAckReceiptEnabled(true) + .subscribe(); + producer.send("1".getBytes(StandardCharsets.UTF_8)); + Message<byte[]> message = consumer1.receive(); + consumer1.acknowledge(message); + consumer1.acknowledge(message); + assertEquals(admin.topics().getStats(topicName).getSubscriptions() + .get("sub-1").getUnackedMessages(), 0); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java index 213296d2283..d2288f948b8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java @@ -18,32 +18,35 @@ */ package org.apache.pulsar.client.impl; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import lombok.Cleanup; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.Murmur3_32Hash; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; @Test(groups = "broker-impl") public class KeySharedSubscriptionTest extends ProducerConsumerBase { @@ -70,91 +73,58 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { @Test(dataProvider = "subType") public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(SubscriptionType subscriptionType) throws PulsarClientException { - PulsarClient pulsarClient = PulsarClient.builder(). - serviceUrl(lookupUrl.toString()) - .build(); final int totalMsg = 1000; String topic = "broker-close-test-" + RandomStringUtils.randomAlphabetic(5); - Map<Consumer<?>, List<MessageId>> nameToId = Maps.newConcurrentMap(); + Map<Consumer<?>, List<MessageId>> nameToId = new ConcurrentHashMap<>(); Set<MessageId> pubMessages = Sets.newConcurrentHashSet(); Set<MessageId> recMessages = Sets.newConcurrentHashSet(); AtomicLong lastActiveTime = new AtomicLong(); AtomicBoolean canAcknowledgement = new AtomicBoolean(false); - @Cleanup - Consumer<byte[]> consumer1 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .consumerName("con-1") - .messageListener((cons1, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons1,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons1.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - }) - .subscribe(); - @Cleanup - Consumer<byte[]> consumer2 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .messageListener((cons2, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons2,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons2.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); + List<Consumer<?>> consumerList = new ArrayList<>(); + // create 3 consumers + for (int i = 0; i < 3; i++) { + ConsumerBuilder<byte[]> builder = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub-1") + .subscriptionType(subscriptionType) + .messageListener((consumer, msg) -> { + lastActiveTime.set(System.currentTimeMillis()); + nameToId.computeIfAbsent(consumer, (k) -> new ArrayList<>()) + .add(msg.getMessageId()); + recMessages.add(msg.getMessageId()); + if (canAcknowledgement.get()) { + try { + consumer.acknowledge(msg); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } } - } - }) - .consumerName("con-2") - .subscribe(); - @Cleanup - Consumer<byte[]> consumer3 = pulsarClient.newConsumer() - .topic(topic) - .subscriptionName("sub-1") - .subscriptionType(subscriptionType) - .messageListener((cons3, msg) -> { - lastActiveTime.set(System.currentTimeMillis()); - nameToId.computeIfAbsent(cons3,(k) -> new ArrayList<>()) - .add(msg.getMessageId()); - recMessages.add(msg.getMessageId()); - if (canAcknowledgement.get()) { - try { - cons3.acknowledge(msg); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } - }) - .consumerName("con-3") - .subscribe(); + }); + + if (subscriptionType == SubscriptionType.Key_Shared) { + // ensure every consumer can be distributed messages + int hash = Murmur3_32Hash.getInstance().makeHash(("key-" + i).getBytes()) + % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE; + builder.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(hash, hash))); + } + + consumerList.add(builder.subscribe()); + } - @Cleanup Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) .enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) // We chose 9 because the maximum unacked message is 10 .batchingMaxMessages(9) + .batcherBuilder(BatcherBuilder.KEY_BASED) .create(); for (int i = 0; i < totalMsg; i++) { - producer.sendAsync(UUID.randomUUID().toString() - .getBytes(StandardCharsets.UTF_8)) - .thenAccept(pubMessages::add); + byte[] msg = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + producer.newMessage().key("key-" + (i % 3)).value(msg) + .sendAsync().thenAccept(pubMessages::add); } // Wait for all consumers can not read more messages. the consumers are stuck by max unacked messages. @@ -176,7 +146,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { // Wait for all consumers to continue receiving messages. Awaitility.await() - .atMost(30, TimeUnit.SECONDS) + .atMost(15, TimeUnit.SECONDS) .pollDelay(5, TimeUnit.SECONDS) .until(() -> (System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5)); @@ -186,5 +156,11 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { Assert.assertEquals(pubMessages.size(), totalMsg); Assert.assertEquals(pubMessages.size(), recMessages.size()); Assert.assertTrue(recMessages.containsAll(pubMessages)); + + // cleanup + producer.close(); + for (Consumer<?> consumer : consumerList) { + consumer.close(); + } } }