This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e531e0  Use synchronized when accessing consumers identity map (#3540)
1e531e0 is described below

commit 1e531e0c5dd24db9f85f7e97bb49ed7ad93bf617
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Thu Feb 7 14:22:28 2019 -0800

    Use synchronized when accessing consumers identity map (#3540)
---
 .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 26b7937..63b4071 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -685,11 +685,13 @@ public class PulsarClientImpl implements PulsarClient {
 
     @SuppressWarnings("unchecked")
     private <T> Optional<ConsumerBase<T>> 
subscriptionExist(ConsumerConfigurationData<?> conf) {
-        Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
-                .filter(consumerBase -> 
consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
-                .filter(c -> 
c.getSubscription().equals(conf.getSubscriptionName()))
-                .findFirst();
-        return subscriber.map(ConsumerBase.class::cast);
+        synchronized (consumers) {
+            Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream()
+                    .filter(consumerBase -> 
consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared))
+                    .filter(c -> 
c.getSubscription().equals(conf.getSubscriptionName()))
+                    .findFirst();
+            return subscriber.map(ConsumerBase.class::cast);
+        }
     }
 
     private static EventLoopGroup getEventLoopGroup(ClientConfigurationData 
conf) {

Reply via email to