Reformat code globally second time
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/97aa813e Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/97aa813e Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/97aa813e Branch: refs/heads/develop Commit: 97aa813eb89c14e0539a8847876d7a28a1efe638 Parents: 7f96008 Author: yukon <yu...@apache.org> Authored: Fri Aug 11 20:46:04 2017 +0800 Committer: yukon <yu...@apache.org> Committed: Fri Aug 11 20:50:22 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/broker/BrokerPathConfigHelper.java | 1 - .../rocketmq/broker/client/ConsumerManager.java | 3 +- .../broker/filter/ConsumerFilterData.java | 3 - .../broker/filter/ConsumerFilterManager.java | 14 +-- .../filter/ExpressionForRetryMessageFilter.java | 4 +- .../broker/filter/ExpressionMessageFilter.java | 2 +- .../broker/filtersrv/FilterServerManager.java | 1 - .../broker/latency/BrokerFastFailure.java | 1 + .../latency/BrokerFixedThreadPoolExecutor.java | 15 ++- .../NotifyMessageArrivingListener.java | 2 +- .../longpolling/PullRequestHoldService.java | 2 +- .../broker/offset/ConsumerOffsetManager.java | 3 +- .../rocketmq/broker/out/BrokerOuterAPI.java | 12 +- .../broker/pagecache/ManyMessageTransfer.java | 4 +- .../broker/pagecache/OneMessageTransfer.java | 4 +- .../broker/pagecache/QueryMessageTransfer.java | 4 +- .../plugin/AbstractPluginMessageStore.java | 2 +- .../broker/processor/AdminBrokerProcessor.java | 82 ++++++++----- .../processor/EndTransactionProcessor.java | 3 +- .../broker/processor/PullMessageProcessor.java | 35 +++--- .../broker/processor/SendMessageProcessor.java | 31 +++-- .../subscription/SubscriptionGroupManager.java | 2 +- .../broker/topic/TopicConfigManager.java | 6 +- .../rocketmq/broker/BrokerControllerTest.java | 2 +- .../CommitLogDispatcherCalcBitMapTest.java | 1 - .../filter/MessageStoreWithFilterTest.java | 7 +- .../processor/PullMessageProcessorTest.java | 9 +- .../processor/SendMessageProcessorTest.java | 13 +- .../org/apache/rocketmq/client/MQAdmin.java | 21 +--- .../org/apache/rocketmq/client/MQHelper.java | 1 - .../org/apache/rocketmq/client/Validators.java | 14 --- .../client/consumer/DefaultMQPullConsumer.java | 9 +- .../client/consumer/DefaultMQPushConsumer.java | 73 ++++++----- .../rocketmq/client/consumer/MQConsumer.java | 16 --- .../client/consumer/MQPullConsumer.java | 74 +---------- .../client/consumer/MQPushConsumer.java | 18 +-- .../client/consumer/MessageSelector.java | 13 +- .../listener/MessageListenerConcurrently.java | 4 +- .../listener/MessageListenerOrderly.java | 7 +- .../AllocateMessageQueueConsistentHash.java | 8 +- .../client/consumer/store/OffsetStore.java | 15 --- .../client/impl/ClientRemotingProcessor.java | 25 ++-- .../rocketmq/client/impl/MQAdminImpl.java | 12 +- .../consumer/ConsumeMessageOrderlyService.java | 6 +- .../client/impl/consumer/RebalanceImpl.java | 9 +- .../client/impl/consumer/RebalancePullImpl.java | 3 +- .../client/impl/consumer/RebalancePushImpl.java | 3 +- .../client/impl/factory/MQClientInstance.java | 6 +- .../impl/producer/DefaultMQProducerImpl.java | 21 ++-- .../rocketmq/client/log/ClientLogger.java | 2 +- .../client/producer/DefaultMQProducer.java | 58 ++++++--- .../rocketmq/client/producer/SendResult.java | 6 +- .../consumer/DefaultMQPullConsumerTest.java | 20 +-- .../consumer/DefaultMQPushConsumerTest.java | 41 ++++--- .../AllocateMessageQueueConsitentHashTest.java | 45 ++++--- .../store/RemoteBrokerOffsetStoreTest.java | 5 +- .../client/impl/MQClientAPIImplTest.java | 15 ++- .../impl/factory/MQClientInstanceTest.java | 2 +- .../client/producer/DefaultMQProducerTest.java | 18 ++- .../apache/rocketmq/common/BrokerConfig.java | 3 +- .../apache/rocketmq/common/Configuration.java | 4 - .../apache/rocketmq/common/CountDownLatch2.java | 8 +- .../org/apache/rocketmq/common/DataVersion.java | 2 +- .../java/org/apache/rocketmq/common/MixAll.java | 8 +- .../apache/rocketmq/common/ServiceThread.java | 2 +- .../org/apache/rocketmq/common/UtilAll.java | 2 - .../consistenthash/ConsistentHashRouter.java | 19 ++- .../rocketmq/common/consistenthash/Node.java | 1 - .../common/consistenthash/VirtualNode.java | 1 - .../rocketmq/common/filter/FilterAPI.java | 2 +- .../rocketmq/common/filter/impl/PolishExpr.java | 1 - .../rocketmq/common/message/MessageDecoder.java | 9 +- .../common/message/MessageExtBatch.java | 16 ++- .../rocketmq/common/protocol/RequestCode.java | 1 - .../header/SearchOffsetRequestHeader.java | 1 - .../header/SendMessageRequestHeaderV2.java | 1 - .../header/UnregisterClientRequestHeader.java | 1 - .../header/UnregisterClientResponseHeader.java | 1 - .../common/protocol/heartbeat/MessageModel.java | 1 - .../rocketmq/common/sysflag/TopicSysFlag.java | 1 - .../rocketmq/common/utils/HttpTinyClient.java | 6 - .../rocketmq/common/utils/IOTinyUtils.java | 3 +- .../apache/rocketmq/common/MQVersionTest.java | 2 +- .../rocketmq/common/MessageBatchTest.java | 13 +- .../rocketmq/common/filter/FilterAPITest.java | 2 +- distribution/bin/mqadmin.xml | 36 +++--- distribution/bin/mqbroker.xml | 40 +++--- distribution/bin/mqfiltersrv.xml | 40 +++--- distribution/bin/mqnamesrv.xml | 40 +++--- .../conf/2m-2s-async/broker-a-s.properties | 1 - .../conf/2m-2s-async/broker-a.properties | 1 - .../conf/2m-2s-async/broker-b-s.properties | 1 - .../conf/2m-2s-async/broker-b.properties | 1 - .../conf/2m-2s-sync/broker-a-s.properties | 1 - .../conf/2m-2s-sync/broker-a.properties | 1 - .../conf/2m-2s-sync/broker-b-s.properties | 1 - .../conf/2m-2s-sync/broker-b.properties | 1 - .../conf/2m-noslave/broker-a.properties | 1 - .../conf/2m-noslave/broker-b.properties | 1 - distribution/conf/logback_broker.xml | 6 +- .../example/batch/SimpleBatchProducer.java | 3 +- .../example/batch/SplitBatchProducer.java | 17 ++- .../rocketmq/example/benchmark/Producer.java | 2 +- .../rocketmq/example/filter/SqlConsumer.java | 2 +- .../rocketmq/example/simple/AsyncProducer.java | 3 +- .../rocketmq/example/simple/PushConsumer.java | 1 - .../apache/rocketmq/filter/FilterFactory.java | 8 -- .../org/apache/rocketmq/filter/FilterSpi.java | 6 - .../filter/expression/BinaryExpression.java | 2 - .../filter/expression/BooleanExpression.java | 2 - .../filter/expression/ComparisonExpression.java | 4 - .../filter/expression/ConstantExpression.java | 2 - .../filter/expression/EvaluationContext.java | 5 - .../filter/expression/UnaryExpression.java | 4 +- .../filter/expression/UnaryInExpression.java | 2 +- .../rocketmq/filter/parser/ParseException.java | 8 +- .../rocketmq/filter/parser/SelectorParser.java | 8 +- .../parser/SelectorParserTokenManager.java | 19 +-- .../filter/parser/SimpleCharStream.java | 24 ++-- .../rocketmq/filter/parser/TokenMgrError.java | 4 +- .../rocketmq/filter/util/BloomFilter.java | 35 ------ .../rocketmq/filter/util/BloomFilterData.java | 12 +- .../apache/rocketmq/filter/BitsArrayTest.java | 4 +- .../apache/rocketmq/filter/BloomFilterTest.java | 8 +- .../apache/rocketmq/filter/FilterSpiTest.java | 1 - .../processor/DefaultRequestProcessor.java | 9 +- .../logappender/common/ProducerInstance.java | 2 - .../log4j/RocketmqLog4jAppender.java | 7 +- .../log4j2/RocketmqLog4j2Appender.java | 17 +-- .../logback/RocketmqLogbackAppender.java | 8 +- .../rocketmq/logappender/AbstractTestCase.java | 1 - .../apache/rocketmq/logappender/Log4jTest.java | 7 +- .../rocketmq/logappender/LogbackTest.java | 7 +- .../apache/rocketmq/logappender/log4j2Test.java | 7 +- .../src/test/resources/log4j-example.properties | 7 +- .../src/test/resources/log4j-example.xml | 26 ++-- .../src/test/resources/log4j2-example.xml | 36 +++--- .../processor/ClusterTestRequestProcessor.java | 3 +- .../processor/DefaultRequestProcessor.java | 42 ++++--- .../namesrv/routeinfo/RouteInfoManager.java | 4 +- .../ClusterTestRequestProcessorTest.java | 3 +- .../rocketmq/promise/DefaultPromise.java | 2 +- .../consumer/LocalMessageCacheTest.java | 2 +- .../rocketmq/remoting/common/ServiceThread.java | 2 +- .../remoting/netty/NettyRemotingAbstract.java | 26 ++-- .../remoting/netty/NettyRemotingClient.java | 2 +- .../remoting/netty/NettyRemotingServer.java | 3 +- .../remoting/netty/NettyServerConfig.java | 1 - .../remoting/protocol/RemotingCommandTest.java | 14 +-- .../protocol/RemotingSerializableTest.java | 4 +- .../rocketmq/store/AppendMessageCallback.java | 10 +- .../org/apache/rocketmq/store/CommitLog.java | 17 +-- .../org/apache/rocketmq/store/ConsumeQueue.java | 5 +- .../apache/rocketmq/store/ConsumeQueueExt.java | 72 ++++------- .../rocketmq/store/DefaultMessageStore.java | 3 - .../org/apache/rocketmq/store/MappedFile.java | 16 +-- .../rocketmq/store/MessageArrivingListener.java | 2 +- .../apache/rocketmq/store/MessageFilter.java | 14 +-- .../org/apache/rocketmq/store/MessageStore.java | 38 +++++- .../apache/rocketmq/store/PutMessageLock.java | 1 + .../rocketmq/store/PutMessageReentrantLock.java | 1 - .../rocketmq/store/PutMessageSpinLock.java | 2 - .../org/apache/rocketmq/store/RunningFlags.java | 1 - .../store/config/MessageStoreConfig.java | 4 +- .../apache/rocketmq/store/ha/HAConnection.java | 3 - .../org/apache/rocketmq/store/ha/HAService.java | 13 +- .../apache/rocketmq/store/index/IndexFile.java | 2 +- .../rocketmq/store/index/IndexHeader.java | 1 - .../rocketmq/store/index/IndexService.java | 4 +- .../rocketmq/store/stats/BrokerStats.java | 1 - .../store/stats/BrokerStatsManager.java | 10 +- .../rocketmq/store/AppendCallbackTest.java | 40 +++--- .../rocketmq/store/ConsumeQueueExtTest.java | 5 +- .../apache/rocketmq/store/ConsumeQueueTest.java | 5 +- .../rocketmq/store/DefaultMessageStoreTest.java | 3 +- .../test/client/rmq/RMQSqlConsumer.java | 1 + .../rocketmq/test/base/IntegrationTestBase.java | 13 +- .../broadcast/order/OrderMsgBroadCastIT.java | 1 - .../client/consumer/filter/SqlFilterIT.java | 2 +- .../test/client/producer/batch/BatchSendIT.java | 2 - .../rocketmq/tools/admin/DefaultMQAdminExt.java | 105 ++++++++++------ .../tools/admin/DefaultMQAdminExtImpl.java | 123 ++++++++++++------- .../apache/rocketmq/tools/admin/MQAdminExt.java | 84 ++++++------- .../command/broker/GetBrokerConfigCommand.java | 3 +- .../command/cluster/ClusterListSubCommand.java | 26 ++-- .../consumer/UpdateSubGroupSubCommand.java | 3 +- .../command/message/DecodeMessageIdCommond.java | 3 +- .../message/PrintMessageByQueueCommand.java | 12 +- .../command/message/QueryMsgByIdSubCommand.java | 3 +- .../message/QueryMsgByUniqueKeySubCommand.java | 3 +- .../namesrv/GetNamesrvConfigCommand.java | 3 +- .../namesrv/UpdateNamesrvConfigCommand.java | 3 +- .../command/queue/QueryConsumeQueueCommand.java | 2 +- .../command/topic/TopicClusterSubCommand.java | 3 +- .../command/topic/TopicListSubCommand.java | 3 +- .../command/topic/TopicRouteSubCommand.java | 3 +- .../command/topic/TopicStatusSubCommand.java | 3 +- .../command/topic/UpdateOrderConfCommand.java | 3 +- .../topic/UpdateTopicPermSubCommand.java | 3 +- .../command/topic/UpdateTopicSubCommand.java | 3 +- .../namesrv/GetNamesrvConfigCommandTest.java | 2 +- 201 files changed, 1104 insertions(+), 1133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java index 0a323ee..42c8da9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.broker; import java.io.File; - public class BrokerPathConfigHelper { private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store" + File.separator + "config" + File.separator + "broker.properties"; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 4a262e5..32632fc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -122,7 +122,8 @@ public class ConsumerManager { return r1 || r2; } - public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) { + public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, + boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null != consumerGroupInfo) { consumerGroupInfo.unregisterChannel(clientChannelInfo); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java index 4db02e2..ee16a61 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java @@ -54,9 +54,6 @@ public class ConsumerFilterData { /** * Check this filter data has been used to calculate bit map when msg was stored in server. - * - * @param msgStoreTime - * @return */ public boolean isMsgInLive(long msgStoreTime) { return msgStoreTime > getBornTime(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java index f50db86..482893f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java @@ -72,16 +72,11 @@ public class ConsumerFilterManager extends ConfigManager { /** * Build consumer filter data.Be care, bloom filter data is not included. * - * @param topic - * @param consumerGroup - * @param expression - * @param type - * @param clientVersion * @return maybe null */ public static ConsumerFilterData build(final String topic, final String consumerGroup, - final String expression, final String type, - final long clientVersion) { + final String expression, final String type, + final long clientVersion) { if (ExpressionType.isTagType(type)) { return null; } @@ -140,7 +135,7 @@ public class ConsumerFilterManager extends ConfigManager { } public boolean register(final String topic, final String consumerGroup, final String expression, - final String type, final long clientVersion) { + final String type, final long clientVersion) { if (ExpressionType.isTagType(type)) { return false; } @@ -357,7 +352,8 @@ public class ConsumerFilterManager extends ConfigManager { data.setDeadTime(now); } - public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData, long clientVersion) { + public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData, + long clientVersion) { ConsumerFilterData old = this.groupFilterData.get(consumerGroup); if (old == null) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java index 9518178..7f7da05 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.broker.filter; - import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.message.MessageConst; @@ -32,7 +31,8 @@ import java.util.Map; * <br>It will decode properties first in order to get real topic. */ public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter { - public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, ConsumerFilterManager consumerFilterManager) { + public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, + ConsumerFilterManager consumerFilterManager) { super(subscriptionData, consumerFilterData, consumerFilterManager); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java index 893df0d..2f94de2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java @@ -41,7 +41,7 @@ public class ExpressionMessageFilter implements MessageFilter { protected final boolean bloomDataValid; public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, - ConsumerFilterManager consumerFilterManager) { + ConsumerFilterManager consumerFilterManager) { this.subscriptionData = subscriptionData; this.consumerFilterData = consumerFilterData; this.consumerFilterManager = consumerFilterManager; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java index ff63127..f8f9943 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java @@ -111,7 +111,6 @@ public class FilterServerManager { } } - public void scanNotActiveChannel() { Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index a2a1aa0..2d4bedc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -103,6 +103,7 @@ public class BrokerFastFailure { } } } + public void shutdown() { this.scheduledExecutorService.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java index 238fe1c..8060fd0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java @@ -25,23 +25,28 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor { - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, + final TimeUnit unit, final BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, + final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, + final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } - public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, - final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { + public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, + final TimeUnit unit, + final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, + final RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java index fd38c4f..ff09011 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java @@ -30,7 +30,7 @@ public class NotifyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index b1bd86f..d0668cb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -114,7 +114,7 @@ public class PullRequestHoldService extends ServiceThread { } public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { String key = this.buildKey(topic, queueId); ManyPullRequest mpr = this.pullRequestTable.get(key); if (mpr != null) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index 57565a6..0257f94 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -118,7 +118,8 @@ public class ConsumerOffsetManager extends ConfigManager { return groups; } - public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { + public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, + final long offset) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; this.commitOffset(clientHost, key, queueId, offset); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 6c2a987..cba70a0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -231,7 +231,8 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException, + public TopicConfigSerializeWrapper getAllTopicConfig( + final String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); @@ -248,7 +249,8 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException, + public ConsumerOffsetSerializeWrapper getAllConsumerOffset( + final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); @@ -264,7 +266,8 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, + public String getAllDelayOffset( + final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null); RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); @@ -280,7 +283,8 @@ public class BrokerOuterAPI { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException, + public SubscriptionGroupWrapper getAllSubscriptionGroupConfig( + final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java index 5b9df3e..968bcfb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java @@ -28,7 +28,9 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil private final ByteBuffer byteBufferHeader; private final GetMessageResult getMessageResult; - /** Bytes which were transferred already. */ + /** + * Bytes which were transferred already. + */ private long transferred; public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java index 254c2ab..b795d2d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java @@ -27,7 +27,9 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File private final ByteBuffer byteBufferHeader; private final SelectMappedBufferResult selectMappedBufferResult; - /** Bytes which were transferred already. */ + /** + * Bytes which were transferred already. + */ private long transferred; public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java index aaa1304..e8f3099 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java @@ -28,7 +28,9 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi private final ByteBuffer byteBufferHeader; private final QueryMessageResult queryMessageResult; - /** Bytes which were transferred already. */ + /** + * Bytes which were transferred already. + */ private long transferred; public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java index 690f70b..f6f8a80 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java @@ -87,7 +87,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore { @Override public GetMessageResult getMessage(String group, String topic, int queueId, long offset, - int maxMsgNums, final MessageFilter messageFilter) { + int maxMsgNums, final MessageFilter messageFilter) { return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 937f575..abea4ec 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -116,7 +116,6 @@ import org.apache.rocketmq.store.SelectMappedBufferResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class AdminBrokerProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final BrokerController brokerController; @@ -126,7 +125,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand processRequest(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request); @@ -212,7 +212,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return false; } - private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final CreateTopicRequestHeader requestHeader = (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class); @@ -249,7 +250,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return null; } - private RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand deleteTopic(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); DeleteTopicRequestHeader requestHeader = (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class); @@ -355,7 +357,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class); final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader(); final SearchOffsetRequestHeader requestHeader = @@ -371,7 +374,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); final GetMaxOffsetRequestHeader requestHeader = @@ -386,7 +390,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getMinOffset(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); final GetMinOffsetRequestHeader requestHeader = @@ -400,7 +405,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class); final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader(); final GetEarliestMsgStoretimeRequestHeader requestHeader = @@ -429,7 +435,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class); @@ -447,7 +454,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class); @@ -477,7 +485,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); String content = this.brokerController.getSubscriptionGroupManager().encode(); if (content != null && content.length() > 0) { @@ -503,7 +512,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); DeleteSubscriptionGroupRequestHeader requestHeader = (DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class); @@ -517,7 +527,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetTopicStatsInfoRequestHeader requestHeader = (GetTopicStatsInfoRequestHeader) request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class); @@ -565,7 +576,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumerConnectionListRequestHeader requestHeader = (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class); @@ -604,7 +616,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetProducerConnectionListRequestHeader requestHeader = (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class); @@ -637,7 +650,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetConsumeStatsRequestHeader requestHeader = (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class); @@ -658,7 +672,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { continue; } - { SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic); @@ -770,7 +783,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand resetOffset(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final ResetOffsetRequestHeader requestHeader = (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", @@ -787,7 +801,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { requestHeader.getTimestamp(), requestHeader.isForce(), isC); } - public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final GetConsumerStatusRequestHeader requestHeader = (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); @@ -798,7 +813,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { requestHeader.getClientAddr()); } - private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryTopicConsumeByWhoRequestHeader requestHeader = (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class); @@ -820,7 +836,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class); final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader(); final RegisterFilterServerRequestHeader requestHeader = @@ -836,7 +853,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryConsumeTimeSpanRequestHeader requestHeader = (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class); @@ -924,8 +942,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - - private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final GetConsumerRunningInfoRequestHeader requestHeader = (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); @@ -933,7 +951,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { requestHeader.getClientId()); } - private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryCorrectionOffsetHeader requestHeader = (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class); @@ -960,7 +979,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class); @@ -984,7 +1004,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { requestHeader.getClientId()); } - private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); CloneGroupOffsetRequestHeader requestHeader = (CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class); @@ -1004,7 +1025,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { continue; } - if (!requestHeader.isOffline()) { SubscriptionData findSubscriptionData = @@ -1025,7 +1045,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final ViewBrokerStatsDataRequestHeader requestHeader = (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); @@ -1249,7 +1270,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } } - private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { QueryConsumeQueueRequestHeader requestHeader = (QueryConsumeQueueRequestHeader) request.decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index 68e4167..fee1420 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -46,7 +46,8 @@ public class EndTransactionProcessor implements NettyRequestProcessor { } @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand processRequest(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index fe2fcfe..a46cbff 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -76,7 +76,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { } @Override - public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand processRequest(final ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { return this.processRequest(ctx.channel(), request, true); } @@ -138,7 +139,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", - requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); + requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); @@ -286,11 +287,11 @@ public class PullMessageProcessor implements NettyRequestProcessor { // XXX: warn and notify me log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", - requestHeader.getQueueOffset(), - getMessageResult.getNextBeginOffset(), - requestHeader.getTopic(), - requestHeader.getQueueId(), - requestHeader.getConsumerGroup() + requestHeader.getQueueOffset(), + getMessageResult.getNextBeginOffset(), + requestHeader.getTopic(), + requestHeader.getQueueId(), + requestHeader.getConsumerGroup() ); } else { response.setCode(ResponseCode.PULL_NOT_FOUND); @@ -314,8 +315,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { case OFFSET_TOO_SMALL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", - requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), - getMessageResult.getMinOffset(), channel.remoteAddress()); + requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), + getMessageResult.getMinOffset(), channel.remoteAddress()); break; default: assert false; @@ -437,15 +438,15 @@ public class PullMessageProcessor implements NettyRequestProcessor { event.setOffsetNew(getMessageResult.getNextBeginOffset()); this.generateOffsetMovedEvent(event); log.warn( - "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), - responseHeader.getSuggestWhichBrokerId()); + "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}", + requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(), + responseHeader.getSuggestWhichBrokerId()); } else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}", - requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), - responseHeader.getSuggestWhichBrokerId()); + requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(), + responseHeader.getSuggestWhichBrokerId()); } break; @@ -483,7 +484,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { } } - private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic, final int queueId) { + private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic, + final int queueId) { final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize()); long storeTimestamp = 0; @@ -528,7 +530,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { } } - public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { + public void executeRequestWhenWakeup(final Channel channel, + final RemotingCommand request) throws RemotingCommandException { Runnable run = new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index cd60c44..227a23e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -60,7 +60,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand processRequest(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: @@ -245,8 +246,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return response; } - - private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, + private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, + RemotingCommand request, MessageExt msg, TopicConfig topicConfig) { String newTopic = requestHeader.getTopic(); if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { @@ -319,8 +320,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement final byte[] body = request.getBody(); - - int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); @@ -361,13 +360,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } - - private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, - SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) { + private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, + RemotingCommand request, MessageExt msg, + SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, + int queueIdInt) { if (putMessageResult == null) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store putMessage return null"); - return response; + return response; } boolean sendOK = false; @@ -462,17 +462,17 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement sendMessageContext.setCommercialOwner(owner); } } - return response; + return response; } + private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, - final RemotingCommand request, - final SendMessageContext sendMessageContext, - final SendMessageRequestHeader requestHeader) throws RemotingCommandException { + final RemotingCommand request, + final SendMessageContext sendMessageContext, + final SendMessageRequestHeader requestHeader) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); - response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); @@ -493,7 +493,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return response; } - int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); @@ -509,7 +508,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { response.setCode(ResponseCode.MESSAGE_ILLEGAL); - response.setRemark("batch request does not support retry group " + requestHeader.getTopic()); + response.setRemark("batch request does not support retry group " + requestHeader.getTopic()); return response; } MessageExtBatch messageExtBatch = new MessageExtBatch(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index bd4a26e..0cbb761 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -143,7 +143,7 @@ public class SubscriptionGroupManager extends ConfigManager { @Override public String configFilePath() { return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig() - .getStorePathRootDir()); + .getStorePathRootDir()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 0d10a16..cd30a08 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -272,7 +272,7 @@ public class TopicConfigManager extends ConfigManager { } log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, - topicConfig.getTopicSysFlag()); + topicConfig.getTopicSysFlag()); this.topicConfigTable.put(topic, topicConfig); @@ -292,7 +292,7 @@ public class TopicConfigManager extends ConfigManager { } log.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag, - topicConfig.getTopicSysFlag()); + topicConfig.getTopicSysFlag()); this.topicConfigTable.put(topic, topicConfig); @@ -384,7 +384,7 @@ public class TopicConfigManager extends ConfigManager { @Override public String configFilePath() { return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig() - .getStorePathRootDir()); + .getStorePathRootDir()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java index d4edd9a..90ef210 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java @@ -50,7 +50,7 @@ public class BrokerControllerTest { } @After - public void destory(){ + public void destory() { UtilAll.deleteFile(new File(new MessageStoreConfig().getStorePathRootDir())); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java index 87f6256..8f28832 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java @@ -51,7 +51,6 @@ public class CommitLogDispatcherCalcBitMapTest { ConsumerFilterData nullBloomData = filterManager.get("topic0", "CID_1"); nullBloomData.setBloomFilterData(null); - CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig, filterManager); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 461932c..d3c973a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -97,7 +97,7 @@ public class MessageStoreWithFilterTest { } public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, - boolean enableCqExt, int cqExtFileSize) { + boolean enableCqExt, int cqExtFileSize) { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); @@ -127,7 +127,7 @@ public class MessageStoreWithFilterTest { new MessageArrivingListener() { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { // System.out.println(String.format("Msg coming: %s, %d, %d, %d", // topic, queueId, logicOffset, tagsCode)); } @@ -154,7 +154,8 @@ public class MessageStoreWithFilterTest { return master; } - protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount, int msgCountPerTopic) throws Exception { + protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount, + int msgCountPerTopic) throws Exception { List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>(); for (int i = 0; i < topicCount; i++) { String realTopic = topic + i; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java index 941d4a7..c96f708 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java @@ -142,15 +142,18 @@ public class PullMessageProcessorTest { List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>(); final ConsumeMessageContext[] messageContext = new ConsumeMessageContext[1]; ConsumeMessageHook consumeMessageHook = new ConsumeMessageHook() { - @Override public String hookName() { + @Override + public String hookName() { return "TestHook"; } - @Override public void consumeMessageBefore(ConsumeMessageContext context) { + @Override + public void consumeMessageBefore(ConsumeMessageContext context) { messageContext[0] = context; } - @Override public void consumeMessageAfter(ConsumeMessageContext context) { + @Override + public void consumeMessageAfter(ConsumeMessageContext context) { } }; consumeMessageHookList.add(consumeMessageHook); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java index 02490a0..7828e7a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java @@ -94,15 +94,18 @@ public class SendMessageProcessorTest { List<SendMessageHook> sendMessageHookList = new ArrayList<>(); final SendMessageContext[] sendMessageContext = new SendMessageContext[1]; SendMessageHook sendMessageHook = new SendMessageHook() { - @Override public String hookName() { + @Override + public String hookName() { return null; } - @Override public void sendMessageBefore(SendMessageContext context) { + @Override + public void sendMessageBefore(SendMessageContext context) { sendMessageContext[0] = context; } - @Override public void sendMessageAfter(SendMessageContext context) { + @Override + public void sendMessageAfter(SendMessageContext context) { } }; @@ -115,7 +118,6 @@ public class SendMessageProcessorTest { assertThat(sendMessageContext[0].getProducerGroup()).isEqualTo(group); } - @Test public void testProcessRequest_FlushTimeOut() throws RemotingCommandException { when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR))); @@ -210,7 +212,8 @@ public class SendMessageProcessorTest { final RemotingCommand request = createSendMsgCommand(RequestCode.SEND_MESSAGE); final RemotingCommand[] response = new RemotingCommand[1]; doAnswer(new Answer() { - @Override public Object answer(InvocationOnMock invocation) throws Throwable { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { response[0] = invocation.getArgument(0); return null; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java index a0144a2..019414b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java +++ b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java @@ -32,7 +32,6 @@ public interface MQAdmin { * @param key accesskey * @param newTopic topic name * @param queueNum topic's queue number - * @throws MQClientException */ void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException; @@ -44,7 +43,6 @@ public interface MQAdmin { * @param newTopic topic name * @param queueNum topic's queue number * @param topicSysFlag topic system flag - * @throws MQClientException */ void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException; @@ -56,7 +54,6 @@ public interface MQAdmin { * @param mq Instance of MessageQueue * @param timestamp from when in milliseconds. * @return offset - * @throws MQClientException */ long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException; @@ -65,7 +62,6 @@ public interface MQAdmin { * * @param mq Instance of MessageQueue * @return the max offset - * @throws MQClientException */ long maxOffset(final MessageQueue mq) throws MQClientException; @@ -74,7 +70,6 @@ public interface MQAdmin { * * @param mq Instance of MessageQueue * @return the minimum offset - * @throws MQClientException */ long minOffset(final MessageQueue mq) throws MQClientException; @@ -83,7 +78,6 @@ public interface MQAdmin { * * @param mq Instance of MessageQueue * @return the time in microseconds - * @throws MQClientException */ long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException; @@ -92,10 +86,6 @@ public interface MQAdmin { * * @param offsetMsgId message id * @return message - * @throws InterruptedException - * @throws MQBrokerException - * @throws RemotingException - * @throws MQClientException */ MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; @@ -109,21 +99,14 @@ public interface MQAdmin { * @param begin from when * @param end to when * @return Instance of QueryResult - * @throws MQClientException - * @throws InterruptedException */ QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException; /** - * @param topic - * @param msgId * @return The {@code MessageExt} of given msgId - * @throws RemotingException - * @throws MQBrokerException - * @throws InterruptedException - * @throws MQClientException */ - MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + MessageExt viewMessage(String topic, + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/MQHelper.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java index b78e1cb..7f0cef3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java +++ b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java @@ -41,7 +41,6 @@ public class MQHelper { * @param consumerGroup consumer group * @param topic topic * @param timestamp time - * @throws Exception */ public static void resetOffsetByTimestamp( final MessageModel messageModel, http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/Validators.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index b49537f..5567e49 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -35,8 +35,6 @@ public class Validators { public static final int CHARACTER_MAX_LENGTH = 255; /** - * @param origin - * @param patternStr * @return The resulting {@code String} */ public static String getGroupWithRegularExpression(String origin, String patternStr) { @@ -50,9 +48,6 @@ public class Validators { /** * Validate group - * - * @param group - * @throws MQClientException */ public static void checkGroup(String group) throws MQClientException { if (UtilAll.isBlank(group)) { @@ -69,8 +64,6 @@ public class Validators { } /** - * @param origin - * @param pattern * @return <tt>true</tt> if, and only if, the entire origin sequence matches this matcher's pattern */ public static boolean regularExpressionMatcher(String origin, Pattern pattern) { @@ -83,10 +76,6 @@ public class Validators { /** * Validate message - * - * @param msg - * @param defaultMQProducer - * @throws MQClientException */ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { @@ -113,9 +102,6 @@ public class Validators { /** * Validate topic - * - * @param topic - * @throws MQClientException */ public static void checkTopic(String topic) throws MQClientException { if (UtilAll.isBlank(topic)) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index f5016f8..cd70670 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -264,7 +264,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume } @Override - public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) + public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, + long timeout) throws MQClientException, RemotingException, InterruptedException { this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout); } @@ -276,7 +277,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume } @Override - public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) + public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, + PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback); } @@ -297,7 +299,8 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume } @Override - public MessageExt viewMessage(String topic, String uniqKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage(String topic, + String uniqKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { MessageDecoder.decodeMessageId(uniqKey); return this.viewMessage(uniqKey);