Remove unused class GetRouteInfoResponseHeader and meaningless comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/7f96008c Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/7f96008c Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/7f96008c Branch: refs/heads/master Commit: 7f96008c8b6f3ce5ac38cd168bd12252799973e3 Parents: ffad656 Author: yukon <yu...@apache.org> Authored: Fri Aug 11 20:28:13 2017 +0800 Committer: yukon <yu...@apache.org> Committed: Fri Aug 11 20:28:13 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/broker/BrokerController.java | 11 +- .../apache/rocketmq/broker/BrokerStartup.java | 8 +- .../client/rebalance/RebalanceLockManager.java | 44 ++-- .../broker/filtersrv/FilterServerManager.java | 2 - .../processor/AbstractSendMessageProcessor.java | 8 +- .../broker/processor/AdminBrokerProcessor.java | 49 ++-- .../broker/processor/PullMessageProcessor.java | 16 +- .../broker/processor/SendMessageProcessor.java | 28 +-- .../rocketmq/broker/BrokerControllerTest.java | 8 +- .../consumer/store/LocalFileOffsetStore.java | 10 +- .../consumer/store/RemoteBrokerOffsetStore.java | 4 +- .../rocketmq/client/impl/MQAdminImpl.java | 2 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 247 +++++++++---------- .../ConsumeMessageConcurrentlyService.java | 66 +++-- .../consumer/ConsumeMessageOrderlyService.java | 86 +++---- .../impl/consumer/ConsumeMessageService.java | 8 +- .../consumer/DefaultMQPullConsumerImpl.java | 104 ++++---- .../consumer/DefaultMQPushConsumerImpl.java | 62 ++--- .../client/impl/consumer/RebalanceImpl.java | 22 +- .../client/impl/consumer/RebalancePushImpl.java | 4 +- .../client/impl/factory/MQClientInstance.java | 27 +- .../impl/producer/DefaultMQProducerImpl.java | 92 +++---- .../client/impl/producer/MQProducerInner.java | 6 +- .../org/apache/rocketmq/common/TopicConfig.java | 10 - .../org/apache/rocketmq/common/help/FAQUrl.java | 26 +- .../common/message/MessageClientIDSetter.java | 2 +- .../protocol/body/ConsumerRunningInfo.java | 94 +++---- .../header/GetConsumeStatsRequestHeader.java | 2 - .../header/GetConsumerStatusRequestHeader.java | 1 - .../GetEarliestMsgStoretimeRequestHeader.java | 1 - .../header/QueryCorrectionOffsetHeader.java | 2 +- .../header/SearchOffsetRequestHeader.java | 2 +- .../header/UnregisterClientRequestHeader.java | 2 +- .../header/UnregisterClientResponseHeader.java | 2 +- .../namesrv/GetRouteInfoResponseHeader.java | 33 --- .../RegisterOrderTopicRequestHeader.java | 2 +- .../rocketmq/common/sysflag/TopicSysFlag.java | 4 - .../rocketmq/common/utils/IOTinyUtils.java | 2 - .../rocketmq/example/simple/PushConsumer.java | 2 - .../src/main/resources/MessageFilterImpl.java | 2 +- .../rocketmq/filter/parser/SelectorParser.java | 1 - .../rocketmq/filter/parser/SelectorParser.jj | 1 - .../namesrv/kvconfig/KVConfigManager.java | 6 +- .../namesrv/routeinfo/RouteInfoManager.java | 4 +- .../rocketmq/remoting/netty/NettyDecoder.java | 2 +- .../remoting/netty/NettyRemotingAbstract.java | 24 +- .../remoting/netty/NettyRemotingClient.java | 8 +- .../remoting/netty/NettyServerConfig.java | 4 - .../remoting/netty/NettySystemConfig.java | 18 +- .../org/apache/rocketmq/store/CommitLog.java | 101 +++----- .../rocketmq/store/DefaultMessageStore.java | 54 ++-- .../apache/rocketmq/store/DispatchRequest.java | 18 -- .../org/apache/rocketmq/store/MappedFile.java | 2 - .../apache/rocketmq/store/MappedFileQueue.java | 1 - .../apache/rocketmq/store/ha/HAConnection.java | 8 - .../org/apache/rocketmq/store/ha/HAService.java | 18 -- .../apache/rocketmq/store/index/IndexFile.java | 1 - .../rocketmq/store/index/IndexHeader.java | 4 - .../store/schedule/ScheduleMessageService.java | 3 - .../rocketmq/store/stats/BrokerStats.java | 2 - .../tools/admin/DefaultMQAdminExtImpl.java | 10 +- .../command/message/PrintMessageSubCommand.java | 5 +- 62 files changed, 617 insertions(+), 781 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index c8624c4..cd68552 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -135,11 +135,11 @@ public class BrokerController { private BrokerFastFailure brokerFastFailure; private Configuration configuration; - public BrokerController(// - final BrokerConfig brokerConfig, // - final NettyServerConfig nettyServerConfig, // - final NettyClientConfig nettyClientConfig, // - final MessageStoreConfig messageStoreConfig // + public BrokerController( + final BrokerConfig brokerConfig, + final NettyServerConfig nettyServerConfig, + final NettyClientConfig nettyClientConfig, + final MessageStoreConfig messageStoreConfig ) { this.brokerConfig = brokerConfig; this.nettyServerConfig = nettyServerConfig; @@ -255,7 +255,6 @@ public class BrokerController { this.registerProcessor(); - // TODO remove in future final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index 85d2e3a..e0a3b69 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -190,10 +190,10 @@ public class BrokerStartup { MixAll.printObjectProperties(log, nettyClientConfig); MixAll.printObjectProperties(log, messageStoreConfig); - final BrokerController controller = new BrokerController(// - brokerConfig, // - nettyServerConfig, // - nettyClientConfig, // + final BrokerController controller = new BrokerController( + brokerConfig, + nettyServerConfig, + nettyClientConfig, messageStoreConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java index ed5a875..519745e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -52,9 +52,9 @@ public class RebalanceLockManager { lockEntry = new LockEntry(); lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); - log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", // - group, // - clientId, // + log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", + group, + clientId, mq); } @@ -69,19 +69,19 @@ public class RebalanceLockManager { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( - "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // + "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", + group, + oldClientId, + clientId, mq); return true; } log.warn( - "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // + "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", + group, + oldClientId, + clientId, mq); return false; } finally { @@ -144,9 +144,9 @@ public class RebalanceLockManager { lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); log.info( - "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", // - group, // - clientId, // + "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", + group, + clientId, mq); } @@ -162,20 +162,20 @@ public class RebalanceLockManager { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( - "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // + "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", + group, + oldClientId, + clientId, mq); lockedMqs.add(mq); continue; } log.warn( - "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", // - group, // - oldClientId, // - clientId, // + "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", + group, + oldClientId, + clientId, mq); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java index 52cb919..ff63127 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java @@ -111,9 +111,7 @@ public class FilterServerManager { } } - /** - */ public void scanNotActiveChannel() { Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 3faa7ae..410192f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -189,10 +189,10 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress()); - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(// - requestHeader.getTopic(), // - requestHeader.getDefaultTopic(), // - RemotingHelper.parseChannelRemoteAddr(ctx.channel()), // + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod( + requestHeader.getTopic(), + requestHeader.getDefaultTopic(), + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getDefaultTopicQueueNums(), topicSysFlag); if (null == topicConfig) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 71fdda9..937f575 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -116,6 +116,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class AdminBrokerProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -432,9 +433,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final RemotingCommand response = RemotingCommand.createResponseCommand(null); LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); - Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(// - requestBody.getConsumerGroup(), // - requestBody.getMqSet(), // + Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch( + requestBody.getConsumerGroup(), + requestBody.getMqSet(), requestBody.getClientId()); LockBatchResponseBody responseBody = new LockBatchResponseBody(); @@ -450,9 +451,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { final RemotingCommand response = RemotingCommand.createResponseCommand(null); UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); - this.brokerController.getRebalanceLockManager().unlockBatch(// - requestBody.getConsumerGroup(), // - requestBody.getMqSet(), // + this.brokerController.getRebalanceLockManager().unlockBatch( + requestBody.getConsumerGroup(), + requestBody.getMqSet(), requestBody.getClientId()); response.setCode(ResponseCode.SUCCESS); @@ -657,14 +658,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { continue; } - /** - */ { SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); - if (null == findSubscriptionData // + if (null == findSubscriptionData && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) { log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic); continue; @@ -683,9 +682,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { if (brokerOffset < 0) brokerOffset = 0; - long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// - requestHeader.getConsumerGroup(), // - topic, // + long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset( + requestHeader.getConsumerGroup(), + topic, i); if (consumerOffset < 0) consumerOffset = 0; @@ -925,9 +924,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - /** - */ private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final GetConsumerRunningInfoRequestHeader requestHeader = (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); @@ -1007,9 +1004,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { continue; } - /** - */ if (!requestHeader.isOffline()) { SubscriptionData findSubscriptionData = @@ -1107,13 +1102,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { if (isOrder && !topicConfig.isOrder()) { continue; } - /** - */ { SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic); - if (null == findSubscriptionData // + if (null == findSubscriptionData && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) { log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic); continue; @@ -1129,9 +1122,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); if (brokerOffset < 0) brokerOffset = 0; - long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(// - group, // - topic, // + long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset( + group, + topic, i); if (consumerOffset < 0) consumerOffset = 0; @@ -1215,10 +1208,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return runtimeInfo; } - private RemotingCommand callConsumer(// - final int requestCode, // - final RemotingCommand request, // - final String consumerGroup, // + private RemotingCommand callConsumer( + final int requestCode, + final RemotingCommand request, + final String consumerGroup, final String clientId) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId); @@ -1231,8 +1224,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) { response.setCode(ResponseCode.SYSTEM_ERROR); - response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", // - clientId, // + response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", + clientId, MQVersion.getVersionDesc(clientChannelInfo.getVersion()))); return response; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index fb7ea20..fe2fcfe 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -160,7 +160,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { assert consumerFilterData != null; } } catch (Exception e) { - log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // + log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark("parse the consumer's subscription failed"); @@ -176,7 +176,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { return response; } - if (!subscriptionGroupConfig.isConsumeBroadcastEnable() // + if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way"); @@ -285,12 +285,12 @@ public class PullMessageProcessor implements NettyRequestProcessor { response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me - log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", // - requestHeader.getQueueOffset(), // - getMessageResult.getNextBeginOffset(), // - requestHeader.getTopic(), // - requestHeader.getQueueId(), // - requestHeader.getConsumerGroup()// + log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", + requestHeader.getQueueOffset(), + getMessageResult.getNextBeginOffset(), + requestHeader.getTopic(), + requestHeader.getQueueId(), + requestHeader.getConsumerGroup() ); } else { response.setCode(ResponseCode.PULL_NOT_FOUND); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 5c716cc..cd60c44 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -139,9 +139,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(// - newTopic, // - subscriptionGroupConfig.getRetryQueueNums(), // + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( + newTopic, + subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); @@ -175,13 +175,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } - if (msgExt.getReconsumeTimes() >= maxReconsumeTimes// + if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // - DLQ_NUMS_PER_GROUP, // + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, + DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); if (null == topicConfig) { @@ -268,8 +268,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (reconsumeTimes >= maxReconsumeTimes) { newTopic = MixAll.getDLQTopic(groupName); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; - topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, // - DLQ_NUMS_PER_GROUP, // + topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, + DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); msg.setTopic(newTopic); @@ -289,9 +289,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return true; } - private RemotingCommand sendMessage(final ChannelHandlerContext ctx, // - final RemotingCommand request, // - final SendMessageContext sendMessageContext, // + private RemotingCommand sendMessage(final ChannelHandlerContext ctx, + final RemotingCommand request, + final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); @@ -464,9 +464,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } return response; } - private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, // - final RemotingCommand request, // - final SendMessageContext sendMessageContext, // + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, + final RemotingCommand request, + final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index fe30d8f..d4edd9a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -38,10 +38,10 @@ public class BrokerControllerTest { @Test public void testBrokerRestart() throws Exception { for (int i = 0; i < 2; i++) { - BrokerController brokerController = new BrokerController(// - new BrokerConfig(), // - new NettyServerConfig(), // - new NettyClientConfig(), // + BrokerController brokerController = new BrokerController( + new BrokerConfig(), + new NettyServerConfig(), + new NettyClientConfig(), new MessageStoreConfig()); assertThat(brokerController.initialize()); brokerController.start(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java index d4b19b2..22ec674 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java @@ -52,9 +52,9 @@ public class LocalFileOffsetStore implements OffsetStore { public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) { this.mQClientFactory = mQClientFactory; this.groupName = groupName; - this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + // - this.mQClientFactory.getClientId() + File.separator + // - this.groupName + File.separator + // + this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + + this.mQClientFactory.getClientId() + File.separator + + this.groupName + File.separator + "offsets.json"; } @@ -217,8 +217,8 @@ public class LocalFileOffsetStore implements OffsetStore { OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class); } catch (Exception e) { log.warn("readLocalOffset Exception", e); - throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" // - + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), // + throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" + + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), e); } return offsetSerializeWrapper; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 5bd5749..b82e992 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -204,7 +204,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { MQBrokerException, InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { - // TODO Here may be heavily overhead for Name Server,need tuning + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); } @@ -232,7 +232,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore { InterruptedException, MQClientException { FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { - // TODO Here may be heavily overhead for Name Server,need tuning + this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index 983e515..92d8513 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -159,7 +159,7 @@ public class MQAdminImpl { } } catch (Exception e) { throw new MQClientException( - "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), // + "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), e); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index ae9ed6c..c5abc36 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -285,32 +285,32 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendMessageContext context, // 7 - final DefaultMQProducerImpl producer // 8 + public SendResult sendMessage( + final String addr, + final String brokerName, + final Message msg, + final SendMessageRequestHeader requestHeader, + final long timeoutMillis, + final CommunicationMode communicationMode, + final SendMessageContext context, + final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer); } - public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendCallback sendCallback, // 7 - final TopicPublishInfo topicPublishInfo, // 8 - final MQClientInstance instance, // 9 - final int retryTimesWhenSendFailed, // 10 - final SendMessageContext context, // 11 - final DefaultMQProducerImpl producer // 12 + public SendResult sendMessage( + final String addr, + final String brokerName, + final Message msg, + final SendMessageRequestHeader requestHeader, + final long timeoutMillis, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final MQClientInstance instance, + final int retryTimesWhenSendFailed, + final SendMessageContext context, + final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = null; if (sendSmartMsg || msg instanceof MessageBatch) { @@ -341,31 +341,31 @@ public class MQClientAPIImpl { return null; } - private SendResult sendMessageSync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request// + private SendResult sendMessageSync( + final String addr, + final String brokerName, + final Message msg, + final long timeoutMillis, + final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); } - private void sendMessageAsync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // - final MQClientInstance instance, // - final int retryTimesWhenSendFailed, // - final AtomicInteger times, // - final SendMessageContext context, // - final DefaultMQProducerImpl producer // + private void sendMessageAsync( + final String addr, + final String brokerName, + final Message msg, + final long timeoutMillis, + final RemotingCommand request, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final MQClientInstance instance, + final int retryTimesWhenSendFailed, + final AtomicInteger times, + final SendMessageContext context, + final DefaultMQProducerImpl producer ) throws InterruptedException, RemotingException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override @@ -380,7 +380,6 @@ public class MQClientAPIImpl { context.getProducer().executeSendMessageHookAfter(context); } } catch (Throwable e) { - // } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); @@ -428,19 +427,19 @@ public class MQClientAPIImpl { }); } - private void onExceptionImpl(final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // - final MQClientInstance instance, // - final int timesTotal, // - final AtomicInteger curTimes, // - final Exception e, // - final SendMessageContext context, // - final boolean needRetry, // - final DefaultMQProducerImpl producer // 12 + private void onExceptionImpl(final String brokerName, + final Message msg, + final long timeoutMillis, + final RemotingCommand request, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, + final MQClientInstance instance, + final int timesTotal, + final AtomicInteger curTimes, + final Exception e, + final SendMessageContext context, + final boolean needRetry, + final DefaultMQProducerImpl producer ) { int tmp = curTimes.incrementAndGet(); if (needRetry && tmp <= timesTotal) { @@ -485,16 +484,15 @@ public class MQClientAPIImpl { } } - private SendResult processSendResponse(// - final String brokerName, // - final Message msg, // - final RemotingCommand response// + private SendResult processSendResponse( + final String brokerName, + final Message msg, + final RemotingCommand response ) throws MQBrokerException, RemotingCommandException { switch (response.getCode()) { case ResponseCode.FLUSH_DISK_TIMEOUT: case ResponseCode.FLUSH_SLAVE_TIMEOUT: case ResponseCode.SLAVE_NOT_AVAILABLE: { - // TODO LOG } case ResponseCode.SUCCESS: { SendStatus sendStatus = SendStatus.SEND_OK; @@ -553,12 +551,12 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public PullResult pullMessage(// - final String addr, // - final PullMessageRequestHeader requestHeader, // - final long timeoutMillis, // - final CommunicationMode communicationMode, // - final PullCallback pullCallback// + public PullResult pullMessage( + final String addr, + final PullMessageRequestHeader requestHeader, + final long timeoutMillis, + final CommunicationMode communicationMode, + final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); @@ -579,11 +577,11 @@ public class MQClientAPIImpl { return null; } - private void pullMessageAsync(// - final String addr, // 1 - final RemotingCommand request, // - final long timeoutMillis, // - final PullCallback pullCallback// + private void pullMessageAsync( + final String addr, + final RemotingCommand request, + final long timeoutMillis, + final PullCallback pullCallback ) throws RemotingException, InterruptedException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override @@ -611,10 +609,10 @@ public class MQClientAPIImpl { }); } - private PullResult pullMessageSync(// - final String addr, // 1 - final RemotingCommand request, // 2 - final long timeoutMillis// 3 + private PullResult pullMessageSync( + final String addr, + final RemotingCommand request, + final long timeoutMillis ) throws RemotingException, InterruptedException, MQBrokerException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; @@ -720,9 +718,9 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public List<String> getConsumerIdListByGroup(// - final String addr, // - final String consumerGroup, // + public List<String> getConsumerIdListByGroup( + final String addr, + final String consumerGroup, final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException { GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader(); @@ -796,10 +794,10 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long queryConsumerOffset(// - final String addr, // - final QueryConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// + public long queryConsumerOffset( + final String addr, + final QueryConsumerOffsetRequestHeader requestHeader, + final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader); @@ -820,10 +818,10 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void updateConsumerOffset(// - final String addr, // - final UpdateConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// + public void updateConsumerOffset( + final String addr, + final UpdateConsumerOffsetRequestHeader requestHeader, + final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); @@ -841,10 +839,10 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void updateConsumerOffsetOneway(// - final String addr, // - final UpdateConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// + public void updateConsumerOffsetOneway( + final String addr, + final UpdateConsumerOffsetRequestHeader requestHeader, + final long timeoutMillis ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); @@ -852,10 +850,10 @@ public class MQClientAPIImpl { this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); } - public int sendHearbeat(// - final String addr, // - final HeartbeatData heartbeatData, // - final long timeoutMillis// + public int sendHearbeat( + final String addr, + final HeartbeatData heartbeatData, + final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); @@ -873,12 +871,12 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void unregisterClient(// - final String addr, // - final String clientID, // - final String producerGroup, // - final String consumerGroup, // - final long timeoutMillis// + public void unregisterClient( + final String addr, + final String clientID, + final String producerGroup, + final String consumerGroup, + final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader(); requestHeader.setClientID(clientID); @@ -899,11 +897,11 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void endTransactionOneway(// - final String addr, // - final EndTransactionRequestHeader requestHeader, // - final String remark, // - final long timeoutMillis// + public void endTransactionOneway( + final String addr, + final EndTransactionRequestHeader requestHeader, + final String remark, + final long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); @@ -965,9 +963,9 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public Set<MessageQueue> lockBatchMQ(// - final String addr, // - final LockBatchRequestBody requestBody, // + public Set<MessageQueue> lockBatchMQ( + final String addr, + final LockBatchRequestBody requestBody, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); @@ -987,11 +985,11 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void unlockBatchMQ(// - final String addr, // - final UnlockBatchRequestBody requestBody, // - final long timeoutMillis, // - final boolean oneway// + public void unlockBatchMQ( + final String addr, + final UnlockBatchRequestBody requestBody, + final long timeoutMillis, + final boolean oneway ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null); @@ -1213,7 +1211,7 @@ public class MQClientAPIImpl { if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) { log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic); } - // TODO :- Log when if condition is not satisfied + break; } case ResponseCode.SUCCESS: { @@ -1566,12 +1564,12 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void registerMessageFilterClass(final String addr, // - final String consumerGroup, // - final String topic, // - final String className, // - final int classCRC, // - final byte[] classBody, // + public void registerMessageFilterClass(final String addr, + final String consumerGroup, + final String topic, + final String className, + final int classCRC, + final byte[] classBody, final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader(); @@ -1706,10 +1704,10 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, // - String consumerGroup, // - String clientId, // - String msgId, // + public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, + String consumerGroup, + String clientId, + String msgId, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); @@ -1912,7 +1910,6 @@ public class MQClientAPIImpl { public Set<String> getClusterList(String topic, long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { - // todo:jodie return Collections.EMPTY_SET; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java index f566ed0..961e062 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java @@ -69,12 +69,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); - this.consumeExecutor = new ThreadPoolExecutor(// - this.defaultMQPushConsumer.getConsumeThreadMin(), // - this.defaultMQPushConsumer.getConsumeThreadMax(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.consumeRequestQueue, // + this.consumeExecutor = new ThreadPoolExecutor( + this.defaultMQPushConsumer.getConsumeThreadMin(), + this.defaultMQPushConsumer.getConsumeThreadMax(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); @@ -100,8 +100,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService @Override public void updateCorePoolSize(int corePoolSize) { - if (corePoolSize > 0 // - && corePoolSize <= Short.MAX_VALUE // + if (corePoolSize > 0 + && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { this.consumeExecutor.setCorePoolSize(corePoolSize); } @@ -115,11 +115,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize() // + 1); // } - // // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup: - // {}", // - // corePoolSize,// - // this.consumeExecutor.getCorePoolSize(),// + // {}", + // corePoolSize, + // this.consumeExecutor.getCorePoolSize(), // this.consumerGroup); } @@ -131,11 +130,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize() // - 1); // } - // // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup: - // {}", // - // corePoolSize,// - // this.consumeExecutor.getCorePoolSize(),// + // {}", + // corePoolSize, + // this.consumeExecutor.getCorePoolSize(), // this.consumerGroup); } @@ -185,10 +183,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageConcurrentlyService.this.consumerGroup, // - msgs, // + log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + RemotingHelper.exceptionSimpleDesc(e), + ConsumeMessageConcurrentlyService.this.consumerGroup, + msgs, mq), e); } @@ -200,10 +198,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService } @Override - public void submitConsumeRequest(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // + public void submitConsumeRequest( + final List<MessageExt> msgs, + final ProcessQueue processQueue, + final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); if (msgs.size() <= consumeBatchSize) { @@ -258,10 +256,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService } } - public void processConsumeResult(// - final ConsumeConcurrentlyStatus status, // - final ConsumeConcurrentlyContext context, // - final ConsumeRequest consumeRequest// + public void processConsumeResult( + final ConsumeConcurrentlyStatus status, + final ConsumeConcurrentlyContext context, + final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); @@ -338,10 +336,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService return false; } - private void submitConsumeRequestLater(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue// + private void submitConsumeRequestLater( + final List<MessageExt> msgs, + final ProcessQueue processQueue, + final MessageQueue messageQueue ) { this.scheduledExecutorService.schedule(new Runnable() { @@ -353,7 +351,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService }, 5000, TimeUnit.MILLISECONDS); } - private void submitConsumeRequestLater(final ConsumeRequest consumeRequest// + private void submitConsumeRequestLater(final ConsumeRequest consumeRequest ) { this.scheduledExecutorService.schedule(new Runnable() { @@ -419,7 +417,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", - RemotingHelper.exceptionSimpleDesc(e), // + RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java index 1fa474c..abdad79 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java @@ -70,12 +70,12 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); - this.consumeExecutor = new ThreadPoolExecutor(// - this.defaultMQPushConsumer.getConsumeThreadMin(), // - this.defaultMQPushConsumer.getConsumeThreadMax(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // - this.consumeRequestQueue, // + this.consumeExecutor = new ThreadPoolExecutor( + this.defaultMQPushConsumer.getConsumeThreadMin(), + this.defaultMQPushConsumer.getConsumeThreadMax(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); @@ -107,8 +107,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { @Override public void updateCorePoolSize(int corePoolSize) { - if (corePoolSize > 0 // - && corePoolSize <= Short.MAX_VALUE // + if (corePoolSize > 0 + && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { this.consumeExecutor.setCorePoolSize(corePoolSize); } @@ -171,10 +171,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { result.setConsumeResult(CMResult.CR_THROW_EXCEPTION); result.setRemark(RemotingHelper.exceptionSimpleDesc(e)); - log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // + log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", + RemotingHelper.exceptionSimpleDesc(e), + ConsumeMessageOrderlyService.this.consumerGroup, + msgs, mq), e); } @@ -187,10 +187,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { } @Override - public void submitConsumeRequest(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // + public void submitConsumeRequest( + final List<MessageExt> msgs, + final ProcessQueue processQueue, + final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); @@ -226,10 +226,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { return false; } - private void submitConsumeRequestLater(// - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // - final long suspendTimeMillis// + private void submitConsumeRequestLater( + final ProcessQueue processQueue, + final MessageQueue messageQueue, + final long suspendTimeMillis ) { long timeMillis = suspendTimeMillis; if (timeMillis == -1) { @@ -251,11 +251,11 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { }, timeMillis, TimeUnit.MILLISECONDS); } - public boolean processConsumeResult(// - final List<MessageExt> msgs, // - final ConsumeOrderlyStatus status, // - final ConsumeOrderlyContext context, // - final ConsumeRequest consumeRequest// + public boolean processConsumeResult( + final List<MessageExt> msgs, + final ConsumeOrderlyStatus status, + final ConsumeOrderlyContext context, + final ConsumeRequest consumeRequest ) { boolean continueConsume = true; long commitOffset = -1L; @@ -273,9 +273,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); - this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // + this.submitConsumeRequestLater( + consumeRequest.getProcessQueue(), + consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } else { @@ -295,9 +295,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { break; case ROLLBACK: consumeRequest.getProcessQueue().rollback(); - this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // + this.submitConsumeRequestLater( + consumeRequest.getProcessQueue(), + consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; break; @@ -305,9 +305,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); if (checkReconsumeTimes(msgs)) { consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs); - this.submitConsumeRequestLater(// - consumeRequest.getProcessQueue(), // - consumeRequest.getMessageQueue(), // + this.submitConsumeRequestLater( + consumeRequest.getProcessQueue(), + consumeRequest.getMessageQueue(), context.getSuspendCurrentQueueTimeMillis()); continueConsume = false; } @@ -468,22 +468,22 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService { status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { - log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", // - RemotingHelper.exceptionSimpleDesc(e), // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // + log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", + RemotingHelper.exceptionSimpleDesc(e), + ConsumeMessageOrderlyService.this.consumerGroup, + msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); } - if (null == status // - || ConsumeOrderlyStatus.ROLLBACK == status// + if (null == status + || ConsumeOrderlyStatus.ROLLBACK == status || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) { - log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", // - ConsumeMessageOrderlyService.this.consumerGroup, // - msgs, // + log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", + ConsumeMessageOrderlyService.this.consumerGroup, + msgs, messageQueue); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java index 8742191..0f6f3bb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java @@ -36,9 +36,9 @@ public interface ConsumeMessageService { ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName); - void submitConsumeRequest(// - final List<MessageExt> msgs, // - final ProcessQueue processQueue, // - final MessageQueue messageQueue, // + void submitConsumeRequest( + final List<MessageExt> msgs, + final ProcessQueue processQueue, + final MessageQueue messageQueue, final boolean dispathToConsume); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 35ee16f..8640d2d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -97,8 +97,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The consumer service state not OK, "// - + this.serviceState// + throw new MQClientException("The consumer service state not OK, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } @@ -185,7 +185,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { SubscriptionData subscriptionData; try { - subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); @@ -193,18 +193,18 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; - PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(// - mq, // 1 - subscriptionData.getSubString(), // 2 - 0L, // 3 - offset, // 4 - maxNums, // 5 - sysFlag, // 6 - 0, // 7 - this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 - timeoutMillis, // 9 - CommunicationMode.SYNC, // 10 - null// 11 + PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( + mq, + subscriptionData.getSubString(), + 0L, + offset, + maxNums, + sysFlag, + 0, + this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), + timeoutMillis, + CommunicationMode.SYNC, + null ); this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); if (!this.consumeMessageHookList.isEmpty()) { @@ -225,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public void subscriptionAutomatically(final String topic) { if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) { try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData); } catch (Exception ignore) { @@ -372,13 +372,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout); } - private void pullAsyncImpl(// - final MessageQueue mq, // - final String subExpression, // - final long offset, // - final int maxNums, // - final PullCallback pullCallback, // - final boolean block, // + private void pullAsyncImpl( + final MessageQueue mq, + final String subExpression, + final long offset, + final int maxNums, + final PullCallback pullCallback, + final boolean block, final long timeout) throws MQClientException, RemotingException, InterruptedException { this.makeSureStateOK(); @@ -405,7 +405,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { final SubscriptionData subscriptionData; try { - subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); @@ -413,17 +413,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; - this.pullAPIWrapper.pullKernelImpl(// - mq, // 1 - subscriptionData.getSubString(), // 2 - 0L, // 3 - offset, // 4 - maxNums, // 5 - sysFlag, // 6 - 0, // 7 - this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8 - timeoutMillis, // 9 - CommunicationMode.ASYNC, // 10 + this.pullAPIWrapper.pullKernelImpl( + mq, + subscriptionData.getSubString(), + 0L, + offset, + maxNums, + sysFlag, + 0, + this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), + timeoutMillis, + CommunicationMode.ASYNC, new PullCallback() { @Override @@ -551,8 +551,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); - this.pullAPIWrapper = new PullAPIWrapper(// - mQClientFactory, // + this.pullAPIWrapper = new PullAPIWrapper( + mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); @@ -589,8 +589,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: - throw new MQClientException("The PullConsumer service state not OK, maybe started once, "// - + this.serviceState// + throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: @@ -606,42 +606,42 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { // consumerGroup if (null == this.defaultMQPullConsumer.getConsumerGroup()) { throw new MQClientException( - "consumerGroup is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + "consumerGroup is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } // consumerGroup if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { throw new MQClientException( - "consumerGroup can not equal "// - + MixAll.DEFAULT_CONSUMER_GROUP // - + ", please specify another one."// - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + "consumerGroup can not equal " + + MixAll.DEFAULT_CONSUMER_GROUP + + ", please specify another one." + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } // messageModel if (null == this.defaultMQPullConsumer.getMessageModel()) { throw new MQClientException( - "messageModel is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + "messageModel is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } // allocateMessageQueueStrategy if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) { throw new MQClientException( - "allocateMessageQueueStrategy is null" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + "allocateMessageQueueStrategy is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } // allocateMessageQueueStrategy if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) { throw new MQClientException( - "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" // - + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), // + "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null); } } @@ -651,7 +651,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics(); if (registerTopics != null) { for (final String topic : registerTopics) { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); }