This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-beta-dledger-controller
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta-dledger-controller
by this push:
new ebce38705 Add notifyBrokerRoleChanged configuration
ebce38705 is described below
commit ebce387057ffa40e060c8ff401622ea128e2844e
Author: RongtongJin <[email protected]>
AuthorDate: Tue Jun 7 17:40:40 2022 +0800
Add notifyBrokerRoleChanged configuration
---
.../rocketmq/common/{namesrv => }/ControllerConfig.java | 13 +++++++++++--
.../org/apache/rocketmq/common/namesrv/NamesrvConfig.java | 2 +-
.../org/apache/rocketmq/controller/ControllerManager.java | 9 ++++++---
.../org/apache/rocketmq/controller/ControllerStartup.java | 2 +-
.../apache/rocketmq/controller/impl/DLedgerController.java | 6 +++++-
.../controller/impl/DefaultBrokerHeartbeatManager.java | 2 +-
.../controller/impl/manager/ReplicasInfoManager.java | 2 +-
.../controller/impl/controller/ControllerManagerTest.java | 2 +-
.../impl/controller/impl/DLedgerControllerTest.java | 2 +-
.../controller/impl/DefaultBrokerHeartbeatManagerTest.java | 2 +-
.../controller/impl/manager/ReplicasInfoManagerTest.java | 2 +-
.../java/org/apache/rocketmq/namesrv/NamesrvController.java | 2 +-
.../java/org/apache/rocketmq/namesrv/NamesrvStartup.java | 2 +-
.../apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java | 13 ++++++++-----
.../store/ha/autoswitch/AutoSwitchHAConnection.java | 2 +-
.../rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java | 2 +-
.../test/autoswitchrole/AutoSwitchRoleIntegrationTest.java | 2 +-
17 files changed, 43 insertions(+), 24 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
similarity index 93%
rename from
common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
rename to common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
index ca2a423eb..3af7aac20 100644
---
a/common/src/main/java/org/apache/rocketmq/common/namesrv/ControllerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
@@ -14,10 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.common.namesrv;
+package org.apache.rocketmq.common;
import java.io.File;
-import org.apache.rocketmq.common.MixAll;
public class ControllerConfig {
@@ -60,6 +59,8 @@ public class ControllerConfig {
*/
private boolean isProcessReadEvent = false;
+ private volatile boolean notifyBrokerRoleChanged = true;
+
public String getRocketmqHome() {
return rocketmqHome;
}
@@ -163,4 +164,12 @@ public class ControllerConfig {
public void setProcessReadEvent(boolean processReadEvent) {
isProcessReadEvent = processReadEvent;
}
+
+ public boolean isNotifyBrokerRoleChanged() {
+ return notifyBrokerRoleChanged;
+ }
+
+ public void setNotifyBrokerRoleChanged(boolean notifyBrokerRoleChanged) {
+ this.notifyBrokerRoleChanged = notifyBrokerRoleChanged;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index 698a0582b..5f5221a21 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -71,7 +71,7 @@ public class NamesrvConfig {
private volatile boolean enableTopicList = true;
- private volatile boolean notifyMinBrokerIdChanged = true;
+ private volatile boolean notifyMinBrokerIdChanged = false;
public boolean isOrderMessageEnable() {
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 5f800a5a6..5e070ddd2 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -30,7 +30,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
import
org.apache.rocketmq.common.protocol.header.NotifyBrokerRoleChangedRequestHeader;
@@ -109,7 +109,10 @@ public class ControllerManager {
if
(StringUtils.isNotEmpty(responseHeader.getNewMasterAddress())) {
heartbeatManager.changeBrokerMetadata(clusterName,
responseHeader.getNewMasterAddress(), MixAll.MASTER_ID);
}
- notifyBrokerMasterChanged(responseHeader,
clusterName);
+
+ if
(controllerConfig.isNotifyBrokerRoleChanged()) {
+ notifyBrokerRoleChanged(responseHeader,
clusterName);
+ }
}
} catch (Exception ignored) {
}
@@ -126,7 +129,7 @@ public class ControllerManager {
/**
* Notify master and all slaves for a broker that the master role changed.
*/
- public void notifyBrokerMasterChanged(final ElectMasterResponseHeader
electMasterResult, final String clusterName) {
+ public void notifyBrokerRoleChanged(final ElectMasterResponseHeader
electMasterResult, final String clusterName) {
final BrokerMemberGroup memberGroup =
electMasterResult.getBrokerMemberGroup();
if (memberGroup != null) {
// First, inform the master
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
index 3820e64e6..8c1ee89e0 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/ControllerStartup.java
@@ -31,7 +31,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 3823b9425..c8c213714 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -38,7 +38,7 @@ import java.util.function.Supplier;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
@@ -140,6 +140,10 @@ public class DLedgerController implements Controller {
return this.roleHandler.isLeaderState();
}
+ public ControllerConfig getControllerConfig() {
+ return controllerConfig;
+ }
+
@Override
public CompletableFuture<RemotingCommand>
alterSyncStateSet(AlterSyncStateSetRequestHeader request,
final SyncStateSet syncStateSet) {
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
index 050f0aa6f..3e58f3b6c 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DefaultBrokerHeartbeatManager.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.BrokerAddrInfo;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index eefb7feeb..2590bd205 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -27,7 +27,7 @@ import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerMemberGroup;
import org.apache.rocketmq.common.protocol.body.InSyncStateData;
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
index 80f3a409a..95e6db4d3 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/ControllerManagerTest.java
@@ -25,7 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import
org.apache.rocketmq.common.protocol.header.namesrv.BrokerHeartbeatRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
index 7a4e04897..56730f40d 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DLedgerControllerTest.java
@@ -25,7 +25,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.BrokerRegisterRequestHeader;
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
index 8ec99a1e0..5565e59bd 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/DefaultBrokerHeartbeatManagerTest.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.controller.impl.controller.impl;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.impl.DefaultBrokerHeartbeatManager;
import org.junit.Before;
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
index fc7d420f5..8bf594076 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -19,7 +19,7 @@ package
org.apache.rocketmq.controller.impl.controller.impl.manager;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.AlterSyncStateSetResponseHeader;
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index 08da5d0a4..cd3b1e5c2 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -30,7 +30,7 @@ import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.controller.Controller;
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index c580a5718..017768cf6 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -32,7 +32,7 @@ import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 4872bb87e..7e99e73d3 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -61,6 +61,7 @@ import
org.apache.rocketmq.common.statictopic.TopicQueueMappingInfo;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.controller.Controller;
+import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
@@ -614,7 +615,9 @@ public class RouteInfoManager {
final ElectMasterResponseHeader
responseHeader = (ElectMasterResponseHeader) response.readCustomHeader();
if (responseHeader != null) {
log.info("Broker {}'s master {}
shutdown, elect a new master done, result:{}", brokerName, responseHeader);
-
notifyBrokerMasterChanged(responseHeader, clusterName);
+ if (controller instanceof
DLedgerController && ((DLedgerController)
controller).getControllerConfig().isNotifyBrokerRoleChanged()) {
+
notifyBrokerMasterChanged(responseHeader, clusterName);
+ }
}
} catch (Exception ignored) {
}
@@ -931,7 +934,8 @@ public class RouteInfoManager {
/**
* Notify master and all slaves for a broker that the master role changed.
*/
- private void notifyBrokerMasterChanged(final ElectMasterResponseHeader
electMasterResult, final String clusterName) {
+ private void notifyBrokerMasterChanged(final ElectMasterResponseHeader
electMasterResult,
+ final String clusterName) {
final BrokerMemberGroup memberGroup =
electMasterResult.getBrokerMemberGroup();
if (memberGroup != null) {
// First, inform the master
@@ -951,7 +955,8 @@ public class RouteInfoManager {
}
}
- private void doNotifyBrokerRoleChanged(final String brokerAddr, final Long
brokerId, final ElectMasterResponseHeader responseHeader) {
+ private void doNotifyBrokerRoleChanged(final String brokerAddr, final Long
brokerId,
+ final ElectMasterResponseHeader responseHeader) {
if (StringUtils.isNoneEmpty(brokerAddr)) {
log.info("Try notify broker {} with id {} that role changed,
responseHeader:{}", brokerAddr, brokerId, responseHeader);
final NotifyBrokerRoleChangedRequestHeader requestHeader = new
NotifyBrokerRoleChangedRequestHeader(responseHeader.getNewMasterAddress(),
@@ -1250,8 +1255,6 @@ class BrokerLiveInfo {
this.haServerAddr = haServerAddr;
}
-
-
@Override
public String toString() {
return "BrokerLiveInfo [lastUpdateTimestamp=" + lastUpdateTimestamp +
", dataVersion=" + dataVersion
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 0c8629b57..8ee6c985e 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -656,7 +656,7 @@ public class AutoSwitchHAConnection implements HAConnection
{
// Setup initial transferEpoch
EpochEntry epochEntry =
AutoSwitchHAConnection.this.epochCache.findEpochEntryByOffset(this.nextTransferFromWhere);
if (epochEntry == null) {
- LOGGER.error("Failed to find an epochEntry
to match slaveRequestOffset {}", this.nextTransferFromWhere);
+ LOGGER.error("Failed to find an epochEntry
to match nextTransferFromWhere {}", this.nextTransferFromWhere);
sendHeartbeatIfNeeded();
waitForRunning(500);
break;
diff --git
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
index d9968b610..f085488b2 100644
---
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
+++
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleBase.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.store.GetMessageResult;
diff --git
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
index 5ecb3988d..5b0f1947d 100644
---
a/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
+++
b/test/src/test/java/org/apache/rocketmq/test/autoswitchrole/AutoSwitchRoleIntegrationTest.java
@@ -21,7 +21,7 @@ import java.io.File;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.hacontroller.ReplicasManager;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.namesrv.ControllerConfig;
+import org.apache.rocketmq.common.ControllerConfig;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.common.protocol.body.SyncStateSet;
import org.apache.rocketmq.namesrv.NamesrvController;