lizhanhui closed pull request #312: [ISSUE#311] Improve broker register
topicrouter info performance
URL: https://github.com/apache/rocketmq/pull/312
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 60f287af5..110d8ad23 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +63,7 @@
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
@@ -741,7 +743,7 @@ public void run() {
log.error("registerBrokerAll Exception", e);
}
}
- }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
+ }, 1000 * 10, Math.max(10000,
Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)),
TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
@@ -752,6 +754,24 @@ public void run() {
}
}
+ public synchronized void registerIncrementBrokerData(TopicConfig
topicConfig, DataVersion dataVersion) {
+ TopicConfig registerTopicConfig = topicConfig;
+ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+ ||
!PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+ registerTopicConfig =
+ new TopicConfig(topicConfig.getTopicName(),
topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+ this.brokerConfig.getBrokerPermission());
+ }
+
+ ConcurrentMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<String, TopicConfig>();
+ topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new
TopicConfigSerializeWrapper();
+ topicConfigSerializeWrapper.setDataVersion(dataVersion);
+ topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+
+ doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
+ }
+
public synchronized void registerBrokerAll(final boolean checkOrderConfig,
boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper =
this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
@@ -772,30 +792,35 @@ public synchronized void registerBrokerAll(final boolean
checkOrderConfig, boole
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
- List<RegisterBrokerResult> registerBrokerResultList =
this.brokerOuterAPI.registerBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.getHAServerAddr(),
- topicConfigWrapper,
- this.filterServerManager.buildNewFilterServerList(),
- oneway,
- this.brokerConfig.getRegisterBrokerTimeoutMills(),
- this.brokerConfig.isCompressedRegister());
-
- if (registerBrokerResultList.size() > 0) {
- RegisterBrokerResult registerBrokerResult =
registerBrokerResultList.get(0);
- if (registerBrokerResult != null) {
- if (this.updateMasterHAServerAddrPeriodically &&
registerBrokerResult.getHaServerAddr() != null) {
-
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
- }
+ doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
+ }
+ }
+
+ private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
+ TopicConfigSerializeWrapper topicConfigWrapper) {
+ List<RegisterBrokerResult> registerBrokerResultList =
this.brokerOuterAPI.registerBrokerAll(
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.getHAServerAddr(),
+ topicConfigWrapper,
+ this.filterServerManager.buildNewFilterServerList(),
+ oneway,
+ this.brokerConfig.getRegisterBrokerTimeoutMills(),
+ this.brokerConfig.isCompressedRegister());
+
+ if (registerBrokerResultList.size() > 0) {
+ RegisterBrokerResult registerBrokerResult =
registerBrokerResultList.get(0);
+ if (registerBrokerResult != null) {
+ if (this.updateMasterHAServerAddrPeriodically &&
registerBrokerResult.getHaServerAddr() != null) {
+
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+ }
-
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
- if (checkOrderConfig) {
-
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
- }
+ if (checkOrderConfig) {
+
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a9e54aa3e..1a704a8c6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -212,7 +212,7 @@ public boolean rejectRequest() {
return false;
}
- private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
+ private synchronized RemotingCommand
updateAndCreateTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
@@ -245,12 +245,13 @@ private RemotingCommand
updateAndCreateTopic(ChannelHandlerContext ctx,
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ?
0 : requestHeader.getTopicSysFlag());
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- this.brokerController.registerBrokerAll(false, true, true);
+
+
this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
return null;
}
- private RemotingCommand deleteTopic(ChannelHandlerContext ctx,
+ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
DeleteTopicRequestHeader requestHeader =
@@ -296,7 +297,7 @@ private RemotingCommand
getAllTopicConfig(ChannelHandlerContext ctx, RemotingCom
return response;
}
- private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx,
RemotingCommand request) {
+ private synchronized RemotingCommand
updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
log.info("updateBrokerConfig called by {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 4468b2d55..21ae522b2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -136,6 +136,14 @@
private boolean forceRegister = false;
+ /**
+ *
+ * This configurable item defines interval of topics registration of
broker to name server. Allowing values are
+ * between 10, 000 and 60, 000 milliseconds.
+ *
+ */
+ private int registerNameServerPeriod = 1000 * 30;
+
public boolean isTraceOn() {
return traceOn;
}
@@ -617,4 +625,12 @@ public boolean isForceRegister() {
public void setForceRegister(boolean forceRegister) {
this.forceRegister = forceRegister;
}
+
+ public int getRegisterNameServerPeriod() {
+ return registerNameServerPeriod;
+ }
+
+ public void setRegisterNameServerPeriod(int registerNameServerPeriod) {
+ this.registerNameServerPeriod = registerNameServerPeriod;
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services