This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 120242477e800924f537acd3c64070ae04a190de Author: Penghui Li <[email protected]> AuthorDate: Fri Jul 22 15:24:40 2022 +0800 [fix][broker] Fix consumer does not abide by the max unacks limitation for Key_Shared subscription (#16718) (cherry picked from commit fd9c418147503ec226f2c87e187a58af6f601b7d) --- .../PersistentStickyKeyDispatcherMultipleConsumers.java | 6 +++++- .../pulsar/client/api/SimpleProducerConsumerTest.java | 17 ++++++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 881d3db1a81..886be363c41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -228,7 +228,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi Consumer consumer = current.getKey(); List<Entry> entriesWithSameKey = current.getValue(); int entriesWithSameKeyCount = entriesWithSameKey.size(); - final int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0); + int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0); + if (consumer != null && consumer.getMaxUnackedMessages() > 0) { + availablePermits = Math.min(availablePermits, + consumer.getMaxUnackedMessages() - consumer.getUnackedMessages()); + } int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits); int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, readType, consumerStickyKeyHashesMap.get(consumer)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 57f82136636..555191ae599 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -149,6 +149,16 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { return new Object[][] { { true }, { false } }; } + @DataProvider(name = "ackReceiptEnabledAndSubscriptionTypes") + public Object[][] ackReceiptEnabledAndSubscriptionTypes() { + return new Object[][] { + {true, SubscriptionType.Shared}, + {true, SubscriptionType.Key_Shared}, + {false, SubscriptionType.Shared}, + {false, SubscriptionType.Key_Shared}, + }; + } + @AfterMethod(alwaysRun = true) @Override protected void cleanup() throws Exception { @@ -1596,8 +1606,9 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } } - @Test(dataProvider = "ackReceiptEnabled") - public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled) throws PulsarClientException { + @Test(dataProvider = "ackReceiptEnabledAndSubscriptionTypes") + public void testMaxUnAckMessagesLowerThanPermits(boolean ackReceiptEnabled, SubscriptionType subType) + throws PulsarClientException { final int maxUnacks = 10; pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnacks); final String topic = "persistent://my-property/my-ns/testMaxUnAckMessagesLowerThanPermits"; @@ -1605,7 +1616,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { @Cleanup Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic).subscriptionName("sub") - .subscriptionType(SubscriptionType.Shared) + .subscriptionType(subType) .isAckReceiptEnabled(ackReceiptEnabled) .acknowledgmentGroupTime(0, TimeUnit.SECONDS) .subscribe();
