This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit df232a03c66ba88723a814170c60503d3aada75f
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Thu Jan 27 17:54:10 2022 -0800

    Fixed handling of consumers with equal names on on key shared selector with 
consistent hashing (#13991)
    
    Fixes #10750
    
    When removing consumers from the key-shared selector based on the 
consistent hashing, we were removing by consumer name, although there can be 
duplicated consumer names (even though is not recommended).
    
    (cherry picked from commit 8360d45ed88ba551ecd16ce910ca82181cf6dc1b)
---
 ...ConsistentHashingStickyKeyConsumerSelector.java |  2 +-
 .../client/api/KeySharedSubscriptionTest.java      | 72 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index 7b7a830..ac255b5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -91,7 +91,7 @@ public class ConsistentHashingStickyKeyConsumerSelector 
implements StickyKeyCons
                     if (v == null) {
                         return null;
                     } else {
-                        v.removeIf(c -> 
c.consumerName().equals(consumer.consumerName()));
+                        v.removeIf(c -> c.equals(consumer));
                         if (v.isEmpty()) {
                             v = null;
                         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index d1f0075..8fc8980 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -40,7 +40,10 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -1069,6 +1072,75 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
         consumer1.close();
     }
 
+    @Test(timeOut = 30_000)
+    public void testCheckConsumersWithSameName() throws Exception {
+        final String topicName = "persistent://public/default/same-name-" + 
UUID.randomUUID();
+        final String subName = "my-sub";
+        final String consumerName = "name";
+
+        ConsumerBuilder<String> cb = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .consumerName(consumerName)
+                .subscriptionType(SubscriptionType.Key_Shared);
+
+        // Create 3 consumers with same name
+        Consumer<String> c1 = cb.subscribe();
+
+        @Cleanup
+        Consumer<String> c2 = cb.subscribe();
+        @Cleanup
+        Consumer<String> c3 = cb.subscribe();
+
+        Producer<String> p = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+        for (int i = 0; i < 100; i++) {
+            p.newMessage()
+                    .key(Integer.toString(i))
+                    .value("msg-" + i)
+                    .send();
+        }
+
+        // C1 receives some messages and won't ack
+        for (int i = 0; i < 5; i++) {
+            c1.receive();
+        }
+
+        // Close C1, now all messages should go to c2 & c3
+        c1.close();
+
+        CountDownLatch l = new CountDownLatch(100);
+
+        @Cleanup("shutdownNow")
+        ExecutorService e = Executors.newCachedThreadPool();
+        e.submit(() -> {
+            while (l.getCount() > 0) {
+                try {
+                    Message<String> msg = c2.receive(1, TimeUnit.SECONDS);
+                    c2.acknowledge(msg);
+                    l.countDown();
+                } catch (PulsarClientException ex) {
+                    ex.printStackTrace();
+                }
+            }
+        });
+
+        e.submit(() -> {
+            while (l.getCount() > 0) {
+                try {
+                    Message<String> msg = c3.receive(1, TimeUnit.SECONDS);
+                    c3.acknowledge(msg);
+                    l.countDown();
+                } catch (PulsarClientException ex) {
+                    ex.printStackTrace();
+                }
+            }
+        });
+
+        l.await();
+    }
+
     private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String 
subscription) {
         if 
(TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
             return ((PersistentStickyKeyDispatcherMultipleConsumers) 
topic.getSubscription(subscription)

Reply via email to