This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8fc57f1468 [ISSUE #9717] Fix
RaftBrokerHeartBeatManager#scanNotActiveBroker was not actually executed
8fc57f1468 is described below
commit 8fc57f1468d182eea11c01e089c0250d647dc5f3
Author: Liu Shengzhong <[email protected]>
AuthorDate: Fri Sep 19 17:25:40 2025 +0800
[ISSUE #9717] Fix RaftBrokerHeartBeatManager#scanNotActiveBroker was not
actually executed
---
.../impl/heartbeat/RaftBrokerHeartBeatManager.java | 5 +-
.../impl/RaftBrokerHeartBeatManagerTest.java | 88 ++++++++++++++++++++++
2 files changed, 91 insertions(+), 2 deletions(-)
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/RaftBrokerHeartBeatManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/RaftBrokerHeartBeatManager.java
index d981ff430c..c7d5d26fd0 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/RaftBrokerHeartBeatManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/RaftBrokerHeartBeatManager.java
@@ -63,7 +63,7 @@ public class RaftBrokerHeartBeatManager implements
BrokerHeartbeatManager {
// resolve the scene
// when controller all down and startup again, we wait for some time to
avoid electing a new leader,which is not necessary
- private long firstReceivedHeartbeatTime = -1;
+ private volatile long firstReceivedHeartbeatTime = -1;
public RaftBrokerHeartBeatManager(ControllerConfig controllerConfig) {
this.scheduledService = Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("RaftBrokerHeartbeatManager_scheduledService_"));
@@ -188,7 +188,8 @@ public class RaftBrokerHeartBeatManager implements
BrokerHeartbeatManager {
}
// if has not received any heartbeat from broker, we do not need to
scan
- if (this.firstReceivedHeartbeatTime +
controllerConfig.getJraftConfig().getjRaftScanWaitTimeoutMs() <
System.currentTimeMillis()) {
+ if (this.firstReceivedHeartbeatTime == -1 ||
+ this.firstReceivedHeartbeatTime +
controllerConfig.getJraftConfig().getjRaftScanWaitTimeoutMs() >
System.currentTimeMillis()) {
log.info("has not received any heartbeat from broker, skip scan
not active broker");
return;
}
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/RaftBrokerHeartBeatManagerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/RaftBrokerHeartBeatManagerTest.java
new file mode 100644
index 0000000000..3c28158982
--- /dev/null
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/RaftBrokerHeartBeatManagerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.controller.impl;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.Channel;
+import io.netty.channel.DefaultChannelPromise;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.controller.impl.heartbeat.BrokerIdentityInfo;
+import
org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RaftBrokerHeartBeatManagerTest {
+ @Mock
+ private JRaftController jRaftController;
+ @Mock
+ private Channel brokerChannel;
+ private RaftBrokerHeartBeatManager heartbeatManager;
+ private final ControllerConfig config = new ControllerConfig();
+
+ @Before
+ public void init() {
+ when(jRaftController.isLeaderState()).thenReturn(true);
+ config.setScanNotActiveBrokerInterval(1000);
+ this.heartbeatManager = new RaftBrokerHeartBeatManager(config);
+ this.heartbeatManager.setController(jRaftController);
+ this.heartbeatManager.initialize();
+ this.heartbeatManager.start();
+ }
+
+ @Test
+ public void testDetectBrokerAlive() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ this.heartbeatManager.registerBrokerLifecycleListener((clusterName,
brokerName, brokerId) -> {
+ latch.countDown(); // onBrokerInactive
+ });
+ String clusterName = "cluster-1";
+ String brokerName = "broker-1";
+ String brokerAddr = "127.0.0.1:10911";
+ long brokerId = 1L;
+ RemotingCommand onBrokerHeartbeat =
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "");
+ RemotingCommand checkNotActiveResp1 =
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "");
+ checkNotActiveResp1.setBody(JSON.toJSONBytes(Collections.emptyList()));
+ RemotingCommand checkNotActiveResp2 =
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "");
+
checkNotActiveResp2.setBody(JSON.toJSONBytes(Collections.singletonList(new
BrokerIdentityInfo(clusterName, brokerName, brokerId))));
+ when(jRaftController.onBrokerHeartBeat(any()))
+ .thenReturn(CompletableFuture.completedFuture(onBrokerHeartbeat));
+ when(jRaftController.checkNotActiveBroker(any()))
+ .thenReturn(CompletableFuture.completedFuture(checkNotActiveResp1))
+
.thenReturn(CompletableFuture.completedFuture(checkNotActiveResp2));
+ DefaultChannelPromise channelPromise = new
DefaultChannelPromise(brokerChannel);
+ channelPromise.setSuccess();
+ when(brokerChannel.close()).thenReturn(channelPromise);
+ this.heartbeatManager.onBrokerHeartbeat(clusterName, brokerName,
brokerAddr, brokerId, 3000L, brokerChannel,
+ 1, 1L, -1L, 0);
+ assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
+ this.heartbeatManager.shutdown();
+ }
+}