This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 4593c46b2a bugfix: raft split-brain causes incorrect cluster
information (#7891)
4593c46b2a is described below
commit 4593c46b2a8ed33818e29386477b30ef9ed060d9
Author: funkye <[email protected]>
AuthorDate: Mon Dec 22 17:00:45 2025 +0800
bugfix: raft split-brain causes incorrect cluster information (#7891)
---
changes/en-us/2.x.md | 1 +
changes/zh-cn/2.x.md | 1 +
.../metadata/namingserver/NamingServerNode.java | 4 +-
.../namingserver/NamingServerNodeTest.java | 30 ++++
.../namingserver/entity/pojo/ClusterData.java | 2 +-
.../namingserver/filter/ConsoleRemotingFilter.java | 2 +-
.../seata/namingserver/manager/NamingManager.java | 19 ++-
.../seata/namingserver/NamingManagerTest.java | 8 +-
.../instance/RaftServerInstanceStrategy.java | 5 +-
.../instance/RaftServerInstanceStrategyTest.java | 188 +++++++++++++++++++++
10 files changed, 242 insertions(+), 18 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 8c8f7184a1..2bf4c7db59 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -66,6 +66,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7860](https://github.com/apache/incubator-seata/pull/7860)] Fix the issue
where delayed messages in RocketMQ transactions were silently ignored, now
explicitly throwing an exception
- [[#7879](https://github.com/apache/incubator-seata/pull/7879)] fix:correct
server port and naming server port
- [[#7881](https://github.com/apache/incubator-seata/pull/7881)] the
vgroup_table in the SQL files of all databases should use a three-column unique
constraint
+- [[#7891](https://github.com/apache/incubator-seata/pull/7891)] raft
split-brain causes incorrect cluster information
### optimize:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 37e62b42fa..ced6b7e49d 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -66,6 +66,7 @@
- [[#7860](https://github.com/apache/incubator-seata/pull/7860)] 修复 RocketMQ
事务中延迟消息静默失效的问题,改为显式抛出异常
- [[#7879](https://github.com/apache/incubator-seata/pull/7879)]
修正服务器端口与命名服务器端口
- [[#7881](https://github.com/apache/incubator-seata/pull/7881)]
修复了除mysql之外其他数据库的sql文件关于vgroup_table的表,使用三列唯一索引以保证容灾迁移高可用
+- [[#7891](https://github.com/apache/incubator-seata/pull/7891)]
修复raft重选举与心跳并发时可能导致namingserver侧的元数据存在多个leader
### optimize:
diff --git
a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
index cdef618bb3..19cef34245 100644
---
a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
+++
b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
@@ -78,7 +78,9 @@ public class NamingServerNode extends Node {
NamingServerNode otherNode = (NamingServerNode) obj;
// other node is newer than me
- return otherNode.term > term ||
!StringUtils.equals(otherNode.getVersion(), this.getVersion());
+ return otherNode.term > term
+ || (otherNode.term >= term && !Objects.equals(this.getRole(),
otherNode.getRole()))
+ || !StringUtils.equals(otherNode.getVersion(),
this.getVersion());
}
public void setWeight(double weight) {
diff --git
a/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java
b/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java
index e0c417a064..09bab78353 100644
---
a/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java
+++
b/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java
@@ -18,6 +18,7 @@ package org.apache.seata.common.metadata.namingserver;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Node;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -112,4 +113,33 @@ class NamingServerNodeTest {
newerNode.setVersion("v1");
Assertions.assertTrue(currentNode.isChanged(newerNode));
}
+
+ @Test
+ void testRaftSplitBrainChanged() {
+ NamingServerNode currentNode = new NamingServerNode();
+ currentNode.setTerm(100L);
+ currentNode.setRole(ClusterRole.LEADER);
+ currentNode.setControl(new Node.Endpoint("1.1.1.1", 888));
+ currentNode.setTransaction(new Node.Endpoint("2.2.2.2", 999));
+ // When heartbeat and cluster election occur concurrently, the term is
updated, but the leader status has not
+ // yet been modified.
+ NamingServerNode newerNode = new NamingServerNode();
+ newerNode.setTerm(101L);
+ newerNode.setControl(new Node.Endpoint("1.1.1.1", 888));
+ newerNode.setTransaction(new Node.Endpoint("2.2.2.2", 999));
+ newerNode.setRole(ClusterRole.LEADER);
+ Assertions.assertTrue(currentNode.isChanged(newerNode));
+ NamingServerNode newerNode2 = new NamingServerNode();
+ newerNode2.setTerm(101L);
+ newerNode2.setControl(new Node.Endpoint("1.1.1.1", 888));
+ newerNode2.setTransaction(new Node.Endpoint("2.2.2.2", 999));
+ newerNode2.setRole(ClusterRole.FOLLOWER);
+ Assertions.assertTrue(newerNode.isChanged(newerNode2));
+ NamingServerNode newerNode3 = new NamingServerNode();
+ newerNode3.setTerm(101L);
+ newerNode3.setControl(new Node.Endpoint("1.1.1.1", 888));
+ newerNode3.setTransaction(new Node.Endpoint("2.2.2.2", 999));
+ newerNode3.setRole(ClusterRole.FOLLOWER);
+ Assertions.assertFalse(newerNode2.isChanged(newerNode3));
+ }
}
diff --git
a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java
b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java
index 5612512277..695207b9ec 100644
---
a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java
+++
b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java
@@ -99,7 +99,7 @@ public class ClusterData {
}
@JsonIgnore
- public List<Node> getInstanceList() {
+ public List<NamingServerNode> getInstanceList() {
return unitData.values().stream()
.map(Unit::getNamingInstanceList)
.flatMap(List::stream)
diff --git
a/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java
b/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java
index e96503c2c8..2a7357468d 100644
---
a/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java
+++
b/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java
@@ -79,7 +79,7 @@ public class ConsoleRemotingFilter implements Filter {
String vgroup = request.getParameter("vgroup");
if (StringUtils.isNotBlank(namespace)
&& (StringUtils.isNotBlank(cluster) ||
StringUtils.isNotBlank(vgroup))) {
- List<Node> list = null;
+ List<NamingServerNode> list = null;
if (StringUtils.isNotBlank(vgroup)) {
list = namingManager.getInstancesByVgroupAndNamespace(
namespace,
diff --git
a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
index a738d229a0..cfa50d0688 100644
---
a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
+++
b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
@@ -192,15 +192,20 @@ public class NamingManager {
return new Result<>("400", "vGroup " + vGroup + " already exists");
}
// add vGroup in new cluster
- List<Node> nodeList = getInstances(namespace, clusterName);
+ List<NamingServerNode> nodeList = getInstances(namespace, clusterName);
if (nodeList == null || nodeList.size() == 0) {
LOGGER.error("no instance in cluster {}", clusterName);
return new Result<>("301", "no instance in cluster:" +
clusterName);
} else {
- Node node = nodeList.stream()
+ List<NamingServerNode> filteredNodes = nodeList.stream()
.filter(n -> n.getRole() == ClusterRole.LEADER ||
n.getRole() == ClusterRole.MEMBER)
- .collect(Collectors.toList())
- .get(0);
+ .sorted((o1, o2) -> Long.compare(o2.getTerm(),
o1.getTerm()))
+ .collect(Collectors.toList());
+ if (filteredNodes.isEmpty()) {
+ LOGGER.error("no suitable instance (LEADER or MEMBER) in
cluster {}", clusterName);
+ return new Result<>("301", "no suitable instance in cluster:"
+ clusterName);
+ }
+ NamingServerNode node = filteredNodes.get(0);
String controlHost = node.getControl().getHost();
int controlPort = node.getControl().getPort();
String httpUrl = NamingServerConstants.HTTP_PREFIX
@@ -397,11 +402,11 @@ public class NamingManager {
return clusterList;
}
- public List<Node> getInstances(String namespace, String clusterName) {
+ public List<NamingServerNode> getInstances(String namespace, String
clusterName) {
return getInstances(namespace, clusterName, false);
}
- public List<Node> getInstances(String namespace, String clusterName,
boolean readOnly) {
+ public List<NamingServerNode> getInstances(String namespace, String
clusterName, boolean readOnly) {
Map<String, ClusterData> clusterDataHashMap =
namespaceClusterDataMap.get(namespace);
if (clusterDataHashMap == null) {
LOGGER.warn("no clusters in namespace: {}", namespace);
@@ -419,7 +424,7 @@ public class NamingManager {
.collect(Collectors.toList());
}
- public List<Node> getInstancesByVgroupAndNamespace(String namespace,
String vgroup, boolean readOnly) {
+ public List<NamingServerNode> getInstancesByVgroupAndNamespace(String
namespace, String vgroup, boolean readOnly) {
List<Cluster> clusters = getClusterListByVgroup(vgroup, namespace);
if (CollectionUtils.isEmpty(clusters)) {
return Collections.emptyList();
diff --git
a/namingserver/src/test/java/org/apache/seata/namingserver/NamingManagerTest.java
b/namingserver/src/test/java/org/apache/seata/namingserver/NamingManagerTest.java
index f50c84d638..a44fbac321 100644
---
a/namingserver/src/test/java/org/apache/seata/namingserver/NamingManagerTest.java
+++
b/namingserver/src/test/java/org/apache/seata/namingserver/NamingManagerTest.java
@@ -118,7 +118,7 @@ class NamingManagerTest {
assertTrue(result);
- List<Node> instances = namingManager.getInstances(namespace,
clusterName);
+ List<NamingServerNode> instances =
namingManager.getInstances(namespace, clusterName);
assertEquals(1, instances.size());
assertEquals("127.0.0.1", instances.get(0).getTransaction().getHost());
assertEquals(8080, instances.get(0).getTransaction().getPort());
@@ -153,7 +153,7 @@ class NamingManagerTest {
node.getMetadata().put(CONSTANT_GROUP, vGroups);
namingManager.registerInstance(node, namespace, clusterName, unitName);
- List<Node> instances = namingManager.getInstances(namespace,
clusterName);
+ List<NamingServerNode> instances =
namingManager.getInstances(namespace, clusterName);
assertEquals(1, instances.size());
boolean result = namingManager.unregisterInstance(namespace,
clusterName, unitName, node);
@@ -199,7 +199,7 @@ class NamingManagerTest {
node.getMetadata().put(CONSTANT_GROUP, vGroups);
namingManager.registerInstance(node, namespace, clusterName, unitName);
- List<Node> instances = namingManager.getInstances(namespace,
clusterName);
+ List<NamingServerNode> instances =
namingManager.getInstances(namespace, clusterName);
assertEquals(1, instances.size());
ReflectionTestUtils.setField(namingManager, "heartbeatTimeThreshold",
10);
@@ -210,7 +210,7 @@ class NamingManagerTest {
}
namingManager.instanceHeartBeatCheck();
- List<Node> afterHeartBeat = namingManager.getInstances(namespace,
clusterName);
+ List<NamingServerNode> afterHeartBeat =
namingManager.getInstances(namespace, clusterName);
assertEquals(0, afterHeartBeat.size());
Mockito.verify(applicationContext,
Mockito.times(2)).publishEvent(any(ClusterChangeEvent.class));
}
diff --git
a/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
b/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
index 7c20a40800..230933baa4 100644
---
a/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
+++
b/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
@@ -66,10 +66,7 @@ public class RaftServerInstanceStrategy extends
AbstractSeataInstanceStrategy
String clusterType = String.valueOf(StoreConfig.getSessionMode());
instance.addMetadata("cluster-type",
"raft".equalsIgnoreCase(clusterType) ? clusterType : "default");
RaftStateMachine stateMachine =
RaftServerManager.getRaftServer(unit).getRaftStateMachine();
- long term = RaftServerManager.getRaftServer(unit)
- .getRaftStateMachine()
- .getCurrentTerm()
- .get();
+ long term = stateMachine.getCurrentTerm().get();
instance.setTerm(term);
instance.setRole(stateMachine.isLeader() ? ClusterRole.LEADER :
ClusterRole.FOLLOWER);
// load node Endpoint
diff --git
a/server/src/test/java/org/apache/seata/server/instance/RaftServerInstanceStrategyTest.java
b/server/src/test/java/org/apache/seata/server/instance/RaftServerInstanceStrategyTest.java
new file mode 100644
index 0000000000..d5d59d94fb
--- /dev/null
+++
b/server/src/test/java/org/apache/seata/server/instance/RaftServerInstanceStrategyTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.instance;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import org.apache.seata.common.XID;
+import org.apache.seata.common.holder.ObjectHolder;
+import org.apache.seata.common.metadata.ClusterRole;
+import org.apache.seata.common.metadata.Instance;
+import org.apache.seata.common.metadata.Node;
+import org.apache.seata.common.store.SessionMode;
+import org.apache.seata.server.BaseSpringBootTest;
+import org.apache.seata.server.cluster.listener.ClusterChangeEvent;
+import org.apache.seata.server.cluster.raft.RaftServer;
+import org.apache.seata.server.cluster.raft.RaftServerManager;
+import org.apache.seata.server.cluster.raft.RaftStateMachine;
+import org.apache.seata.server.session.SessionHolder;
+import org.apache.seata.server.store.StoreConfig;
+import org.apache.seata.server.store.VGroupMappingStoreManager;
+import
org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNamingServerProperties;
+import
org.apache.seata.spring.boot.autoconfigure.properties.server.raft.ServerRaftProperties;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.core.env.MapPropertySource;
+import org.springframework.core.env.MutablePropertySources;
+import org.springframework.core.env.StandardEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class RaftServerInstanceStrategyTest extends BaseSpringBootTest {
+
+ private RaftServerInstanceStrategy strategy;
+
+ private ServerRaftProperties raftProperties;
+
+ private RegistryNamingServerProperties namingProps;
+
+ private ServerProperties serverProperties;
+
+ @BeforeEach
+ void setUp() {
+ strategy = new RaftServerInstanceStrategy();
+ raftProperties = new ServerRaftProperties();
+ namingProps = new RegistryNamingServerProperties();
+ serverProperties = new ServerProperties();
+ // set minimal required fields
+ raftProperties.setGroup("groupA");
+ namingProps.setNamespace("ns");
+ namingProps.setCluster("clusterA");
+ serverProperties.setPort(8088);
+ strategy.raftProperties = raftProperties;
+ strategy.registryNamingServerProperties = namingProps;
+ strategy.serverProperties = serverProperties;
+ }
+
+ @AfterEach
+ void tearDown() {
+ resetInstance();
+ // Do not put null into ObjectHolder to avoid ConcurrentHashMap NPE
+ }
+
+ @Test
+ void serverInstanceInit_shouldPopulateInstanceFromRaft() {
+ ConfigurableEnvironment environment = buildEnvironmentWithMeta();
+
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT,
environment);
+
+ RaftStateMachine stateMachine = mock(RaftStateMachine.class);
+ when(stateMachine.getCurrentTerm()).thenReturn(new AtomicLong(5L));
+ when(stateMachine.isLeader()).thenReturn(true);
+
+ PeerId peerId = new PeerId("127.0.0.1", 9090);
+ RaftServer raftServer = mock(RaftServer.class);
+ when(raftServer.getRaftStateMachine()).thenReturn(stateMachine);
+ when(raftServer.getServerId()).thenReturn(peerId);
+
+ try (MockedStatic<RaftServerManager> raftServerManagerMock =
Mockito.mockStatic(RaftServerManager.class);
+ MockedStatic<StoreConfig> storeConfigMock =
Mockito.mockStatic(StoreConfig.class);
+ MockedStatic<XID> xidMock = Mockito.mockStatic(XID.class)) {
+ raftServerManagerMock
+ .when(() -> RaftServerManager.getRaftServer("groupA"))
+ .thenReturn(raftServer);
+
storeConfigMock.when(StoreConfig::getSessionMode).thenReturn(SessionMode.RAFT);
+ xidMock.when(XID::getIpAddress).thenReturn("10.0.0.1");
+
+ Instance instance = strategy.serverInstanceInit();
+
+ assertEquals("ns", instance.getNamespace());
+ assertEquals("clusterA", instance.getClusterName());
+ assertEquals("groupA", instance.getUnit());
+ assertEquals(5L, instance.getTerm());
+ assertEquals(ClusterRole.LEADER, instance.getRole());
+ Node.Endpoint control = instance.getControl();
+ assertNotNull(control);
+ assertEquals("10.0.0.1", control.getHost());
+ assertEquals(8088, control.getPort());
+ Node.Endpoint internal = instance.getInternal();
+ assertNotNull(internal);
+ assertEquals("127.0.0.1", internal.getHost());
+ assertEquals(9090, internal.getPort());
+ assertEquals("RAFT", instance.getMetadata().get("cluster-type"));
+ }
+ }
+
+ @Test
+ void onChangeEvent_shouldUpdateTermRoleAndNotify() {
+ resetInstance();
+ Instance instance = Instance.getInstance();
+ instance.setRole(ClusterRole.FOLLOWER);
+ instance.setTerm(1L);
+
+ ClusterChangeEvent event = new ClusterChangeEvent(this, "groupA", 12L,
true);
+
+ try (MockedStatic<SessionHolder> sessionHolderMock =
Mockito.mockStatic(SessionHolder.class)) {
+ VGroupMappingStoreManager mappingManager =
mock(VGroupMappingStoreManager.class);
+
sessionHolderMock.when(SessionHolder::getRootVGroupMappingManager).thenReturn(mappingManager);
+
+ strategy.onChangeEvent(event);
+
+ assertEquals(12L, instance.getTerm());
+ assertEquals(ClusterRole.LEADER, instance.getRole());
+ verify(mappingManager, times(1)).notifyMapping();
+ }
+ }
+
+ @Test
+ void typeAndOrderShouldReturnExpectedValues() {
+ assertEquals(SeataInstanceStrategy.Type.RAFT, strategy.type());
+ assertEquals(Integer.MAX_VALUE - 1, strategy.getOrder());
+ }
+
+ private ConfigurableEnvironment buildEnvironmentWithMeta() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("meta.demo", "v");
+ StandardEnvironment environment = new StandardEnvironment();
+ MutablePropertySources sources = environment.getPropertySources();
+ sources.addFirst(new MapPropertySource("testMeta", map));
+ return environment;
+ }
+
+ private void resetInstance() {
+ Instance instance = Instance.getInstance();
+ instance.setNamespace(null);
+ instance.setClusterName(null);
+ instance.setUnit(null);
+ instance.setControl(null);
+ instance.setTransaction(null);
+ instance.setInternal(null);
+ instance.setHealthy(true);
+ instance.setWeight(1.0);
+ instance.setTerm(0L);
+ instance.setTimestamp(0L);
+ instance.setMetadata(new HashMap<>());
+ instance.setRole(ClusterRole.MEMBER);
+ instance.setVersion(null);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]