This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fc1a8ee4c829f663091b3d0daf91b14661539cf5 Author: Masahiro Sakamoto <massa...@yahoo-corp.jp> AuthorDate: Tue Jul 7 01:25:07 2020 +0900 Consumer is registered on dispatcher even if hash range conflicts on Key_Shared subscription (#7444) (cherry picked from commit 97ee82ed2dc337f81d7059c5d8980191d16dbfe3) --- .../broker/service/AbstractDispatcherSingleActiveConsumer.java | 4 ++-- .../NonPersistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++++-- .../PersistentStickyKeyDispatcherMultipleConsumers.java | 10 ++++++++-- .../apache/pulsar/client/api/KeySharedSubscriptionTest.java | 10 +++++++++- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 6c5f8a7..9948dcc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -155,8 +155,6 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas throw new ConsumerBusyException("Subscription reached max consumers limit"); } - consumers.add(consumer); - if (subscriptionType == SubType.Exclusive && consumer.getKeySharedMeta() != null && consumer.getKeySharedMeta().getHashRangesList() != null @@ -168,6 +166,8 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas isKeyHashRangeFiltered = false; } + consumers.add(consumer); + if (!pickAndScheduleActiveConsumer()) { // the active consumer is not changed Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java index 32cce87..37b29da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java @@ -47,7 +47,13 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis @Override public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { super.addConsumer(consumer); - selector.addConsumer(consumer); + try { + selector.addConsumer(consumer); + } catch (BrokerServiceException e) { + consumerSet.removeAll(consumer); + consumerList.remove(consumer); + throw e; + } } @Override @@ -99,4 +105,4 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages()); } } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index a4a532f..420552c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -93,7 +93,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi @Override public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { super.addConsumer(consumer); - selector.addConsumer(consumer); + try { + selector.addConsumer(consumer); + } catch (BrokerServiceException e) { + consumerSet.removeAll(consumer); + consumerList.remove(consumer); + throw e; + } // If this was the 1st consumer, or if all the messages are already acked, then we // don't need to do anything special @@ -294,4 +300,4 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class); -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 610c4d1..02073ea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -658,7 +658,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { @Test public void testHashRangeConflict() throws PulsarClientException { this.conf.setSubscriptionKeySharedEnable(true); - final String topic = "testHashRangeConflict-" + UUID.randomUUID().toString(); + final String topic = "persistent://public/default/testHashRangeConflict-" + UUID.randomUUID().toString(); final String sub = "test"; Consumer<String> consumer1 = createFixedHashRangesConsumer(topic, sub, Range.of(0,99), Range.of(400, 65535)); @@ -667,6 +667,10 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { Consumer<String> consumer2 = createFixedHashRangesConsumer(topic, sub, Range.of(100,399)); Assert.assertTrue(consumer2.isConnected()); + PersistentStickyKeyDispatcherMultipleConsumers dispatcher = (PersistentStickyKeyDispatcherMultipleConsumers) pulsar + .getBrokerService().getTopicReference(topic).get().getSubscription(sub).getDispatcher(); + Assert.assertEquals(dispatcher.getConsumers().size(), 2); + try { createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535)); Assert.fail("Should failed with conflict range."); @@ -679,7 +683,9 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { } catch (PulsarClientException.ConsumerAssignException ignore) { } + Assert.assertEquals(dispatcher.getConsumers().size(), 2); consumer1.close(); + Assert.assertEquals(dispatcher.getConsumers().size(), 1); try { createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535)); @@ -705,9 +711,11 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { Consumer<String> consumer4 = createFixedHashRangesConsumer(topic, sub, Range.of(50,99)); Assert.assertTrue(consumer4.isConnected()); + Assert.assertEquals(dispatcher.getConsumers().size(), 3); consumer2.close(); consumer3.close(); consumer4.close(); + Assert.assertFalse(dispatcher.isConsumerConnected()); } @Test