http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 9bf34be..7eda7c1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -297,10 +297,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); - DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(// - pullResult.getMsgFoundList(), // - processQueue, // - pullRequest.getMessageQueue(), // + DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( + pullResult.getMsgFoundList(), + processQueue, + pullRequest.getMessageQueue(), dispathToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { @@ -311,12 +311,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { } } - if (pullResult.getNextBeginOffset() < prevRequestOffset// + if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( - "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", // - pullResult.getNextBeginOffset(), // - firstMsgOffset, // + "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", + pullResult.getNextBeginOffset(), + firstMsgOffset, prevRequestOffset); } @@ -336,7 +336,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: - log.warn("the pull request offset illegal, {} {}", // + log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); @@ -396,26 +396,26 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { classFilter = sd.isClassFilterMode(); } - int sysFlag = PullSysFlag.buildSysFlag(// + int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); try { - this.pullAPIWrapper.pullKernelImpl(// - pullRequest.getMessageQueue(), // 1 - subExpression, // 2 - subscriptionData.getExpressionType(), // 3 - subscriptionData.getSubVersion(), // 4 - pullRequest.getNextOffset(), // 5 - this.defaultMQPushConsumer.getPullBatchSize(), // 6 - sysFlag, // 7 - commitOffsetValue, // 8 - BROKER_SUSPEND_MAX_TIME_MILLIS, // 9 - CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 10 - CommunicationMode.ASYNC, // 11 - pullCallback // 12 + this.pullAPIWrapper.pullKernelImpl( + pullRequest.getMessageQueue(), + subExpression, + subscriptionData.getExpressionType(), + subscriptionData.getSubVersion(), + pullRequest.getNextOffset(), + this.defaultMQPushConsumer.getPullBatchSize(), + sysFlag, + commitOffsetValue, + BROKER_SUSPEND_MAX_TIME_MILLIS, + CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, + CommunicationMode.ASYNC, + pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); @@ -425,8 +425,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The consumer service state not OK, "// - + this.serviceState// + throw new MQClientException("The consumer service state not OK, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } @@ -608,8 +608,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: - throw new MQClientException("The PushConsumer service state not OK, maybe started once, "// - + this.serviceState// + throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: @@ -764,7 +764,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } @@ -779,7 +779,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; @@ -811,7 +811,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void subscribe(String topic, String subExpression) throws MQClientException { try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) { @@ -824,7 +824,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException { try { - SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), // + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, "*"); subscriptionData.setSubString(fullClassName); subscriptionData.setClassFilterMode(true);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 634e0f0..ef27ff8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -73,9 +73,9 @@ public abstract class RebalanceImpl { try { this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway); - log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", // - this.consumerGroup, // - this.mQClientFactory.getClientId(), // + log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", + this.consumerGroup, + this.mQClientFactory.getClientId(), mq); } catch (Exception e) { log.error("unlockBatchMQ exception, " + mq, e); @@ -245,10 +245,10 @@ public abstract class RebalanceImpl { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); - log.info("messageQueueChanged {} {} {} {}", // - consumerGroup, // - topic, // - mqSet, // + log.info("messageQueueChanged {} {} {} {}", + consumerGroup, + topic, + mqSet, mqSet); } } else { @@ -280,10 +280,10 @@ public abstract class RebalanceImpl { List<MessageQueue> allocateResult = null; try { - allocateResult = strategy.allocate(// - this.consumerGroup, // - this.mQClientFactory.getClientId(), // - mqAll, // + allocateResult = strategy.allocate( + this.consumerGroup, + this.mQClientFactory.getClientId(), + mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index 112bcee..2f4f745 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -74,8 +74,8 @@ public class RebalancePushImpl extends RebalanceImpl { pq.getLockConsume().unlock(); } } else { - log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", // - mq, // + log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", + mq, pq.getTryUnlockTimes()); pq.incTryUnlockTimes(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index f146be9..6ef594b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -148,10 +148,10 @@ public class MQClientInstance { this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); - log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", // - this.instanceIndex, // - this.clientId, // - this.clientConfig, // + log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", + this.instanceIndex, + this.clientId, + this.clientConfig, MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer()); } @@ -727,13 +727,13 @@ public class MQClientInstance { classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET); classCRC = UtilAll.crc32(classBody); } catch (Exception e1) { - log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", // - fullClassName, // + log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", + fullClassName, RemotingHelper.exceptionSimpleDesc(e1)); } TopicRouteData topicRouteData = this.topicRouteTable.get(topic); - if (topicRouteData != null // + if (topicRouteData != null && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) { Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator(); while (it.hasNext()) { @@ -1006,10 +1006,10 @@ public class MQClientInstance { return null; } - public FindBrokerResult findBrokerAddressInSubscribe(// - final String brokerName, // - final long brokerId, // - final boolean onlyThisBroker// + public FindBrokerResult findBrokerAddressInSubscribe( + final String brokerName, + final long brokerId, + final boolean onlyThisBroker ) { String brokerAddr = null; boolean slave = false; @@ -1102,7 +1102,6 @@ public class MQClientInstance { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { - // } Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator(); @@ -1171,8 +1170,8 @@ public class MQClientInstance { return topicRouteTable; } - public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, // - final String consumerGroup, // + public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, + final String consumerGroup, final String brokerName) { MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); if (null != mqConsumerInner) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 12f8a36..602fedd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -116,11 +116,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void initTransactionEnv() { TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax()); - this.checkExecutor = new ThreadPoolExecutor(// - producer.getCheckThreadPoolMinSize(), // - producer.getCheckThreadPoolMaxSize(), // - 1000 * 60, // - TimeUnit.MILLISECONDS, // + this.checkExecutor = new ThreadPoolExecutor( + producer.getCheckThreadPoolMinSize(), + producer.getCheckThreadPoolMaxSize(), + 1000 * 60, + TimeUnit.MILLISECONDS, this.checkRequestQueue); } @@ -172,8 +172,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: - throw new MQClientException("The producer service state not OK, maybe started once, "// - + this.serviceState// + throw new MQClientException("The producer service state not OK, maybe started once, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: @@ -268,18 +268,18 @@ public class DefaultMQProducerImpl implements MQProducerInner { exception = e; } - this.processTransactionState(// - localTransactionState, // - group, // + this.processTransactionState( + localTransactionState, + group, exception); } else { log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); } } - private void processTransactionState(// - final LocalTransactionState localTransactionState, // - final String producerGroup, // + private void processTransactionState( + final LocalTransactionState localTransactionState, + final String producerGroup, final Throwable exception) { final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); @@ -354,8 +354,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { private void makeSureStateOK() throws MQClientException { if (this.serviceState != ServiceState.RUNNING) { - throw new MQClientException("The producer service state not OK, "// - + this.serviceState// + throw new MQClientException("The producer service state not OK, " + + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); } @@ -428,11 +428,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); } - private SendResult sendDefaultImpl(// - Message msg, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, // - final long timeout// + private SendResult sendDefaultImpl( + Message msg, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -579,11 +579,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { } } - private SendResult sendKernelImpl(final Message msg, // - final MessageQueue mq, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // + private SendResult sendKernelImpl(final Message msg, + final MessageQueue mq, + final CommunicationMode communicationMode, + final SendCallback sendCallback, + final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { @@ -674,18 +674,18 @@ public class DefaultMQProducerImpl implements MQProducerInner { SendResult sendResult = null; switch (communicationMode) { case ASYNC: - sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// - brokerAddr, // 1 - mq.getBrokerName(), // 2 - msg, // 3 - requestHeader, // 4 - timeout, // 5 - communicationMode, // 6 - sendCallback, // 7 - topicPublishInfo, // 8 - this.mQClientFactory, // 9 - this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10 - context, // + sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( + brokerAddr, + mq.getBrokerName(), + msg, + requestHeader, + timeout, + communicationMode, + sendCallback, + topicPublishInfo, + this.mQClientFactory, + this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), + context, this); break; case ONEWAY: @@ -887,12 +887,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); } - private SendResult sendSelectImpl(// - Message msg, // - MessageQueueSelector selector, // - Object arg, // - final CommunicationMode communicationMode, // - final SendCallback sendCallback, final long timeout// + private SendResult sendSelectImpl( + Message msg, + MessageQueueSelector selector, + Object arg, + final CommunicationMode communicationMode, + final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); @@ -1017,9 +1017,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { return send(msg, this.defaultMQProducer.getSendMsgTimeout()); } - public void endTransaction(// - final SendResult sendResult, // - final LocalTransactionState localTransactionState, // + public void endTransaction( + final SendResult sendResult, + final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java index 5b2039e..dfd485d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java @@ -28,9 +28,9 @@ public interface MQProducerInner { TransactionCheckListener checkListener(); - void checkTransactionState(// - final String addr, // - final MessageExt msg, // + void checkTransactionState( + final String addr, + final MessageExt msg, final CheckTransactionStateRequestHeader checkRequestHeader); void updateTopicPublishInfo(final String topic, final TopicPublishInfo info); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java index b85f6f5..4795cce 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java @@ -46,24 +46,14 @@ public class TopicConfig { public String encode() { StringBuilder sb = new StringBuilder(); - - // 1 sb.append(this.topicName); sb.append(SEPARATOR); - - // 2 sb.append(this.readQueueNums); sb.append(SEPARATOR); - - // 3 sb.append(this.writeQueueNums); sb.append(SEPARATOR); - - // 4 sb.append(this.perm); sb.append(SEPARATOR); - - // 5 sb.append(this.topicFilterType); return sb.toString(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java index eea0da1..5d950be 100644 --- a/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java +++ b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java @@ -18,44 +18,44 @@ package org.apache.rocketmq.common.help; public class FAQUrl { - public static final String APPLY_TOPIC_URL = // + public static final String APPLY_TOPIC_URL = "http://rocketmq.apache.org/docs/faq/"; - public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = // + public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = "http://rocketmq.apache.org/docs/faq/"; - public static final String GROUP_NAME_DUPLICATE_URL = // + public static final String GROUP_NAME_DUPLICATE_URL = "http://rocketmq.apache.org/docs/faq/"; - public static final String CLIENT_PARAMETER_CHECK_URL = // + public static final String CLIENT_PARAMETER_CHECK_URL = "http://rocketmq.apache.org/docs/faq/"; - public static final String SUBSCRIPTION_GROUP_NOT_EXIST = // + public static final String SUBSCRIPTION_GROUP_NOT_EXIST = "http://rocketmq.apache.org/docs/faq/"; - public static final String CLIENT_SERVICE_NOT_OK = // + public static final String CLIENT_SERVICE_NOT_OK = "http://rocketmq.apache.org/docs/faq/"; // FAQ: No route info of this topic, TopicABC - public static final String NO_TOPIC_ROUTE_INFO = // + public static final String NO_TOPIC_ROUTE_INFO = "http://rocketmq.apache.org/docs/faq/"; - public static final String LOAD_JSON_EXCEPTION = // + public static final String LOAD_JSON_EXCEPTION = "http://rocketmq.apache.org/docs/faq/"; - public static final String SAME_GROUP_DIFFERENT_TOPIC = // + public static final String SAME_GROUP_DIFFERENT_TOPIC = "http://rocketmq.apache.org/docs/faq/"; - public static final String MQLIST_NOT_EXIST = // + public static final String MQLIST_NOT_EXIST = "http://rocketmq.apache.org/docs/faq/"; - public static final String UNEXPECTED_EXCEPTION_URL = // + public static final String UNEXPECTED_EXCEPTION_URL = "http://rocketmq.apache.org/docs/faq/"; - public static final String SEND_MSG_FAILED = // + public static final String SEND_MSG_FAILED = "http://rocketmq.apache.org/docs/faq/"; - public static final String UNKNOWN_HOST_EXCEPTION = // + public static final String UNKNOWN_HOST_EXCEPTION = "http://rocketmq.apache.org/docs/faq/"; private static final String TIP_STRING_BEGIN = "\nSee "; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java index a1d3ede..d0b202e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java @@ -42,7 +42,7 @@ public class MessageClientIDSetter { tempBuffer.put(createFakeIP()); } tempBuffer.position(6); - tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); //4 + tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); FIX_STRING = UtilAll.bytes2string(tempBuffer.array()); setStartTime(System.currentTimeMillis()); COUNTER = new AtomicInteger(0); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java index 41e76fc..d7942eb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java @@ -130,15 +130,15 @@ public class ConsumerRunningInfo extends RemotingSerializable { if (orderMsg) { if (!pq.isLocked()) { - sb.append(String.format("%s %s can't lock for a while, %dms%n", // - clientId, // - mq, // + sb.append(String.format("%s %s can't lock for a while, %dms%n", + clientId, + mq, System.currentTimeMillis() - pq.getLastLockTimestamp())); } else { if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) { - sb.append(String.format("%s %s unlock %d times, still failed%n", // - clientId, // - mq, // + sb.append(String.format("%s %s unlock %d times, still failed%n", + clientId, + mq, pq.getTryUnlockTimes())); } } @@ -147,9 +147,9 @@ public class ConsumerRunningInfo extends RemotingSerializable { long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp(); if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) { - sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", // - clientId, // - mq, // + sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", + clientId, + mq, diff)); } } @@ -211,10 +211,10 @@ public class ConsumerRunningInfo extends RemotingSerializable { int i = 0; while (it.hasNext()) { SubscriptionData next = it.next(); - String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", // - ++i, // - next.getTopic(), // - next.isClassFilterMode(), // + String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", + ++i, + next.getTopic(), + next.isClassFilterMode(), next.getSubString()); sb.append(item); @@ -223,20 +223,20 @@ public class ConsumerRunningInfo extends RemotingSerializable { { sb.append("\n\n#Consumer Offset#\n"); - sb.append(String.format("%-32s %-32s %-4s %-20s%n", // - "#Topic", // - "#Broker Name", // - "#QID", // - "#Consumer Offset"// + sb.append(String.format("%-32s %-32s %-4s %-20s%n", + "#Topic", + "#Broker Name", + "#QID", + "#Consumer Offset" )); Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueueInfo> next = it.next(); - String item = String.format("%-32s %-32s %-4d %-20d%n", // - next.getKey().getTopic(), // - next.getKey().getBrokerName(), // - next.getKey().getQueueId(), // + String item = String.format("%-32s %-32s %-4d %-20d%n", + next.getKey().getTopic(), + next.getKey().getBrokerName(), + next.getKey().getQueueId(), next.getValue().getCommitOffset()); sb.append(item); @@ -245,20 +245,20 @@ public class ConsumerRunningInfo extends RemotingSerializable { { sb.append("\n\n#Consumer MQ Detail#\n"); - sb.append(String.format("%-32s %-32s %-4s %-20s%n", // - "#Topic", // - "#Broker Name", // - "#QID", // - "#ProcessQueueInfo"// + sb.append(String.format("%-32s %-32s %-4s %-20s%n", + "#Topic", + "#Broker Name", + "#QID", + "#ProcessQueueInfo" )); Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueueInfo> next = it.next(); - String item = String.format("%-32s %-32s %-4d %s%n", // - next.getKey().getTopic(), // - next.getKey().getBrokerName(), // - next.getKey().getQueueId(), // + String item = String.format("%-32s %-32s %-4d %s%n", + next.getKey().getTopic(), + next.getKey().getBrokerName(), + next.getKey().getQueueId(), next.getValue().toString()); sb.append(item); @@ -267,27 +267,27 @@ public class ConsumerRunningInfo extends RemotingSerializable { { sb.append("\n\n#Consumer RT&TPS#\n"); - sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", // - "#Topic", // - "#Pull RT", // - "#Pull TPS", // - "#Consume RT", // - "#ConsumeOK TPS", // - "#ConsumeFailed TPS", // - "#ConsumeFailedMsgsInHour"// + sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", + "#Topic", + "#Pull RT", + "#Pull TPS", + "#Consume RT", + "#ConsumeOK TPS", + "#ConsumeFailed TPS", + "#ConsumeFailedMsgsInHour" )); Iterator<Entry<String, ConsumeStatus>> it = this.statusTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConsumeStatus> next = it.next(); - String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", // - next.getKey(), // - next.getValue().getPullRT(), // - next.getValue().getPullTPS(), // - next.getValue().getConsumeRT(), // - next.getValue().getConsumeOKTPS(), // - next.getValue().getConsumeFailedTPS(), // - next.getValue().getConsumeFailedMsgs()// + String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", + next.getKey(), + next.getValue().getPullRT(), + next.getValue().getPullTPS(), + next.getValue().getConsumeRT(), + next.getValue().getConsumeOKTPS(), + next.getValue().getConsumeFailedTPS(), + next.getValue().getConsumeFailedMsgs() ); sb.append(item); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java index ba6b129..6ba069e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java @@ -27,8 +27,6 @@ public class GetConsumeStatsRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub - } public String getConsumerGroup() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java index 20990a6..ca26a86 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java @@ -32,7 +32,6 @@ public class GetConsumerStatusRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub } public String getTopic() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java index 222382e..c64381f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java @@ -32,7 +32,6 @@ public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub } public String getTopic() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java index 6a998d9..93fa722 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java @@ -33,7 +33,7 @@ public class QueryCorrectionOffsetHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub + } public String getFilterGroups() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java index 113e46f..3685ef9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java @@ -34,7 +34,7 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub + } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java index 082329c..95e18d0 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java @@ -57,7 +57,7 @@ public class UnregisterClientRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java index 6ae6929..f61f0cd 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java @@ -24,7 +24,7 @@ public class UnregisterClientResponseHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub + } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java deleted file mode 100644 index 64081ea..0000000 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: GetRouteInfoResponseHeader.java 1835 2013-05-16 02:00:50Z vintagew...@apache.org $ - */ -package org.apache.rocketmq.common.protocol.header.namesrv; - -import org.apache.rocketmq.remoting.CommandCustomHeader; -import org.apache.rocketmq.remoting.exception.RemotingCommandException; - -public class GetRouteInfoResponseHeader implements CommandCustomHeader { - - @Override - public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java index 93069fe..8307e20 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java @@ -32,7 +32,7 @@ public class RegisterOrderTopicRequestHeader implements CommandCustomHeader { @Override public void checkFields() throws RemotingCommandException { - // TODO Auto-generated method stub + } public String getTopic() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java index 9966a90..8fd8628 100644 --- a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java +++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java @@ -16,11 +16,7 @@ */ package org.apache.rocketmq.common.sysflag; -/** - * - * - */ public class TopicSysFlag { private final static int FLAG_UNIT = 0x1 << 0; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java index 28ead5c..e43ae41 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java @@ -114,9 +114,7 @@ public class IOTinyUtils { fileOrDir.delete(); } - /** - */ public static void cleanDirectory(File directory) throws IOException { if (!directory.exists()) { String message = directory + " does not exist"; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java index c8252d0..9bd9ea1 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java @@ -35,9 +35,7 @@ public class PushConsumer { consumer.setConsumeTimestamp("20170422221800"); consumer.registerMessageListener(new MessageListenerConcurrently() { - /** - */ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/example/src/main/resources/MessageFilterImpl.java ---------------------------------------------------------------------- diff --git a/example/src/main/resources/MessageFilterImpl.java b/example/src/main/resources/MessageFilterImpl.java index 23e4a79..6cb5d15 100644 --- a/example/src/main/resources/MessageFilterImpl.java +++ b/example/src/main/resources/MessageFilterImpl.java @@ -28,7 +28,7 @@ public class MessageFilterImpl implements MessageFilter { String property = msg.getProperty("SequenceId"); if (property != null) { int id = Integer.parseInt(property); - if (((id % 10) == 0) && // + if (((id % 10) == 0) && (id > 100)) { return true; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java index 74e5501..2948c10 100644 --- a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java +++ b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java @@ -56,7 +56,6 @@ public class SelectorParser implements SelectorParserConstants { // convertStringExpressions = true; // sql = sql.substring(CONVERT_STRING_EXPRESSIONS_PREFIX.length()); // } - // // if( convertStringExpressions ) { // ComparisonExpression.CONVERT_STRING_EXPRESSIONS.set(true); // } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj ---------------------------------------------------------------------- diff --git a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj index 5d1a4a7..b533ac1 100644 --- a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj +++ b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj @@ -82,7 +82,6 @@ public class SelectorParser { // convertStringExpressions = true; // sql = sql.substring(CONVERT_STRING_EXPRESSIONS_PREFIX.length()); // } -// // if( convertStringExpressions ) { // ComparisonExpression.CONVERT_STRING_EXPRESSIONS.set(true); // } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java index be13bd6..376a814 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java @@ -72,10 +72,10 @@ public class KVConfigManager { final String prev = kvTable.put(key, value); if (null != prev) { - log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", // + log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } else { - log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", // + log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } } finally { @@ -119,7 +119,7 @@ public class KVConfigManager { HashMap<String, String> kvTable = this.configTable.get(namespace); if (null != kvTable) { String value = kvTable.remove(key); - log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", // + log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", namespace, key, value); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 7479fcc..35790c9 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -131,9 +131,9 @@ public class RouteInfoManager { String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); - if (null != topicConfigWrapper // + if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { - if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())// + if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java index 4ed156d..6e99b32 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); - private static final int FRAME_MAX_LENGTH = // + private static final int FRAME_MAX_LENGTH = Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); public NettyDecoder() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index ba74b53..b66e7de 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -188,7 +188,7 @@ public abstract class NettyRemotingAbstract { log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { - final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, // + final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); @@ -210,9 +210,9 @@ public abstract class NettyRemotingAbstract { pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { - log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) // - + ", too many requests and system thread pool busy, RejectedExecutionException " // - + pair.getObject2().toString() // + log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + + ", too many requests and system thread pool busy, RejectedExecutionException " + + pair.getObject2().toString() + " request code: " + cmd.getCode()); } @@ -422,10 +422,10 @@ public abstract class NettyRemotingAbstract { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = - String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // - timeoutMillis, // - this.semaphoreAsync.getQueueLength(), // - this.semaphoreAsync.availablePermits()// + String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", + timeoutMillis, + this.semaphoreAsync.getQueueLength(), + this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); @@ -459,10 +459,10 @@ public abstract class NettyRemotingAbstract { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( - "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // - timeoutMillis, // - this.semaphoreOneway.getQueueLength(), // - this.semaphoreOneway.availablePermits()// + "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", + timeoutMillis, + this.semaphoreOneway.getQueueLength(), + this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index db6a7e4..ecf9ab2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -92,7 +92,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti this(nettyClientConfig, null); } - public NettyRemotingClient(final NettyClientConfig nettyClientConfig, // + public NettyRemotingClient(final NettyClientConfig nettyClientConfig, final ChannelEventListener channelEventListener) { super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue()); this.nettyClientConfig = nettyClientConfig; @@ -130,8 +130,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void start() { - this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// - nettyClientConfig.getClientWorkerThreads(), // + this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( + nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @@ -142,7 +142,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } }); - Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// + Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java index c4354e9..0570c84 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java @@ -16,11 +16,7 @@ */ package org.apache.rocketmq.remoting.netty; -/** - * - * - */ public class NettyServerConfig implements Cloneable { private int listenPort = 8888; private int serverWorkerThreads = 8; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java index 52556fc..2e0a81e 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java @@ -20,23 +20,23 @@ package org.apache.rocketmq.remoting.netty; public class NettySystemConfig { public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable"; - public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = // + public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = "com.rocketmq.remoting.socket.sndbuf.size"; - public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = // + public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = "com.rocketmq.remoting.socket.rcvbuf.size"; - public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = // + public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = "com.rocketmq.remoting.clientAsyncSemaphoreValue"; - public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = // + public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = "com.rocketmq.remoting.clientOnewaySemaphoreValue"; - public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = // + public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = Boolean .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); - public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = // + public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535")); - public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = // + public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535")); - public static int socketSndbufSize = // + public static int socketSndbufSize = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535")); - public static int socketRcvbufSize = // + public static int socketRcvbufSize = Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535")); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 0810d0c..a2cb629 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -125,11 +125,11 @@ public class CommitLog { return this.mappedFileQueue.remainHowManyDataToFlush(); } - public int deleteExpiredFile(// - final long expiredTime, // - final int deleteFilesInterval, // - final long intervalForcibly, // - final boolean cleanImmediately// + public int deleteExpiredFile( + final long expiredTime, + final int deleteFilesInterval, + final long intervalForcibly, + final boolean cleanImmediately ) { return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); } @@ -244,43 +244,30 @@ public class CommitLog { byte[] bytesContent = new byte[totalSize]; - // 3 BODYCRC int bodyCRC = byteBuffer.getInt(); - // 4 QUEUEID int queueId = byteBuffer.getInt(); - // 5 FLAG int flag = byteBuffer.getInt(); - // 6 QUEUEOFFSET long queueOffset = byteBuffer.getLong(); - // 7 PHYSICALOFFSET long physicOffset = byteBuffer.getLong(); - // 8 SYSFLAG int sysFlag = byteBuffer.getInt(); - // 9 BORNTIMESTAMP long bornTimeStamp = byteBuffer.getLong(); - // 10 ByteBuffer byteBuffer1 = byteBuffer.get(bytesContent, 0, 8); - // 11 STORETIMESTAMP long storeTimestamp = byteBuffer.getLong(); - // 12 ByteBuffer byteBuffer2 = byteBuffer.get(bytesContent, 0, 8); - // 13 RECONSUMETIMES int reconsumeTimes = byteBuffer.getInt(); - // 14 Prepared Transaction Offset long preparedTransactionOffset = byteBuffer.getLong(); - // 15 BODY int bodyLen = byteBuffer.getInt(); if (bodyLen > 0) { if (readBody) { @@ -298,7 +285,6 @@ public class CommitLog { } } - // 16 TOPIC byte topicLen = byteBuffer.get(); byteBuffer.get(bytesContent, 0, topicLen); String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8); @@ -307,7 +293,6 @@ public class CommitLog { String keys = ""; String uniqKey = null; - // 17 properties short propertiesLength = byteBuffer.getShort(); Map<String, String> propertiesMap = null; if (propertiesLength > 0) { @@ -355,19 +340,19 @@ public class CommitLog { return new DispatchRequest(totalSize, false/* success */); } - return new DispatchRequest(// - topic, // 1 - queueId, // 2 - physicOffset, // 3 - totalSize, // 4 - tagsCode, // 5 - storeTimestamp, // 6 - queueOffset, // 7 - keys, // 8 - uniqKey, //9 - sysFlag, // 10 - preparedTransactionOffset, // 11 - propertiesMap // 12 + return new DispatchRequest( + topic, + queueId, + physicOffset, + totalSize, + tagsCode, + storeTimestamp, + queueOffset, + keys, + uniqKey, + sysFlag, + preparedTransactionOffset, + propertiesMap ); } catch (Exception e) { } @@ -376,24 +361,23 @@ public class CommitLog { } private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { - final int msgLen = 4 // 1 TOTALSIZE - + 4 // 2 MAGICCODE - + 4 // 3 BODYCRC - + 4 // 4 QUEUEID - + 4 // 5 FLAG - + 8 // 6 QUEUEOFFSET - + 8 // 7 PHYSICALOFFSET - + 4 // 8 SYSFLAG - + 8 // 9 BORNTIMESTAMP - + 8 // 10 BORNHOST - + 8 // 11 STORETIMESTAMP - + 8 // 12 STOREHOSTADDRESS - + 4 // 13 RECONSUMETIMES - + 8 // 14 Prepared Transaction Offset - + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY - + 1 + topicLength // 15 TOPIC - + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16 - // propertiesLength + final int msgLen = 4 //TOTALSIZE + + 4 //MAGICCODE + + 4 //BODYCRC + + 4 //QUEUEID + + 4 //FLAG + + 8 //QUEUEOFFSET + + 8 //PHYSICALOFFSET + + 4 //SYSFLAG + + 8 //BORNTIMESTAMP + + 8 //BORNHOST + + 8 //STORETIMESTAMP + + 8 //STOREHOSTADDRESS + + 4 //RECONSUMETIMES + + 8 //Prepared Transaction Offset + + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY + + 1 + topicLength //TOPIC + + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength + 0; return msgLen; } @@ -500,18 +484,18 @@ public class CommitLog { return false; } - if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()// + if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) { if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) { - log.info("find check timestamp, {} {}", // - storeTimestamp, // + log.info("find check timestamp, {} {}", + storeTimestamp, UtilAll.timeMillisToHumanString(storeTimestamp)); return true; } } else { if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) { - log.info("find check timestamp, {} {}", // - storeTimestamp, // + log.info("find check timestamp, {} {}", + storeTimestamp, UtilAll.timeMillisToHumanString(storeTimestamp)); return true; } @@ -547,7 +531,7 @@ public class CommitLog { int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); - if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE// + if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { @@ -1270,8 +1254,6 @@ public class CommitLog { // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value - // - // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); @@ -1391,7 +1373,6 @@ public class CommitLog { // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value - // //ignore previous read messagesByteBuff.reset(); // Here the length of the specially set maxBlank http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 49a1eba..36c15d4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -218,9 +218,7 @@ public class DefaultMessageStore implements MessageStore { this.shutdown = false; } - /** - */ public void shutdown() { if (!this.shutdown) { this.shutdown = true; @@ -392,7 +390,7 @@ public class DefaultMessageStore implements MessageStore { long begin = this.getCommitLog().getBeginTimeInLock(); long diff = this.systemClock.now() - begin; - if (diff < 10000000 // + if (diff < 10000000 && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) { return true; } @@ -579,9 +577,7 @@ public class DefaultMessageStore implements MessageStore { return getResult; } - /** - */ public long getMaxOffsetInQueue(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { @@ -592,9 +588,7 @@ public class DefaultMessageStore implements MessageStore { return 0; } - /** - */ public long getMinOffsetInQueue(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { @@ -891,9 +885,9 @@ public class DefaultMessageStore implements MessageStore { ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { cq.destroy(); - log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", // - cq.getTopic(), // - cq.getQueueId() // + log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", + cq.getTopic(), + cq.getQueueId() ); this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); @@ -922,17 +916,17 @@ public class DefaultMessageStore implements MessageStore { long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset(); if (maxCLOffsetInConsumeQueue == -1) { - log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", // - nextQT.getValue().getTopic(), // - nextQT.getValue().getQueueId(), // - nextQT.getValue().getMaxPhysicOffset(), // + log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", + nextQT.getValue().getTopic(), + nextQT.getValue().getQueueId(), + nextQT.getValue().getMaxPhysicOffset(), nextQT.getValue().getMinLogicOffset()); } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) { log.info( - "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", // - topic, // - nextQT.getKey(), // - minCommitLogOffset, // + "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", + topic, + nextQT.getKey(), + minCommitLogOffset, maxCLOffsetInConsumeQueue); DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(), @@ -1072,11 +1066,11 @@ public class DefaultMessageStore implements MessageStore { ConsumeQueue logic = map.get(queueId); if (null == logic) { - ConsumeQueue newLogic = new ConsumeQueue(// - topic, // - queueId, // - StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // - this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), // + ConsumeQueue newLogic = new ConsumeQueue( + topic, + queueId, + StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), + this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { @@ -1462,11 +1456,11 @@ public class DefaultMessageStore implements MessageStore { boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; - log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", // - fileReservedTime, // - timeup, // - spacefull, // - manualDeleteFileSeveralTimes, // + log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", + fileReservedTime, + timeup, + spacefull, + manualDeleteFileSeveralTimes, cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; @@ -1725,7 +1719,7 @@ public class DefaultMessageStore implements MessageStore { private void doReput() { for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { - if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() // + if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } @@ -1751,7 +1745,7 @@ public class DefaultMessageStore implements MessageStore { dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } - // FIXED BUG By shijia + this.reputFromOffset += size; readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java index 3d33eaf..819bb94 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java +++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java @@ -66,23 +66,14 @@ public class DispatchRequest { } public DispatchRequest(int size) { - // 1 this.topic = ""; - // 2 this.queueId = 0; - // 3 this.commitLogOffset = 0; - // 4 this.msgSize = size; - // 5 this.tagsCode = 0; - // 6 this.storeTimestamp = 0; - // 7 this.consumeQueueOffset = 0; - // 8 this.keys = ""; - //9 this.uniqKey = null; this.sysFlag = 0; this.preparedTransactionOffset = 0; @@ -91,23 +82,14 @@ public class DispatchRequest { } public DispatchRequest(int size, boolean success) { - // 1 this.topic = ""; - // 2 this.queueId = 0; - // 3 this.commitLogOffset = 0; - // 4 this.msgSize = size; - // 5 this.tagsCode = 0; - // 6 this.storeTimestamp = 0; - // 7 this.consumeQueueOffset = 0; - // 8 this.keys = ""; - // 9 this.uniqKey = null; this.sysFlag = 0; this.preparedTransactionOffset = 0; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/MappedFile.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index 4250450..81cf0f7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -404,9 +404,7 @@ public class MappedFile extends ReferenceResource { return null; } - /** - */ public SelectMappedBufferResult selectMappedBuffer(int pos) { int readPosition = getReadPosition(); if (pos < readPosition && pos >= 0) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java index a8fa364..edf4c91 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java @@ -405,7 +405,6 @@ public class MappedFileQueue { break; } - // TODO: Externalize this hardcoded value if (destroy && mappedFile.destroy(1000 * 60)) { files.add(mappedFile); deleteCount++; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java index 3967b64..e0c51a1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java @@ -78,10 +78,7 @@ public class HAConnection { return socketChannel; } - /** - * - */ class ReadSocketService extends ServiceThread { private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024; private final Selector selector; @@ -194,10 +191,7 @@ public class HAConnection { } } - /** - * - */ class WriteSocketService extends ServiceThread { private final Selector selector; private final SocketChannel socketChannel; @@ -333,9 +327,7 @@ public class HAConnection { HAConnection.log.info(this.getServiceName() + " service end"); } - /** - */ private boolean transferData() throws Exception { int writeSizeZeroTimes = 0; // Write Header