[ROCKETMQ-208]incompatibility problem found in enviroment of JDK 1.7 when running client closes apache/incubator-rocketmq#10
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/96cd2e4e Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/96cd2e4e Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/96cd2e4e Branch: refs/heads/release-4.1.0-incubating Commit: 96cd2e4ed03e2c47a7d79d7e10980d154d2acb93 Parents: 2c28baa Author: Jaskey <linjunjie1...@gmail.com> Authored: Tue Jun 6 16:06:46 2017 +0800 Committer: dongeforever <dongefore...@apache.org> Committed: Tue Jun 6 16:28:15 2017 +0800 ---------------------------------------------------------------------- .../broker/client/ConsumerGroupInfo.java | 9 ++-- .../rocketmq/broker/client/ConsumerManager.java | 7 +-- .../broker/client/net/Broker2Client.java | 6 +-- .../client/rebalance/RebalanceLockManager.java | 3 +- .../broker/filter/ConsumerFilterManager.java | 9 ++-- .../broker/filtersrv/FilterServerManager.java | 3 +- .../longpolling/PullRequestHoldService.java | 3 +- .../broker/offset/ConsumerOffsetManager.java | 31 +++++++------ .../broker/processor/AdminBrokerProcessor.java | 4 +- .../subscription/SubscriptionGroupManager.java | 5 +- .../broker/topic/TopicConfigManager.java | 5 +- .../consumer/MQPullConsumerScheduleService.java | 7 +-- .../consumer/store/LocalFileOffsetStore.java | 3 +- .../consumer/store/OffsetSerializeWrapper.java | 7 +-- .../consumer/store/RemoteBrokerOffsetStore.java | 3 +- .../rocketmq/client/impl/MQClientManager.java | 3 +- .../consumer/DefaultMQPullConsumerImpl.java | 4 +- .../consumer/DefaultMQPushConsumerImpl.java | 6 +-- .../client/impl/consumer/MessageQueueLock.java | 3 +- .../client/impl/consumer/PullAPIWrapper.java | 5 +- .../client/impl/consumer/RebalanceImpl.java | 13 +++--- .../client/impl/factory/MQClientInstance.java | 17 +++---- .../impl/producer/DefaultMQProducerImpl.java | 5 +- .../protocol/body/ConsumerConnection.java | 5 +- .../body/ConsumerOffsetSerializeWrapper.java | 9 ++-- .../protocol/body/SubscriptionGroupWrapper.java | 7 +-- .../body/TopicConfigSerializeWrapper.java | 7 +-- .../common/stats/MomentStatsItemSet.java | 5 +- .../rocketmq/common/stats/StatsItemSet.java | 3 +- .../filtersrv/filter/FilterClassManager.java | 3 +- .../namesrv/routeinfo/RouteInfoManager.java | 4 +- .../remoting/netty/NettyRemotingAbstract.java | 3 +- .../remoting/netty/NettyRemotingClient.java | 3 +- .../store/AllocateMappedFileService.java | 3 +- .../rocketmq/store/DefaultMessageStore.java | 49 ++++++++++---------- .../schedule/DelayOffsetSerializeWrapper.java | 7 +-- .../store/schedule/ScheduleMessageService.java | 5 +- 37 files changed, 153 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java index 6ce542a..91b6c81 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -34,9 +35,9 @@ import org.slf4j.LoggerFactory; public class ConsumerGroupInfo { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final String groupName; - private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = + private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>(); - private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = new ConcurrentHashMap<Channel, ClientChannelInfo>(16); private volatile ConsumeType consumeType; private volatile MessageModel messageModel; @@ -63,11 +64,11 @@ public class ConsumerGroupInfo { return null; } - public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { + public ConcurrentMap<String, SubscriptionData> getSubscriptionTable() { return subscriptionTable; } - public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() { + public ConcurrentMap<Channel, ClientChannelInfo> getChannelInfoTable() { return channelInfoTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index a5ddec8..4a262e5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; @@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory; public class ConsumerManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120; - private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable = + private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable = new ConcurrentHashMap<String, ConsumerGroupInfo>(1024); private final ConsumerIdsChangeListener consumerIdsChangeListener; @@ -145,7 +146,7 @@ public class ConsumerManager { Entry<String, ConsumerGroupInfo> next = it.next(); String group = next.getKey(); ConsumerGroupInfo consumerGroupInfo = next.getValue(); - ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = consumerGroupInfo.getChannelInfoTable(); Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator(); @@ -176,7 +177,7 @@ public class ConsumerManager { Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConsumerGroupInfo> entry = it.next(); - ConcurrentHashMap<String, SubscriptionData> subscriptionTable = + ConcurrentMap<String, SubscriptionData> subscriptionTable = entry.getValue().getSubscriptionTable(); if (subscriptionTable.containsKey(topic)) { groups.add(entry.getKey()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 863da62..65b444e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -25,7 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -189,7 +189,7 @@ public class Broker2Client { this.brokerController.getConsumerManager().getConsumerGroupInfo(group); if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { - ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = consumerGroupInfo.getChannelInfoTable(); for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { int version = entry.getValue().getVersion(); @@ -252,7 +252,7 @@ public class Broker2Client { Map<String, Map<MessageQueue, Long>> consumerStatusTable = new HashMap<String, Map<MessageQueue, Long>>(); - ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable = + ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable(); if (null == channelInfoTable || channelInfoTable.isEmpty()) { result.setCode(ResponseCode.SYSTEM_ERROR); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java index 98aceb6..ed5a875 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.client.rebalance; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.common.constant.LoggerName; @@ -31,7 +32,7 @@ public class RebalanceLockManager { private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty( "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); private final Lock lock = new ReentrantLock(); - private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = + private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable = new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024); public boolean tryLock(final String group, final MessageQueue mq, final String clientId) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java index 7f790af..f50db86 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.broker.filter; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -45,7 +46,7 @@ public class ConsumerFilterManager extends ConfigManager { private static final long MS_24_HOUR = 24 * 3600 * 1000; - private ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic> + private ConcurrentMap<String/*Topic*/, FilterDataMapByTopic> filterDataByTopic = new ConcurrentHashMap<String/*consumer group*/, FilterDataMapByTopic>(256); private transient BrokerController brokerController; @@ -316,7 +317,7 @@ public class ConsumerFilterManager extends ConfigManager { } } - public ConcurrentHashMap<String, FilterDataMapByTopic> getFilterDataByTopic() { + public ConcurrentMap<String, FilterDataMapByTopic> getFilterDataByTopic() { return filterDataByTopic; } @@ -326,7 +327,7 @@ public class ConsumerFilterManager extends ConfigManager { public static class FilterDataMapByTopic { - private ConcurrentHashMap<String/*consumer group*/, ConsumerFilterData> + private ConcurrentMap<String/*consumer group*/, ConsumerFilterData> groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>(); private String topic; @@ -452,7 +453,7 @@ public class ConsumerFilterManager extends ConfigManager { return this.groupFilterData.get(consumerGroup); } - public final ConcurrentHashMap<String, ConsumerFilterData> getGroupFilterData() { + public final ConcurrentMap<String, ConsumerFilterData> getGroupFilterData() { return this.groupFilterData; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java index b935bc8..52cb919 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -38,7 +39,7 @@ public class FilterServerManager { public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000; private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable = + private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable = new ConcurrentHashMap<Channel, FilterServerInfo>(16); private final BrokerController brokerController; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index 71f56a4..b1bd86f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.SystemClock; @@ -33,7 +34,7 @@ public class PullRequestHoldService extends ServiceThread { private static final String TOPIC_QUEUEID_SEPARATOR = "@"; private final BrokerController brokerController; private final SystemClock systemClock = new SystemClock(); - private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = + private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable = new ConcurrentHashMap<String, ManyPullRequest>(1024); public PullRequestHoldService(final BrokerController brokerController) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 769c4ad..57565a6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -36,8 +37,8 @@ public class ConsumerOffsetManager extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final String TOPIC_GROUP_SEPARATOR = "@"; - private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = - new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); + private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = + new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512); private transient BrokerController brokerController; @@ -49,9 +50,9 @@ public class ConsumerOffsetManager extends ConfigManager { } public void scanUnsubscribedTopic() { - Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); + Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { - Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); + Entry<String, ConcurrentMap<Integer, Long>> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { @@ -67,7 +68,7 @@ public class ConsumerOffsetManager extends ConfigManager { } } - private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) { + private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap<Integer, Long> table) { Iterator<Entry<Integer, Long>> it = table.entrySet().iterator(); boolean result = !table.isEmpty(); @@ -84,9 +85,9 @@ public class ConsumerOffsetManager extends ConfigManager { public Set<String> whichTopicByConsumer(final String group) { Set<String> topics = new HashSet<String>(); - Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); + Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { - Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); + Entry<String, ConcurrentMap<Integer, Long>> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { @@ -102,9 +103,9 @@ public class ConsumerOffsetManager extends ConfigManager { public Set<String> whichGroupByTopic(final String topic) { Set<String> groups = new HashSet<String>(); - Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); + Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { - Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next(); + Entry<String, ConcurrentMap<Integer, Long>> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { @@ -124,7 +125,7 @@ public class ConsumerOffsetManager extends ConfigManager { } private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { - ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); + ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap<Integer, Long>(32); map.put(queueId, offset); @@ -140,7 +141,7 @@ public class ConsumerOffsetManager extends ConfigManager { public long queryOffset(final String group, final String topic, final int queueId) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; - ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key); + ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) @@ -173,11 +174,11 @@ public class ConsumerOffsetManager extends ConfigManager { return RemotingSerializable.toJson(this, prettyFormat); } - public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() { + public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) { + public void setOffsetTable(ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable) { this.offsetTable = offsetTable; } @@ -196,7 +197,7 @@ public class ConsumerOffsetManager extends ConfigManager { } } - for (Map.Entry<String, ConcurrentHashMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) { + for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) { String topicGroup = offSetEntry.getKey(); String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR); if (topic.equals(topicGroupArr[0])) { @@ -224,7 +225,7 @@ public class ConsumerOffsetManager extends ConfigManager { } public void cloneOffset(final String srcGroup, final String destGroup, final String topic) { - ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); + ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup); if (offsets != null) { this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets)); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index f59d295..71fdda9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; @@ -1084,7 +1084,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { GetConsumeStatsInBrokerHeader requestHeader = (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class); boolean isOrder = requestHeader.isOrder(); - ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroups = + ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroups = brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable(); List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList = http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index bdf2a01..bd4a26e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.subscription; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory; public class SubscriptionGroupManager extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); - private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable = + private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); private final DataVersion dataVersion = new DataVersion(); private transient BrokerController brokerController; @@ -169,7 +170,7 @@ public class SubscriptionGroupManager extends ConfigManager { } } - public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() { + public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() { return subscriptionGroupTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 93a631a..3bcafc0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -44,7 +45,7 @@ public class TopicConfigManager extends ConfigManager { private static final long LOCK_TIMEOUT_MILLIS = 3000; private transient final Lock lockTopicConfigTable = new ReentrantLock(); - private final ConcurrentHashMap<String, TopicConfig> topicConfigTable = + private final ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(1024); private final DataVersion dataVersion = new DataVersion(); private final Set<String> systemTopicList = new HashSet<String>(); @@ -416,7 +417,7 @@ public class TopicConfigManager extends ConfigManager { return dataVersion; } - public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() { + public ConcurrentMap<String, TopicConfig> getTopicConfigTable() { return topicConfigTable; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java index 6bae85a..e0b546d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.exception.MQClientException; @@ -35,11 +36,11 @@ import org.slf4j.Logger; public class MQPullConsumerScheduleService { private final Logger log = ClientLogger.getLog(); private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl(); - private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable = + private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable = new ConcurrentHashMap<MessageQueue, PullTaskImpl>(); private DefaultMQPullConsumer defaultMQPullConsumer; private int pullThreadNums = 20; - private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable = + private ConcurrentMap<String /* topic */, PullTaskCallback> callbackTable = new ConcurrentHashMap<String, PullTaskCallback>(); private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; @@ -100,7 +101,7 @@ public class MQPullConsumerScheduleService { } } - public ConcurrentHashMap<String, PullTaskCallback> getCallbackTable() { + public ConcurrentMap<String, PullTaskCallback> getCallbackTable() { return callbackTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index 6c81516..d4b19b2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -45,7 +46,7 @@ public class LocalFileOffsetStore implements OffsetStore { private final MQClientInstance mQClientFactory; private final String groupName; private final String storePath; - private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = + private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(); public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java index 32bcc9f..7dfd97a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.consumer.store; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; @@ -25,14 +26,14 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; * Wrapper class for offset serialization */ public class OffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = + private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(); - public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() { + public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) { + public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) { this.offsetTable = offsetTable; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 60ad101..5bd5749 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -42,7 +43,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { private final static Logger log = ClientLogger.getLog(); private final MQClientInstance mQClientFactory; private final String groupName; - private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable = + private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(); public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java index f596b83..25877d7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.impl; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -28,7 +29,7 @@ public class MQClientManager { private final static Logger log = ClientLogger.getLog(); private static MQClientManager instance = new MQClientManager(); private AtomicInteger factoryIndexGenerator = new AtomicInteger(); - private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable = + private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String, MQClientInstance>(); private MQClientManager() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 7d43b37..35ee16f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; @@ -115,7 +115,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { throw new IllegalArgumentException("topic is null"); } - ConcurrentHashMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable(); + ConcurrentMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable(); Set<MessageQueue> mqResult = new HashSet<MessageQueue>(); for (MessageQueue mq : mqTable.keySet()) { if (mq.getTopic().equals(topic)) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 8767964..9bf34be 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -26,7 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -805,7 +805,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } - public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() { + public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() { return this.rebalanceImpl.getSubscriptionInner(); } @@ -1060,7 +1060,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private long computeAccumulationTotal() { long msgAccTotal = 0; - ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable(); + ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable(); Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java index c25e41b..a02f1b6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java @@ -17,13 +17,14 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.message.MessageQueue; /** * Message lock,strictly ensure the single queue only one thread at a time consuming */ public class MessageQueueLock { - private ConcurrentHashMap<MessageQueue, Object> mqLockTable = + private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>(); public Object fetchLockObject(final MessageQueue mq) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index 304a44a..bbdf27d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -53,7 +54,7 @@ public class PullAPIWrapper { private final MQClientInstance mQClientFactory; private final String consumerGroup; private final boolean unitMode; - private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = + private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable = new ConcurrentHashMap<MessageQueue, AtomicLong>(32); private volatile boolean connectBrokerByUser = false; private volatile long defaultBrokerId = MixAll.MASTER_ID; @@ -247,7 +248,7 @@ public class PullAPIWrapper { private String computPullFromWhichFilterServer(final String topic, final String brokerAddr) throws MQClientException { - ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable(); + ConcurrentMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable(); if (topicRouteTable != null) { TopicRouteData topicRouteData = topicRouteTable.get(topic); List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 6b12221..634e0f0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.impl.FindBrokerResult; import org.apache.rocketmq.client.impl.factory.MQClientInstance; @@ -44,10 +45,10 @@ import org.slf4j.Logger; */ public abstract class RebalanceImpl { protected static final Logger log = ClientLogger.getLog(); - protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); - protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = + protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); + protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>(); - protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner = + protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>(); protected String consumerGroup; protected MessageModel messageModel; @@ -232,7 +233,7 @@ public abstract class RebalanceImpl { this.truncateMessageQueueNotMyTopic(); } - public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() { + public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() { return subscriptionInner; } @@ -421,11 +422,11 @@ public abstract class RebalanceImpl { } } - public ConcurrentHashMap<MessageQueue, ProcessQueue> getProcessQueueTable() { + public ConcurrentMap<MessageQueue, ProcessQueue> getProcessQueueTable() { return processQueueTable; } - public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() { + public ConcurrentMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() { return topicSubscribeInfoTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 1b075ee..f146be9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -88,18 +89,18 @@ public class MQClientInstance { private final int instanceIndex; private final String clientId; private final long bootTimestamp = System.currentTimeMillis(); - private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); - private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); - private final ConcurrentHashMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>(); + private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); + private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); + private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>(); private final NettyClientConfig nettyClientConfig; private final MQClientAPIImpl mQClientAPIImpl; private final MQAdminImpl mQAdminImpl; - private final ConcurrentHashMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>(); + private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>(); private final Lock lockNamesrv = new ReentrantLock(); private final Lock lockHeartbeat = new ReentrantLock(); - private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = + private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>(); - private final ConcurrentHashMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = + private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = new ConcurrentHashMap<String, HashMap<String, Integer>>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override @@ -1088,7 +1089,7 @@ public class MQClientInstance { } consumer.suspend(); - ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); + ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) { MessageQueue mq = entry.getKey(); if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) { @@ -1166,7 +1167,7 @@ public class MQClientInstance { return defaultMQProducer; } - public ConcurrentHashMap<String, TopicRouteData> getTopicRouteTable() { + public ConcurrentMap<String, TopicRouteData> getTopicRouteTable() { return topicRouteTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index d828875..12f8a36 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -26,6 +26,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -84,7 +85,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { private final Logger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; - private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = + private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final RPCHook rpcHook; @@ -1057,7 +1058,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); } - public ConcurrentHashMap<String, TopicPublishInfo> getTopicPublishInfoTable() { + public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() { return topicPublishInfoTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java index 7478dd2..3a0356c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.common.protocol.body; import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; @@ -27,7 +28,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class ConsumerConnection extends RemotingSerializable { private HashSet<Connection> connectionSet = new HashSet<Connection>(); - private ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = + private ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>(); private ConsumeType consumeType; private MessageModel messageModel; @@ -52,7 +53,7 @@ public class ConsumerConnection extends RemotingSerializable { this.connectionSet = connectionSet; } - public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() { + public ConcurrentMap<String, SubscriptionData> getSubscriptionTable() { return subscriptionTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java index 02bf811..5b08d78 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java @@ -18,17 +18,18 @@ package org.apache.rocketmq.common.protocol.body; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class ConsumerOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = - new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512); + private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = + new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512); - public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() { + public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) { + public void setOffsetTable(ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable) { this.offsetTable = offsetTable; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java index 92c15eb..e05f759 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java @@ -18,21 +18,22 @@ package org.apache.rocketmq.common.protocol.body; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class SubscriptionGroupWrapper extends RemotingSerializable { - private ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable = + private ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024); private DataVersion dataVersion = new DataVersion(); - public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() { + public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() { return subscriptionGroupTable; } public void setSubscriptionGroupTable( - ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable) { + ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable) { this.subscriptionGroupTable = subscriptionGroupTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java index c471d1a..ce12302 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java @@ -18,20 +18,21 @@ package org.apache.rocketmq.common.protocol.body; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class TopicConfigSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap<String, TopicConfig> topicConfigTable = + private ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); private DataVersion dataVersion = new DataVersion(); - public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() { + public ConcurrentMap<String, TopicConfig> getTopicConfigTable() { return topicConfigTable; } - public void setTopicConfigTable(ConcurrentHashMap<String, TopicConfig> topicConfigTable) { + public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) { this.topicConfigTable = topicConfigTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java index 5498d34..57dfc38 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java @@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.slf4j.Logger; public class MomentStatsItemSet { - private final ConcurrentHashMap<String/* key */, MomentStatsItem> statsItemTable = + private final ConcurrentMap<String/* key */, MomentStatsItem> statsItemTable = new ConcurrentHashMap<String, MomentStatsItem>(128); private final String statsName; private final ScheduledExecutorService scheduledExecutorService; @@ -39,7 +40,7 @@ public class MomentStatsItemSet { this.init(); } - public ConcurrentHashMap<String, MomentStatsItem> getStatsItemTable() { + public ConcurrentMap<String, MomentStatsItem> getStatsItemTable() { return statsItemTable; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index 8633d68..17dbf0d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.slf4j.Logger; public class StatsItemSet { - private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable = + private final ConcurrentMap<String/* key */, StatsItem> statsItemTable = new ConcurrentHashMap<String, StatsItem>(128); private final String statsName; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java index 2c31538..490c582 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.filtersrv.filter; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -40,7 +41,7 @@ public class FilterClassManager { private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread")); - private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable = + private ConcurrentMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable = new ConcurrentHashMap<String, FilterClassInfo>(128); private FilterClassFetchMethod filterClassFetchMethod; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 5a953a9..7479fcc 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.rocketmq.common.DataVersion; @@ -135,7 +135,7 @@ public class RouteInfoManager { && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())// || registerFirst) { - ConcurrentHashMap<String, TopicConfig> tcTable = + ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 15586cb..0ba714a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -67,7 +68,7 @@ public abstract class NettyRemotingAbstract { /** * This map caches all on-going requests. */ - protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable = + protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable = new ConcurrentHashMap<Integer, ResponseFuture>(256); /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 52ca47e..1c3da9a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -41,6 +41,7 @@ import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -73,7 +74,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private final Bootstrap bootstrap = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private final Lock lockChannelTables = new ReentrantLock(); - private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>(); + private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>(); private final Timer timer = new Timer("ClientHouseKeepingService", true); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 0993a5f..abb8385 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory; public class AllocateMappedFileService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static int waitTimeOut = 1000 * 5; - private ConcurrentHashMap<String, AllocateRequest> requestTable = + private ConcurrentMap<String, AllocateRequest> requestTable = new ConcurrentHashMap<String, AllocateRequest>(); private PriorityBlockingQueue<AllocateRequest> requestQueue = new PriorityBlockingQueue<AllocateRequest>(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 931edc7..4549f1e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -64,7 +65,7 @@ public class DefaultMessageStore implements MessageStore { // CommitLog private final CommitLog commitLog; - private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; + private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable; private final FlushConsumeQueueService flushConsumeQueueService; @@ -140,9 +141,9 @@ public class DefaultMessageStore implements MessageStore { } public void truncateDirtyLogicFiles(long phyOffset) { - ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; + ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; - for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { + for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { logic.truncateDirtyLogicFiles(phyOffset); } @@ -267,7 +268,7 @@ public class DefaultMessageStore implements MessageStore { } public void destroyLogics() { - for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { + for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.destroy(); } @@ -885,13 +886,13 @@ public class DefaultMessageStore implements MessageStore { @Override public int cleanUnusedTopic(Set<String> topics) { - Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); + Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { - Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next(); + Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); String topic = next.getKey(); if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { - ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue(); + ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", // @@ -913,12 +914,12 @@ public class DefaultMessageStore implements MessageStore { public void cleanExpiredConsumerQueue() { long minCommitLogOffset = this.commitLog.getMinOffset(); - Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); + Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { - Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next(); + Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); String topic = next.getKey(); if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { - ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue(); + ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator(); while (itQT.hasNext()) { Entry<Integer, ConsumeQueue> nextQT = itQT.next(); @@ -1061,10 +1062,10 @@ public class DefaultMessageStore implements MessageStore { } public ConsumeQueue findConsumeQueue(String topic, int queueId) { - ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); + ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); if (null == map) { - ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); - ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); + ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); + ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { @@ -1205,9 +1206,9 @@ public class DefaultMessageStore implements MessageStore { private void checkSelf() { this.commitLog.checkSelf(); - Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); + Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { - Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next(); + Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); Iterator<Entry<Integer, ConsumeQueue>> itNext = next.getValue().entrySet().iterator(); while (itNext.hasNext()) { Entry<Integer, ConsumeQueue> cq = itNext.next(); @@ -1280,7 +1281,7 @@ public class DefaultMessageStore implements MessageStore { } private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) { - ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic); + ConcurrentMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic); if (null == map) { map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>(); map.put(queueId, consumeQueue); @@ -1291,7 +1292,7 @@ public class DefaultMessageStore implements MessageStore { } private void recoverConsumeQueue() { - for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { + for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { logic.recover(); } @@ -1301,7 +1302,7 @@ public class DefaultMessageStore implements MessageStore { private void recoverTopicQueueTable() { HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024); long minPhyOffset = this.commitLog.getMinOffset(); - for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { + for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + "-" + logic.getQueueId(); table.put(key, logic.getMaxOffsetInQueue()); @@ -1324,7 +1325,7 @@ public class DefaultMessageStore implements MessageStore { return runningFlags; } - public ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> getConsumeQueueTable() { + public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> getConsumeQueueTable() { return consumeQueueTable; } @@ -1375,7 +1376,7 @@ public class DefaultMessageStore implements MessageStore { @Override public ConsumeQueue getConsumeQueue(String topic, int queueId) { - ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); + ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); if (map == null) { return null; } @@ -1594,9 +1595,9 @@ public class DefaultMessageStore implements MessageStore { if (minOffset > this.lastPhysicalMinOffset) { this.lastPhysicalMinOffset = minOffset; - ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; + ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; - for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { + for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { int deleteCount = logic.deleteExpiredFile(minOffset); @@ -1639,9 +1640,9 @@ public class DefaultMessageStore implements MessageStore { logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp(); } - ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; + ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; - for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) { + for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue cq : maps.values()) { boolean result = false; for (int i = 0; i < retryTimes && !result; i++) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java index efb6aa8..7021992 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java @@ -17,17 +17,18 @@ package org.apache.rocketmq.store.schedule; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class DelayOffsetSerializeWrapper extends RemotingSerializable { - private ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable = + private ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32); - public ConcurrentHashMap<Integer, Long> getOffsetTable() { + public ConcurrentMap<Integer, Long> getOffsetTable() { return offsetTable; } - public void setOffsetTable(ConcurrentHashMap<Integer, Long> offsetTable) { + public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) { this.offsetTable = offsetTable; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/96cd2e4e/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 501876e..172954d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -23,6 +23,7 @@ import java.util.Map.Entry; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; @@ -49,10 +50,10 @@ public class ScheduleMessageService extends ConfigManager { private static final long DELAY_FOR_A_WHILE = 100L; private static final long DELAY_FOR_A_PERIOD = 10000L; - private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = + private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32); - private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable = + private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32); private final Timer timer = new Timer("ScheduleMessageTimerThread", true);