Repository: activemq-artemis Updated Branches: refs/heads/master 1cb372bcd -> 0bd784df4
Declare ConcurrentMaps instead of ConcurrentHashMaps See PR #88 for discussion. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2a647c17 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2a647c17 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2a647c17 Branch: refs/heads/master Commit: 2a647c176f5424a92642ca24ba7d6f9eaba0315e Parents: 1cb372b Author: Ville Skyttä <ville.sky...@iki.fi> Authored: Wed Jul 22 16:07:58 2015 +0300 Committer: Ville Skyttä <ville.sky...@iki.fi> Committed: Wed Jul 22 16:16:57 2015 +0300 ---------------------------------------------------------------------- .../org/apache/activemq/artemis/utils/FactoryFinder.java | 3 ++- .../artemis/core/protocol/mqtt/MQTTSessionState.java | 6 +++--- .../artemis/core/protocol/mqtt/MQTTSubscriptionManager.java | 5 +++-- .../core/protocol/openwire/OpenWireProtocolManager.java | 3 ++- .../core/protocol/openwire/amq/AMQConnectionContext.java | 8 ++++---- .../core/protocol/openwire/amq/AMQSecurityContext.java | 9 +++++---- .../java/org/proton/plug/test/minimalserver/DumbServer.java | 3 ++- .../activemq/artemis/rest/queue/ConsumersResource.java | 3 ++- .../activemq/artemis/rest/topic/SubscriptionsResource.java | 3 ++- .../src/test/java/org/apache/activemq/bugs/AMQ4062Test.java | 9 +++++---- .../org/apache/activemq/transport/tcp/SocketTstFactory.java | 3 ++- 11 files changed, 32 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FactoryFinder.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FactoryFinder.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FactoryFinder.java index 4af3203..b26a71e 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FactoryFinder.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FactoryFinder.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class FactoryFinder { @@ -51,7 +52,7 @@ public class FactoryFinder */ protected static class StandaloneObjectFactory implements ObjectFactory { - final ConcurrentHashMap<String, Class> classMap = new ConcurrentHashMap<String, Class>(); + final ConcurrentMap<String, Class> classMap = new ConcurrentHashMap<String, Class>(); public Object create(final String path) throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index 38fea75..d6bbd44 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -35,12 +35,12 @@ public class MQTTSessionState private ServerMessage willMessage; - private final ConcurrentHashMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>(); // Used to store Packet ID of Publish QoS1 and QoS2 message. See spec: 4.3.3 QoS 2: Exactly once delivery. Method B. private Map<Integer, MQTTMessageInfo> messageRefStore; - private ConcurrentHashMap<String, Map<Long, Integer>> addressMessageMap; + private ConcurrentMap<String, Map<Long, Integer>> addressMessageMap; private Set<Integer> pubRec; @@ -53,7 +53,7 @@ public class MQTTSessionState // Objects track the Outbound message references private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore; - private ConcurrentMap<String, ConcurrentHashMap<Long, Integer>> reverseOutboundReferenceStore; + private ConcurrentMap<String, ConcurrentMap<Long, Integer>> reverseOutboundReferenceStore; private final Object outboundLock = new Object(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index e7ac143..c523938 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -25,14 +25,15 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class MQTTSubscriptionManager { private MQTTSession session; - private ConcurrentHashMap<Long, Integer> consumerQoSLevels; + private ConcurrentMap<Long, Integer> consumerQoSLevels; - private ConcurrentHashMap<String, ServerConsumer> consumers; + private ConcurrentMap<String, ServerConsumer> consumers; private MQTTLogger log = MQTTLogger.LOGGER; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index a34168c..15cb9e2 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -26,6 +26,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import io.netty.channel.ChannelPipeline; @@ -117,7 +118,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No private final CopyOnWriteArrayList<OpenWireConnection> connections = new CopyOnWriteArrayList<OpenWireConnection>(); - protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); + protected final ConcurrentMap<ConnectionId, ConnectionInfo> connectionInfos = new ConcurrentHashMap<ConnectionId, ConnectionInfo>(); private final Map<String, AMQConnectionContext> clientIdSet = new HashMap<String, AMQConnectionContext>(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java index def94b6..a219bb2 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConnectionContext.java @@ -17,7 +17,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq; import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.broker.region.MessageReference; @@ -38,7 +38,7 @@ public class AMQConnectionContext private OpenWireProtocolManager broker; //use protocol manager to represent the broker private boolean inRecoveryMode; private AMQTransaction transaction; - private ConcurrentHashMap<TransactionId, AMQTransaction> transactions; + private ConcurrentMap<TransactionId, AMQTransaction> transactions; private AMQSecurityContext securityContext; private ConnectionId connectionId; private String clientId; @@ -216,13 +216,13 @@ public class AMQConnectionContext this.inRecoveryMode = inRecoveryMode; } - public ConcurrentHashMap<TransactionId, AMQTransaction> getTransactions() + public ConcurrentMap<TransactionId, AMQTransaction> getTransactions() { return transactions; } public void setTransactions( - ConcurrentHashMap<TransactionId, AMQTransaction> transactions) + ConcurrentMap<TransactionId, AMQTransaction> transactions) { this.transactions = transactions; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSecurityContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSecurityContext.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSecurityContext.java index f51a636..8cc6238 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSecurityContext.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSecurityContext.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.activemq.command.ActiveMQDestination; @@ -45,8 +46,8 @@ public abstract class AMQSecurityContext final String userName; - final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> authorizedReadDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>(); - final ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> authorizedWriteDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>(); + final ConcurrentMap<ActiveMQDestination, ActiveMQDestination> authorizedReadDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>(); + final ConcurrentMap<ActiveMQDestination, ActiveMQDestination> authorizedWriteDests = new ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination>(); public AMQSecurityContext(String userName) { @@ -77,12 +78,12 @@ public abstract class AMQSecurityContext return userName; } - public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests() + public ConcurrentMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedReadDests() { return authorizedReadDests; } - public ConcurrentHashMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests() + public ConcurrentMap<ActiveMQDestination, ActiveMQDestination> getAuthorizedWriteDests() { return authorizedWriteDests; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java index 452c949..e615a4c 100644 --- a/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java +++ b/artemis-protocols/artemis-proton-plug/src/test/java/org/proton/plug/test/minimalserver/DumbServer.java @@ -18,11 +18,12 @@ package org.proton.plug.test.minimalserver; import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; public class DumbServer { - static ConcurrentHashMap<String, BlockingDeque<Object>> maps = new ConcurrentHashMap<>(); + static ConcurrentMap<String, BlockingDeque<Object>> maps = new ConcurrentHashMap<>(); public static BlockingDeque getQueue(String name) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/artemis-rest/src/main/java/org/apache/activemq/artemis/rest/queue/ConsumersResource.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/main/java/org/apache/activemq/artemis/rest/queue/ConsumersResource.java b/artemis-rest/src/main/java/org/apache/activemq/artemis/rest/queue/ConsumersResource.java index 4b4214d..9e73b02 100644 --- a/artemis-rest/src/main/java/org/apache/activemq/artemis/rest/queue/ConsumersResource.java +++ b/artemis-rest/src/main/java/org/apache/activemq/artemis/rest/queue/ConsumersResource.java @@ -30,6 +30,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -39,7 +40,7 @@ import org.apache.activemq.artemis.rest.util.TimeoutTask; public class ConsumersResource implements TimeoutTask.Callback { - protected ConcurrentHashMap<String, QueueConsumer> queueConsumers = new ConcurrentHashMap<String, QueueConsumer>(); + protected ConcurrentMap<String, QueueConsumer> queueConsumers = new ConcurrentHashMap<String, QueueConsumer>(); protected ClientSessionFactory sessionFactory; protected String destination; protected final String startup = Long.toString(System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/artemis-rest/src/main/java/org/apache/activemq/artemis/rest/topic/SubscriptionsResource.java ---------------------------------------------------------------------- diff --git a/artemis-rest/src/main/java/org/apache/activemq/artemis/rest/topic/SubscriptionsResource.java b/artemis-rest/src/main/java/org/apache/activemq/artemis/rest/topic/SubscriptionsResource.java index 55bb64a..444e8f4 100644 --- a/artemis-rest/src/main/java/org/apache/activemq/artemis/rest/topic/SubscriptionsResource.java +++ b/artemis-rest/src/main/java/org/apache/activemq/artemis/rest/topic/SubscriptionsResource.java @@ -30,6 +30,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -45,7 +46,7 @@ import org.apache.activemq.artemis.rest.util.TimeoutTask; public class SubscriptionsResource implements TimeoutTask.Callback { - protected ConcurrentHashMap<String, QueueConsumer> queueConsumers = new ConcurrentHashMap<String, QueueConsumer>(); + protected ConcurrentMap<String, QueueConsumer> queueConsumers = new ConcurrentHashMap<String, QueueConsumer>(); protected ClientSessionFactory sessionFactory; protected String destination; protected final String startup = Long.toString(System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java index 222efc7..2be0126 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import javax.jms.DeliveryMode; @@ -57,7 +58,7 @@ public class AMQ4062Test { private BrokerService service; private PolicyEntry policy; - private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions; + private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions; private static final int PREFETCH_SIZE_5=5; private String connectionUri; @@ -174,17 +175,17 @@ public class AMQ4062Test { } @SuppressWarnings("unchecked") - private ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException { + private ConcurrentMap<SubscriptionKey, DurableTopicSubscription> getDurableSubscriptions() throws NoSuchFieldException, IllegalAccessException { if(durableSubscriptions!=null) return durableSubscriptions; RegionBroker regionBroker=(RegionBroker)service.getRegionBroker(); TopicRegion region=(TopicRegion)regionBroker.getTopicRegion(); Field field=TopicRegion.class.getDeclaredField("durableSubscriptions"); field.setAccessible(true); - durableSubscriptions=(ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>)field.get(region); + durableSubscriptions=(ConcurrentMap<SubscriptionKey, DurableTopicSubscription>)field.get(region); return durableSubscriptions; } - private ConsumerInfo getConsumerInfo(ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions) { + private ConsumerInfo getConsumerInfo(ConcurrentMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions) { ConsumerInfo info=null; for(Iterator<DurableTopicSubscription> it=durableSubscriptions.values().iterator();it.hasNext();){ Subscription sub = it.next(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2a647c17/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java index 396f284..9b31a73 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java @@ -22,6 +22,7 @@ import java.net.Socket; import java.net.UnknownHostException; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import javax.net.SocketFactory; @@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory; public class SocketTstFactory extends SocketFactory { private static final Logger LOG = LoggerFactory.getLogger(SocketTstFactory.class); - private static final ConcurrentHashMap<InetAddress, Integer> closeIter = new ConcurrentHashMap<InetAddress, Integer>(); + private static final ConcurrentMap<InetAddress, Integer> closeIter = new ConcurrentHashMap<InetAddress, Integer>(); private class SocketTst {