https://issues.apache.org/jira/browse/AMQ-5337
Switch to LinkedHashMap with R/W locking for concurrent add / remove protection Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4349e77e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4349e77e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4349e77e Branch: refs/heads/activemq-5.10.x Commit: 4349e77eefa560f0dff2eee31cb6a9881fee3559 Parents: 41311df Author: Timothy Bish <[email protected]> Authored: Fri Aug 29 15:52:23 2014 -0400 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Dec 17 19:50:55 2014 -0500 ---------------------------------------------------------------------- .../activemq/advisory/AdvisoryBroker.java | 45 +++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4349e77e/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 2583a23..58947ad 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -16,12 +16,14 @@ */ package org.apache.activemq.advisory; +import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; @@ -70,7 +72,8 @@ public class AdvisoryBroker extends BrokerFilter { protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); - protected final Queue<ConsumerInfo> consumers = new ConcurrentLinkedQueue<ConsumerInfo>(); + private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); + protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>(); protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); @@ -103,7 +106,12 @@ public class AdvisoryBroker extends BrokerFilter { // Don't advise advisory topics. if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); - consumers.offer(info); + consumersLock.writeLock().lock(); + try { + consumers.put(info.getConsumerId(), info); + } finally { + consumersLock.writeLock().unlock(); + } fireConsumerAdvisory(context, info.getDestination(), topic, info); } else { // We need to replay all the previously collected state objects @@ -148,10 +156,15 @@ public class AdvisoryBroker extends BrokerFilter { // Replay the consumers. if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { - for (Iterator<ConsumerInfo> iter = consumers.iterator(); iter.hasNext(); ) { - ConsumerInfo value = iter.next(); - ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); - fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId()); + consumersLock.readLock().lock(); + try { + for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) { + ConsumerInfo value = iter.next(); + ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination()); + fireConsumerAdvisory(context, value.getDestination(), topic, value, info.getConsumerId()); + } + } finally { + consumersLock.readLock().unlock(); } } @@ -266,7 +279,12 @@ public class AdvisoryBroker extends BrokerFilter { ActiveMQDestination dest = info.getDestination(); if (!AdvisorySupport.isAdvisoryTopic(dest)) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest); - consumers.remove(info); + consumersLock.writeLock().lock(); + try { + consumers.remove(info.getConsumerId()); + } finally { + consumersLock.writeLock().unlock(); + } if (!dest.isTemporary() || destinations.containsKey(dest)) { fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand()); } @@ -623,8 +641,13 @@ public class AdvisoryBroker extends BrokerFilter { return connections; } - public Queue<ConsumerInfo> getAdvisoryConsumers() { - return consumers; + public Collection<ConsumerInfo> getAdvisoryConsumers() { + consumersLock.readLock().lock(); + try { + return new ArrayList<ConsumerInfo>(consumers.values()); + } finally { + consumersLock.readLock().unlock(); + } } public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {
