This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8357cff10 [ISSUE #5002]Fix code style in any net modules. (#4989)
8357cff10 is described below
commit 8357cff10156860ffbedc276d79f90609aa3bb85
Author: CoedPig <[email protected]>
AuthorDate: Mon Sep 5 13:38:26 2022 +0800
[ISSUE #5002]Fix code style in any net modules. (#4989)
* optimize the memory usage in DefaultMappedFile:
use AtomicIntegerFieldUpdater instead of AtomicInteger
* fix code bug:
if the initial cursor of listIterator equals zero,
the previous element will always null.
* modify constant name
* fix excessive nesting
* Update
client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
* update maven-surefire-plugin.version from 2.19.1 to 2.22.1
* Revert "update maven-surefire-plugin.version from 2.19.1 to 2.22.1"
This reverts commit 37fabe6e7805826895e224b0da22b57750c7e643.
* Code Style: fix warn info from the code detector.
* fix checkstyle: not use '*' for import.
* Fix code style in any net modules.
* change method name from topicRouteDataIsChange to topicRouteDataChanged
in TopicRouteData.java
* revert executeInvokeCallback method
* Fix grammar issues
Co-authored-by: shanpengyu <[email protected]>
Co-authored-by: Aaron Ai <[email protected]>
Co-authored-by: Zhanhui Li <[email protected]>
---
.../broker/topic/TopicRouteInfoManager.java | 2 +-
.../client/impl/factory/MQClientInstance.java | 2 +-
.../rocketmq/common/protocol/route/BrokerData.java | 25 +-
.../rocketmq/common/protocol/route/QueueData.java | 8 +-
.../common/protocol/route/TopicRouteData.java | 2 +-
.../apache/rocketmq/common/rpc/ClientMetadata.java | 2 +-
.../apache/rocketmq/namesrv/NamesrvController.java | 169 ++++---
.../apache/rocketmq/namesrv/NamesrvStartup.java | 26 +-
.../namesrv/routeinfo/RouteInfoManager.java | 509 ++++++++++-----------
.../remoting/netty/NettyRemotingAbstract.java | 256 +++++------
.../remoting/netty/NettyRemotingServer.java | 220 ++++-----
11 files changed, 581 insertions(+), 640 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
index 0f891eded..4a51c7dc2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicRouteInfoManager.java
@@ -134,7 +134,7 @@ public class TopicRouteInfoManager {
private boolean updateTopicRouteTable(String topic, TopicRouteData
topicRouteData) {
TopicRouteData old = this.topicRouteTable.get(topic);
- boolean changed = topicRouteData.topicRouteDataIsChange(old);
+ boolean changed = topicRouteData.topicRouteDataChanged(old);
if (!changed) {
if (!this.isNeedUpdateTopicRouteInfo(topic)) {
return false;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index e88b70b74..820faf2f2 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -611,7 +611,7 @@ public class MQClientInstance {
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
- boolean changed =
topicRouteData.topicRouteDataIsChange(old);
+ boolean changed =
topicRouteData.topicRouteDataChanged(old);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
index f13491350..47c53f8c3 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
@@ -20,16 +20,24 @@ package org.apache.rocketmq.common.protocol.route;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
+/**
+ * The class describes that a typical broker cluster's (in replication)
details: the cluster (in sharding) name
+ * that it belongs to, and all the single instance information for this
cluster.
+ */
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
- private HashMap<Long/* brokerId */, String/* broker address */>
brokerAddrs;
+
+ /**
+ * The container that store the all single instances for the current
broker replication cluster.
+ * The key is the brokerId, and the value is the address of the single
broker instance.
+ */
+ private HashMap<Long, String> brokerAddrs;
private String zoneName;
private final Random random = new Random();
@@ -46,10 +54,7 @@ public class BrokerData implements Comparable<BrokerData> {
this.cluster = brokerData.cluster;
this.brokerName = brokerData.brokerName;
if (brokerData.brokerAddrs != null) {
- this.brokerAddrs = new HashMap<Long, String>();
- for (final Map.Entry<Long, String> brokerEntry :
brokerData.brokerAddrs.entrySet()) {
- this.brokerAddrs.put(brokerEntry.getKey(),
brokerEntry.getValue());
- }
+ this.brokerAddrs = new HashMap<>(brokerData.brokerAddrs);
}
this.enableActingMaster = brokerData.enableActingMaster;
}
@@ -82,14 +87,14 @@ public class BrokerData implements Comparable<BrokerData> {
* @return Broker address.
*/
public String selectBrokerAddr() {
- String addr = this.brokerAddrs.get(MixAll.MASTER_ID);
+ String masterAddress = this.brokerAddrs.get(MixAll.MASTER_ID);
- if (addr == null) {
- List<String> addrs = new ArrayList<String>(brokerAddrs.values());
+ if (masterAddress == null) {
+ List<String> addrs = new ArrayList<>(brokerAddrs.values());
return addrs.get(random.nextInt(addrs.size()));
}
- return addr;
+ return masterAddress;
}
public HashMap<Long, String> getBrokerAddrs() {
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java
index 6e9e653f1..fb55e22de 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java
@@ -15,8 +15,8 @@
* limitations under the License.
*/
-/**
- * $Id: QueueData.java 1835 2013-05-16 02:00:50Z [email protected] $
+/*
+ $Id: QueueData.java 1835 2013-05-16 02:00:50Z [email protected] $
*/
package org.apache.rocketmq.common.protocol.route;
@@ -104,9 +104,7 @@ public class QueueData implements Comparable<QueueData> {
return false;
if (writeQueueNums != other.writeQueueNums)
return false;
- if (topicSysFlag != other.topicSysFlag)
- return false;
- return true;
+ return topicSysFlag == other.topicSysFlag;
}
@Override
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
index 6d8e375dd..fa382296b 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
@@ -118,7 +118,7 @@ public class TopicRouteData extends RemotingSerializable {
return topicRouteData;
}
- public boolean topicRouteDataIsChange(TopicRouteData oldData) {
+ public boolean topicRouteDataChanged(TopicRouteData oldData) {
if (oldData == null)
return true;
TopicRouteData old = new TopicRouteData(oldData);
diff --git
a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 9c8bbf7da..67ceed520 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -52,7 +52,7 @@ public class ClientMetadata {
return;
}
TopicRouteData old = this.topicRouteTable.get(topic);
- if (!topicRouteData.topicRouteDataIsChange(old)) {
+ if (!topicRouteData.topicRouteDataChanged(old)) {
return ;
}
{
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 04abae285..cec567a5c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -63,22 +63,18 @@ public class NamesrvController {
private final NettyClientConfig nettyClientConfig;
private final ScheduledExecutorService scheduledExecutorService = new
ScheduledThreadPoolExecutor(1,
- new BasicThreadFactory.Builder()
- .namingPattern("NSScheduledThread")
- .daemon(true)
- .build());
+ new
BasicThreadFactory.Builder().namingPattern("NSScheduledThread").daemon(true).build());
+
private final ScheduledExecutorService scanExecutorService = new
ScheduledThreadPoolExecutor(1,
- new BasicThreadFactory.Builder()
- .namingPattern("NSScanScheduledThread")
- .daemon(true)
- .build());
+ new
BasicThreadFactory.Builder().namingPattern("NSScanScheduledThread").daemon(true).build());
+
private final KVConfigManager kvConfigManager;
private final RouteInfoManager routeInfoManager;
private RemotingClient remotingClient;
private RemotingServer remotingServer;
- private BrokerHousekeepingService brokerHousekeepingService;
+ private final BrokerHousekeepingService brokerHousekeepingService;
private ExecutorService defaultExecutor;
private ExecutorService clientRequestExecutor;
@@ -86,7 +82,7 @@ public class NamesrvController {
private BlockingQueue<Runnable> defaultThreadPoolQueue;
private BlockingQueue<Runnable> clientRequestThreadPoolQueue;
- private Configuration configuration;
+ private final Configuration configuration;
private FileWatchService fileWatchService;
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig
nettyServerConfig) {
@@ -100,113 +96,103 @@ public class NamesrvController {
this.kvConfigManager = new KVConfigManager(this);
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.routeInfoManager = new RouteInfoManager(namesrvConfig, this);
- this.configuration = new Configuration(
- LOGGER,
- this.namesrvConfig, this.nettyServerConfig
- );
+ this.configuration = new Configuration(LOGGER, this.namesrvConfig,
this.nettyServerConfig);
this.configuration.setStorePathFromConfig(this.namesrvConfig,
"configStorePath");
}
public boolean initialize() {
+ loadConfig();
+ initiateNetworkComponents();
+ initiateThreadExecutors();
+ registerProcessor();
+ startScheduleService();
+ initiateSslContext();
+ initiateRpcHooks();
+ return true;
+ }
+ private void loadConfig() {
this.kvConfigManager.load();
+ }
+
+ private void startScheduleService() {
+
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
+ 5, this.namesrvConfig.getScanNotActiveBrokerInterval(),
TimeUnit.MILLISECONDS);
+
+
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
+ 1, 10, TimeUnit.MINUTES);
+ this.scheduledExecutorService.scheduleAtFixedRate(() -> {
+ try {
+ NamesrvController.this.printWaterMark();
+ } catch (Throwable e) {
+ LOGGER.error("printWaterMark error.", e);
+ }
+ }, 10, 1, TimeUnit.SECONDS);
+ }
+
+ private void initiateNetworkComponents() {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
this.brokerHousekeepingService);
+ this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
+ }
+ private void initiateThreadExecutors() {
this.defaultThreadPoolQueue = new
LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());
- this.defaultExecutor = new ThreadPoolExecutor(
- this.namesrvConfig.getDefaultThreadPoolNums(),
- this.namesrvConfig.getDefaultThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.defaultThreadPoolQueue,
- new ThreadFactoryImpl("RemotingExecutorThread_")) {
+ this.defaultExecutor = new
ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(),
this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60,
TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new
ThreadFactoryImpl("RemotingExecutorThread_")) {
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable
runnable, final T value) {
- return new FutureTaskExt<T>(runnable, value);
+ return new FutureTaskExt<>(runnable, value);
}
};
this.clientRequestThreadPoolQueue = new
LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());
- this.clientRequestExecutor = new ThreadPoolExecutor(
- this.namesrvConfig.getClientRequestThreadPoolNums(),
- this.namesrvConfig.getClientRequestThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.clientRequestThreadPoolQueue,
- new ThreadFactoryImpl("ClientRequestExecutorThread_")) {
+ this.clientRequestExecutor = new
ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(),
this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60,
TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new
ThreadFactoryImpl("ClientRequestExecutorThread_")) {
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable
runnable, final T value) {
- return new FutureTaskExt<T>(runnable, value);
+ return new FutureTaskExt<>(runnable, value);
}
};
- this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);
+ }
- this.registerProcessor();
+ private void initiateSslContext() {
+ if (TlsSystemConfig.tlsMode == TlsMode.DISABLED) {
+ return;
+ }
-
this.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,
- 5, this.namesrvConfig.getScanNotActiveBrokerInterval(),
TimeUnit.MILLISECONDS);
+ String[] watchFiles = {TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath};
-
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,
- 1, 10, TimeUnit.MINUTES);
+ FileWatchService.Listener listener = new FileWatchService.Listener() {
+ boolean certChanged, keyChanged = false;
- this.scheduledExecutorService.scheduleAtFixedRate(() -> {
- try {
- NamesrvController.this.printWaterMark();
- } catch (Throwable e) {
- LOGGER.error("printWaterMark error.", e);
+ @Override
+ public void onChanged(String path) {
+ if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+ LOGGER.info("The trust certificate changed, reload the ssl
context");
+ ((NettyRemotingServer) remotingServer).loadSslContext();
+ }
+ if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+ certChanged = true;
+ }
+ if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+ keyChanged = true;
+ }
+ if (certChanged && keyChanged) {
+ LOGGER.info("The certificate and private key changed,
reload the ssl context");
+ certChanged = keyChanged = false;
+ ((NettyRemotingServer) remotingServer).loadSslContext();
+ }
}
- }, 10, 1, TimeUnit.SECONDS);
+ };
- if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
- // Register a listener to reload SslContext
- try {
- fileWatchService = new FileWatchService(
- new String[] {
- TlsSystemConfig.tlsServerCertPath,
- TlsSystemConfig.tlsServerKeyPath,
- TlsSystemConfig.tlsServerTrustCertPath
- },
- new FileWatchService.Listener() {
- boolean certChanged, keyChanged = false;
-
- @Override
- public void onChanged(String path) {
- if
(path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
- LOGGER.info("The trust certificate changed,
reload the ssl context");
- reloadServerSslContext();
- }
- if
(path.equals(TlsSystemConfig.tlsServerCertPath)) {
- certChanged = true;
- }
- if (path.equals(TlsSystemConfig.tlsServerKeyPath))
{
- keyChanged = true;
- }
- if (certChanged && keyChanged) {
- LOGGER.info("The certificate and private key
changed, reload the ssl context");
- certChanged = keyChanged = false;
- reloadServerSslContext();
- }
- }
-
- private void reloadServerSslContext() {
- ((NettyRemotingServer)
remotingServer).loadSslContext();
- }
- });
- } catch (Exception e) {
- LOGGER.warn("FileWatchService created error, can't load the
certificate dynamically");
- }
+ try {
+ fileWatchService = new FileWatchService(watchFiles, listener);
+ } catch (Exception e) {
+ LOGGER.warn("FileWatchService created error, can't load the
certificate dynamically");
}
-
- initialRpcHooks();
- return true;
}
private void printWaterMark() {
- WATER_MARK_LOG.info("[WATERMARK] ClientQueueSize:{}
ClientQueueSlowTime:{} " +
- "DefaultQueueSize:{} DefaultQueueSlowTime:{}",
- this.clientRequestThreadPoolQueue.size(),
headSlowTimeMills(this.clientRequestThreadPoolQueue),
- this.defaultThreadPoolQueue.size(),
headSlowTimeMills(this.defaultThreadPoolQueue));
+ WATER_MARK_LOG.info("[WATERMARK] ClientQueueSize:{}
ClientQueueSlowTime:{} " + "DefaultQueueSize:{} DefaultQueueSlowTime:{}",
this.clientRequestThreadPoolQueue.size(),
headSlowTimeMills(this.clientRequestThreadPoolQueue),
this.defaultThreadPoolQueue.size(),
headSlowTimeMills(this.defaultThreadPoolQueue));
}
private long headSlowTimeMills(BlockingQueue<Runnable> q) {
@@ -214,7 +200,7 @@ public class NamesrvController {
final Runnable firstRunnable = q.peek();
if (firstRunnable instanceof FutureTaskExt) {
- final Runnable inner = ((FutureTaskExt)
firstRunnable).getRunnable();
+ final Runnable inner = ((FutureTaskExt<?>)
firstRunnable).getRunnable();
if (inner instanceof RequestTask) {
slowTimeMills = System.currentTimeMillis() - ((RequestTask)
inner).getCreateTimestamp();
}
@@ -230,8 +216,7 @@ public class NamesrvController {
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
- this.remotingServer.registerDefaultProcessor(new
ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
- this.defaultExecutor);
+ this.remotingServer.registerDefaultProcessor(new
ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.defaultExecutor);
} else {
// Support get route info only temporarily
ClientRequestProcessor clientRequestProcessor = new
ClientRequestProcessor(this);
@@ -241,10 +226,10 @@ public class NamesrvController {
}
}
- private void initialRpcHooks() {
+ private void initiateRpcHooks() {
this.remotingServer.registerRPCHook(new ZoneRouteRPCHook());
}
-
+
public void start() throws Exception {
this.remotingServer.start();
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index 54a6320bf..d5f5d9762 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -19,8 +19,9 @@ package org.apache.rocketmq.namesrv;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import java.io.BufferedInputStream;
-import java.io.FileInputStream;
import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.Callable;
import org.apache.commons.cli.CommandLine;
@@ -46,7 +47,6 @@ public class NamesrvStartup {
private static InternalLogger log;
private static Properties properties = null;
- private static CommandLine commandLine = null;
private static NamesrvConfig namesrvConfig = null;
private static NettyServerConfig nettyServerConfig = null;
private static NettyClientConfig nettyClientConfig = null;
@@ -57,29 +57,26 @@ public class NamesrvStartup {
controllerManagerMain();
}
- public static NamesrvController main0(String[] args) {
+ public static void main0(String[] args) {
try {
parseCommandlineAndConfigFile(args);
- NamesrvController controller = createAndStartNamesrvController();
- return controller;
+ createAndStartNamesrvController();
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
- return null;
}
- public static ControllerManager controllerManagerMain() {
+ public static void controllerManagerMain() {
try {
if (namesrvConfig.isEnableControllerInNamesrv()) {
- return createAndStartControllerManager();
+ createAndStartControllerManager();
}
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
- return null;
}
public static void parseCommandlineAndConfigFile(String[] args) throws
Exception {
@@ -87,7 +84,7 @@ public class NamesrvStartup {
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- commandLine = ServerUtil.parseCmdLine("mqnamesrv", args,
buildCommandlineOptions(options), new PosixParser());
+ CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args,
buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return;
@@ -101,7 +98,7 @@ public class NamesrvStartup {
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
- InputStream in = new BufferedInputStream(new
FileInputStream(file));
+ InputStream in = new
BufferedInputStream(Files.newInputStream(Paths.get(file)));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
@@ -144,14 +141,12 @@ public class NamesrvStartup {
}
- public static NamesrvController createAndStartNamesrvController() throws
Exception {
-
+ public static void createAndStartNamesrvController() throws Exception {
NamesrvController controller = createNamesrvController();
start(controller);
String tip = "The Name Server boot success. serializeType=" +
RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
- return controller;
}
public static NamesrvController createNamesrvController() {
@@ -184,13 +179,12 @@ public class NamesrvStartup {
return controller;
}
- public static ControllerManager createAndStartControllerManager() throws
Exception {
+ public static void createAndStartControllerManager() throws Exception {
ControllerManager controllerManager = createControllerManager();
start(controllerManager);
String tip = "The ControllerManager boot success. serializeType=" +
RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
- return controllerManager;
}
public static ControllerManager createControllerManager() throws Exception
{
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 05cff6595..3facb3cf8 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -122,86 +122,79 @@ public class RouteInfoManager {
return;
}
try {
- try {
- this.lock.writeLock().lockInterruptibly();
- if (this.topicQueueTable.containsKey(topic)) {
- log.info("Topic route already exist.{}, {}", topic,
this.topicQueueTable.get(topic));
- } else {
- // check and construct queue data map
- Map<String, QueueData> queueDataMap = new HashMap<>();
- for (QueueData queueData : queueDatas) {
- if
(!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
- log.warn("Register topic contains illegal broker,
{}, {}", topic, queueData);
- return;
- }
- queueDataMap.put(queueData.getBrokerName(), queueData);
+ this.lock.writeLock().lockInterruptibly();
+ if (this.topicQueueTable.containsKey(topic)) {
+ log.info("Topic route already exist.{}, {}", topic,
this.topicQueueTable.get(topic));
+ } else {
+ // check and construct queue data map
+ Map<String, QueueData> queueDataMap = new HashMap<>();
+ for (QueueData queueData : queueDatas) {
+ if
(!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
+ log.warn("Register topic contains illegal broker, {},
{}", topic, queueData);
+ return;
}
-
- this.topicQueueTable.put(topic, queueDataMap);
- log.info("Register topic route:{}, {}", topic, queueDatas);
+ queueDataMap.put(queueData.getBrokerName(), queueData);
}
- } finally {
- this.lock.writeLock().unlock();
+
+ this.topicQueueTable.put(topic, queueDataMap);
+ log.info("Register topic route:{}, {}", topic, queueDatas);
}
} catch (Exception e) {
log.error("registerTopic Exception", e);
+ } finally {
+ this.lock.writeLock().unlock();
}
}
public void deleteTopic(final String topic) {
try {
- try {
- this.lock.writeLock().lockInterruptibly();
- this.topicQueueTable.remove(topic);
- } finally {
- this.lock.writeLock().unlock();
- }
+ this.lock.writeLock().lockInterruptibly();
+ this.topicQueueTable.remove(topic);
} catch (Exception e) {
log.error("deleteTopic Exception", e);
+ } finally {
+ this.lock.writeLock().unlock();
}
}
public void deleteTopic(final String topic, final String clusterName) {
try {
- try {
- this.lock.writeLock().lockInterruptibly();
- Set<String> brokerNames =
this.clusterAddrTable.get(clusterName);
- if (brokerNames != null
- && !brokerNames.isEmpty()) {
- Map<String, QueueData> queueDataMap =
this.topicQueueTable.get(topic);
- if (queueDataMap != null) {
- for (String brokerName : brokerNames) {
- final QueueData removedQD =
queueDataMap.remove(brokerName);
- if (removedQD != null) {
- log.info("deleteTopic, remove one broker's
topic {} {} {}", brokerName, topic,
- removedQD);
- }
- }
- if (queueDataMap.isEmpty()) {
- log.info("deleteTopic, remove the topic all queue
{} {}", clusterName, topic);
- this.topicQueueTable.remove(topic);
- }
+ this.lock.writeLock().lockInterruptibly();
+ //get all the brokerNames fot the specified cluster
+ Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
+ if (brokerNames == null || brokerNames.isEmpty()) {
+ return;
+ }
+ //get the store information for single topic
+ Map<String, QueueData> queueDataMap =
this.topicQueueTable.get(topic);
+ if (queueDataMap != null) {
+ for (String brokerName : brokerNames) {
+ final QueueData removedQD =
queueDataMap.remove(brokerName);
+ if (removedQD != null) {
+ log.info("deleteTopic, remove one broker's topic {} {}
{}", brokerName, topic, removedQD);
}
}
- } finally {
- this.lock.writeLock().unlock();
+ if (queueDataMap.isEmpty()) {
+ log.info("deleteTopic, remove the topic all queue {} {}",
clusterName, topic);
+ this.topicQueueTable.remove(topic);
+ }
}
} catch (Exception e) {
log.error("deleteTopic Exception", e);
+ } finally {
+ this.lock.writeLock().unlock();
}
}
public TopicList getAllTopicList() {
TopicList topicList = new TopicList();
try {
- try {
- this.lock.readLock().lockInterruptibly();
- topicList.getTopicList().addAll(this.topicQueueTable.keySet());
- } finally {
- this.lock.readLock().unlock();
- }
+ this.lock.readLock().lockInterruptibly();
+ topicList.getTopicList().addAll(this.topicQueueTable.keySet());
} catch (Exception e) {
log.error("getAllTopicList Exception", e);
+ } finally {
+ this.lock.readLock().unlock();
}
return topicList;
@@ -235,148 +228,147 @@ public class RouteInfoManager {
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
- try {
- this.lock.writeLock().lockInterruptibly();
+ this.lock.writeLock().lockInterruptibly();
- Set<String> brokerNames =
this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
- brokerNames.add(brokerName);
+ //init or update the cluster info
+ Set<String> brokerNames =
this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
+ brokerNames.add(brokerName);
- boolean registerFirst = false;
+ boolean registerFirst = false;
- BrokerData brokerData = this.brokerAddrTable.get(brokerName);
- if (null == brokerData) {
- registerFirst = true;
- brokerData = new BrokerData(clusterName, brokerName, new
HashMap<>());
- this.brokerAddrTable.put(brokerName, brokerData);
- }
+ BrokerData brokerData = this.brokerAddrTable.get(brokerName);
+ if (null == brokerData) {
+ registerFirst = true;
+ brokerData = new BrokerData(clusterName, brokerName, new
HashMap<>());
+ this.brokerAddrTable.put(brokerName, brokerData);
+ }
- boolean isOldVersionBroker = enableActingMaster == null;
- brokerData.setEnableActingMaster(isOldVersionBroker ? false :
enableActingMaster);
- brokerData.setZoneName(zoneName);
-
- Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
+ boolean isOldVersionBroker = enableActingMaster == null;
+ brokerData.setEnableActingMaster(!isOldVersionBroker &&
enableActingMaster);
+ brokerData.setZoneName(zoneName);
- boolean isMinBrokerIdChanged = false;
- long prevMinBrokerId = 0;
- if (!brokerAddrsMap.isEmpty()) {
- prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
- }
+ Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
- if (brokerId < prevMinBrokerId) {
- isMinBrokerIdChanged = true;
- }
+ boolean isMinBrokerIdChanged = false;
+ long prevMinBrokerId = 0;
+ if (!brokerAddrsMap.isEmpty()) {
+ prevMinBrokerId = Collections.min(brokerAddrsMap.keySet());
+ }
+
+ if (brokerId < prevMinBrokerId) {
+ isMinBrokerIdChanged = true;
+ }
- //Switch slave to master: first remove <1, IP:PORT> in
namesrv, then add <0, IP:PORT>
- //The same IP:PORT must only have one record in brokerAddrTable
- brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr
&& brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
-
- //If Local brokerId stateVersion bigger than the registering
one,
- String oldBrokerAddr = brokerAddrsMap.get(brokerId);
- if (null != oldBrokerAddr &&
!oldBrokerAddr.equals(brokerAddr)) {
- BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new
BrokerAddrInfo(clusterName, oldBrokerAddr));
-
- if (null != oldBrokerInfo) {
- long oldStateVersion =
oldBrokerInfo.getDataVersion().getStateVersion();
- long newStateVersion =
topicConfigWrapper.getDataVersion().getStateVersion();
- if (oldStateVersion > newStateVersion) {
- log.warn("Registered Broker conflicts with the
existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
- "Old BrokerAddr:{}, Old Version:{}, New
BrokerAddr:{}, New Version:{}.",
+ //Switch slave to master: first remove <1, IP:PORT> in namesrv,
then add <0, IP:PORT>
+ //The same IP:PORT must only have one record in brokerAddrTable
+ brokerAddrsMap.entrySet().removeIf(item -> null != brokerAddr &&
brokerAddr.equals(item.getValue()) && brokerId != item.getKey());
+
+ //If Local brokerId stateVersion bigger than the registering one,
+ String oldBrokerAddr = brokerAddrsMap.get(brokerId);
+ if (null != oldBrokerAddr && !oldBrokerAddr.equals(brokerAddr)) {
+ BrokerLiveInfo oldBrokerInfo = brokerLiveTable.get(new
BrokerAddrInfo(clusterName, oldBrokerAddr));
+
+ if (null != oldBrokerInfo) {
+ long oldStateVersion =
oldBrokerInfo.getDataVersion().getStateVersion();
+ long newStateVersion =
topicConfigWrapper.getDataVersion().getStateVersion();
+ if (oldStateVersion > newStateVersion) {
+ log.warn("Registered Broker conflicts with the existed
one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
+ "Old BrokerAddr:{}, Old Version:{},
New BrokerAddr:{}, New Version:{}.",
clusterName, brokerName, brokerId,
oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
- //Remove the rejected brokerAddr from
brokerLiveTable.
- brokerLiveTable.remove(new
BrokerAddrInfo(clusterName, brokerAddr));
- return result;
- }
+ //Remove the rejected brokerAddr from brokerLiveTable.
+ brokerLiveTable.remove(new BrokerAddrInfo(clusterName,
brokerAddr));
+ return result;
}
}
+ }
- if (!brokerAddrsMap.containsKey(brokerId) &&
topicConfigWrapper.getTopicConfigTable().size() == 1) {
- log.warn("Can't register topicConfigWrapper={} because
broker[{}]={} has not registered.",
- topicConfigWrapper.getTopicConfigTable(),
brokerId, brokerAddr);
- return null;
- }
+ if (!brokerAddrsMap.containsKey(brokerId) &&
topicConfigWrapper.getTopicConfigTable().size() == 1) {
+ log.warn("Can't register topicConfigWrapper={} because
broker[{}]={} has not registered.",
+ topicConfigWrapper.getTopicConfigTable(), brokerId,
brokerAddr);
+ return null;
+ }
- String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
- registerFirst = registerFirst ||
(StringUtils.isEmpty(oldAddr));
+ String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr);
+ registerFirst = registerFirst || (StringUtils.isEmpty(oldAddr));
- boolean isMaster = MixAll.MASTER_ID == brokerId;
- boolean isPrimeSlave = !isOldVersionBroker && !isMaster
+ boolean isMaster = MixAll.MASTER_ID == brokerId;
+ boolean isPrimeSlave = !isOldVersionBroker && !isMaster
&& brokerId == Collections.min(brokerAddrsMap.keySet());
- if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
+ if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
- ConcurrentMap<String, TopicConfig> tcTable =
+ ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
- if (tcTable != null) {
- for (Map.Entry<String, TopicConfig> entry :
tcTable.entrySet()) {
- if (registerFirst ||
this.isTopicConfigChanged(clusterName, brokerAddr,
+ if (tcTable != null) {
+ for (Map.Entry<String, TopicConfig> entry :
tcTable.entrySet()) {
+ if (registerFirst ||
this.isTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion(),
brokerName,
entry.getValue().getTopicName())) {
- final TopicConfig topicConfig =
entry.getValue();
- if (isPrimeSlave) {
- // Wipe write perm for prime slave
- topicConfig.setPerm(topicConfig.getPerm()
& (~PermName.PERM_WRITE));
- }
- this.createAndUpdateQueueData(brokerName,
topicConfig);
+ final TopicConfig topicConfig = entry.getValue();
+ if (isPrimeSlave) {
+ // Wipe write perm for prime slave
+ topicConfig.setPerm(topicConfig.getPerm() &
(~PermName.PERM_WRITE));
}
+ this.createAndUpdateQueueData(brokerName,
topicConfig);
}
}
+ }
- if (this.isBrokerTopicConfigChanged(clusterName,
brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
- TopicConfigAndMappingSerializeWrapper
mappingSerializeWrapper =
TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
- Map<String, TopicQueueMappingInfo>
topicQueueMappingInfoMap =
mappingSerializeWrapper.getTopicQueueMappingInfoMap();
- //the topicQueueMappingInfoMap should never be null,
but can be empty
- for (Map.Entry<String, TopicQueueMappingInfo> entry :
topicQueueMappingInfoMap.entrySet()) {
- if
(!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
- topicQueueMappingInfoTable.put(entry.getKey(),
new HashMap<String, TopicQueueMappingInfo>());
- }
- //Note asset brokerName equal
entry.getValue().getBname()
- //here use the mappingDetail.bname
-
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(),
entry.getValue());
+ if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion()) || registerFirst) {
+ TopicConfigAndMappingSerializeWrapper
mappingSerializeWrapper =
TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
+ Map<String, TopicQueueMappingInfo>
topicQueueMappingInfoMap =
mappingSerializeWrapper.getTopicQueueMappingInfoMap();
+ //the topicQueueMappingInfoMap should never be null, but
can be empty
+ for (Map.Entry<String, TopicQueueMappingInfo> entry :
topicQueueMappingInfoMap.entrySet()) {
+ if
(!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
+ topicQueueMappingInfoTable.put(entry.getKey(), new
HashMap<>());
}
+ //Note asset brokerName equal
entry.getValue().getBname()
+ //here use the mappingDetail.bname
+
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(),
entry.getValue());
}
}
+ }
- BrokerAddrInfo brokerAddrInfo = new
BrokerAddrInfo(clusterName, brokerAddr);
- BrokerLiveInfo prevBrokerLiveInfo =
this.brokerLiveTable.put(brokerAddrInfo,
+ BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName,
brokerAddr);
+ BrokerLiveInfo prevBrokerLiveInfo =
this.brokerLiveTable.put(brokerAddrInfo,
new BrokerLiveInfo(
- System.currentTimeMillis(),
- timeoutMillis == null ?
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
- topicConfigWrapper == null ? new DataVersion() :
topicConfigWrapper.getDataVersion(),
- channel,
- haServerAddr));
- if (null == prevBrokerLiveInfo) {
- log.info("new broker registered, {} HAService: {}",
brokerAddrInfo, haServerAddr);
- }
+ System.currentTimeMillis(),
+ timeoutMillis == null ?
DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
+ topicConfigWrapper == null ? new DataVersion() :
topicConfigWrapper.getDataVersion(),
+ channel,
+ haServerAddr));
+ if (null == prevBrokerLiveInfo) {
+ log.info("new broker registered, {} HAService: {}",
brokerAddrInfo, haServerAddr);
+ }
- if (filterServerList != null) {
- if (filterServerList.isEmpty()) {
- this.filterServerTable.remove(brokerAddrInfo);
- } else {
- this.filterServerTable.put(brokerAddrInfo,
filterServerList);
- }
+ if (filterServerList != null) {
+ if (filterServerList.isEmpty()) {
+ this.filterServerTable.remove(brokerAddrInfo);
+ } else {
+ this.filterServerTable.put(brokerAddrInfo,
filterServerList);
}
+ }
- if (MixAll.MASTER_ID != brokerId) {
- String masterAddr =
brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
- if (masterAddr != null) {
- BrokerAddrInfo masterAddrInfo = new
BrokerAddrInfo(clusterName, masterAddr);
- BrokerLiveInfo masterLiveInfo =
this.brokerLiveTable.get(masterAddrInfo);
- if (masterLiveInfo != null) {
-
result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
- result.setMasterAddr(masterAddr);
- }
+ if (MixAll.MASTER_ID != brokerId) {
+ String masterAddr =
brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+ if (masterAddr != null) {
+ BrokerAddrInfo masterAddrInfo = new
BrokerAddrInfo(clusterName, masterAddr);
+ BrokerLiveInfo masterLiveInfo =
this.brokerLiveTable.get(masterAddrInfo);
+ if (masterLiveInfo != null) {
+
result.setHaServerAddr(masterLiveInfo.getHaServerAddr());
+ result.setMasterAddr(masterAddr);
}
}
+ }
- if (isMinBrokerIdChanged &&
namesrvConfig.isNotifyMinBrokerIdChanged()) {
- notifyMinBrokerIdChanged(brokerAddrsMap, null,
+ if (isMinBrokerIdChanged &&
namesrvConfig.isNotifyMinBrokerIdChanged()) {
+ notifyMinBrokerIdChanged(brokerAddrsMap, null,
this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
- }
- } finally {
- this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
+ } finally {
+ this.lock.writeLock().unlock();
}
return result;
@@ -417,12 +409,8 @@ public class RouteInfoManager {
return true;
}
- if (queueDataMap.containsKey(brokerName)) {
- // The topicQueueTable already contains the broker
- return false;
- }
-
- return true;
+ // The topicQueueTable already contains the broker
+ return !queueDataMap.containsKey(brokerName);
}
public DataVersion queryBrokerTopicConfig(final String clusterName, final
String brokerAddr) {
@@ -500,25 +488,24 @@ public class RouteInfoManager {
private int operateWritePermOfBroker(final String brokerName, final int
requestCode) {
int topicCnt = 0;
- Iterator<Entry<String, Map<String, QueueData>>> itTopic =
this.topicQueueTable.entrySet().iterator();
- while (itTopic.hasNext()) {
- Entry<String, Map<String, QueueData>> entry = itTopic.next();
+ for (Entry<String, Map<String, QueueData>> entry :
this.topicQueueTable.entrySet()) {
Map<String, QueueData> qdMap = entry.getValue();
final QueueData qd = qdMap.get(brokerName);
- if (qd != null) {
- int perm = qd.getPerm();
- switch (requestCode) {
- case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
- perm &= ~PermName.PERM_WRITE;
- break;
- case RequestCode.ADD_WRITE_PERM_OF_BROKER:
- perm = PermName.PERM_READ | PermName.PERM_WRITE;
- break;
- }
- qd.setPerm(perm);
- topicCnt++;
+ if (qd == null) {
+ continue;
+ }
+ int perm = qd.getPerm();
+ switch (requestCode) {
+ case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
+ perm &= ~PermName.PERM_WRITE;
+ break;
+ case RequestCode.ADD_WRITE_PERM_OF_BROKER:
+ perm = PermName.PERM_READ | PermName.PERM_WRITE;
+ break;
}
+ qd.setPerm(perm);
+ topicCnt++;
}
return topicCnt;
}
@@ -671,7 +658,6 @@ public class RouteInfoManager {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
- Set<String> brokerNameSet = new HashSet<>();
List<BrokerData> brokerDataList = new LinkedList<>();
topicRouteData.setBrokerDatas(brokerDataList);
@@ -679,40 +665,41 @@ public class RouteInfoManager {
topicRouteData.setFilterServerTable(filterServerMap);
try {
- try {
- this.lock.readLock().lockInterruptibly();
- Map<String, QueueData> queueDataMap =
this.topicQueueTable.get(topic);
- if (queueDataMap != null) {
- topicRouteData.setQueueDatas(new
ArrayList<>(queueDataMap.values()));
- foundQueueData = true;
-
- brokerNameSet.addAll(queueDataMap.keySet());
-
- for (String brokerName : brokerNameSet) {
- BrokerData brokerData =
this.brokerAddrTable.get(brokerName);
- if (null != brokerData) {
- BrokerData brokerDataClone = new
BrokerData(brokerData.getCluster(),
- brokerData.getBrokerName(),
- (HashMap<Long, String>)
brokerData.getBrokerAddrs().clone(),
- brokerData.isEnableActingMaster(),
brokerData.getZoneName());
-
- brokerDataList.add(brokerDataClone);
- foundBrokerData = true;
- if (!filterServerTable.isEmpty()) {
- for (final String brokerAddr :
brokerDataClone.getBrokerAddrs().values()) {
- BrokerAddrInfo brokerAddrInfo = new
BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
- List<String> filterServerList =
this.filterServerTable.get(brokerAddrInfo);
- filterServerMap.put(brokerAddr,
filterServerList);
- }
- }
- }
+ this.lock.readLock().lockInterruptibly();
+ Map<String, QueueData> queueDataMap =
this.topicQueueTable.get(topic);
+ if (queueDataMap != null) {
+ topicRouteData.setQueueDatas(new
ArrayList<>(queueDataMap.values()));
+ foundQueueData = true;
+
+ Set<String> brokerNameSet = new
HashSet<>(queueDataMap.keySet());
+
+ for (String brokerName : brokerNameSet) {
+ BrokerData brokerData =
this.brokerAddrTable.get(brokerName);
+ if (null == brokerData) {
+ continue;
+ }
+ BrokerData brokerDataClone = new
BrokerData(brokerData.getCluster(),
+ brokerData.getBrokerName(),
+ (HashMap<Long, String>)
brokerData.getBrokerAddrs().clone(),
+ brokerData.isEnableActingMaster(),
brokerData.getZoneName());
+
+ brokerDataList.add(brokerDataClone);
+ foundBrokerData = true;
+ if (filterServerTable.isEmpty()) {
+ continue;
}
+ for (final String brokerAddr :
brokerDataClone.getBrokerAddrs().values()) {
+ BrokerAddrInfo brokerAddrInfo = new
BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
+ List<String> filterServerList =
this.filterServerTable.get(brokerAddrInfo);
+ filterServerMap.put(brokerAddr, filterServerList);
+ }
+
}
- } finally {
- this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
+ } finally {
+ this.lock.readLock().unlock();
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
@@ -937,36 +924,28 @@ public class RouteInfoManager {
log.info("--------------------------------------------------------");
{
log.info("topicQueueTable SIZE: {}",
this.topicQueueTable.size());
- Iterator<Entry<String, Map<String, QueueData>>> it =
this.topicQueueTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, Map<String, QueueData>> next = it.next();
+ for (Entry<String, Map<String, QueueData>> next :
this.topicQueueTable.entrySet()) {
log.info("topicQueueTable Topic: {} {}",
next.getKey(), next.getValue());
}
}
{
log.info("brokerAddrTable SIZE: {}",
this.brokerAddrTable.size());
- Iterator<Entry<String, BrokerData>> it =
this.brokerAddrTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, BrokerData> next = it.next();
+ for (Entry<String, BrokerData> next :
this.brokerAddrTable.entrySet()) {
log.info("brokerAddrTable brokerName: {} {}",
next.getKey(), next.getValue());
}
}
{
log.info("brokerLiveTable SIZE: {}",
this.brokerLiveTable.size());
- Iterator<Entry<BrokerAddrInfo, BrokerLiveInfo>> it =
this.brokerLiveTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<BrokerAddrInfo, BrokerLiveInfo> next = it.next();
+ for (Entry<BrokerAddrInfo, BrokerLiveInfo> next :
this.brokerLiveTable.entrySet()) {
log.info("brokerLiveTable brokerAddr: {} {}",
next.getKey(), next.getValue());
}
}
{
log.info("clusterAddrTable SIZE: {}",
this.clusterAddrTable.size());
- Iterator<Entry<String, Set<String>>> it =
this.clusterAddrTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, Set<String>> next = it.next();
+ for (Entry<String, Set<String>> next :
this.clusterAddrTable.entrySet()) {
log.info("clusterAddrTable clusterName: {} {}",
next.getKey(), next.getValue());
}
}
@@ -981,30 +960,27 @@ public class RouteInfoManager {
public TopicList getSystemTopicList() {
TopicList topicList = new TopicList();
try {
- try {
- this.lock.readLock().lockInterruptibly();
- for (Map.Entry<String, Set<String>> entry :
clusterAddrTable.entrySet()) {
- topicList.getTopicList().add(entry.getKey());
- topicList.getTopicList().addAll(entry.getValue());
- }
+ this.lock.readLock().lockInterruptibly();
+ for (Map.Entry<String, Set<String>> entry :
clusterAddrTable.entrySet()) {
+ topicList.getTopicList().add(entry.getKey());
+ topicList.getTopicList().addAll(entry.getValue());
+ }
- if (brokerAddrTable != null && !brokerAddrTable.isEmpty()) {
- Iterator<String> it = brokerAddrTable.keySet().iterator();
- while (it.hasNext()) {
- BrokerData bd = brokerAddrTable.get(it.next());
- HashMap<Long, String> brokerAddrs =
bd.getBrokerAddrs();
- if (brokerAddrs != null && !brokerAddrs.isEmpty()) {
- Iterator<Long> it2 =
brokerAddrs.keySet().iterator();
-
topicList.setBrokerAddr(brokerAddrs.get(it2.next()));
- break;
- }
+ if (!brokerAddrTable.isEmpty()) {
+ for (String s : brokerAddrTable.keySet()) {
+ BrokerData bd = brokerAddrTable.get(s);
+ HashMap<Long, String> brokerAddrs = bd.getBrokerAddrs();
+ if (brokerAddrs != null && !brokerAddrs.isEmpty()) {
+ Iterator<Long> it2 = brokerAddrs.keySet().iterator();
+ topicList.setBrokerAddr(brokerAddrs.get(it2.next()));
+ break;
}
}
- } finally {
- this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("getSystemTopicList Exception", e);
+ } finally {
+ this.lock.readLock().unlock();
}
return topicList;
@@ -1042,24 +1018,19 @@ public class RouteInfoManager {
public TopicList getUnitTopics() {
TopicList topicList = new TopicList();
try {
- try {
- this.lock.readLock().lockInterruptibly();
- Iterator<Entry<String, Map<String, QueueData>>> topicTableIt =
- this.topicQueueTable.entrySet().iterator();
- while (topicTableIt.hasNext()) {
- Entry<String, Map<String, QueueData>> topicEntry =
topicTableIt.next();
- String topic = topicEntry.getKey();
- Map<String, QueueData> queueDatas = topicEntry.getValue();
- if (queueDatas != null && queueDatas.size() > 0
+ this.lock.readLock().lockInterruptibly();
+ for (Entry<String, Map<String, QueueData>> topicEntry :
this.topicQueueTable.entrySet()) {
+ String topic = topicEntry.getKey();
+ Map<String, QueueData> queueDatas = topicEntry.getValue();
+ if (queueDatas != null && queueDatas.size() > 0
&&
TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
{
- topicList.getTopicList().add(topic);
- }
+ topicList.getTopicList().add(topic);
}
- } finally {
- this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("getUnitTopics Exception", e);
+ } finally {
+ this.lock.readLock().unlock();
}
return topicList;
@@ -1068,24 +1039,19 @@ public class RouteInfoManager {
public TopicList getHasUnitSubTopicList() {
TopicList topicList = new TopicList();
try {
- try {
- this.lock.readLock().lockInterruptibly();
- Iterator<Entry<String, Map<String, QueueData>>> topicTableIt =
- this.topicQueueTable.entrySet().iterator();
- while (topicTableIt.hasNext()) {
- Entry<String, Map<String, QueueData>> topicEntry =
topicTableIt.next();
- String topic = topicEntry.getKey();
- Map<String, QueueData> queueDatas = topicEntry.getValue();
- if (queueDatas != null && queueDatas.size() > 0
+ this.lock.readLock().lockInterruptibly();
+ for (Entry<String, Map<String, QueueData>> topicEntry :
this.topicQueueTable.entrySet()) {
+ String topic = topicEntry.getKey();
+ Map<String, QueueData> queueDatas = topicEntry.getValue();
+ if (queueDatas != null && queueDatas.size() > 0
&&
TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
{
- topicList.getTopicList().add(topic);
- }
+ topicList.getTopicList().add(topic);
}
- } finally {
- this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("getHasUnitSubTopicList Exception", e);
+ } finally {
+ this.lock.readLock().unlock();
}
return topicList;
@@ -1094,25 +1060,20 @@ public class RouteInfoManager {
public TopicList getHasUnitSubUnUnitTopicList() {
TopicList topicList = new TopicList();
try {
- try {
- this.lock.readLock().lockInterruptibly();
- Iterator<Entry<String, Map<String, QueueData>>> topicTableIt =
- this.topicQueueTable.entrySet().iterator();
- while (topicTableIt.hasNext()) {
- Entry<String, Map<String, QueueData>> topicEntry =
topicTableIt.next();
- String topic = topicEntry.getKey();
- Map<String, QueueData> queueDatas = topicEntry.getValue();
- if (queueDatas != null && queueDatas.size() > 0
+ this.lock.readLock().lockInterruptibly();
+ for (Entry<String, Map<String, QueueData>> topicEntry :
this.topicQueueTable.entrySet()) {
+ String topic = topicEntry.getKey();
+ Map<String, QueueData> queueDatas = topicEntry.getValue();
+ if (queueDatas != null && queueDatas.size() > 0
&&
!TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())
&&
TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag()))
{
- topicList.getTopicList().add(topic);
- }
+ topicList.getTopicList().add(topic);
}
- } finally {
- this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("getHasUnitSubUnUnitTopicList Exception", e);
+ } finally {
+ this.lock.readLock().unlock();
}
return topicList;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 7a164ab08..b5569bb97 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ssl.SslContext;
@@ -73,14 +72,14 @@ public abstract class NettyRemotingAbstract {
* This map caches all on-going requests.
*/
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture>
responseTable =
- new ConcurrentHashMap<Integer, ResponseFuture>(256);
+ new ConcurrentHashMap<>(256);
/**
* This container holds all processors per request code, aka, for each
incoming request, we may look up the
* responding processor in this map to handle the request.
*/
protected final HashMap<Integer/* request code */,
Pair<NettyRequestProcessor, ExecutorService>> processorTable =
- new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
+ new HashMap<>(64);
/**
* Executor to feed netty events to user defined {@link
ChannelEventListener}.
@@ -90,7 +89,7 @@ public abstract class NettyRemotingAbstract {
/**
* The default request processor to use in case there is no exact match in
{@link #processorTable} per request code.
*/
- protected Pair<NettyRequestProcessor, ExecutorService>
defaultRequestProcessor;
+ protected Pair<NettyRequestProcessor, ExecutorService>
defaultRequestProcessorPair;
/**
* SSL context via which to create {@link SslHandler}.
@@ -100,7 +99,7 @@ public abstract class NettyRemotingAbstract {
/**
* custom rpc hooks
*/
- protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
+ protected List<RPCHook> rpcHooks = new ArrayList<>();
static {
NettyLogger.initNettyLogger();
@@ -147,17 +146,15 @@ public abstract class NettyRemotingAbstract {
*
* @param ctx Channel handler context.
* @param msg incoming remoting command.
- * @throws Exception if there were any error while processing the incoming
command.
*/
- public void processMessageReceived(ChannelHandlerContext ctx,
RemotingCommand msg) throws Exception {
- final RemotingCommand cmd = msg;
- if (cmd != null) {
- switch (cmd.getType()) {
+ public void processMessageReceived(ChannelHandlerContext ctx,
RemotingCommand msg) {
+ if (msg != null) {
+ switch (msg.getType()) {
case REQUEST_COMMAND:
- processRequestCommand(ctx, cmd);
+ processRequestCommand(ctx, msg);
break;
case RESPONSE_COMMAND:
- processResponseCommand(ctx, cmd);
+ processResponseCommand(ctx, msg);
break;
default:
break;
@@ -189,103 +186,104 @@ public abstract class NettyRemotingAbstract {
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final
RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched =
this.processorTable.get(cmd.getCode());
- final Pair<NettyRequestProcessor, ExecutorService> pair = null ==
matched ? this.defaultRequestProcessor : matched;
+ final Pair<NettyRequestProcessor, ExecutorService> pair = null ==
matched ? this.defaultRequestProcessorPair : matched;
final int opaque = cmd.getOpaque();
- if (pair != null) {
- Runnable run = new Runnable() {
- @Override
- public void run() {
- Exception exception = null;
- RemotingCommand response;
-
- try {
- String remoteAddr =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- try {
- doBeforeRpcHooks(remoteAddr, cmd);
- } catch (Exception e) {
- exception = e;
- }
-
- if (exception == null) {
- response = pair.getObject1().processRequest(ctx,
cmd);
- } else {
- response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
null);
- }
-
- try {
- doAfterRpcHooks(remoteAddr, cmd, response);
- } catch (Exception e) {
- exception = e;
- }
+ if (pair == null) {
+ String error = " request type " + cmd.getCode() + " not supported";
+ final RemotingCommand response =
+
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED,
error);
+ response.setOpaque(opaque);
+ ctx.writeAndFlush(response);
+ log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) +
error);
+ return;
+ }
- if (exception != null) {
- throw exception;
- }
+ Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque);
- if (!cmd.isOnewayRPC()) {
- if (response != null) {
- response.setOpaque(opaque);
- response.markResponseType();
- try {
- ctx.writeAndFlush(response);
- } catch (Throwable e) {
- log.error("process request over, but
response failed", e);
- log.error(cmd.toString());
- log.error(response.toString());
- }
- } else {
-
- }
- }
- } catch (Throwable e) {
- log.error("process request exception", e);
- log.error(cmd.toString());
+ if (pair.getObject1().rejectRequest()) {
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
+ "[REJECTREQUEST]system busy, start flow control for a
while");
+ response.setOpaque(opaque);
+ ctx.writeAndFlush(response);
+ return;
+ }
- if (!cmd.isOnewayRPC()) {
- response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
- RemotingHelper.exceptionSimpleDesc(e));
- response.setOpaque(opaque);
- ctx.writeAndFlush(response);
- }
- }
- }
- };
+ try {
+ final RequestTask requestTask = new RequestTask(run,
ctx.channel(), cmd);
+ //async execute task, current thread return directly
+ pair.getObject2().submit(requestTask);
+ } catch (RejectedExecutionException e) {
+ if ((System.currentTimeMillis() % 10000) == 0) {
+ log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ + ", too many requests and system thread pool busy,
RejectedExecutionException "
+ + pair.getObject2().toString()
+ + " request code: " + cmd.getCode());
+ }
- if (pair.getObject1().rejectRequest()) {
+ if (!cmd.isOnewayRPC()) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
- "[REJECTREQUEST]system busy, start flow control for a
while");
+ "[OVERLOAD]system busy, start flow control for a
while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
- return;
}
+ }
+ }
+
+ private Runnable buildProcessRequestHandler(ChannelHandlerContext ctx,
RemotingCommand cmd, Pair<NettyRequestProcessor, ExecutorService> pair, int
opaque) {
+ return () -> {
+ Exception exception = null;
+ RemotingCommand response;
try {
- final RequestTask requestTask = new RequestTask(run,
ctx.channel(), cmd);
- pair.getObject2().submit(requestTask);
- } catch (RejectedExecutionException e) {
- if ((System.currentTimeMillis() % 10000) == 0) {
-
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
- + ", too many requests and system thread pool busy,
RejectedExecutionException "
- + pair.getObject2().toString()
- + " request code: " + cmd.getCode());
+ String remoteAddr =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ try {
+ doBeforeRpcHooks(remoteAddr, cmd);
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception == null) {
+ response = pair.getObject1().processRequest(ctx, cmd);
+ } else {
+ response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
null);
+ }
+
+ try {
+ doAfterRpcHooks(remoteAddr, cmd, response);
+ } catch (Exception e) {
+ exception = e;
+ }
+
+ if (exception != null) {
+ throw exception;
}
if (!cmd.isOnewayRPC()) {
- final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
- "[OVERLOAD]system busy, start flow control for a
while");
+ if (response != null) {
+ response.setOpaque(opaque);
+ response.markResponseType();
+ try {
+ ctx.writeAndFlush(response);
+ } catch (Throwable e) {
+ log.error("process request over, but response
failed", e);
+ log.error(cmd.toString());
+ log.error(response.toString());
+ }
+ }
+ }
+ } catch (Throwable e) {
+ log.error("process request exception", e);
+ log.error(cmd.toString());
+
+ if (!cmd.isOnewayRPC()) {
+ response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
+ RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
- } else {
- String error = " request type " + cmd.getCode() + " not supported";
- final RemotingCommand response =
-
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED,
error);
- response.setOpaque(opaque);
- ctx.writeAndFlush(response);
- log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) +
error);
- }
+ };
}
/**
@@ -322,16 +320,13 @@ public abstract class NettyRemotingAbstract {
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- responseFuture.executeInvokeCallback();
- } catch (Throwable e) {
- log.warn("execute callback in executor exception,
and callback throw", e);
- } finally {
- responseFuture.release();
- }
+ executor.submit(() -> {
+ try {
+ responseFuture.executeInvokeCallback();
+ } catch (Throwable e) {
+ log.warn("execute callback in executor exception, and
callback throw", e);
+ } finally {
+ responseFuture.release();
}
});
} catch (Exception e) {
@@ -386,7 +381,7 @@ public abstract class NettyRemotingAbstract {
* </p>
*/
public void scanResponseTable() {
- final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
+ final List<ResponseFuture> rfList = new LinkedList<>();
Iterator<Entry<Integer, ResponseFuture>> it =
this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
@@ -412,27 +407,24 @@ public abstract class NettyRemotingAbstract {
public RemotingCommand invokeSyncImpl(final Channel channel, final
RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException {
+ //get the request id
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel,
opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
- channel.writeAndFlush(request).addListener(new
ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws
Exception {
- if (f.isSuccess()) {
- responseFuture.setSendRequestOK(true);
- return;
- } else {
- responseFuture.setSendRequestOK(false);
- }
-
- responseTable.remove(opaque);
- responseFuture.setCause(f.cause());
- responseFuture.putResponse(null);
- log.warn("Failed to write a request command to {}, caused
by underlying I/O operation failure", addr);
+ channel.writeAndFlush(request).addListener((ChannelFutureListener)
f -> {
+ if (f.isSuccess()) {
+ responseFuture.setSendRequestOK(true);
+ return;
}
+
+ responseFuture.setSendRequestOK(false);
+ responseTable.remove(opaque);
+ responseFuture.setCause(f.cause());
+ responseFuture.putResponse(null);
+ log.warn("Failed to write a request command to {}, caused by
underlying I/O operation failure", addr);
});
RemotingCommand responseCommand =
responseFuture.waitResponse(timeoutMillis);
@@ -468,16 +460,13 @@ public abstract class NettyRemotingAbstract {
final ResponseFuture responseFuture = new ResponseFuture(channel,
opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
- channel.writeAndFlush(request).addListener(new
ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws
Exception {
- if (f.isSuccess()) {
- responseFuture.setSendRequestOK(true);
- return;
- }
- requestFail(opaque);
- log.warn("send a request command to channel <{}>
failed.", RemotingHelper.parseChannelRemoteAddr(channel));
+
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
+ if (f.isSuccess()) {
+ responseFuture.setSendRequestOK(true);
+ return;
}
+ requestFail(opaque);
+ log.warn("send a request command to channel <{}> failed.",
RemotingHelper.parseChannelRemoteAddr(channel));
});
} catch (Exception e) {
responseFuture.release();
@@ -521,9 +510,7 @@ public abstract class NettyRemotingAbstract {
* @param channel the channel which is close already
*/
protected void failFast(final Channel channel) {
- Iterator<Entry<Integer, ResponseFuture>> it =
responseTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Integer, ResponseFuture> entry = it.next();
+ for (Entry<Integer, ResponseFuture> entry : responseTable.entrySet()) {
if (entry.getValue().getChannel() == channel) {
Integer opaque = entry.getKey();
if (opaque != null) {
@@ -540,13 +527,10 @@ public abstract class NettyRemotingAbstract {
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new
SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
- channel.writeAndFlush(request).addListener(new
ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws
Exception {
- once.release();
- if (!f.isSuccess()) {
- log.warn("send a request command to channel <" +
channel.remoteAddress() + "> failed.");
- }
+
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
+ once.release();
+ if (!f.isSuccess()) {
+ log.warn("send a request command to channel <" +
channel.remoteAddress() + "> failed.");
}
});
} catch (Exception e) {
@@ -571,11 +555,11 @@ public abstract class NettyRemotingAbstract {
}
class NettyEventExecutor extends ServiceThread {
- private final LinkedBlockingQueue<NettyEvent> eventQueue = new
LinkedBlockingQueue<NettyEvent>();
- private final int maxSize = 10000;
+ private final LinkedBlockingQueue<NettyEvent> eventQueue = new
LinkedBlockingQueue<>();
public void putNettyEvent(final NettyEvent event) {
int currentSize = this.eventQueue.size();
+ int maxSize = 10000;
if (currentSize <= maxSize) {
this.eventQueue.add(event);
} else {
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index c3deb8a48..06bbae120 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -39,6 +39,7 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
@@ -52,6 +53,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -66,6 +68,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+@SuppressWarnings("NullableProblems")
public class NettyRemotingServer extends NettyRemotingAbstract implements
RemotingServer {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private final ServerBootstrap serverBootstrap;
@@ -83,7 +86,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
* NettyRemotingServer may hold multiple SubRemotingServer, each server
will be stored in this container with a
* ListenPort key.
*/
- private ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract>
remotingServerTable = new ConcurrentHashMap<Integer, NettyRemotingAbstract>();
+ private final ConcurrentMap<Integer/*Port*/, NettyRemotingAbstract>
remotingServerTable = new ConcurrentHashMap<>();
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler";
@@ -99,68 +102,81 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
this(nettyServerConfig, null);
}
- public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
- final ChannelEventListener channelEventListener) {
+ public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(),
nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
- int publicThreadNums =
nettyServerConfig.getServerCallbackExecutorThreads();
- if (publicThreadNums <= 0) {
- publicThreadNums = 4;
- }
+ this.publicExecutor = buildPublicExecutor(nettyServerConfig);
- this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums,
new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
+ this.eventLoopGroupBoss = buildBossEventLoopGroup();
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyServerPublicExecutor_" +
this.threadIndex.incrementAndGet());
- }
- });
+ this.eventLoopGroupSelector = buildEventLoopGroupSelector();
+ loadSslContext();
+ }
+
+ private EventLoopGroup buildEventLoopGroupSelector() {
if (useEpoll()) {
- this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new
ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
+ return new
EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new
ThreadFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+ private final int threadTotal =
nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyEPOLLBoss_%d",
this.threadIndex.incrementAndGet()));
+ return new Thread(r,
String.format("NettyServerEPOLLSelector_%d_%d", threadTotal,
this.threadIndex.incrementAndGet()));
}
});
-
- this.eventLoopGroupSelector = new
EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new
ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
- private int threadTotal =
nettyServerConfig.getServerSelectorThreads();
+ } else {
+ return new
NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new
ThreadFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+ private final int threadTotal =
nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
- return new Thread(r,
String.format("NettyServerEPOLLSelector_%d_%d", threadTotal,
this.threadIndex.incrementAndGet()));
+ return new Thread(r,
String.format("NettyServerNIOSelector_%d_%d", threadTotal,
this.threadIndex.incrementAndGet()));
}
});
- } else {
- this.eventLoopGroupBoss = new NioEventLoopGroup(1, new
ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
+ }
+ }
+
+ private EventLoopGroup buildBossEventLoopGroup() {
+ if (useEpoll()) {
+ return new EpollEventLoopGroup(1, new ThreadFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyNIOBoss_%d",
this.threadIndex.incrementAndGet()));
+ return new Thread(r, String.format("NettyEPOLLBoss_%d",
this.threadIndex.incrementAndGet()));
}
});
-
- this.eventLoopGroupSelector = new
NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new
ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
- private int threadTotal =
nettyServerConfig.getServerSelectorThreads();
+ } else {
+ return new NioEventLoopGroup(1, new ThreadFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
- return new Thread(r,
String.format("NettyServerNIOSelector_%d_%d", threadTotal,
this.threadIndex.incrementAndGet()));
+ return new Thread(r, String.format("NettyNIOBoss_%d",
this.threadIndex.incrementAndGet()));
}
});
}
+ }
- loadSslContext();
+ private ExecutorService buildPublicExecutor(NettyServerConfig
nettyServerConfig) {
+ int publicThreadNums =
nettyServerConfig.getServerCallbackExecutorThreads();
+ if (publicThreadNums <= 0) {
+ publicThreadNums = 4;
+ }
+
+ return Executors.newFixedThreadPool(publicThreadNums, new
ThreadFactory() {
+ private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyServerPublicExecutor_" +
this.threadIndex.incrementAndGet());
+ }
+ });
}
public void loadSslContext() {
@@ -171,9 +187,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
try {
sslContext = TlsHelper.buildSslContext(false);
log.info("SSLContext created for server");
- } catch (CertificateException e) {
- log.error("Failed to create SSLContext for server", e);
- } catch (IOException e) {
+ } catch (CertificateException | IOException e) {
log.error("Failed to create SSLContext for server", e);
}
}
@@ -181,28 +195,27 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
private boolean useEpoll() {
return RemotingUtil.isLinuxPlatform()
- && nettyServerConfig.isUseEpollNativeSelector()
- && Epoll.isAvailable();
+ && nettyServerConfig.isUseEpollNativeSelector()
+ && Epoll.isAvailable();
}
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
- nettyServerConfig.getServerWorkerThreads(),
- new ThreadFactory() {
+ nettyServerConfig.getServerWorkerThreads(),
+ new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
+ private final AtomicInteger threadIndex = new
AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyServerCodecThread_" +
this.threadIndex.incrementAndGet());
- }
- });
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyServerCodecThread_" +
this.threadIndex.incrementAndGet());
+ }
+ });
prepareSharableHandlers();
- ServerBootstrap childHandler =
- this.serverBootstrap.group(this.eventLoopGroupBoss,
this.eventLoopGroupSelector)
+ serverBootstrap.group(this.eventLoopGroupBoss,
this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class :
NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
@@ -211,39 +224,23 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
.localAddress(new
InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
- public void initChannel(SocketChannel ch) throws Exception
{
+ public void initChannel(SocketChannel ch) {
ch.pipeline()
- .addLast(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, handshakeHandler)
- .addLast(defaultEventExecutorGroup,
- encoder,
- new NettyDecoder(),
- new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
- connectionManageHandler,
- serverHandler
- );
+ .addLast(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, handshakeHandler)
+ .addLast(defaultEventExecutorGroup,
+ encoder,
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ connectionManageHandler,
+ serverHandler
+ );
}
});
- if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
- log.info("server set SO_SNDBUF to {}",
nettyServerConfig.getServerSocketSndBufSize());
- childHandler.childOption(ChannelOption.SO_SNDBUF,
nettyServerConfig.getServerSocketSndBufSize());
- }
- if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
- log.info("server set SO_RCVBUF to {}",
nettyServerConfig.getServerSocketRcvBufSize());
- childHandler.childOption(ChannelOption.SO_RCVBUF,
nettyServerConfig.getServerSocketRcvBufSize());
- }
- if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 &&
nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
- log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
- nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark());
- childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(
- nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()));
- }
- if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
- childHandler.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT);
- }
+ addCustomConfig(serverBootstrap);
try {
- ChannelFuture sync =
this.serverBootstrap.bind(nettyServerConfig.getListenPort()).sync();
+ ChannelFuture sync =
serverBootstrap.bind(nettyServerConfig.getListenPort()).sync();
InetSocketAddress addr = (InetSocketAddress)
sync.channel().localAddress();
if (0 == nettyServerConfig.getListenPort()) {
this.nettyServerConfig.setListenPort(addr.getPort());
@@ -271,20 +268,37 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}, 1000 * 3, 1000);
}
+ private void addCustomConfig(ServerBootstrap childHandler) {
+ if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
+ log.info("server set SO_SNDBUF to {}",
nettyServerConfig.getServerSocketSndBufSize());
+ childHandler.childOption(ChannelOption.SO_SNDBUF,
nettyServerConfig.getServerSocketSndBufSize());
+ }
+ if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
+ log.info("server set SO_RCVBUF to {}",
nettyServerConfig.getServerSocketRcvBufSize());
+ childHandler.childOption(ChannelOption.SO_RCVBUF,
nettyServerConfig.getServerSocketRcvBufSize());
+ }
+ if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 &&
nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
+ log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
+ nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark());
+ childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(
+ nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()));
+ }
+
+ if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
+ childHandler.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT);
+ }
+ }
+
@Override
public void shutdown() {
try {
- if (this.timer != null) {
- this.timer.cancel();
- }
+ this.timer.cancel();
this.eventLoopGroupBoss.shutdownGracefully();
this.eventLoopGroupSelector.shutdownGracefully();
- if (this.nettyEventExecutor != null) {
- this.nettyEventExecutor.shutdown();
- }
+ this.nettyEventExecutor.shutdown();
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
@@ -309,13 +323,13 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
executorThis = this.publicExecutor;
}
- Pair<NettyRequestProcessor, ExecutorService> pair = new
Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+ Pair<NettyRequestProcessor, ExecutorService> pair = new
Pair<>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor,
ExecutorService executor) {
- this.defaultRequestProcessor = new Pair<NettyRequestProcessor,
ExecutorService>(processor, executor);
+ this.defaultRequestProcessorPair = new Pair<>(processor, executor);
}
@Override
@@ -330,14 +344,14 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public Pair<NettyRequestProcessor, ExecutorService>
getDefaultProcessorPair() {
- return defaultRequestProcessor;
+ return defaultRequestProcessorPair;
}
@Override
public RemotingServer newRemotingServer(final int port) {
SubRemotingServer remotingServer = new SubRemotingServer(port,
- this.nettyServerConfig.getServerOnewaySemaphoreValue(),
- this.nettyServerConfig.getServerAsyncSemaphoreValue());
+ this.nettyServerConfig.getServerOnewaySemaphoreValue(),
+ this.nettyServerConfig.getServerAsyncSemaphoreValue());
NettyRemotingAbstract existingServer =
this.remotingServerTable.putIfAbsent(port, remotingServer);
if (existingServer != null) {
throw new RuntimeException("The port " + port + " already in use
by another RemotingServer");
@@ -352,19 +366,19 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public RemotingCommand invokeSync(final Channel channel, final
RemotingCommand request, final long timeoutMillis)
- throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException {
+ throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
}
@Override
public void invokeAsync(Channel channel, RemotingCommand request, long
timeoutMillis, InvokeCallback invokeCallback)
- throws InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
+ throws InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
}
@Override
public void invokeOneway(Channel channel, RemotingCommand request, long
timeoutMillis) throws InterruptedException,
- RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
+ RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
this.invokeOnewayImpl(channel, request, timeoutMillis);
}
@@ -397,7 +411,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
@Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// mark the current position so that we can peek the first byte to
determine if the content is starting with
// TLS handshake
@@ -415,8 +429,8 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
- .addAfter(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
- .addAfter(defaultEventExecutorGroup,
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
+ .addAfter(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
+ .addAfter(defaultEventExecutorGroup,
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline
to establish SSL connection");
} else {
ctx.close();
@@ -503,7 +517,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
@Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
@@ -512,7 +526,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
RemotingUtil.closeChannel(ctx.channel());
if (NettyRemotingServer.this.channelEventListener != null)
{
NettyRemotingServer.this
- .putNettyEvent(new NettyEvent(NettyEventType.IDLE,
remoteAddress, ctx.channel()));
+ .putNettyEvent(new
NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
@@ -521,7 +535,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
final String remoteAddress =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY SERVER PIPELINE: exceptionCaught {}",
remoteAddress);
log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.",
cause);
@@ -550,19 +564,19 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public void registerProcessor(final int requestCode, final
NettyRequestProcessor processor,
- final ExecutorService executor) {
+ final ExecutorService executor) {
ExecutorService executorThis = executor;
if (null == executor) {
executorThis = NettyRemotingServer.this.publicExecutor;
}
- Pair<NettyRequestProcessor, ExecutorService> pair = new
Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+ Pair<NettyRequestProcessor, ExecutorService> pair = new
Pair<>(processor, executorThis);
this.processorTable.put(requestCode, pair);
}
@Override
public void registerDefaultProcessor(final NettyRequestProcessor
processor, final ExecutorService executor) {
- this.defaultRequestProcessor = new Pair<NettyRequestProcessor,
ExecutorService>(processor, executor);
+ this.defaultRequestProcessorPair = new Pair<>(processor, executor);
}
@Override
@@ -577,36 +591,36 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
@Override
public Pair<NettyRequestProcessor, ExecutorService>
getDefaultProcessorPair() {
- return this.defaultRequestProcessor;
+ return this.defaultRequestProcessorPair;
}
@Override
public RemotingServer newRemotingServer(final int port) {
throw new UnsupportedOperationException("The SubRemotingServer of
NettyRemotingServer " +
- "doesn't support new nested RemotingServer");
+ "doesn't support new nested RemotingServer");
}
@Override
public void removeRemotingServer(final int port) {
throw new UnsupportedOperationException("The SubRemotingServer of
NettyRemotingServer " +
- "doesn't support remove nested RemotingServer");
+ "doesn't support remove nested RemotingServer");
}
@Override
public RemotingCommand invokeSync(final Channel channel, final
RemotingCommand request,
- final long timeoutMillis) throws InterruptedException,
RemotingSendRequestException, RemotingTimeoutException {
+ final long timeoutMillis) throws
InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
return this.invokeSyncImpl(channel, request, timeoutMillis);
}
@Override
public void invokeAsync(final Channel channel, final RemotingCommand
request, final long timeoutMillis,
- final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
+ final InvokeCallback invokeCallback) throws
InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
this.invokeAsyncImpl(channel, request, timeoutMillis,
invokeCallback);
}
@Override
public void invokeOneway(final Channel channel, final RemotingCommand
request,
- final long timeoutMillis) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
+ final long timeoutMillis) throws
InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
this.invokeOnewayImpl(channel, request, timeoutMillis);
}