This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1e531e0 Use synchronized when accessing consumers identity map (#3540) 1e531e0 is described below commit 1e531e0c5dd24db9f85f7e97bb49ed7ad93bf617 Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Feb 7 14:22:28 2019 -0800 Use synchronized when accessing consumers identity map (#3540) --- .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 26b7937..63b4071 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -685,11 +685,13 @@ public class PulsarClientImpl implements PulsarClient { @SuppressWarnings("unchecked") private <T> Optional<ConsumerBase<T>> subscriptionExist(ConsumerConfigurationData<?> conf) { - Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream() - .filter(consumerBase -> consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared)) - .filter(c -> c.getSubscription().equals(conf.getSubscriptionName())) - .findFirst(); - return subscriber.map(ConsumerBase.class::cast); + synchronized (consumers) { + Optional<ConsumerBase<?>> subscriber = consumers.keySet().stream() + .filter(consumerBase -> consumerBase.getSubType().equals(PulsarApi.CommandSubscribe.SubType.Shared)) + .filter(c -> c.getSubscription().equals(conf.getSubscriptionName())) + .findFirst(); + return subscriber.map(ConsumerBase.class::cast); + } } private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {