This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 4593c46b2a bugfix: raft split-brain causes incorrect cluster 
information (#7891)
4593c46b2a is described below

commit 4593c46b2a8ed33818e29386477b30ef9ed060d9
Author: funkye <[email protected]>
AuthorDate: Mon Dec 22 17:00:45 2025 +0800

    bugfix: raft split-brain causes incorrect cluster information (#7891)
---
 changes/en-us/2.x.md                               |   1 +
 changes/zh-cn/2.x.md                               |   1 +
 .../metadata/namingserver/NamingServerNode.java    |   4 +-
 .../namingserver/NamingServerNodeTest.java         |  30 ++++
 .../namingserver/entity/pojo/ClusterData.java      |   2 +-
 .../namingserver/filter/ConsoleRemotingFilter.java |   2 +-
 .../seata/namingserver/manager/NamingManager.java  |  19 ++-
 .../seata/namingserver/NamingManagerTest.java      |   8 +-
 .../instance/RaftServerInstanceStrategy.java       |   5 +-
 .../instance/RaftServerInstanceStrategyTest.java   | 188 +++++++++++++++++++++
 10 files changed, 242 insertions(+), 18 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 8c8f7184a1..2bf4c7db59 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -66,6 +66,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#7860](https://github.com/apache/incubator-seata/pull/7860)] Fix the issue 
where delayed messages in RocketMQ transactions were silently ignored, now 
explicitly throwing an exception
 - [[#7879](https://github.com/apache/incubator-seata/pull/7879)] fix:correct 
server port and naming server port
 - [[#7881](https://github.com/apache/incubator-seata/pull/7881)] the 
vgroup_table in the SQL files of all databases should use a three-column unique 
constraint
+- [[#7891](https://github.com/apache/incubator-seata/pull/7891)] raft 
split-brain causes incorrect cluster information
 
 
 ### optimize:
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 37e62b42fa..ced6b7e49d 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -66,6 +66,7 @@
 - [[#7860](https://github.com/apache/incubator-seata/pull/7860)] 修复 RocketMQ 
事务中延迟消息静默失效的问题,改为显式抛出异常
 - [[#7879](https://github.com/apache/incubator-seata/pull/7879)] 
修正服务器端口与命名服务器端口
 - [[#7881](https://github.com/apache/incubator-seata/pull/7881)] 
修复了除mysql之外其他数据库的sql文件关于vgroup_table的表,使用三列唯一索引以保证容灾迁移高可用
+- [[#7891](https://github.com/apache/incubator-seata/pull/7891)] 
修复raft重选举与心跳并发时可能导致namingserver侧的元数据存在多个leader
 
 
 ### optimize:
diff --git 
a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
 
b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
index cdef618bb3..19cef34245 100644
--- 
a/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
+++ 
b/common/src/main/java/org/apache/seata/common/metadata/namingserver/NamingServerNode.java
@@ -78,7 +78,9 @@ public class NamingServerNode extends Node {
         NamingServerNode otherNode = (NamingServerNode) obj;
 
         // other node is newer than me
-        return otherNode.term > term || 
!StringUtils.equals(otherNode.getVersion(), this.getVersion());
+        return otherNode.term > term
+                || (otherNode.term >= term && !Objects.equals(this.getRole(), 
otherNode.getRole()))
+                || !StringUtils.equals(otherNode.getVersion(), 
this.getVersion());
     }
 
     public void setWeight(double weight) {
diff --git 
a/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java
 
b/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java
index e0c417a064..09bab78353 100644
--- 
a/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java
+++ 
b/common/src/test/java/org/apache/seata/common/metadata/namingserver/NamingServerNodeTest.java
@@ -18,6 +18,7 @@ package org.apache.seata.common.metadata.namingserver;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seata.common.metadata.ClusterRole;
 import org.apache.seata.common.metadata.Node;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -112,4 +113,33 @@ class NamingServerNodeTest {
         newerNode.setVersion("v1");
         Assertions.assertTrue(currentNode.isChanged(newerNode));
     }
+
+    @Test
+    void testRaftSplitBrainChanged() {
+        NamingServerNode currentNode = new NamingServerNode();
+        currentNode.setTerm(100L);
+        currentNode.setRole(ClusterRole.LEADER);
+        currentNode.setControl(new Node.Endpoint("1.1.1.1", 888));
+        currentNode.setTransaction(new Node.Endpoint("2.2.2.2", 999));
+        // When heartbeat and cluster election occur concurrently, the term is 
updated, but the leader status has not
+        // yet been modified.
+        NamingServerNode newerNode = new NamingServerNode();
+        newerNode.setTerm(101L);
+        newerNode.setControl(new Node.Endpoint("1.1.1.1", 888));
+        newerNode.setTransaction(new Node.Endpoint("2.2.2.2", 999));
+        newerNode.setRole(ClusterRole.LEADER);
+        Assertions.assertTrue(currentNode.isChanged(newerNode));
+        NamingServerNode newerNode2 = new NamingServerNode();
+        newerNode2.setTerm(101L);
+        newerNode2.setControl(new Node.Endpoint("1.1.1.1", 888));
+        newerNode2.setTransaction(new Node.Endpoint("2.2.2.2", 999));
+        newerNode2.setRole(ClusterRole.FOLLOWER);
+        Assertions.assertTrue(newerNode.isChanged(newerNode2));
+        NamingServerNode newerNode3 = new NamingServerNode();
+        newerNode3.setTerm(101L);
+        newerNode3.setControl(new Node.Endpoint("1.1.1.1", 888));
+        newerNode3.setTransaction(new Node.Endpoint("2.2.2.2", 999));
+        newerNode3.setRole(ClusterRole.FOLLOWER);
+        Assertions.assertFalse(newerNode2.isChanged(newerNode3));
+    }
 }
diff --git 
a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java
 
b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java
index 5612512277..695207b9ec 100644
--- 
a/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java
+++ 
b/namingserver/src/main/java/org/apache/seata/namingserver/entity/pojo/ClusterData.java
@@ -99,7 +99,7 @@ public class ClusterData {
     }
 
     @JsonIgnore
-    public List<Node> getInstanceList() {
+    public List<NamingServerNode> getInstanceList() {
         return unitData.values().stream()
                 .map(Unit::getNamingInstanceList)
                 .flatMap(List::stream)
diff --git 
a/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java
 
b/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java
index e96503c2c8..2a7357468d 100644
--- 
a/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java
+++ 
b/namingserver/src/main/java/org/apache/seata/namingserver/filter/ConsoleRemotingFilter.java
@@ -79,7 +79,7 @@ public class ConsoleRemotingFilter implements Filter {
                 String vgroup = request.getParameter("vgroup");
                 if (StringUtils.isNotBlank(namespace)
                         && (StringUtils.isNotBlank(cluster) || 
StringUtils.isNotBlank(vgroup))) {
-                    List<Node> list = null;
+                    List<NamingServerNode> list = null;
                     if (StringUtils.isNotBlank(vgroup)) {
                         list = namingManager.getInstancesByVgroupAndNamespace(
                                 namespace,
diff --git 
a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
 
b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
index a738d229a0..cfa50d0688 100644
--- 
a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
+++ 
b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java
@@ -192,15 +192,20 @@ public class NamingManager {
             return new Result<>("400", "vGroup " + vGroup + " already exists");
         }
         // add vGroup in new cluster
-        List<Node> nodeList = getInstances(namespace, clusterName);
+        List<NamingServerNode> nodeList = getInstances(namespace, clusterName);
         if (nodeList == null || nodeList.size() == 0) {
             LOGGER.error("no instance in cluster {}", clusterName);
             return new Result<>("301", "no instance in cluster:" + 
clusterName);
         } else {
-            Node node = nodeList.stream()
+            List<NamingServerNode> filteredNodes = nodeList.stream()
                     .filter(n -> n.getRole() == ClusterRole.LEADER || 
n.getRole() == ClusterRole.MEMBER)
-                    .collect(Collectors.toList())
-                    .get(0);
+                    .sorted((o1, o2) -> Long.compare(o2.getTerm(), 
o1.getTerm()))
+                    .collect(Collectors.toList());
+            if (filteredNodes.isEmpty()) {
+                LOGGER.error("no suitable instance (LEADER or MEMBER) in 
cluster {}", clusterName);
+                return new Result<>("301", "no suitable instance in cluster:" 
+ clusterName);
+            }
+            NamingServerNode node = filteredNodes.get(0);
             String controlHost = node.getControl().getHost();
             int controlPort = node.getControl().getPort();
             String httpUrl = NamingServerConstants.HTTP_PREFIX
@@ -397,11 +402,11 @@ public class NamingManager {
         return clusterList;
     }
 
-    public List<Node> getInstances(String namespace, String clusterName) {
+    public List<NamingServerNode> getInstances(String namespace, String 
clusterName) {
         return getInstances(namespace, clusterName, false);
     }
 
-    public List<Node> getInstances(String namespace, String clusterName, 
boolean readOnly) {
+    public List<NamingServerNode> getInstances(String namespace, String 
clusterName, boolean readOnly) {
         Map<String, ClusterData> clusterDataHashMap = 
namespaceClusterDataMap.get(namespace);
         if (clusterDataHashMap == null) {
             LOGGER.warn("no clusters in namespace: {}", namespace);
@@ -419,7 +424,7 @@ public class NamingManager {
                         .collect(Collectors.toList());
     }
 
-    public List<Node> getInstancesByVgroupAndNamespace(String namespace, 
String vgroup, boolean readOnly) {
+    public List<NamingServerNode> getInstancesByVgroupAndNamespace(String 
namespace, String vgroup, boolean readOnly) {
         List<Cluster> clusters = getClusterListByVgroup(vgroup, namespace);
         if (CollectionUtils.isEmpty(clusters)) {
             return Collections.emptyList();
diff --git 
a/namingserver/src/test/java/org/apache/seata/namingserver/NamingManagerTest.java
 
b/namingserver/src/test/java/org/apache/seata/namingserver/NamingManagerTest.java
index f50c84d638..a44fbac321 100644
--- 
a/namingserver/src/test/java/org/apache/seata/namingserver/NamingManagerTest.java
+++ 
b/namingserver/src/test/java/org/apache/seata/namingserver/NamingManagerTest.java
@@ -118,7 +118,7 @@ class NamingManagerTest {
 
         assertTrue(result);
 
-        List<Node> instances = namingManager.getInstances(namespace, 
clusterName);
+        List<NamingServerNode> instances = 
namingManager.getInstances(namespace, clusterName);
         assertEquals(1, instances.size());
         assertEquals("127.0.0.1", instances.get(0).getTransaction().getHost());
         assertEquals(8080, instances.get(0).getTransaction().getPort());
@@ -153,7 +153,7 @@ class NamingManagerTest {
         node.getMetadata().put(CONSTANT_GROUP, vGroups);
         namingManager.registerInstance(node, namespace, clusterName, unitName);
 
-        List<Node> instances = namingManager.getInstances(namespace, 
clusterName);
+        List<NamingServerNode> instances = 
namingManager.getInstances(namespace, clusterName);
         assertEquals(1, instances.size());
 
         boolean result = namingManager.unregisterInstance(namespace, 
clusterName, unitName, node);
@@ -199,7 +199,7 @@ class NamingManagerTest {
         node.getMetadata().put(CONSTANT_GROUP, vGroups);
         namingManager.registerInstance(node, namespace, clusterName, unitName);
 
-        List<Node> instances = namingManager.getInstances(namespace, 
clusterName);
+        List<NamingServerNode> instances = 
namingManager.getInstances(namespace, clusterName);
         assertEquals(1, instances.size());
 
         ReflectionTestUtils.setField(namingManager, "heartbeatTimeThreshold", 
10);
@@ -210,7 +210,7 @@ class NamingManagerTest {
         }
         namingManager.instanceHeartBeatCheck();
 
-        List<Node> afterHeartBeat = namingManager.getInstances(namespace, 
clusterName);
+        List<NamingServerNode> afterHeartBeat = 
namingManager.getInstances(namespace, clusterName);
         assertEquals(0, afterHeartBeat.size());
         Mockito.verify(applicationContext, 
Mockito.times(2)).publishEvent(any(ClusterChangeEvent.class));
     }
diff --git 
a/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
 
b/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
index 7c20a40800..230933baa4 100644
--- 
a/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
+++ 
b/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
@@ -66,10 +66,7 @@ public class RaftServerInstanceStrategy extends 
AbstractSeataInstanceStrategy
         String clusterType = String.valueOf(StoreConfig.getSessionMode());
         instance.addMetadata("cluster-type", 
"raft".equalsIgnoreCase(clusterType) ? clusterType : "default");
         RaftStateMachine stateMachine = 
RaftServerManager.getRaftServer(unit).getRaftStateMachine();
-        long term = RaftServerManager.getRaftServer(unit)
-                .getRaftStateMachine()
-                .getCurrentTerm()
-                .get();
+        long term = stateMachine.getCurrentTerm().get();
         instance.setTerm(term);
         instance.setRole(stateMachine.isLeader() ? ClusterRole.LEADER : 
ClusterRole.FOLLOWER);
         // load node Endpoint
diff --git 
a/server/src/test/java/org/apache/seata/server/instance/RaftServerInstanceStrategyTest.java
 
b/server/src/test/java/org/apache/seata/server/instance/RaftServerInstanceStrategyTest.java
new file mode 100644
index 0000000000..d5d59d94fb
--- /dev/null
+++ 
b/server/src/test/java/org/apache/seata/server/instance/RaftServerInstanceStrategyTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.instance;
+
+import com.alipay.sofa.jraft.entity.PeerId;
+import org.apache.seata.common.XID;
+import org.apache.seata.common.holder.ObjectHolder;
+import org.apache.seata.common.metadata.ClusterRole;
+import org.apache.seata.common.metadata.Instance;
+import org.apache.seata.common.metadata.Node;
+import org.apache.seata.common.store.SessionMode;
+import org.apache.seata.server.BaseSpringBootTest;
+import org.apache.seata.server.cluster.listener.ClusterChangeEvent;
+import org.apache.seata.server.cluster.raft.RaftServer;
+import org.apache.seata.server.cluster.raft.RaftServerManager;
+import org.apache.seata.server.cluster.raft.RaftStateMachine;
+import org.apache.seata.server.session.SessionHolder;
+import org.apache.seata.server.store.StoreConfig;
+import org.apache.seata.server.store.VGroupMappingStoreManager;
+import 
org.apache.seata.spring.boot.autoconfigure.properties.registry.RegistryNamingServerProperties;
+import 
org.apache.seata.spring.boot.autoconfigure.properties.server.raft.ServerRaftProperties;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
+import org.springframework.core.env.ConfigurableEnvironment;
+import org.springframework.core.env.MapPropertySource;
+import org.springframework.core.env.MutablePropertySources;
+import org.springframework.core.env.StandardEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class RaftServerInstanceStrategyTest extends BaseSpringBootTest {
+
+    private RaftServerInstanceStrategy strategy;
+
+    private ServerRaftProperties raftProperties;
+
+    private RegistryNamingServerProperties namingProps;
+
+    private ServerProperties serverProperties;
+
+    @BeforeEach
+    void setUp() {
+        strategy = new RaftServerInstanceStrategy();
+        raftProperties = new ServerRaftProperties();
+        namingProps = new RegistryNamingServerProperties();
+        serverProperties = new ServerProperties();
+        // set minimal required fields
+        raftProperties.setGroup("groupA");
+        namingProps.setNamespace("ns");
+        namingProps.setCluster("clusterA");
+        serverProperties.setPort(8088);
+        strategy.raftProperties = raftProperties;
+        strategy.registryNamingServerProperties = namingProps;
+        strategy.serverProperties = serverProperties;
+    }
+
+    @AfterEach
+    void tearDown() {
+        resetInstance();
+        // Do not put null into ObjectHolder to avoid ConcurrentHashMap NPE
+    }
+
+    @Test
+    void serverInstanceInit_shouldPopulateInstanceFromRaft() {
+        ConfigurableEnvironment environment = buildEnvironmentWithMeta();
+        
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT, 
environment);
+
+        RaftStateMachine stateMachine = mock(RaftStateMachine.class);
+        when(stateMachine.getCurrentTerm()).thenReturn(new AtomicLong(5L));
+        when(stateMachine.isLeader()).thenReturn(true);
+
+        PeerId peerId = new PeerId("127.0.0.1", 9090);
+        RaftServer raftServer = mock(RaftServer.class);
+        when(raftServer.getRaftStateMachine()).thenReturn(stateMachine);
+        when(raftServer.getServerId()).thenReturn(peerId);
+
+        try (MockedStatic<RaftServerManager> raftServerManagerMock = 
Mockito.mockStatic(RaftServerManager.class);
+                MockedStatic<StoreConfig> storeConfigMock = 
Mockito.mockStatic(StoreConfig.class);
+                MockedStatic<XID> xidMock = Mockito.mockStatic(XID.class)) {
+            raftServerManagerMock
+                    .when(() -> RaftServerManager.getRaftServer("groupA"))
+                    .thenReturn(raftServer);
+            
storeConfigMock.when(StoreConfig::getSessionMode).thenReturn(SessionMode.RAFT);
+            xidMock.when(XID::getIpAddress).thenReturn("10.0.0.1");
+
+            Instance instance = strategy.serverInstanceInit();
+
+            assertEquals("ns", instance.getNamespace());
+            assertEquals("clusterA", instance.getClusterName());
+            assertEquals("groupA", instance.getUnit());
+            assertEquals(5L, instance.getTerm());
+            assertEquals(ClusterRole.LEADER, instance.getRole());
+            Node.Endpoint control = instance.getControl();
+            assertNotNull(control);
+            assertEquals("10.0.0.1", control.getHost());
+            assertEquals(8088, control.getPort());
+            Node.Endpoint internal = instance.getInternal();
+            assertNotNull(internal);
+            assertEquals("127.0.0.1", internal.getHost());
+            assertEquals(9090, internal.getPort());
+            assertEquals("RAFT", instance.getMetadata().get("cluster-type"));
+        }
+    }
+
+    @Test
+    void onChangeEvent_shouldUpdateTermRoleAndNotify() {
+        resetInstance();
+        Instance instance = Instance.getInstance();
+        instance.setRole(ClusterRole.FOLLOWER);
+        instance.setTerm(1L);
+
+        ClusterChangeEvent event = new ClusterChangeEvent(this, "groupA", 12L, 
true);
+
+        try (MockedStatic<SessionHolder> sessionHolderMock = 
Mockito.mockStatic(SessionHolder.class)) {
+            VGroupMappingStoreManager mappingManager = 
mock(VGroupMappingStoreManager.class);
+            
sessionHolderMock.when(SessionHolder::getRootVGroupMappingManager).thenReturn(mappingManager);
+
+            strategy.onChangeEvent(event);
+
+            assertEquals(12L, instance.getTerm());
+            assertEquals(ClusterRole.LEADER, instance.getRole());
+            verify(mappingManager, times(1)).notifyMapping();
+        }
+    }
+
+    @Test
+    void typeAndOrderShouldReturnExpectedValues() {
+        assertEquals(SeataInstanceStrategy.Type.RAFT, strategy.type());
+        assertEquals(Integer.MAX_VALUE - 1, strategy.getOrder());
+    }
+
+    private ConfigurableEnvironment buildEnvironmentWithMeta() {
+        Map<String, Object> map = new HashMap<>();
+        map.put("meta.demo", "v");
+        StandardEnvironment environment = new StandardEnvironment();
+        MutablePropertySources sources = environment.getPropertySources();
+        sources.addFirst(new MapPropertySource("testMeta", map));
+        return environment;
+    }
+
+    private void resetInstance() {
+        Instance instance = Instance.getInstance();
+        instance.setNamespace(null);
+        instance.setClusterName(null);
+        instance.setUnit(null);
+        instance.setControl(null);
+        instance.setTransaction(null);
+        instance.setInternal(null);
+        instance.setHealthy(true);
+        instance.setWeight(1.0);
+        instance.setTerm(0L);
+        instance.setTimestamp(0L);
+        instance.setMetadata(new HashMap<>());
+        instance.setRole(ClusterRole.MEMBER);
+        instance.setVersion(null);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to