This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 68460b8d05 optimize: discover the raft leader node from the naming
server (#7234)
68460b8d05 is described below
commit 68460b8d05bbdbbfba4969336b5a823fa9d7f5ff
Author: funkye <[email protected]>
AuthorDate: Sat Mar 22 19:59:36 2025 +0800
optimize: discover the raft leader node from the naming server (#7234)
---
.gitignore | 2 +
changes/en-us/2.x.md | 1 +
changes/zh-cn/2.x.md | 1 +
codecov.yml | 2 +-
.../NamingserverRegistryServiceImpl.java | 59 +++++++-----
.../NamingserverRegistryServiceImplTest.java | 81 ++++++++++++----
.../src/test/resources/registry.conf | 32 +++----
namingserver/src/main/resources/application.yml | 2 +-
.../raft/snapshot/vgroup/VGroupSnapshotFile.java | 1 +
.../instance/RaftServerInstanceStrategy.java | 7 +-
.../raft/store/RaftVGroupMappingStoreManager.java | 31 +++---
.../store/RaftVGroupMappingStoreManagerTest.java | 105 +++++++++++++++++++++
12 files changed, 251 insertions(+), 73 deletions(-)
diff --git a/.gitignore b/.gitignore
index 308540907c..b5d777838b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -42,6 +42,8 @@ dependency-reduced-pom.xml
/server/sessionStore/
/server/db_store/
/sessionStore/
+/vgroupStore
+/vgroupStore/**
/test/sessionStore/
/distribution/sessionStore/
/distribution/*/sessionStore/
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index 2a07331898..c29109394a 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -56,6 +56,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7215](https://github.com/apache/incubator-seata/pull/7215)] intercept
non-leader write requests of the console trx operation
- [[#7222](https://github.com/apache/incubator-seata/pull/7222)] in raft mode
add the vgroup field to global lock
- [[#7229](https://github.com/apache/incubator-seata/pull/7229)] update Notice
+- [[#7234](https://github.com/apache/incubator-seata/pull/7234)] discover the
raft leader node from the naming server
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 78371a4422..abfc266986 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -56,6 +56,7 @@
- [[#7215](https://github.com/apache/incubator-seata/pull/7215)]
拦截控制台写操作的非leader的raft请求
- [[#7222](https://github.com/apache/incubator-seata/pull/7222)]
raft模式下控制台接口响应全局锁信息时增加vgroup字段
- [[#7229](https://github.com/apache/incubator-seata/pull/7229)] 更新 Notice
+- [[#7234](https://github.com/apache/incubator-seata/pull/7234)]
优化raft对接namingserve时的服务发现逻辑
### security:
diff --git a/codecov.yml b/codecov.yml
index 25e06a23ad..b6129ab691 100644
--- a/codecov.yml
+++ b/codecov.yml
@@ -24,7 +24,7 @@ coverage:
default:
threshold: 1%
if_not_found: success
- changes: yes
+ changes: no
precision: 2
range: "50...100"
ignore:
diff --git
a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
index 55088501e0..c5aa98c9a1 100644
---
a/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
+++
b/discovery/seata-discovery-namingserver/src/main/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -52,9 +53,13 @@ import org.apache.http.util.EntityUtils;
import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.exception.AuthenticationFailedException;
import org.apache.seata.common.exception.RetryableException;
+import org.apache.seata.common.metadata.Cluster;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Instance;
+import org.apache.seata.common.metadata.Node;
import org.apache.seata.common.metadata.namingserver.MetaResponse;
+import org.apache.seata.common.metadata.namingserver.NamingServerNode;
+import org.apache.seata.common.metadata.namingserver.Unit;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.HttpClientUtil;
@@ -111,8 +116,8 @@ public class NamingserverRegistryServiceImpl implements
RegistryService<NamingLi
private volatile boolean isSubscribed = false;
private static final Configuration FILE_CONFIG =
ConfigurationFactory.CURRENT_FILE_INSTANCE;
private String namingServerAddressCache;
- private static ConcurrentMap<String /* namingserver address */,
AtomicInteger /* Number of Health Check Continues Failures */>
AVAILABLE_NAMINGSERVER_MAP = new ConcurrentHashMap<>();
- private static final ConcurrentMap<String/* vgroup */,
List<InetSocketAddress>> VGROUP_ADDRESS_MAP = new ConcurrentHashMap<>();
+ private static final ConcurrentMap<String /* namingserver address */,
AtomicInteger /* Number of Health Check Continues Failures */>
AVAILABLE_NAMINGSERVER_MAP = new ConcurrentHashMap<>();
+ private static final ConcurrentMap<String/* vgroup */,
List<NamingServerNode>> VGROUP_ADDRESS_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String/* vgroup */,
List<NamingListener>> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>();
protected static final ScheduledExecutorService
SCHEDULED_THREAD_POOL_EXECUTOR = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("seata-namingser-scheduled", THREAD_POOL_NUM, true));
@@ -399,7 +404,10 @@ public class NamingserverRegistryServiceImpl implements
RegistryService<NamingLi
}, key);
}
- return VGROUP_ADDRESS_MAP.get(key);
+ return
Optional.ofNullable(VGROUP_ADDRESS_MAP.get(key)).orElse(Collections.emptyList()).stream().map(node
-> {
+ Node.Endpoint endpoint = node.getTransaction();
+ return new InetSocketAddress(endpoint.getHost(),
endpoint.getPort());
+ }).collect(Collectors.toList());
}
@@ -426,27 +434,38 @@ public class NamingserverRegistryServiceImpl implements
RegistryService<NamingLi
// jsonResponse -> MetaResponse
MetaResponse metaResponse = OBJECT_MAPPER.readValue(jsonResponse,
new TypeReference<MetaResponse>() {
});
- // MetaResponse -> endpoint list
- List<InetSocketAddress> newAddressList =
metaResponse.getClusterList().stream()
- .flatMap(cluster -> cluster.getUnitData().stream())
- .flatMap(unit -> unit.getNamingInstanceList().stream()
- .filter(namingServerNode -> namingServerNode.getRole() ==
ClusterRole.LEADER
- || namingServerNode.getRole() == ClusterRole.MEMBER)
- .map(namingInstance -> new
InetSocketAddress(namingInstance.getTransaction().getHost(),
- namingInstance.getTransaction().getPort())))
- .collect(Collectors.toList());
- if (metaResponse.getTerm() > 0) {
- term = metaResponse.getTerm();
- }
- VGROUP_ADDRESS_MAP.put(vGroup, newAddressList);
- removeOfflineAddressesIfNecessary(vGroup,vGroup,newAddressList);
- return newAddressList;
+ return handleMetadata(metaResponse, vGroup);
} catch (IOException e) {
LOGGER.error(e.getMessage());
throw new RemoteException();
}
}
+ public List<InetSocketAddress> handleMetadata(MetaResponse metaResponse,
String vGroup) {
+ // MetaResponse -> endpoint list
+ List<NamingServerNode> newAddressList = new ArrayList<>();
+ if (metaResponse.getTerm() > 0) {
+ term = metaResponse.getTerm();
+ }
+ for (Cluster cluster : metaResponse.getClusterList()) {
+ for (Unit unitDatum : cluster.getUnitData()) {
+ // In raft mode, only the leader is cached, while in non-raft
cluster mode, all nodes are cached.
+
newAddressList.addAll(unitDatum.getNamingInstanceList().stream()
+ .filter(instance -> (instance.getRole() ==
ClusterRole.LEADER && instance.getTerm() >= term)
+ || instance.getRole() == ClusterRole.MEMBER)
+ .collect(Collectors.toList()));
+ }
+ }
+ List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();
+ for (NamingServerNode node : newAddressList) {
+ Node.Endpoint endpoint = node.getTransaction();
+ inetSocketAddresses.add(new InetSocketAddress(endpoint.getHost(),
endpoint.getPort()));
+ }
+ removeOfflineAddressesIfNecessary(vGroup, vGroup, inetSocketAddresses);
+ VGROUP_ADDRESS_MAP.put(vGroup, newAddressList);
+ return inetSocketAddresses;
+ }
+
@Override
public void close() throws Exception {
@@ -487,8 +506,6 @@ public class NamingserverRegistryServiceImpl implements
RegistryService<NamingLi
List<InetSocketAddress>
aliveAddress) {
Map<String, List<InetSocketAddress>> clusterAddressMap =
CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup,
key -> new ConcurrentHashMap<>());
-
-
return clusterAddressMap.put(transactionServiceGroup, aliveAddress);
}
@@ -530,7 +547,7 @@ public class NamingserverRegistryServiceImpl implements
RegistryService<NamingLi
String namingAddrsKey = String.join(FILE_CONFIG_SPLIT_CHAR,
FILE_ROOT_REGISTRY, REGISTRY_TYPE, NAMING_SERVICE_URL_KEY);
String urlListStr = FILE_CONFIG.getConfig(namingAddrsKey);
- if (urlListStr.isEmpty()) {
+ if (StringUtils.isBlank(urlListStr)) {
throw new NamingRegistryException("Naming server url can not be
null!");
}
return
Arrays.stream(urlListStr.split(",")).collect(Collectors.toList());
diff --git
a/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java
b/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java
index 44c0af6570..b6f1dc273a 100644
---
a/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java
+++
b/discovery/seata-discovery-namingserver/src/test/java/org/apache/seata/discovery/registry/namingserver/NamingserverRegistryServiceImplTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.seata.discovery.registry.namingserver;
+import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.rmi.RemoteException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,11 +32,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.http.entity.ContentType;
import org.apache.http.protocol.HTTP;
import org.apache.seata.common.holder.ObjectHolder;
+import org.apache.seata.common.metadata.Cluster;
+import org.apache.seata.common.metadata.ClusterRole;
+import org.apache.seata.common.metadata.Node;
+import org.apache.seata.common.metadata.namingserver.MetaResponse;
+import org.apache.seata.common.metadata.namingserver.NamingServerNode;
+import org.apache.seata.common.metadata.namingserver.Unit;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.common.util.HttpClientUtil;
import org.apache.seata.discovery.registry.RegistryService;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.junit.After;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -47,16 +57,15 @@ import
org.springframework.core.env.PropertiesPropertySource;
import static
org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT;
import static org.junit.jupiter.api.Assertions.assertEquals;
-@Disabled
class NamingserverRegistryServiceImplTest {
private static final Configuration FILE_CONFIG =
ConfigurationFactory.CURRENT_FILE_INSTANCE;
@BeforeAll
public static void beforeClass() throws Exception {
- System.setProperty("registry.namingserver.namespace", "dev");
- System.setProperty("registry.namingserver.cluster", "cluster1");
- System.setProperty("registry.namingserver.serverAddr",
"127.0.0.1:8080");
+ System.setProperty("registry.seata.namespace", "dev");
+ System.setProperty("registry.seata.cluster", "cluster1");
+ System.setProperty("registry.seata.server-addr", "127.0.0.1:8080");
AnnotationConfigApplicationContext context = new
AnnotationConfigApplicationContext();
// 获取应用程序环境
@@ -68,9 +77,14 @@ class NamingserverRegistryServiceImplTest {
PropertiesPropertySource customPropertySource = new
PropertiesPropertySource("customSource", customProperties);
propertySources.addLast(customPropertySource);
ObjectHolder.INSTANCE.setObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT,
environment);
-
}
+ @AfterAll
+ public static void afterClass() {
+ System.clearProperty("registry.seata.namespace");
+ System.clearProperty("registry.seata.cluster");
+ System.clearProperty("registry.seata.server-addr");
+ }
@Test
public void unregister1() throws Exception {
@@ -82,6 +96,7 @@ class NamingserverRegistryServiceImplTest {
@Test
+ @Disabled
public void getNamingAddrsTest() {
NamingserverRegistryServiceImpl namingserverRegistryService =
NamingserverRegistryServiceImpl.getInstance();
List<String> list = namingserverRegistryService.getNamingAddrs();
@@ -90,6 +105,7 @@ class NamingserverRegistryServiceImplTest {
@Test
+ @Disabled
public void getNamingAddrTest() {
NamingserverRegistryServiceImpl namingserverRegistryService =
NamingserverRegistryServiceImpl.getInstance();
String addr = namingserverRegistryService.getNamingAddr();
@@ -98,14 +114,7 @@ class NamingserverRegistryServiceImplTest {
@Test
- public void convertTest() {
- InetSocketAddress inetSocketAddress = new
InetSocketAddress("127.0.0.1", 8088);
- assertEquals(inetSocketAddress.getAddress().getHostAddress(),
"127.0.0.1");
- assertEquals(inetSocketAddress.getPort(), 8088);
- }
-
-
- @Test
+ @Disabled
public void testRegister1() throws Exception {
RegistryService registryService = new
NamingserverRegistryProvider().provide();
@@ -117,7 +126,6 @@ class NamingserverRegistryServiceImplTest {
//2.create vGroup in cluster
createGroupInCluster("dev", "group1", "cluster1");
-
//3.get instances
List<InetSocketAddress> list = registryService.lookup("group1");
@@ -131,8 +139,46 @@ class NamingserverRegistryServiceImplTest {
}
+ @Test
+ public void testHandleMetadata() throws Exception {
+ NamingserverRegistryServiceImpl registryService =
NamingserverRegistryServiceImpl.getInstance();
+ // Use reflection to set the isSubscribed field to true
+ Field isSubscribedField =
NamingserverRegistryServiceImpl.class.getDeclaredField("isSubscribed");
+ isSubscribedField.setAccessible(true);
+ isSubscribedField.set(registryService, true);
+
+ // Create a mock MetaResponse
+ MetaResponse metaResponse = new MetaResponse();
+ metaResponse.setTerm(1);
+
+ Cluster cluster = new Cluster();
+ Unit unit = new Unit();
+ List<NamingServerNode> namingInstanceList = new ArrayList<>();
+ NamingServerNode node = new NamingServerNode();
+ node.setRole(ClusterRole.LEADER);
+ node.setTerm(1);
+ node.setTransaction(new Node.Endpoint("127.0.0.1", 8091));
+ namingInstanceList.add(node);
+ unit.setNamingInstanceList(namingInstanceList);
+ List<Unit> unitData = new ArrayList<>();
+ unitData.add(unit);
+ cluster.setUnitData(unitData);
+ List<Cluster> clusterList = new ArrayList<>();
+ clusterList.add(cluster);
+ metaResponse.setClusterList(clusterList);
+
+ // Call the method to test
+ List<InetSocketAddress> result =
registryService.handleMetadata(metaResponse, "testGroup");
+ registryService.lookup("testGroup");
+ // Verify the result
+ assertEquals(1, result.size());
+ assertEquals("127.0.0.1", result.get(0).getAddress().getHostAddress());
+ assertEquals(8091, result.get(0).getPort());
+ isSubscribedField.set(registryService, false);
+ }
@Test
+ @Disabled
public void testRegister2() throws Exception {
NamingserverRegistryServiceImpl registryService =
(NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide();
InetSocketAddress inetSocketAddress1 = new
InetSocketAddress("127.0.0.1", 8088);
@@ -157,6 +203,7 @@ class NamingserverRegistryServiceImplTest {
@Test
+ @Disabled
public void testRegister3() throws Exception {
NamingserverRegistryServiceImpl registryService =
(NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide();
InetSocketAddress inetSocketAddress1 = new
InetSocketAddress("127.0.0.1", 8088);
@@ -189,6 +236,7 @@ class NamingserverRegistryServiceImplTest {
@Test
+ @Disabled
public void testUnregister() throws Exception {
RegistryService registryService = new
NamingserverRegistryProvider().provide();
InetSocketAddress inetSocketAddress1 = new
InetSocketAddress("127.0.0.1", 8088);
@@ -214,7 +262,7 @@ class NamingserverRegistryServiceImplTest {
}
- // @Disabled
+ @Disabled
@Test
public void testWatch() throws Exception {
NamingserverRegistryServiceImpl registryService =
(NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide();
@@ -254,7 +302,7 @@ class NamingserverRegistryServiceImplTest {
}
- // @Disabled
+ @Disabled
@Test
public void testSubscribe() throws Exception {
NamingserverRegistryServiceImpl registryService =
NamingserverRegistryServiceImpl.getInstance();
@@ -282,6 +330,7 @@ class NamingserverRegistryServiceImplTest {
@Test
+ @Disabled
public void testUnsubscribe() throws Exception {
NamingserverRegistryServiceImpl registryService =
(NamingserverRegistryServiceImpl) new NamingserverRegistryProvider().provide();
diff --git
a/discovery/seata-discovery-namingserver/src/test/resources/registry.conf
b/discovery/seata-discovery-namingserver/src/test/resources/registry.conf
index 3190efa393..274688f4bb 100644
--- a/discovery/seata-discovery-namingserver/src/test/resources/registry.conf
+++ b/discovery/seata-discovery-namingserver/src/test/resources/registry.conf
@@ -1,19 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom
diff --git a/namingserver/src/main/resources/application.yml
b/namingserver/src/main/resources/application.yml
index 7006f3eda6..4764227f0e 100644
--- a/namingserver/src/main/resources/application.yml
+++ b/namingserver/src/main/resources/application.yml
@@ -36,6 +36,6 @@ seata:
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
- csrf-ignore-urls: /naming/v1/**
+ csrf-ignore-urls: /naming/v1/**,/api/v1/naming/**
ignore:
urls:
/,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/version.json,/naming/v1/health,/error
diff --git
a/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java
b/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java
index 8f8c7c46ca..9dbcba26b4 100644
---
a/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java
+++
b/server/src/main/java/org/apache/seata/server/cluster/raft/snapshot/vgroup/VGroupSnapshotFile.java
@@ -78,6 +78,7 @@ public class VGroupSnapshotFile implements Serializable,
StoreSnapshotFile {
Map<String/*vgroup*/, MappingDO> map = (Map<String/*vgroup*/,
MappingDO>)load(path);
RaftVGroupMappingStoreManager raftVGroupMappingStoreManager =
(RaftVGroupMappingStoreManager)SessionHolder.getRootVGroupMappingManager();
+ raftVGroupMappingStoreManager.clear(group);
raftVGroupMappingStoreManager.localAddVGroups(map, group);
return true;
} catch (final Exception e) {
diff --git
a/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
b/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
index ab8ccd651c..fe26844145 100644
---
a/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
+++
b/server/src/main/java/org/apache/seata/server/instance/RaftServerInstanceStrategy.java
@@ -63,7 +63,7 @@ public class RaftServerInstanceStrategy extends
AbstractSeataInstanceStrategy
instance.setUnit(unit);
// load cluster type
String clusterType = String.valueOf(StoreConfig.getSessionMode());
- instance.addMetadata("cluster-type", "raft".equals(clusterType) ?
clusterType : "default");
+ instance.addMetadata("cluster-type",
"raft".equalsIgnoreCase(clusterType) ? clusterType : "default");
RaftStateMachine stateMachine =
RaftServerManager.getRaftServer(unit).getRaftStateMachine();
long term =
RaftServerManager.getRaftServer(unit).getRaftStateMachine().getCurrentTerm().get();
instance.setTerm(term);
@@ -100,8 +100,9 @@ public class RaftServerInstanceStrategy extends
AbstractSeataInstanceStrategy
@EventListener
@Async
public void onChangeEvent(ClusterChangeEvent event) {
- Instance.getInstance().setTerm(event.getTerm());
- Instance.getInstance().setRole(event.isLeader() ? ClusterRole.LEADER :
ClusterRole.FOLLOWER);
+ Instance instance = Instance.getInstance();
+ instance.setTerm(event.getTerm());
+ instance.setRole(event.isLeader() ? ClusterRole.LEADER :
ClusterRole.FOLLOWER);
SessionHolder.getRootVGroupMappingManager().notifyMapping();
}
diff --git
a/server/src/main/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManager.java
b/server/src/main/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManager.java
index 26ca5e3ec5..44a2fd5ce0 100644
---
a/server/src/main/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManager.java
+++
b/server/src/main/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManager.java
@@ -16,13 +16,11 @@
*/
package org.apache.seata.server.storage.raft.store;
-import java.net.InetSocketAddress;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import com.alipay.sofa.jraft.Closure;
-import org.apache.seata.common.XID;
import org.apache.seata.common.loader.LoadLevel;
import org.apache.seata.common.metadata.ClusterRole;
import org.apache.seata.common.metadata.Instance;
@@ -39,12 +37,12 @@ import
org.apache.seata.server.store.VGroupMappingStoreManager;
public class RaftVGroupMappingStoreManager implements
VGroupMappingStoreManager {
private final static Map<String/*unit(raft group)*/, Map<String/*vgroup*/,
MappingDO>> VGROUP_MAPPING =
- new HashMap<>();
+ new ConcurrentHashMap<>();
public boolean localAddVGroup(MappingDO mappingDO) {
return VGROUP_MAPPING.computeIfAbsent(mappingDO.getUnit(), k -> new
HashMap<>()).put(mappingDO.getVGroup(),
- mappingDO) != null;
+ mappingDO) == null;
}
public void localAddVGroups(Map<String/*vgroup*/, MappingDO> vGroups,
String unit) {
@@ -114,7 +112,11 @@ public class RaftVGroupMappingStoreManager implements
VGroupMappingStoreManager
}
public Map<String/*vgroup*/, MappingDO> loadVGroupsByUnit(String unit) {
- return VGROUP_MAPPING.getOrDefault(unit, Collections.emptyMap());
+ return VGROUP_MAPPING.getOrDefault(unit, new HashMap<>());
+ }
+
+ public void clear(String unit) {
+ VGROUP_MAPPING.remove(unit);
}
@Override
@@ -123,19 +125,18 @@ public class RaftVGroupMappingStoreManager implements
VGroupMappingStoreManager
}
@Override
- public void notifyMapping() {
+ public void notifyMapping() {
Instance instance = Instance.getInstance();
Map<String, Object> map = this.readVGroups();
instance.addMetadata("vGroup", map);
- for (String group : RaftServerManager.groups()) {
- Instance node = instance.clone();
- node.setRole(RaftServerManager.isLeader(group) ?
ClusterRole.LEADER : ClusterRole.FOLLOWER);
- Instance.getInstances().add(node);
- }
try {
- InetSocketAddress address = new
InetSocketAddress(XID.getIpAddress(), XID.getPort());
- for (RegistryService<?> registryService :
MultiRegistryFactory.getInstances()) {
- registryService.register(address);
+ for (String group : RaftServerManager.groups()) {
+ Instance node = instance.clone();
+ node.setRole(RaftServerManager.isLeader(group) ?
ClusterRole.LEADER : ClusterRole.FOLLOWER);
+ Instance.getInstances().add(node);
+ for (RegistryService<?> registryService :
MultiRegistryFactory.getInstances()) {
+ registryService.register(node);
+ }
}
} catch (Exception e) {
throw new RuntimeException("vGroup mapping relationship notified
failed! ", e);
diff --git
a/server/src/test/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManagerTest.java
b/server/src/test/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManagerTest.java
new file mode 100644
index 0000000000..250062a4b7
--- /dev/null
+++
b/server/src/test/java/org/apache/seata/server/storage/raft/store/RaftVGroupMappingStoreManagerTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.server.storage.raft.store;
+
+import org.apache.seata.core.store.MappingDO;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SpringBootTest
+public class RaftVGroupMappingStoreManagerTest {
+
+ private RaftVGroupMappingStoreManager raftVGroupMappingStoreManager;
+
+ @BeforeEach
+ public void setUp() {
+ raftVGroupMappingStoreManager = new
RaftVGroupMappingStoreManager();
+ raftVGroupMappingStoreManager.clear("unit1");
+ }
+
+ @Test
+ public void testLocalAddVGroup() {
+ MappingDO mappingDO = new MappingDO();
+ mappingDO.setUnit("unit1");
+ mappingDO.setVGroup("vgroup2");
+
+ boolean result =
raftVGroupMappingStoreManager.localAddVGroup(mappingDO);
+
+ assertTrue(result);
+ assertEquals(mappingDO,
raftVGroupMappingStoreManager.loadVGroupsByUnit("unit1").get("vgroup2"));
+ }
+
+ @Test
+ public void testLocalAddVGroups() {
+ Map<String, MappingDO> vGroups = new HashMap<>();
+ MappingDO mappingDO1 = new MappingDO();
+ mappingDO1.setUnit("unit1");
+ mappingDO1.setVGroup("vgroup1");
+ vGroups.put("vgroup1", mappingDO1);
+
+ MappingDO mappingDO2 = new MappingDO();
+ mappingDO2.setUnit("unit1");
+ mappingDO2.setVGroup("vgroup2");
+ vGroups.put("vgroup2", mappingDO2);
+
+ raftVGroupMappingStoreManager.localAddVGroups(vGroups, "unit1");
+
+ assertEquals(mappingDO1,
raftVGroupMappingStoreManager.loadVGroupsByUnit("unit1").get("vgroup1"));
+ assertEquals(mappingDO2,
raftVGroupMappingStoreManager.loadVGroupsByUnit("unit1").get("vgroup2"));
+ }
+
+ @Test
+ public void testLocalRemoveVGroup() {
+ MappingDO mappingDO = new MappingDO();
+ mappingDO.setUnit("unit1");
+ mappingDO.setVGroup("vgroup1");
+
+ raftVGroupMappingStoreManager.localAddVGroup(mappingDO);
+ boolean result =
raftVGroupMappingStoreManager.localRemoveVGroup("vgroup1");
+
+ assertTrue(result);
+
assertTrue(raftVGroupMappingStoreManager.loadVGroupsByUnit("unit1").isEmpty());
+ }
+
+ @Test
+ public void testLoadVGroupsByUnit() {
+ MappingDO mappingDO1 = new MappingDO();
+ mappingDO1.setUnit("unit1");
+ mappingDO1.setVGroup("vgroup1");
+
+ MappingDO mappingDO2 = new MappingDO();
+ mappingDO2.setUnit("unit1");
+ mappingDO2.setVGroup("vgroup2");
+
+ raftVGroupMappingStoreManager.localAddVGroup(mappingDO1);
+ raftVGroupMappingStoreManager.localAddVGroup(mappingDO2);
+
+ Map<String, MappingDO> result =
raftVGroupMappingStoreManager.loadVGroupsByUnit("unit1");
+
+ assertEquals(2, result.size());
+ assertEquals(mappingDO1, result.get("vgroup1"));
+ assertEquals(mappingDO2, result.get("vgroup2"));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]