This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit df232a03c66ba88723a814170c60503d3aada75f Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Jan 27 17:54:10 2022 -0800 Fixed handling of consumers with equal names on on key shared selector with consistent hashing (#13991) Fixes #10750 When removing consumers from the key-shared selector based on the consistent hashing, we were removing by consumer name, although there can be duplicated consumer names (even though is not recommended). (cherry picked from commit 8360d45ed88ba551ecd16ce910ca82181cf6dc1b) --- ...ConsistentHashingStickyKeyConsumerSelector.java | 2 +- .../client/api/KeySharedSubscriptionTest.java | 72 ++++++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java index 7b7a830..ac255b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java @@ -91,7 +91,7 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons if (v == null) { return null; } else { - v.removeIf(c -> c.consumerName().equals(consumer.consumerName())); + v.removeIf(c -> c.equals(consumer)); if (v.isEmpty()) { v = null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index d1f0075..8fc8980 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -40,7 +40,10 @@ import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -1069,6 +1072,75 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { consumer1.close(); } + @Test(timeOut = 30_000) + public void testCheckConsumersWithSameName() throws Exception { + final String topicName = "persistent://public/default/same-name-" + UUID.randomUUID(); + final String subName = "my-sub"; + final String consumerName = "name"; + + ConsumerBuilder<String> cb = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .consumerName(consumerName) + .subscriptionType(SubscriptionType.Key_Shared); + + // Create 3 consumers with same name + Consumer<String> c1 = cb.subscribe(); + + @Cleanup + Consumer<String> c2 = cb.subscribe(); + @Cleanup + Consumer<String> c3 = cb.subscribe(); + + Producer<String> p = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .create(); + for (int i = 0; i < 100; i++) { + p.newMessage() + .key(Integer.toString(i)) + .value("msg-" + i) + .send(); + } + + // C1 receives some messages and won't ack + for (int i = 0; i < 5; i++) { + c1.receive(); + } + + // Close C1, now all messages should go to c2 & c3 + c1.close(); + + CountDownLatch l = new CountDownLatch(100); + + @Cleanup("shutdownNow") + ExecutorService e = Executors.newCachedThreadPool(); + e.submit(() -> { + while (l.getCount() > 0) { + try { + Message<String> msg = c2.receive(1, TimeUnit.SECONDS); + c2.acknowledge(msg); + l.countDown(); + } catch (PulsarClientException ex) { + ex.printStackTrace(); + } + } + }); + + e.submit(() -> { + while (l.getCount() > 0) { + try { + Message<String> msg = c3.receive(1, TimeUnit.SECONDS); + c3.acknowledge(msg); + l.countDown(); + } catch (PulsarClientException ex) { + ex.printStackTrace(); + } + } + }); + + l.await(); + } + private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) { if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) { return ((PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)