This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 68460b8d05 optimize: discover the raft leader node from the naming 
server (#7234)
68460b8d05 is described below

commit 68460b8d05bbdbbfba4969336b5a823fa9d7f5ff
Author: funkye <[email protected]>
AuthorDate: Sat Mar 22 19:59:36 2025 +0800

    optimize: discover the raft leader node from the naming server (#7234)
---
 .gitignore                                         |   2 +
 changes/en-us/2.x.md                               |   1 +
 changes/zh-cn/2.x.md                               |   1 +
 codecov.yml                                        |   2 +-
 .../NamingserverRegistryServiceImpl.java           |  59 +++++++-----
 .../NamingserverRegistryServiceImplTest.java       |  81 ++++++++++++----
 .../src/test/resources/registry.conf               |  32 +++----
 namingserver/src/main/resources/application.yml    |   2 +-
 .../raft/snapshot/vgroup/VGroupSnapshotFile.java   |   1 +
 .../instance/RaftServerInstanceStrategy.java       |   7 +-
 .../raft/store/RaftVGroupMappingStoreManager.java  |  31 +++---
 .../store/RaftVGroupMappingStoreManagerTest.java   | 105 +++++++++++++++++++++
 12 files changed, 251 insertions(+), 73 deletions(-)

diff --git a/.gitignore b/.gitignore
index 308540907c..b5d777838b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -42,6 +42,8 @@ dependency-reduced-pom.xml
 /server/sessionStore/
 /server/db_store/
 /sessionStore/
+/vgroupStore
+/vgroupStore/**
 /test/sessionStore/
 /distribution/sessionStore/
 /distribution/*/sessionStore/
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 2a07331898..c29109394a 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -56,6 +56,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#7215](https://github.com/apache/incubator-seata/pull/7215)] intercept 
non-leader write requests of the console trx operation
 - [[#7222](https://github.com/apache/incubator-seata/pull/7222)] in raft mode 
add the vgroup field to global lock
 - [[#7229](https://github.com/apache/incubator-seata/pull/7229)] update Notice
+- [[#7234](https://github.com/apache/incubator-seata/pull/7234)] discover the 
raft leader node from the naming server
 
 
 
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 78371a4422..abfc266986 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -56,6 +56,7 @@
 - [[#7215](https://github.com/apache/incubator-seata/pull/7215)] 
拦截控制台写操作的非leader的raft请求
 - [[#7222](https://github.com/apache/incubator-seata/pull/7222)] 
raft模式下控制台接口响应全局锁信息时增加vgroup字段
 - [[#7229](https://github.com/apache/incubator-seata/pull/7229)] 更新 Notice
+- [[#7234](https://github.com/apache/incubator-seata/pull/7234)] 
优化raft对接namingserve时的服务发现逻辑
 
 
 ### security:
diff --git a/codecov.yml b/codecov.yml
index 25e06a23ad..b6129ab691 100644
--- a/codecov.yml
+++ b/codecov.yml
@@ -24,7 +24,7 @@ coverage:
       default:
         threshold: 1%
         if_not_found: success
-    changes: yes
+    changes: no
   precision: 2
   range: "50...100"
 ignore:
diff --git 
a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
 
b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
index 55088501e0..c5aa98c9a1 100644
--- 
a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -52,9 +53,13 @@ import org.apache.http.util.EntityUtils;
 import org.apache.seata.common.ConfigurationKeys;
 import org.apache.seata.common.exception.AuthenticationFailedException;
 import org.apache.seata.common.exception.RetryableException;
+import org.apache.seata.common.metadata.Cluster;
 import org.apache.seata.common.metadata.ClusterRole;
 import org.apache.seata.common.metadata.Instance;
+import org.apache.seata.common.metadata.Node;
 import org.apache.seata.common.metadata.namingserver.MetaResponse;
+import org.apache.seata.common.metadata.namingserver.NamingServerNode;
+import org.apache.seata.common.metadata.namingserver.Unit;
 import org.apache.seata.common.thread.NamedThreadFactory;
 import org.apache.seata.common.util.CollectionUtils;
 import org.apache.seata.common.util.HttpClientUtil;
@@ -111,8 +116,8 @@ public class NamingserverRegistryServiceImpl implements 
RegistryService<NamingLi
     private volatile boolean isSubscribed = false;
     private static final Configuration FILE_CONFIG = 
ConfigurationFactory.CURRENT_FILE_INSTANCE;
     private String namingServerAddressCache;
-    private static ConcurrentMap<String /* namingserver address */, 
AtomicInteger /* Number of Health Check Continues Failures */> 
AVAILABLE_NAMINGSERVER_MAP = new ConcurrentHashMap<>();
-    private static final ConcurrentMap<String/* vgroup */, 
List<InetSocketAddress>> VGROUP_ADDRESS_MAP = new ConcurrentHashMap<>();
+    private static final ConcurrentMap<String /* namingserver address */, 
AtomicInteger /* Number of Health Check Continues Failures */> 
AVAILABLE_NAMINGSERVER_MAP = new ConcurrentHashMap<>();
+    private static final ConcurrentMap<String/* vgroup */, 
List<NamingServerNode>> VGROUP_ADDRESS_MAP = new ConcurrentHashMap<>();
     private static final ConcurrentMap<String/* vgroup */, 
List<NamingListener>> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>();
     protected static final ScheduledExecutorService
         SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(1, 
new NamedThreadFactory("seata-namingser-scheduled", THREAD_POOL_NUM, true));
@@ -399,7 +404,10 @@ public class NamingserverRegistryServiceImpl implements 
RegistryService<NamingLi
             }, key);
         }
 
-        return VGROUP_ADDRESS_MAP.get(key);
+        return 
Optional.ofNullable(VGROUP_ADDRESS_MAP.get(key)).orElse(Collections.emptyList()).stream().map(node
 -> {
+            Node.Endpoint endpoint = node.getTransaction();
+            return new InetSocketAddress(endpoint.getHost(), 
endpoint.getPort());
+        }).collect(Collectors.toList());
     }
 
 
@@ -426,27 +434,38 @@ public class NamingserverRegistryServiceImpl implements 
RegistryService<NamingLi
             // jsonResponse -> MetaResponse
             MetaResponse metaResponse = OBJECT_MAPPER.readValue(jsonResponse, 
new TypeReference<MetaResponse>() {
             });
-            // MetaResponse -> endpoint list
-            List<InetSocketAddress> newAddressList = 
metaResponse.getClusterList().stream()
-                .flatMap(cluster -> cluster.getUnitData().stream())
-                .flatMap(unit -> unit.getNamingInstanceList().stream()
-                    .filter(namingServerNode -> namingServerNode.getRole() == 
ClusterRole.LEADER
-                        || namingServerNode.getRole() == ClusterRole.MEMBER)
-                    .map(namingInstance -> new 
InetSocketAddress(namingInstance.getTransaction().getHost(),
-                        namingInstance.getTransaction().getPort())))
-                .collect(Collectors.toList());
-            if (metaResponse.getTerm() > 0) {
-                term = metaResponse.getTerm();
-            }
-            VGROUP_ADDRESS_MAP.put(vGroup, newAddressList);
-            removeOfflineAddressesIfNecessary(vGroup,vGroup,newAddressList);
-            return newAddressList;
+            return handleMetadata(metaResponse, vGroup);
         } catch (IOException e) {
             LOGGER.error(e.getMessage());
             throw new RemoteException();
         }
     }
 
+    public List<InetSocketAddress> handleMetadata(MetaResponse metaResponse, 
String vGroup) {
+        // MetaResponse -> endpoint list
+        List<NamingServerNode> newAddressList = new ArrayList<>();
+        if (metaResponse.getTerm() > 0) {
+            term = metaResponse.getTerm();
+        }
+        for (Cluster cluster : metaResponse.getClusterList()) {
+            for (Unit unitDatum : cluster.getUnitData()) {
+                // In raft mode, only the leader is cached, while in non-raft 
cluster mode, all nodes are cached.
+                
newAddressList.addAll(unitDatum.getNamingInstanceList().stream()
+                    .filter(instance -> (instance.getRole() == 
ClusterRole.LEADER && instance.getTerm() >= term)
+                        || instance.getRole() == ClusterRole.MEMBER)
+                    .collect(Collectors.toList()));
+            }
+        }
+        List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();
+        for (NamingServerNode node : newAddressList) {
+            Node.Endpoint endpoint = node.getTransaction();
+            inetSocketAddresses.add(new InetSocketAddress(endpoint.getHost(), 
endpoint.getPort()));
+        }
+        removeOfflineAddressesIfNecessary(vGroup, vGroup, inetSocketAddresses);
+        VGROUP_ADDRESS_MAP.put(vGroup, newAddressList);
+        return inetSocketAddresses;
+    }
+
     @Override
     public void close() throws Exception {
 
@@ -487,8 +506,6 @@ public class NamingserverRegistryServiceImpl implements 
RegistryService<NamingLi
                                                       List<InetSocketAddress> 
aliveAddress) {
         Map<String, List<InetSocketAddress>> clusterAddressMap = 
CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
             key -> new ConcurrentHashMap<>());
-
-
         return clusterAddressMap.put(transactionServiceGroup, aliveAddress);
     }
 
@@ -530,7 +547,7 @@ public class NamingserverRegistryServiceImpl implements 
RegistryService<NamingLi
         String namingAddrsKey = String.join(FILE_CONFIG_SPLIT_CHAR, 
FILE_ROOT_REGISTRY, REGISTRY_TYPE, NAMING_SERVICE_URL_KEY);
 
         String urlListStr = FILE_CONFIG.getConfig(namingAddrsKey);
-        if (urlListStr.isEmpty()) {
+        if (StringUtils.isBlank(urlListStr)) {
             throw new NamingRegistryException("Naming server url can not be 
null!");
         }
         return 
Arrays.stream(urlListStr.split(",")).collect(Collectors.toList());
diff --git 
a/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java
 
b/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java
index 44c0af6570..b6f1dc273a 100644
--- 
a/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java
+++ 
b/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.seata.discovery.registry.namingserver;
 
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.rmi.RemoteException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,11 +32,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.http.entity.ContentType;
 import org.apache.http.protocol.HTTP;
 import org.apache.seata.common.holder.ObjectHolder;
+import org.apache.seata.common.metadata.Cluster;
+import org.apache.seata.common.metadata.ClusterRole;
+import org.apache.seata.common.metadata.Node;
+import org.apache.seata.common.metadata.namingserver.MetaResponse;
+import org.apache.seata.common.metadata.namingserver.NamingServerNode;
+import org.apache.seata.common.metadata.namingserver.Unit;
 import org.apache.seata.config.Configuration;
 import org.apache.seata.config.ConfigurationFactory;
 import org.apache.seata.common.util.HttpClientUtil;
 import org.apache.seata.discovery.registry.RegistryService;
 import org.apache.http.client.methods.CloseableHttpResponse;
+import org.junit.After;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -47,16 +57,15 @@ import 
org.springframework.core.env.PropertiesPropertySource;
 import static 
org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-@Disabled
 class NamingserverRegistryServiceImplTest {
 
     private static final Configuration FILE_CONFIG = 
ConfigurationFactory.CURRENT_FILE_INSTANCE;
 
     @BeforeAll
     public static void beforeClass() throws Exception {
-        System.setProperty("registry.namingserver.namespace", "dev");
-        System.setProperty("registry.namingserver.cluster", "cluster1");
-        System.setProperty("registry.namingserver.serverAddr", 
"127.0.0.1:8080");
+        System.setProperty("registry.seata.namespace", "dev");
+        System.setProperty("registry.seata.cluster", "cluster1");
+        System.setProperty("registry.seata.server-addr", "127.0.0.1:8080");
         AnnotationConfigApplicationContext context = new 
AnnotationConfigApplicationContext();
 
         // 获取应用程序环境
@@ -68,9 +77,14 @@ class NamingserverRegistryServiceImplTest {
         PropertiesPropertySource customPropertySource = new 
PropertiesPropertySource("customSource", customProperties);
         propertySources.addLast(customPropertySource);
         
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT, 
environment);
-
     }
 
+    @AfterAll
+    public static void afterClass() {
+        System.clearProperty("registry.seata.namespace");
+        System.clearProperty("registry.seata.cluster");
+        System.clearProperty("registry.seata.server-addr");
+    }
 
     @Test
     public void unregister1() throws Exception {
@@ -82,6 +96,7 @@ class NamingserverRegistryServiceImplTest {
 
 
     @Test
+    @Disabled
     public void getNamingAddrsTest() {
         NamingserverRegistryServiceImpl namingserverRegistryService = 
NamingserverRegistryServiceImpl.getInstance();
         List<String> list = namingserverRegistryService.getNamingAddrs();
@@ -90,6 +105,7 @@ class NamingserverRegistryServiceImplTest {
 
 
     @Test
+    @Disabled
     public void getNamingAddrTest() {
         NamingserverRegistryServiceImpl namingserverRegistryService = 
NamingserverRegistryServiceImpl.getInstance();
         String addr = namingserverRegistryService.getNamingAddr();
@@ -98,14 +114,7 @@ class NamingserverRegistryServiceImplTest {
 
 
     @Test
-    public void convertTest() {
-        InetSocketAddress inetSocketAddress = new 
InetSocketAddress("127.0.0.1", 8088);
-        assertEquals(inetSocketAddress.getAddress().getHostAddress(), 
"127.0.0.1");
-        assertEquals(inetSocketAddress.getPort(), 8088);
-    }
-
-
-    @Test
+    @Disabled
     public void testRegister1() throws Exception {
 
         RegistryService registryService = new 
NamingserverRegistryProvider().provide();
@@ -117,7 +126,6 @@ class NamingserverRegistryServiceImplTest {
 
         //2.create vGroup in cluster
         createGroupInCluster("dev", "group1", "cluster1");
-
         //3.get instances
         List<InetSocketAddress> list = registryService.lookup("group1");
 
@@ -131,8 +139,46 @@ class NamingserverRegistryServiceImplTest {
 
     }
 
+    @Test
+    public void testHandleMetadata() throws Exception {
+        NamingserverRegistryServiceImpl registryService = 
NamingserverRegistryServiceImpl.getInstance();
+        // Use reflection to set the isSubscribed field to true
+        Field isSubscribedField = 
NamingserverRegistryServiceImpl.class.getDeclaredField("isSubscribed");
+        isSubscribedField.setAccessible(true);
+        isSubscribedField.set(registryService, true);
+
+        // Create a mock MetaResponse
+        MetaResponse metaResponse = new MetaResponse();
+        metaResponse.setTerm(1);
+
+        Cluster cluster = new Cluster();
+        Unit unit = new Unit();
+        List<NamingServerNode> namingInstanceList = new ArrayList<>();
+        NamingServerNode node = new NamingServerNode();
+        node.setRole(ClusterRole.LEADER);
+        node.setTerm(1);
+        node.setTransaction(new Node.Endpoint("127.0.0.1", 8091));
+        namingInstanceList.add(node);
+        unit.setNamingInstanceList(namingInstanceList);
+        List<Unit> unitData = new ArrayList<>();
+        unitData.add(unit);
+        cluster.setUnitData(unitData);
+        List<Cluster> clusterList = new ArrayList<>();
+        clusterList.add(cluster);
+        metaResponse.setClusterList(clusterList);
+
+        // Call the method to test
+        List<InetSocketAddress> result = 
registryService.handleMetadata(metaResponse, "testGroup");
+        registryService.lookup("testGroup");
+        // Verify the result
+        assertEquals(1, result.size());
+        assertEquals("127.0.0.1", result.get(0).getAddress().getHostAddress());
+        assertEquals(8091, result.get(0).getPort());
+        isSubscribedField.set(registryService, false);
+    }
 
     @Test
+    @Disabled
     public void testRegister2() throws Exception {
         NamingserverRegistryServiceImpl registryService = 
(NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide();
         InetSocketAddress inetSocketAddress1 = new 
InetSocketAddress("127.0.0.1", 8088);
@@ -157,6 +203,7 @@ class NamingserverRegistryServiceImplTest {
 
 
     @Test
+    @Disabled
     public void testRegister3() throws Exception {
         NamingserverRegistryServiceImpl registryService = 
(NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide();
         InetSocketAddress inetSocketAddress1 = new 
InetSocketAddress("127.0.0.1", 8088);
@@ -189,6 +236,7 @@ class NamingserverRegistryServiceImplTest {
 
 
     @Test
+    @Disabled
     public void testUnregister() throws Exception {
         RegistryService registryService = new 
NamingserverRegistryProvider().provide();
         InetSocketAddress inetSocketAddress1 = new 
InetSocketAddress("127.0.0.1", 8088);
@@ -214,7 +262,7 @@ class NamingserverRegistryServiceImplTest {
     }
 
 
-    //    @Disabled
+    @Disabled
     @Test
     public void testWatch() throws Exception {
         NamingserverRegistryServiceImpl registryService = 
(NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide();
@@ -254,7 +302,7 @@ class NamingserverRegistryServiceImplTest {
 
     }
 
-    //    @Disabled
+    @Disabled
     @Test
     public void testSubscribe() throws Exception {
         NamingserverRegistryServiceImpl registryService = 
NamingserverRegistryServiceImpl.getInstance();
@@ -282,6 +330,7 @@ class NamingserverRegistryServiceImplTest {
 
 
     @Test
+    @Disabled
     public void testUnsubscribe() throws Exception {
         NamingserverRegistryServiceImpl registryService = 
(NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide();
 
diff --git 
a/discovery/seata-discovery-namingserver/src/test/resources/registry.conf 
b/discovery/seata-discovery-namingserver/src/test/resources/registry.conf
index 3190efa393..274688f4bb 100644
--- a/discovery/seata-discovery-namingserver/src/test/resources/registry.conf
+++ b/discovery/seata-discovery-namingserver/src/test/resources/registry.conf
@@ -1,19 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 
 registry {
   # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom
diff --git a/namingserver/src/main/resources/application.yml 
b/namingserver/src/main/resources/application.yml
index 7006f3eda6..4764227f0e 100644
--- a/namingserver/src/main/resources/application.yml
+++ b/namingserver/src/main/resources/application.yml
@@ -36,6 +36,6 @@ seata:
   security:
     secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
     tokenValidityInMilliseconds: 1800000
-    csrf-ignore-urls: /naming/v1/**
+    csrf-ignore-urls: /naming/v1/**,/api/v1/naming/**
     ignore:
       urls: 
/,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/naming/v1/health,/error
diff --git 
a/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java
 
b/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java
index 8f8c7c46ca..9dbcba26b4 100644
--- 
a/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java
+++ 
b/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java
@@ -78,6 +78,7 @@ public class VGroupSnapshotFile implements Serializable, 
StoreSnapshotFile {
             Map<String/*vgroup*/, MappingDO> map = (Map<String/*vgroup*/, 
MappingDO>)load(path);
             RaftVGroupMappingStoreManager raftVGroupMappingStoreManager =
                 
(RaftVGroupMappingStoreManager)SessionHolder.getRootVGroupMappingManager();
+            raftVGroupMappingStoreManager.clear(group);
             raftVGroupMappingStoreManager.localAddVGroups(map, group);
             return true;
         } catch (final Exception e) {
diff --git 
a/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
 
b/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
index ab8ccd651c..fe26844145 100644
--- 
a/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
+++ 
b/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
@@ -63,7 +63,7 @@ public class RaftServerInstanceStrategy extends 
AbstractSeataInstanceStrategy
         instance.setUnit(unit);
         // load cluster type
         String clusterType = String.valueOf(StoreConfig.getSessionMode());
-        instance.addMetadata("cluster-type", "raft".equals(clusterType) ? 
clusterType : "default");
+        instance.addMetadata("cluster-type", 
"raft".equalsIgnoreCase(clusterType) ? clusterType : "default");
         RaftStateMachine stateMachine = 
RaftServerManager.getRaftServer(unit).getRaftStateMachine();
         long term = 
RaftServerManager.getRaftServer(unit).getRaftStateMachine().getCurrentTerm().get();
         instance.setTerm(term);
@@ -100,8 +100,9 @@ public class RaftServerInstanceStrategy extends 
AbstractSeataInstanceStrategy
     @EventListener
     @Async
     public void onChangeEvent(ClusterChangeEvent event) {
-        Instance.getInstance().setTerm(event.getTerm());
-        Instance.getInstance().setRole(event.isLeader() ? ClusterRole.LEADER : 
ClusterRole.FOLLOWER);
+        Instance instance = Instance.getInstance();
+        instance.setTerm(event.getTerm());
+        instance.setRole(event.isLeader() ? ClusterRole.LEADER : 
ClusterRole.FOLLOWER);
         SessionHolder.getRootVGroupMappingManager().notifyMapping();
     }
 
diff --git 
a/server/src/main/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManager.java
 
b/server/src/main/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManager.java
index 26ca5e3ec5..44a2fd5ce0 100644
--- 
a/server/src/main/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManager.java
+++ 
b/server/src/main/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManager.java
@@ -16,13 +16,11 @@
  */
 package org.apache.seata.server.storage.raft.store;
 
-import java.net.InetSocketAddress;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import com.alipay.sofa.jraft.Closure;
-import org.apache.seata.common.XID;
 import org.apache.seata.common.loader.LoadLevel;
 import org.apache.seata.common.metadata.ClusterRole;
 import org.apache.seata.common.metadata.Instance;
@@ -39,12 +37,12 @@ import 
org.apache.seata.server.store.VGroupMappingStoreManager;
 public class RaftVGroupMappingStoreManager implements 
VGroupMappingStoreManager {
 
     private final static Map<String/*unit(raft group)*/, Map<String/*vgroup*/, 
MappingDO>> VGROUP_MAPPING =
-        new HashMap<>();
+        new ConcurrentHashMap<>();
 
 
     public boolean localAddVGroup(MappingDO mappingDO) {
         return VGROUP_MAPPING.computeIfAbsent(mappingDO.getUnit(), k -> new 
HashMap<>()).put(mappingDO.getVGroup(),
-            mappingDO) != null;
+            mappingDO) == null;
     }
 
     public void localAddVGroups(Map<String/*vgroup*/, MappingDO> vGroups, 
String unit) {
@@ -114,7 +112,11 @@ public class RaftVGroupMappingStoreManager implements 
VGroupMappingStoreManager
     }
 
     public Map<String/*vgroup*/, MappingDO> loadVGroupsByUnit(String unit) {
-        return VGROUP_MAPPING.getOrDefault(unit, Collections.emptyMap());
+        return VGROUP_MAPPING.getOrDefault(unit, new HashMap<>());
+    }
+
+    public void clear(String unit) {
+        VGROUP_MAPPING.remove(unit);
     }
 
     @Override
@@ -123,19 +125,18 @@ public class RaftVGroupMappingStoreManager implements 
VGroupMappingStoreManager
     }
 
     @Override
-   public void notifyMapping() {
+    public void notifyMapping() {
         Instance instance = Instance.getInstance();
         Map<String, Object> map = this.readVGroups();
         instance.addMetadata("vGroup", map);
-        for (String group : RaftServerManager.groups()) {
-            Instance node = instance.clone();
-            node.setRole(RaftServerManager.isLeader(group) ? 
ClusterRole.LEADER : ClusterRole.FOLLOWER);
-            Instance.getInstances().add(node);
-        }
         try {
-            InetSocketAddress address = new 
InetSocketAddress(XID.getIpAddress(), XID.getPort());
-            for (RegistryService<?> registryService : 
MultiRegistryFactory.getInstances()) {
-                registryService.register(address);
+            for (String group : RaftServerManager.groups()) {
+                Instance node = instance.clone();
+                node.setRole(RaftServerManager.isLeader(group) ? 
ClusterRole.LEADER : ClusterRole.FOLLOWER);
+                Instance.getInstances().add(node);
+                for (RegistryService<?> registryService : 
MultiRegistryFactory.getInstances()) {
+                    registryService.register(node);
+                }
             }
         } catch (Exception e) {
             throw new RuntimeException("vGroup mapping relationship notified 
failed! ", e);
diff --git 
a/server/src/test/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManagerTest.java
 
b/server/src/test/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManagerTest.java
new file mode 100644
index 0000000000..250062a4b7
--- /dev/null
+++ 
b/server/src/test/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManagerTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.storage.raft.store;
+
+import org.apache.seata.core.store.MappingDO;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SpringBootTest
+public class RaftVGroupMappingStoreManagerTest {
+
+       private RaftVGroupMappingStoreManager raftVGroupMappingStoreManager;
+
+       @BeforeEach
+       public void setUp() {
+               raftVGroupMappingStoreManager = new 
RaftVGroupMappingStoreManager();
+               raftVGroupMappingStoreManager.clear("unit1");
+       }
+
+       @Test
+       public void testLocalAddVGroup() {
+               MappingDO mappingDO = new MappingDO();
+               mappingDO.setUnit("unit1");
+               mappingDO.setVGroup("vgroup2");
+
+               boolean result = 
raftVGroupMappingStoreManager.localAddVGroup(mappingDO);
+
+               assertTrue(result);
+               assertEquals(mappingDO, 
raftVGroupMappingStoreManager.loadVGroupsByUnit("unit1").get("vgroup2"));
+       }
+
+       @Test
+       public void testLocalAddVGroups() {
+               Map<String, MappingDO> vGroups = new HashMap<>();
+               MappingDO mappingDO1 = new MappingDO();
+               mappingDO1.setUnit("unit1");
+               mappingDO1.setVGroup("vgroup1");
+               vGroups.put("vgroup1", mappingDO1);
+
+               MappingDO mappingDO2 = new MappingDO();
+               mappingDO2.setUnit("unit1");
+               mappingDO2.setVGroup("vgroup2");
+               vGroups.put("vgroup2", mappingDO2);
+
+               raftVGroupMappingStoreManager.localAddVGroups(vGroups, "unit1");
+
+               assertEquals(mappingDO1, 
raftVGroupMappingStoreManager.loadVGroupsByUnit("unit1").get("vgroup1"));
+               assertEquals(mappingDO2, 
raftVGroupMappingStoreManager.loadVGroupsByUnit("unit1").get("vgroup2"));
+       }
+
+       @Test
+       public void testLocalRemoveVGroup() {
+               MappingDO mappingDO = new MappingDO();
+               mappingDO.setUnit("unit1");
+               mappingDO.setVGroup("vgroup1");
+
+               raftVGroupMappingStoreManager.localAddVGroup(mappingDO);
+               boolean result = 
raftVGroupMappingStoreManager.localRemoveVGroup("vgroup1");
+
+               assertTrue(result);
+               
assertTrue(raftVGroupMappingStoreManager.loadVGroupsByUnit("unit1").isEmpty());
+       }
+
+       @Test
+       public void testLoadVGroupsByUnit() {
+               MappingDO mappingDO1 = new MappingDO();
+               mappingDO1.setUnit("unit1");
+               mappingDO1.setVGroup("vgroup1");
+
+               MappingDO mappingDO2 = new MappingDO();
+               mappingDO2.setUnit("unit1");
+               mappingDO2.setVGroup("vgroup2");
+
+               raftVGroupMappingStoreManager.localAddVGroup(mappingDO1);
+               raftVGroupMappingStoreManager.localAddVGroup(mappingDO2);
+
+               Map<String, MappingDO> result = 
raftVGroupMappingStoreManager.loadVGroupsByUnit("unit1");
+
+               assertEquals(2, result.size());
+               assertEquals(mappingDO1, result.get("vgroup1"));
+               assertEquals(mappingDO2, result.get("vgroup2"));
+       }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to