http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java ---------------------------------------------------------------------- diff --git a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java index 22eeb86..bfbddf4 100644 --- a/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java +++ b/filter/src/test/java/org/apache/rocketmq/filter/FilterSpiTest.java @@ -48,7 +48,6 @@ public class FilterSpiTest { } } - @Test public void testRegister() { FilterFactory.INSTANCE.register(new NothingFilter());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java index ff8450e..e459b1a 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java @@ -83,7 +83,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return false; } - private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RegisterMessageFilterClassRequestHeader requestHeader = (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); @@ -108,7 +109,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception { + private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, + final RemotingCommand request) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = @@ -215,7 +217,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return null; } - private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response, + private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, + final RemotingCommand response, final List<MessageExt> msgList) { if (null != msgList) { ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()]; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java ---------------------------------------------------------------------- diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java index 53521d4..d2adac5 100644 --- a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java @@ -52,7 +52,6 @@ public class ProducerInstance { return nameServerAddress + "_" + group; } - public MQProducer getInstance(String nameServerAddress, String group) throws MQClientException { if (StringUtils.isBlank(group)) { group = DEFAULT_GROUP; @@ -75,7 +74,6 @@ public class ProducerInstance { return defaultMQProducer; } - public void removeAndClose(String nameServerAddress, String group) { if (group == null) { group = DEFAULT_GROUP; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java ---------------------------------------------------------------------- diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java index 3fd8d4c..646e924 100644 --- a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j/RocketmqLog4jAppender.java @@ -59,7 +59,6 @@ public class RocketmqLog4jAppender extends AppenderSkeleton { public RocketmqLog4jAppender() { } - public void activateOptions() { LogLog.debug("Getting initial context."); if (!checkEntryConditions()) { @@ -72,11 +71,8 @@ public class RocketmqLog4jAppender extends AppenderSkeleton { } } - /** * Info,error,warn,callback method implementation - * - * @param event */ public void append(LoggingEvent event) { if (null == producer) { @@ -95,7 +91,7 @@ public class RocketmqLog4jAppender extends AppenderSkeleton { } catch (Exception e) { String msg = new String(data); errorHandler.error("Could not send message in RocketmqLog4jAppender [" + name + "].Message is :" + msg, e, - ErrorCode.GENERIC_FAILURE); + ErrorCode.GENERIC_FAILURE); } } @@ -145,7 +141,6 @@ public class RocketmqLog4jAppender extends AppenderSkeleton { return topic; } - public void setTopic(String topic) { this.topic = topic; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java ---------------------------------------------------------------------- diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java index 5a6362e..9543f1c 100644 --- a/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/log4j2/RocketmqLog4j2Appender.java @@ -70,10 +70,9 @@ public class RocketmqLog4j2Appender extends AbstractAppender { */ private String topic; - protected RocketmqLog4j2Appender(String name, Filter filter, Layout<? extends Serializable> layout, - boolean ignoreExceptions, String nameServerAddress, String producerGroup, - String topic, String tag) { + boolean ignoreExceptions, String nameServerAddress, String producerGroup, + String topic, String tag) { super(name, filter, layout, ignoreExceptions); this.producer = producer; this.topic = topic; @@ -86,15 +85,13 @@ public class RocketmqLog4j2Appender extends AbstractAppender { ErrorHandler handler = this.getHandler(); if (handler != null) { handler.error("Starting RocketmqLog4j2Appender [" + this.getName() - + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); } } } /** * Info,error,warn,callback method implementation - * - * @param event */ public void append(LogEvent event) { if (null == producer) { @@ -119,10 +116,6 @@ public class RocketmqLog4j2Appender extends AbstractAppender { /** * When system exit,this method will be called to close resources - * - * @param timeout - * @param timeUnit - * @return */ public boolean stop(long timeout, TimeUnit timeUnit) { this.setStopping(); @@ -132,7 +125,7 @@ public class RocketmqLog4j2Appender extends AbstractAppender { ErrorHandler handler = this.getHandler(); if (handler != null) { handler.error("Closeing RocketmqLog4j2Appender [" + this.getName() - + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); } } @@ -227,7 +220,7 @@ public class RocketmqLog4j2Appender extends AbstractAppender { public RocketmqLog4j2Appender build() { return new RocketmqLog4j2Appender(name, filter, layout, ignoreExceptions, - nameServerAddress, producerGroup, topic, tag); + nameServerAddress, producerGroup, topic, tag); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java ---------------------------------------------------------------------- diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java b/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java index 50ba564..4018cd4 100644 --- a/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/logback/RocketmqLogbackAppender.java @@ -62,8 +62,6 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> { /** * Info,error,warn,callback method implementation - * - * @param event */ @Override protected void append(ILoggingEvent event) { @@ -100,7 +98,7 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> { producer = ProducerInstance.getProducerInstance().getInstance(nameServerAddress, producerGroup); } catch (Exception e) { addError("Starting RocketmqLogbackAppender [" + this.getName() - + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); } if (producer != null) { super.start(); @@ -122,7 +120,7 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> { ProducerInstance.getProducerInstance().removeAndClose(this.nameServerAddress, this.producerGroup); } catch (Exception e) { addError("Closeing RocketmqLogbackAppender [" + this.getName() - + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); + + "] nameServerAddress:" + nameServerAddress + " group:" + producerGroup + " " + e.getMessage()); } // Help garbage collection @@ -144,7 +142,6 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> { } } - public Layout getLayout() { return this.layout; } @@ -160,7 +157,6 @@ public class RocketmqLogbackAppender extends AppenderBase<ILoggingEvent> { return tag; } - public void setTag(String tag) { this.tag = tag; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java ---------------------------------------------------------------------- diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java index 9faebb9..38904c0 100644 --- a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java @@ -30,7 +30,6 @@ import java.lang.reflect.Field; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; - /** * Basic test rocketmq broker and name server init */ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java ---------------------------------------------------------------------- diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java index 6306ec5..c139283 100644 --- a/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/Log4jTest.java @@ -22,8 +22,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; - -public abstract class Log4jTest extends AbstractTestCase{ +public abstract class Log4jTest extends AbstractTestCase { @Before public abstract void init(); @@ -37,8 +36,8 @@ public abstract class Log4jTest extends AbstractTestCase{ for (int i = 0; i < 10; i++) { logger.info("log4j " + this.getType() + " simple test message " + i); } - int received = consumeMessages(10, "log4j",10); - Assert.assertTrue(received>5); + int received = consumeMessages(10, "log4j", 10); + Assert.assertTrue(received > 5); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java ---------------------------------------------------------------------- diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java b/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java index 4f9d3e5..d7ec184 100644 --- a/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/LogbackTest.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import java.io.File; -public class LogbackTest extends AbstractTestCase{ +public class LogbackTest extends AbstractTestCase { @Before public void init() throws JoranException { @@ -41,7 +41,6 @@ public class LogbackTest extends AbstractTestCase{ StatusPrinter.printInCaseOfErrorsOrWarnings(lc); } - @Test public void testLogback() throws InterruptedException, MQClientException { clear(); @@ -49,7 +48,7 @@ public class LogbackTest extends AbstractTestCase{ for (int i = 0; i < 10; i++) { logger.info("logback test message " + i); } - int received = consumeMessages(10, "logback",10); - Assert.assertTrue(received>=5); + int received = consumeMessages(10, "logback", 10); + Assert.assertTrue(received >= 5); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java ---------------------------------------------------------------------- diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java b/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java index 4089644..6f6af60 100644 --- a/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/log4j2Test.java @@ -24,14 +24,13 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class log4j2Test extends AbstractTestCase{ +public class log4j2Test extends AbstractTestCase { @Before public void init() { Configurator.initialize("log4j2", "src/test/resources/log4j2-example.xml"); } - @Test public void testLog4j2() throws InterruptedException, MQClientException { clear(); @@ -39,7 +38,7 @@ public class log4j2Test extends AbstractTestCase{ for (int i = 0; i < 10; i++) { logger.info("log4j2 log message " + i); } - int received = consumeMessages(10, "log4j2",10); - Assert.assertTrue(received>5); + int received = consumeMessages(10, "log4j2", 10); + Assert.assertTrue(received > 5); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/resources/log4j-example.properties ---------------------------------------------------------------------- diff --git a/logappender/src/test/resources/log4j-example.properties b/logappender/src/test/resources/log4j-example.properties index 7fdebbb..63b2a98 100644 --- a/logappender/src/test/resources/log4j-example.properties +++ b/logappender/src/test/resources/log4j-example.properties @@ -12,23 +12,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - log4j.rootLogger=INFO,stdout - log4j.logger.testLogger=INFO,mq - log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %-4r [%t] (%F:%L) %-5p - %m%n - log4j.appender.store=org.apache.log4j.DailyRollingFileAppender log4j.appender.store.File=${user.home}/logs/rocketmqlogs/appender.log log4j.appender.store.Append=true -log4j.appender.store.DatePattern ='_'yyyy-MM-dd'.log' +log4j.appender.store.DatePattern='_'yyyy-MM-dd'.log' log4j.appender.store.layout=org.apache.log4j.PatternLayout log4j.appender.store.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n - log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender log4j.appender.mq.Tag=log log4j.appender.mq.Topic=TopicTest http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/resources/log4j-example.xml ---------------------------------------------------------------------- diff --git a/logappender/src/test/resources/log4j-example.xml b/logappender/src/test/resources/log4j-example.xml index b0dc776..6bddde9 100644 --- a/logappender/src/test/resources/log4j-example.xml +++ b/logappender/src/test/resources/log4j-example.xml @@ -19,37 +19,37 @@ limitations under the License. <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> <appender name="consoleAppender" class="org.apache.log4j.ConsoleAppender"> - <param name="Encoding" value="UTF-8" /> - <param name="Target" value="System.out" /> + <param name="Encoding" value="UTF-8"/> + <param name="Target" value="System.out"/> <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss},%d %-4r [%t] (%F:%L) %-5p - %m%n" /> + <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss},%d %-4r [%t] (%F:%L) %-5p - %m%n"/> </layout> </appender> <appender name="mqAppender1" class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender"> - <param name="Tag" value="log1" /> - <param name="Topic" value="TopicTest" /> - <param name="ProducerGroup" value="loggerAppender" /> + <param name="Tag" value="log1"/> + <param name="Topic" value="TopicTest"/> + <param name="ProducerGroup" value="loggerAppender"/> <param name="NameServerAddress" value="127.0.0.1:9876"/> <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" /> + <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n"/> </layout> </appender> <logger name="testLogger" additivity="false"> - <level value="INFO" /> - <appender-ref ref="mqAppender1" /> - <appender-ref ref="consoleAppender" /> + <level value="INFO"/> + <appender-ref ref="mqAppender1"/> + <appender-ref ref="consoleAppender"/> </logger> <logger name="consoleLogger" additivity="false"> - <level value="INFO" /> - <appender-ref ref="consoleAppender" /> + <level value="INFO"/> + <appender-ref ref="consoleAppender"/> </logger> <root> - <level value="INFO" /> + <level value="INFO"/> <appender-ref ref="consoleAppender"/> </root> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/logappender/src/test/resources/log4j2-example.xml ---------------------------------------------------------------------- diff --git a/logappender/src/test/resources/log4j2-example.xml b/logappender/src/test/resources/log4j2-example.xml index 3ee8a01..c310855 100644 --- a/logappender/src/test/resources/log4j2-example.xml +++ b/logappender/src/test/resources/log4j2-example.xml @@ -17,25 +17,25 @@ --> <Configuration status="warn" name="Rocketmq"> -<Appenders> - <RocketMQ name="rocketmqAppender" producerGroup="loggerAppender" nameServerAddress="127.0.0.1:9876" - topic="TopicTest" tag="log"> - <PatternLayout pattern="%d [%p] hahahah %c %m%n"/> - </RocketMQ> + <Appenders> + <RocketMQ name="rocketmqAppender" producerGroup="loggerAppender" nameServerAddress="127.0.0.1:9876" + topic="TopicTest" tag="log"> + <PatternLayout pattern="%d [%p] hahahah %c %m%n"/> + </RocketMQ> - <Console name="Console" target="SYSTEM_OUT"> - <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> - </Console> -</Appenders> -<Loggers> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> + </Console> + </Appenders> + <Loggers> - <Logger name="rocketmqLogger" level="info"> - <AppenderRef ref="rocketmqAppender"/> - </Logger> + <Logger name="rocketmqLogger" level="info"> + <AppenderRef ref="rocketmqAppender"/> + </Logger> - <Root level="debug"> - <AppenderRef ref="Console"/> - <AppenderRef ref="rocketmqAppender"/> - </Root> -</Loggers> + <Root level="debug"> + <AppenderRef ref="Console"/> + <AppenderRef ref="rocketmqAppender"/> + </Root> + </Loggers> </Configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java index 04cf870..f6611b6 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java @@ -50,7 +50,8 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor { } @Override - public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 9647684..ed5b20b 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -63,7 +63,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { } @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand processRequest(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { if (log.isDebugEnabled()) { log.debug("receive request, {} {} {}", request.getCode(), @@ -124,7 +125,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return false; } - public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand putKVConfig(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final PutKVConfigRequestHeader requestHeader = (PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class); @@ -140,7 +142,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand getKVConfig(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class); final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader(); final GetKVConfigRequestHeader requestHeader = @@ -163,7 +166,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final DeleteKVConfigRequestHeader requestHeader = (DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class); @@ -215,7 +219,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand registerBroker(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = @@ -251,7 +256,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final UnRegisterBrokerRequestHeader requestHeader = (UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class); @@ -267,7 +273,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); @@ -306,7 +313,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(WipeWritePermOfBrokerResponseHeader.class); final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader) response.readCustomHeader(); final WipeWritePermOfBrokerRequestHeader requestHeader = @@ -336,7 +344,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final DeleteTopicInNamesrvRequestHeader requestHeader = (DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class); @@ -348,7 +357,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetKVListByNamespaceRequestHeader requestHeader = (GetKVListByNamespaceRequestHeader) request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class); @@ -367,7 +377,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetTopicsByClusterRequestHeader requestHeader = (GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class); @@ -380,7 +391,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); byte[] body = this.namesrvController.getRouteInfoManager().getSystemTopicList(); @@ -391,7 +403,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); byte[] body = this.namesrvController.getRouteInfoManager().getUnitTopics(); @@ -402,7 +415,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { return response; } - private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index 35790c9..d78ec95 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -276,8 +276,8 @@ public class RouteInfoManager { this.lock.writeLock().lockInterruptibly(); BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr); log.info("unregisterBroker, remove from brokerLiveTable {}, {}", - brokerLiveInfo != null ? "OK" : "Failed", - brokerAddr + brokerLiveInfo != null ? "OK" : "Failed", + brokerAddr ); this.filterServerTable.remove(brokerAddr); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java ---------------------------------------------------------------------- diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java index 5a98cd7..a0e8137 100644 --- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java +++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessorTest.java @@ -99,7 +99,8 @@ public class ClusterTestRequestProcessorTest { @Test public void testGetRouteInfoByTopic() throws RemotingCommandException { RemotingCommand request = RemotingCommand.createRequestCommand(12, new CommandCustomHeader() { - @Override public void checkFields() throws RemotingCommandException { + @Override + public void checkFields() throws RemotingCommandException { } }); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java index c863ccf..3e4bd26 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java @@ -77,7 +77,7 @@ public class DefaultPromise<V> implements Promise<V> { } else { long waitTime = timeout - (System.currentTimeMillis() - createTime); if (waitTime > 0) { - for (;; ) { + for (; ; ) { try { lock.wait(waitTime); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java ---------------------------------------------------------------------- diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java index ae4d3ed..851c283 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java @@ -75,7 +75,7 @@ public class LocalMessageCacheTest { @Test public void testSubmitConsumeRequest() throws Exception { - byte [] body = new byte[]{'1', '2', '3'}; + byte[] body = new byte[] {'1', '2', '3'}; MessageExt consumedMsg = new MessageExt(); consumedMsg.setMsgId("NewMsgId"); consumedMsg.setBody(body); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java index 8436189..323c089 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java @@ -63,7 +63,7 @@ public abstract class ServiceThread implements Runnable { this.thread.join(this.getJointime()); long eclipseTime = System.currentTimeMillis() - beginTime; log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " - + this.getJointime()); + + this.getJointime()); } catch (InterruptedException e) { log.error("Interrupted", e); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index b66e7de..6143462 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -90,6 +90,7 @@ public abstract class NettyRemotingAbstract { /** * Constructor, specifying capacity of one-way and asynchronous semaphores. + * * @param permitsOneway Number of permits for one-way requests. * @param permitsAsync Number of permits for asynchronous requests. */ @@ -100,12 +101,14 @@ public abstract class NettyRemotingAbstract { /** * Custom channel event listener. + * * @return custom channel event listener if defined; null otherwise. */ public abstract ChannelEventListener getChannelEventListener(); /** * Put a netty event to the executor. + * * @param event Netty event instance. */ public void putNettyEvent(final NettyEvent event) { @@ -116,13 +119,14 @@ public abstract class NettyRemotingAbstract { * Entry of incoming command processing. * * <p> - * <strong>Note:</strong> - * The incoming remoting command may be - * <ul> - * <li>An inquiry request from a remote peer component;</li> - * <li>A response to a previous request issued by this very participant.</li> - * </ul> + * <strong>Note:</strong> + * The incoming remoting command may be + * <ul> + * <li>An inquiry request from a remote peer component;</li> + * <li>A response to a previous request issued by this very participant.</li> + * </ul> * </p> + * * @param ctx Channel handler context. * @param msg incoming remoting command. * @throws Exception if there were any error while processing the incoming command. @@ -145,6 +149,7 @@ public abstract class NettyRemotingAbstract { /** * Process incoming request command issued by remote peer. + * * @param ctx channel handler context. * @param cmd request command. */ @@ -235,6 +240,7 @@ public abstract class NettyRemotingAbstract { /** * Process response from remote peer to the previous issued requests. + * * @param ctx channel handler context. * @param cmd response command instance. */ @@ -261,7 +267,6 @@ public abstract class NettyRemotingAbstract { /** * Execute callback in callback executor. If callback executor is null, run directly in current thread - * @param responseFuture */ private void executeInvokeCallback(final ResponseFuture responseFuture) { boolean runInThisThread = false; @@ -297,12 +302,14 @@ public abstract class NettyRemotingAbstract { /** * Custom RPC hook. + * * @return RPC hook if specified; null otherwise. */ public abstract RPCHook getRPCHook(); /** * This method specifies thread pool to use while invoking callback methods. + * * @return Dedicated thread pool instance if specified; or null if the callback is supposed to be executed in the * netty event-loop thread. */ @@ -310,7 +317,7 @@ public abstract class NettyRemotingAbstract { /** * <p> - * This method is periodically invoked to scan and expire deprecated request. + * This method is periodically invoked to scan and expire deprecated request. * </p> */ public void scanResponseTable() { @@ -337,7 +344,8 @@ public abstract class NettyRemotingAbstract { } } - public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) + public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, + final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index ecf9ab2..34f560f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -322,7 +322,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti if (update) { Collections.shuffle(addrs); - log.info("name server address updated. NEW : {} , OLD: {}",addrs,old); + log.info("name server address updated. NEW : {} , OLD: {}", addrs, old); this.namesrvAddrList.set(addrs); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index b2041b2..7cf82c9 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -78,7 +78,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti this(nettyServerConfig, null); } - public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { + public NettyRemotingServer(final NettyServerConfig nettyServerConfig, + final ChannelEventListener channelEventListener) { super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java index 0570c84..a5e2a23 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.remoting.netty; - public class NettyServerConfig implements Cloneable { private int listenPort = 8888; private int serverWorkerThreads = 8; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java ---------------------------------------------------------------------- diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java index 0f5da6e..e11915b 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java @@ -29,7 +29,7 @@ public class RemotingCommandTest { int source = 261; SerializeType type = SerializeType.JSON; byte[] result = RemotingCommand.markProtocolType(source, type); - assertThat(result).isEqualTo(new byte[]{0, 0, 1, 5}); + assertThat(result).isEqualTo(new byte[] {0, 0, 1, 5}); } @Test @@ -37,7 +37,7 @@ public class RemotingCommandTest { int source = 16777215; SerializeType type = SerializeType.ROCKETMQ; byte[] result = RemotingCommand.markProtocolType(source, type); - assertThat(result).isEqualTo(new byte[]{1, -1, -1, -1}); + assertThat(result).isEqualTo(new byte[] {1, -1, -1, -1}); } @Test @@ -58,7 +58,7 @@ public class RemotingCommandTest { int code = RemotingSysResponseCode.SUCCESS; String remark = "Sample remark"; - RemotingCommand cmd = RemotingCommand.createResponseCommand(code ,remark, SampleCommandCustomHeader.class); + RemotingCommand cmd = RemotingCommand.createResponseCommand(code, remark, SampleCommandCustomHeader.class); assertThat(cmd.getCode()).isEqualTo(code); assertThat(cmd.getVersion()).isEqualTo(2333); assertThat(cmd.getRemark()).isEqualTo(remark); @@ -71,7 +71,7 @@ public class RemotingCommandTest { int code = RemotingSysResponseCode.SUCCESS; String remark = "Sample remark"; - RemotingCommand cmd = RemotingCommand.createResponseCommand(code ,remark); + RemotingCommand cmd = RemotingCommand.createResponseCommand(code, remark); assertThat(cmd.getCode()).isEqualTo(code); assertThat(cmd.getVersion()).isEqualTo(2333); assertThat(cmd.getRemark()).isEqualTo(remark); @@ -84,7 +84,7 @@ public class RemotingCommandTest { int code = RemotingSysResponseCode.SUCCESS; String remark = "Sample remark"; - RemotingCommand cmd = RemotingCommand.createResponseCommand(code ,remark, CommandCustomHeader.class); + RemotingCommand cmd = RemotingCommand.createResponseCommand(code, remark, CommandCustomHeader.class); assertThat(cmd).isNull(); } @@ -128,7 +128,7 @@ public class RemotingCommandTest { int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER CommandCustomHeader header = new SampleCommandCustomHeader(); RemotingCommand cmd = RemotingCommand.createRequestCommand(code, header); - cmd.setBody(new byte[] { 0, 1, 2, 3, 4}); + cmd.setBody(new byte[] {0, 1, 2, 3, 4}); ByteBuffer buffer = cmd.encode(); @@ -141,7 +141,7 @@ public class RemotingCommandTest { RemotingCommand decodedCommand = RemotingCommand.decode(buffer); assertThat(decodedCommand.getSerializeTypeCurrentRPC()).isEqualTo(SerializeType.JSON); - assertThat(decodedCommand.getBody()).isEqualTo(new byte[]{ 0, 1, 2, 3, 4}); + assertThat(decodedCommand.getBody()).isEqualTo(new byte[] {0, 1, 2, 3, 4}); } @Test http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java ---------------------------------------------------------------------- diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java index 38548cd..3e8b7a9 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java @@ -86,7 +86,7 @@ class Sample { private String stringValue = "string"; private int intValue = 2333; private Integer integerValue = 666; - private double[] doubleArray = new double[]{0.618, 1.618}; + private double[] doubleArray = new double[] {0.618, 1.618}; private List<String> stringList = Arrays.asList("a", "o", "e", "i", "u", "v"); public String getStringValue() { @@ -136,7 +136,7 @@ class Sample { if (o == null || getClass() != o.getClass()) return false; - Sample sample = (Sample)o; + Sample sample = (Sample) o; if (intValue != sample.intValue) return false; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java index 16a62fa..d337638 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java @@ -27,23 +27,17 @@ public interface AppendMessageCallback { /** * After message serialization, write MapedByteBuffer * - * @param byteBuffer - * @param maxBlank - * @param msg * @return How many bytes to write */ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, - final int maxBlank, final MessageExtBrokerInner msg); + final int maxBlank, final MessageExtBrokerInner msg); /** * After batched message serialization, write MapedByteBuffer * - * @param byteBuffer - * @param maxBlank * @param messageExtBatch, backed up by a byte array - * * @return How many bytes to write */ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, - final int maxBlank, final MessageExtBatch messageExtBatch); + final int maxBlank, final MessageExtBatch messageExtBatch); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index a2cb629..edd68a5 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -62,6 +62,7 @@ public class CommitLog { private volatile long beginTimeInLock = 0; private final PutMessageLock putMessageLock; + public CommitLog(final DefaultMessageStore defaultMessageStore) { this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(), defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService()); @@ -77,11 +78,12 @@ public class CommitLog { this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() { - @Override protected MessageExtBatchEncoder initialValue() { + @Override + protected MessageExtBatchEncoder initialValue() { return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } }; - this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); + this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); } @@ -661,7 +663,7 @@ public class CommitLog { if (messageExt.isWaitStoreMsgOK()) { // Determine whether to wait if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { - GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); + GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); boolean flushOK = @@ -758,7 +760,6 @@ public class CommitLog { putMessageLock.unlock(); } - if (eclipseTimeInLock > 500) { log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, messageExtBatch.getBody().length, result); } @@ -773,7 +774,6 @@ public class CommitLog { storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); - handleDiskFlush(result, putMessageResult, messageExtBatch); handleHA(result, putMessageResult, messageExtBatch); @@ -885,8 +885,6 @@ public class CommitLog { return diff; } - - abstract class FlushCommitLogService extends ServiceThread { protected static final int RETRY_TIMES_OVER = 10; } @@ -1030,23 +1028,19 @@ public class CommitLog { private final CountDownLatch countDownLatch = new CountDownLatch(1); private volatile boolean flushOK = false; - public GroupCommitRequest(long nextOffset) { this.nextOffset = nextOffset; } - public long getNextOffset() { return nextOffset; } - public void wakeupCustomer(final boolean flushOK) { this.flushOK = flushOK; this.countDownLatch.countDown(); } - public boolean waitForFlush(long timeout) { try { this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); @@ -1430,7 +1424,6 @@ public class CommitLog { this.maxMessageSize = size; } - public ByteBuffer encode(final MessageExtBatch messageExtBatch) { msgBatchMemory.clear(); //not thread-safe int totalMsgLen = 0; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 275334c..379162d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -350,7 +350,7 @@ public class ConsumeQueue { if (offsetPy >= phyMinOffset) { this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i; log.info("Compute logical min offset: {}, topic: {}, queueId: {}", - this.getMinOffsetInQueue(), this.topic, this.queueId); + this.getMinOffsetInQueue(), this.topic, this.queueId); // This maybe not take effect, when not every consume queue has extend file. if (isExtAddr(tagsCode)) { minExtAddr = tagsCode; @@ -567,9 +567,6 @@ public class ConsumeQueue { /** * Check {@code tagsCode} is address of extend file or tags code. - * - * @param tagsCode - * @return */ public boolean isExtAddr(long tagsCode) { return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java index 1a177e9..a118cde 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java @@ -58,17 +58,17 @@ public class ConsumeQueueExt { /** * Constructor. * - * @param topic topic - * @param queueId id of queue - * @param storePath root dir of files to store. + * @param topic topic + * @param queueId id of queue + * @param storePath root dir of files to store. * @param mappedFileSize file size - * @param bitMapLength bit map length. + * @param bitMapLength bit map length. */ public ConsumeQueueExt(final String topic, - final int queueId, - final String storePath, - final int mappedFileSize, - final int bitMapLength) { + final int queueId, + final String storePath, + final int mappedFileSize, + final int bitMapLength) { this.storePath = storePath; this.mappedFileSize = mappedFileSize; @@ -94,9 +94,6 @@ public class ConsumeQueueExt { * <p> * Just test {@code address} is less than 0. * </p> - * - * @param address - * @return */ public boolean isExtAddr(final long address) { return address <= MAX_ADDR; @@ -108,9 +105,6 @@ public class ConsumeQueueExt { * if {@code address} is less than 0, return {@code address} - {@link java.lang.Long#MIN_VALUE}; * else, just return {@code address} * </p> - * - * @param address - * @return */ public long unDecorate(final long address) { if (isExtAddr(address)) { @@ -126,7 +120,6 @@ public class ConsumeQueueExt { * else, just return {@code offset} * </p> * - * @param offset * @return ext address(value is less than 0) */ public long decorate(final long offset) { @@ -140,7 +133,6 @@ public class ConsumeQueueExt { * Get data from buffer. * * @param address less than 0 - * @return */ public CqExtUnit get(final long address) { CqExtUnit cqExtUnit = new CqExtUnit(); @@ -154,9 +146,7 @@ public class ConsumeQueueExt { /** * Get data from buffer, and set to {@code cqExtUnit} * - * @param address less than 0 - * @param cqExtUnit - * @return + * @param address less than 0 */ public boolean get(final long address, final CqExtUnit cqExtUnit) { if (!isExtAddr(address)) { @@ -194,7 +184,6 @@ public class ConsumeQueueExt { * Be careful, this method is not thread safe. * </p> * - * @param cqExtUnit * @return success: < 0: fail: >=0 */ public long put(final CqExtUnit cqExtUnit) { @@ -259,8 +248,6 @@ public class ConsumeQueueExt { /** * Load data from file when startup. - * - * @return */ public boolean load() { boolean result = this.mappedFileQueue.load(); @@ -379,9 +366,6 @@ public class ConsumeQueueExt { /** * flush buffer to file. - * - * @param flushLeastPages - * @return */ public boolean flush(final int flushLeastPages) { return this.mappedFileQueue.flush(flushLeastPages); @@ -400,8 +384,6 @@ public class ConsumeQueueExt { * <p> * Be careful: it's an address just when invoking this method. * </p> - * - * @return */ public long getMaxAddress() { MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); @@ -413,8 +395,6 @@ public class ConsumeQueueExt { /** * Minus address saved in file. - * - * @return */ public long getMinAddress() { MappedFile firstFile = this.mappedFileQueue.getFirstMappedFile(); @@ -435,7 +415,8 @@ public class ConsumeQueueExt { public static final int MAX_EXT_UNIT_SIZE = Short.MAX_VALUE; - public CqExtUnit() {} + public CqExtUnit() { + } public CqExtUnit(Long tagsCode, long msgStoreTime, byte[] filterBitMap) { this.tagsCode = tagsCode == null ? 0 : tagsCode; @@ -468,9 +449,6 @@ public class ConsumeQueueExt { /** * build unit from buffer from current position. - * - * @param buffer - * @return */ private boolean read(final ByteBuffer buffer) { if (buffer.position() + 2 > buffer.limit()) { @@ -507,8 +485,6 @@ public class ConsumeQueueExt { * <p> * if size <= 0, nothing to do. * </p> - * - * @param buffer */ private void readBySkip(final ByteBuffer buffer) { ByteBuffer temp = buffer.slice(); @@ -527,9 +503,6 @@ public class ConsumeQueueExt { * <li>1. @{code container} can be null, it will be created if null.</li> * <li>2. if capacity of @{code container} is less than unit size, it will be created also.</li> * <li>3. Pls be sure that size of unit is not greater than {@link #MAX_EXT_UNIT_SIZE}</li> - * - * @param container - * @return */ private byte[] write(final ByteBuffer container) { this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length); @@ -557,8 +530,6 @@ public class ConsumeQueueExt { /** * Calculate unit size by current data. - * - * @return */ private int calcUnitSize() { int sizeTemp = MIN_EXT_UNIT_SIZE + (filterBitMap == null ? 0 : filterBitMap.length); @@ -600,16 +571,23 @@ public class ConsumeQueueExt { @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof CqExtUnit)) return false; + if (this == o) + return true; + if (!(o instanceof CqExtUnit)) + return false; CqExtUnit cqExtUnit = (CqExtUnit) o; - if (bitMapSize != cqExtUnit.bitMapSize) return false; - if (msgStoreTime != cqExtUnit.msgStoreTime) return false; - if (size != cqExtUnit.size) return false; - if (tagsCode != cqExtUnit.tagsCode) return false; - if (!Arrays.equals(filterBitMap, cqExtUnit.filterBitMap)) return false; + if (bitMapSize != cqExtUnit.bitMapSize) + return false; + if (msgStoreTime != cqExtUnit.msgStoreTime) + return false; + if (size != cqExtUnit.size) + return false; + if (tagsCode != cqExtUnit.tagsCode) + return false; + if (!Arrays.equals(filterBitMap, cqExtUnit.filterBitMap)) + return false; return true; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 36c15d4..95a017a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -218,7 +218,6 @@ public class DefaultMessageStore implements MessageStore { this.shutdown = false; } - public void shutdown() { if (!this.shutdown) { this.shutdown = true; @@ -577,7 +576,6 @@ public class DefaultMessageStore implements MessageStore { return getResult; } - public long getMaxOffsetInQueue(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { @@ -588,7 +586,6 @@ public class DefaultMessageStore implements MessageStore { return 0; } - public long getMinOffsetInQueue(String topic, int queueId) { ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/MappedFile.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index 81cf0f7..492ac5f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -73,7 +73,8 @@ public class MappedFile extends ReferenceResource { init(fileName, fileSize); } - public MappedFile(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { + public MappedFile(final String fileName, final int fileSize, + final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize, transientStorePool); } @@ -142,7 +143,8 @@ public class MappedFile extends ReferenceResource { return TOTAL_MAPPED_VIRTUAL_MEMORY.get(); } - public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool) throws IOException { + public void init(final String fileName, final int fileSize, + final TransientStorePool transientStorePool) throws IOException { init(fileName, fileSize); this.writeBuffer = transientStorePool.borrowBuffer(); this.transientStorePool = transientStorePool; @@ -209,7 +211,7 @@ public class MappedFile extends ReferenceResource { if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { - result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch)messageExt); + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } @@ -217,16 +219,14 @@ public class MappedFile extends ReferenceResource { this.storeTimestamp = result.getStoreTimestamp(); return result; } - log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); + log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } - public long getFileFromOffset() { return this.fileFromOffset; } - public boolean appendMessage(final byte[] data) { int currentPos = this.wrotePosition.get(); @@ -247,10 +247,8 @@ public class MappedFile extends ReferenceResource { /** * Content of data from offset to offset + length will be wrote to file. * - * @param data * @param offset The offset of the subarray to be used. * @param length The length of the subarray to be used. - * @return */ public boolean appendMessage(final byte[] data, final int offset, final int length) { int currentPos = this.wrotePosition.get(); @@ -270,7 +268,6 @@ public class MappedFile extends ReferenceResource { } /** - * @param flushLeastPages * @return The current flushed position */ public int flush(final int flushLeastPages) { @@ -404,7 +401,6 @@ public class MappedFile extends ReferenceResource { return null; } - public SelectMappedBufferResult selectMappedBuffer(int pos) { int readPosition = getReadPosition(); if (pos < readPosition && pos >= 0) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java index dee1bc7..bae7a16 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java @@ -21,5 +21,5 @@ import java.util.Map; public interface MessageArrivingListener { void arriving(String topic, int queueId, long logicOffset, long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map<String, String> properties); + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java index 6b34758..3dd0fee 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java @@ -24,22 +24,20 @@ public interface MessageFilter { * match by tags code or filter bit map which is calculated when message received * and stored in consume queue ext. * - * @param tagsCode tagsCode - * @param cqExtUnit extend unit of consume queue - * @return + * @param tagsCode tagsCode + * @param cqExtUnit extend unit of consume queue */ boolean isMatchedByConsumeQueue(final Long tagsCode, - final ConsumeQueueExt.CqExtUnit cqExtUnit); + final ConsumeQueueExt.CqExtUnit cqExtUnit); /** * match by message content which are stored in commit log. * <br>{@code msgBuffer} and {@code properties} are not all null.If invoked in store, * {@code properties} is null;If invoked in {@code PullRequestHoldService}, {@code msgBuffer} is null. * - * @param msgBuffer message buffer in commit log, may be null if not invoked in store. - * @param properties message properties, should decode from buffer if null by yourself. - * @return + * @param msgBuffer message buffer in commit log, may be null if not invoked in store. + * @param properties message properties, should decode from buffer if null by yourself. */ boolean isMatchedByCommitLog(final ByteBuffer msgBuffer, - final Map<String, String> properties); + final Map<String, String> properties); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/MessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java index 55572ce..907dfe2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java @@ -29,12 +29,14 @@ public interface MessageStore { /** * Load previously stored messages. + * * @return true if success; false otherwise. */ boolean load(); /** * Launch this message store. + * * @throws Exception if there is any error. */ void start() throws Exception; @@ -51,6 +53,7 @@ public interface MessageStore { /** * Store a message into store. + * * @param msg Message instance to store * @return result of store operation. */ @@ -58,6 +61,7 @@ public interface MessageStore { /** * Store a batch of messages. + * * @param messageExtBatch Message batch. * @return result of storing batch messages. */ @@ -80,6 +84,7 @@ public interface MessageStore { /** * Get maximum offset of the topic queue. + * * @param topic Topic name. * @param queueId Queue ID. * @return Maximum offset at present. @@ -88,6 +93,7 @@ public interface MessageStore { /** * Get the minimum offset of the topic queue. + * * @param topic Topic name. * @param queueId Queue ID. * @return Minimum offset at present. @@ -96,6 +102,7 @@ public interface MessageStore { /** * Get the offset of the message in the commit log, which is also known as physical offset. + * * @param topic Topic of the message to lookup. * @param queueId Queue ID. * @param consumeQueueOffset offset of consume queue. @@ -105,6 +112,7 @@ public interface MessageStore { /** * Look up the physical offset of the message whose store timestamp is as specified. + * * @param topic Topic of the message. * @param queueId Queue ID. * @param timestamp Timestamp to look up. @@ -114,6 +122,7 @@ public interface MessageStore { /** * Look up the message by given commit log offset. + * * @param commitLogOffset physical offset. * @return Message whose physical offset is as specified. */ @@ -121,6 +130,7 @@ public interface MessageStore { /** * Get one message from the specified commit log offset. + * * @param commitLogOffset commit log offset. * @return wrapped result of the message. */ @@ -128,6 +138,7 @@ public interface MessageStore { /** * Get one message from the specified commit log offset. + * * @param commitLogOffset commit log offset. * @param msgSize message size. * @return wrapped result of the message. @@ -136,30 +147,35 @@ public interface MessageStore { /** * Get the running information of this store. + * * @return message store running info. */ String getRunningDataInfo(); /** * Message store runtime information, which should generally contains various statistical information. + * * @return runtime information of the message store in format of key-value pairs. */ HashMap<String, String> getRuntimeInfo(); /** * Get the maximum commit log offset. + * * @return maximum commit log offset. */ long getMaxPhyOffset(); /** * Get the minimum commit log offset. + * * @return minimum commit log offset. */ long getMinPhyOffset(); /** * Get the store time of the earliest message in the given queue. + * * @param topic Topic of the messages to query. * @param queueId Queue ID to find. * @return store time of the earliest message. @@ -168,12 +184,14 @@ public interface MessageStore { /** * Get the store time of the earliest message in this store. + * * @return timestamp of the earliest message in this store. */ long getEarliestMessageTime(); /** * Get the store time of the message specified. + * * @param topic message topic. * @param queueId queue ID. * @param consumeQueueOffset consume queue offset. @@ -183,6 +201,7 @@ public interface MessageStore { /** * Get the total number of the messages in the specified queue. + * * @param topic Topic * @param queueId Queue ID. * @return total number. @@ -191,6 +210,7 @@ public interface MessageStore { /** * Get the raw commit log data starting from the given offset, which should used for replication purpose. + * * @param offset starting offset. * @return commit log data. */ @@ -198,6 +218,7 @@ public interface MessageStore { /** * Append data to commit log. + * * @param startOffset starting offset. * @param data data to append. * @return true if success; false otherwise. @@ -211,36 +232,40 @@ public interface MessageStore { /** * Query messages by given key. + * * @param topic topic of the message. * @param key message key. * @param maxNum maximum number of the messages possible. * @param begin begin timestamp. * @param end end timestamp. - * @return */ QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end); /** * Update HA master address. + * * @param newAddr new address. */ void updateHaMasterAddress(final String newAddr); /** * Return how much the slave falls behind. + * * @return number of bytes that slave falls behind. */ long slaveFallBehindMuch(); /** * Return the current timestamp of the store. + * * @return current time in milliseconds since 1970-01-01. */ long now(); /** * Clean unused topics. + * * @param topics all valid topics. * @return number of the topics deleted. */ @@ -253,6 +278,7 @@ public interface MessageStore { /** * Check if the given message has been swapped out of the memory. + * * @param topic topic. * @param queueId queue ID. * @param consumeOffset consume queue offset. @@ -262,18 +288,21 @@ public interface MessageStore { /** * Get number of the bytes that have been stored in commit log and not yet dispatched to consume queue. + * * @return number of the bytes to dispatch. */ long dispatchBehindBytes(); /** * Flush the message store to persist all data. + * * @return maximum offset flushed to persistent storage device. */ long flush(); /** * Reset written offset. + * * @param phyOffset new offset. * @return true if success; false otherwise. */ @@ -281,42 +310,49 @@ public interface MessageStore { /** * Get confirm offset. + * * @return confirm offset. */ long getConfirmOffset(); /** * Set confirm offset. + * * @param phyOffset confirm offset to set. */ void setConfirmOffset(long phyOffset); /** * Check if the operation system page cache is busy or not. + * * @return true if the OS page cache is busy; false otherwise. */ boolean isOSPageCacheBusy(); /** * Get lock time in milliseconds of the store by far. + * * @return lock time in milliseconds. */ long lockTimeMills(); /** * Check if the transient store pool is deficient. + * * @return true if the transient store pool is running out; false otherwise. */ boolean isTransientStorePoolDeficient(); /** * Get the dispatcher list. + * * @return list of the dispatcher. */ LinkedList<CommitLogDispatcher> getDispatcherList(); /** * Get consume queue of the topic/queue. + * * @param topic Topic. * @param queueId Queue ID. * @return Consume queue. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java index a03e41a..758f437 100644 --- a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java @@ -21,5 +21,6 @@ package org.apache.rocketmq.store; */ public interface PutMessageLock { void lock(); + void unlock(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java index 9198f1c..9aa80d8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.store; - import java.util.concurrent.locks.ReentrantLock; /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java index baa809d..39a32cc 100644 --- a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java +++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java @@ -16,12 +16,10 @@ */ package org.apache.rocketmq.store; - import java.util.concurrent.atomic.AtomicBoolean; /** * Spin lock Implementation to put message, suggest using this witb low race conditions - * */ public class PutMessageSpinLock implements PutMessageLock { //true: Can lock, false : in lock. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java index 3dcd861..7ff11a2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java +++ b/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java @@ -28,7 +28,6 @@ public class RunningFlags { private static final int DISK_FULL_BIT = 1 << 4; - private volatile int flagBits = 0; public RunningFlags() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 19ed211..02aa84a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -143,7 +143,6 @@ public class MessageStoreConfig { private int transientStorePoolSize = 5; private boolean fastFailIfNoBufferInStorePool = false; - public boolean isDebugLockEnable() { return debugLockEnable; } @@ -605,7 +604,8 @@ public class MessageStoreConfig { } /** - * Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is ASYNC_FLUSH + * Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is + * ASYNC_FLUSH * * @return <tt>true</tt> or <tt>false</tt> */ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java index e0c51a1..8b97504 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java @@ -78,7 +78,6 @@ public class HAConnection { return socketChannel; } - class ReadSocketService extends ServiceThread { private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024; private final Selector selector; @@ -191,7 +190,6 @@ public class HAConnection { } } - class WriteSocketService extends ServiceThread { private final Selector selector; private final SocketChannel socketChannel; @@ -327,7 +325,6 @@ public class HAConnection { HAConnection.log.info(this.getServiceName() + " service end"); } - private boolean transferData() throws Exception { int writeSizeZeroTimes = 0; // Write Header