This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new 522d46e035 feature:init Namingserver server (#6538) 522d46e035 is described below commit 522d46e035ede4f6b8f91afd9ab0bcaac377bb32 Author: ggbocoder <119659920+ggboco...@users.noreply.github.com> AuthorDate: Tue Jul 30 15:47:11 2024 +0800 feature:init Namingserver server (#6538) --- build/pom.xml | 1 + changes/en-us/2.x.md | 2 +- changes/zh-cn/2.x.md | 2 + .../org/apache/seata/common/ConfigurationKeys.java | 40 ++++++ .../seata/common}/NamingServerConstants.java | 8 +- .../apache/seata/common/exception/ErrorCode.java | 17 ++- .../org/apache/seata/common/metadata/Cluster.java | 1 + .../org/apache/seata/core/store/MappingDO.java | 48 +++---- .../seata/namingserver/manager/NamingManager.java | 4 +- .../seata/namingserver/NamingControllerTest.java | 2 +- script/server/db/dm.sql | 9 ++ script/server/db/mysql.sql | 12 +- script/server/db/oracle.sql | 7 + script/server/db/postgresql.sql | 8 ++ script/server/db/sqlserver.sql | 9 ++ server/pom.xml | 6 + .../main/java/org/apache/seata/server/Server.java | 79 +++++++++-- .../seata/server/controller/NamingController.java | 99 ++++++++++++++ .../apache/seata/server/session/SessionHolder.java | 42 +++++- .../store/DataBaseVGroupMappingStoreManager.java | 70 ++++++++++ .../storage/db/store/VGroupMappingDataBaseDAO.java | 145 +++++++++++++++++++++ .../file/store/FileVGroupMappingStoreManager.java | 128 ++++++++++++++++++ .../store/RedisVGroupMappingStoreManager.java | 76 +++++++++++ .../server/store/VGroupMappingStoreManager.java | 67 ++++++++++ ...he.seata.server.store.VGroupMappingStoreManager | 19 +++ server/src/main/resources/application.example.yml | 7 + .../server/controller/NamingControllerTest.java | 47 +++---- .../store/FileVGroupMappingStoreManagerTest.java | 129 ++++++++++++++++++ .../store/RedisVGroupMappingStoreManagerTest.java | 55 ++++++++ 29 files changed, 1062 insertions(+), 77 deletions(-) diff --git a/build/pom.xml b/build/pom.xml index 2f072af7de..71153b65b9 100644 --- a/build/pom.xml +++ b/build/pom.xml @@ -88,6 +88,7 @@ <kafka-appender.version>0.2.0-RC2</kafka-appender.version> <kafka-clients.version>3.6.1</kafka-clients.version> <snakeyaml.version>2.0</snakeyaml.version> + <jackson-mapper.version>1.9.13</jackson-mapper.version> <!-- For test --> <junit-jupiter.version>5.8.2</junit-jupiter.version> diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index e83eaa555b..838c1568e3 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -5,7 +5,7 @@ Add changes here for all PR submitted to the 2.x branch. ### feature: - [[#6226](https://github.com/apache/incubator-seata/pull/6226)] multi-version seata protocol support - [[#6537](https://github.com/apache/incubator-seata/pull/6537)] support Namingserver - +- [[#6538](https://github.com/apache/incubator-seata/pull/6538)] Integration of naming server on the Seata server side ### bugfix: - [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async annotation not working in ClusterWatcherManager - [[#6624](https://github.com/apache/incubator-seata/pull/6624)] fix Alibaba Dubbo convert error diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 5a651a2acd..63f9a01fd9 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -5,6 +5,7 @@ ### feature: - [[#6226](https://github.com/apache/incubator-seata/pull/6226)] 支持seata私有协议多版本兼容 - [[#6537](https://github.com/apache/incubator-seata/pull/6537)] 支持 Namingserver +- [[#6538](https://github.com/apache/incubator-seata/pull/6538)] seata server端集成naming server ### bugfix: - [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async注解ClusterWatcherManager中不生效的问题 @@ -49,6 +50,7 @@ + 非常感谢以下 contributors 的代码贡献。若有无意遗漏,请报告。 <!-- 请确保您的 GitHub ID 在以下列表中 --> diff --git a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java index e1c0f11f39..f88a3cd2bf 100644 --- a/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java +++ b/common/src/main/java/org/apache/seata/common/ConfigurationKeys.java @@ -811,6 +811,11 @@ public interface ConfigurationKeys { */ String SERVER_ENABLE_CHECK_AUTH = SERVER_PREFIX + "enableCheckAuth"; + /** + * The constant NAMING_SERVER + */ + String NAMING_SERVER = "namingserver"; + /** * The constant APPLICATION_ID. */ @@ -1011,4 +1016,39 @@ public interface ConfigurationKeys { * The constant ROCKET_MQ_MSG_TIMEOUT */ String ROCKET_MQ_MSG_TIMEOUT = SERVER_PREFIX + "rocketmqMsgTimeout"; + + /** + * + */ + String NAMINGSERVER_REGISTRY_PREFIX = FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + NAMING_SERVER + FILE_CONFIG_SPLIT_CHAR; + + /** + * + */ + String SEATA_NAMINGSERVER_REGISTRY_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + NAMINGSERVER_REGISTRY_PREFIX; + + /** + * The constant REGISTRY_NAMINGSERVER_CLUSTER + */ + String REGISTRY_NAMINGSERVER_CLUSTER = NAMINGSERVER_REGISTRY_PREFIX + "cluster"; + + /** + * The constant MAPPING_TABLE_NAME + */ + String MAPPING_TABLE_NAME = STORE_DB_PREFIX + FILE_CONFIG_SPLIT_CHAR + "mapping-table"; + + /** + * The constant NAMESPACE_KEY + */ + String NAMESPACE_KEY = SEATA_NAMINGSERVER_REGISTRY_PREFIX + "namespace"; + + /** + * The constant CLUSTER_NAME_KEY + */ + String CLUSTER_NAME_KEY = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + REGISTRY_NAMINGSERVER_CLUSTER; + + /** + * The constant META_PREFIX + */ + String META_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + "metadata."; } diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/constants/NamingServerConstants.java b/common/src/main/java/org/apache/seata/common/NamingServerConstants.java similarity index 90% copy from namingserver/src/main/java/org/apache/seata/namingserver/constants/NamingServerConstants.java copy to common/src/main/java/org/apache/seata/common/NamingServerConstants.java index 91c11a3623..975a42d3ee 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/constants/NamingServerConstants.java +++ b/common/src/main/java/org/apache/seata/common/NamingServerConstants.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seata.namingserver.constants; +package org.apache.seata.common; public interface NamingServerConstants { /** @@ -46,4 +46,10 @@ public interface NamingServerConstants { * The constant IP_PORT_SPLIT_CHAR */ String IP_PORT_SPLIT_CHAR = ":"; + + /** + * The constant DEFAULT_VGROUP_MAPPING + */ + String DEFAULT_VGROUP_MAPPING = "vgroup_table"; + } diff --git a/common/src/main/java/org/apache/seata/common/exception/ErrorCode.java b/common/src/main/java/org/apache/seata/common/exception/ErrorCode.java index fb69cd328f..8406d7db0c 100644 --- a/common/src/main/java/org/apache/seata/common/exception/ErrorCode.java +++ b/common/src/main/java/org/apache/seata/common/exception/ErrorCode.java @@ -29,11 +29,18 @@ public enum ErrorCode { /** * 0100 ~ 0199 Security related errors */ - ERR_DESERIALIZATION_SECURITY(ErrorType.Security, 0156); + ERR_DESERIALIZATION_SECURITY(ErrorType.Security, 0156), /** * The error code of the transaction exception. */ + + + /** + * The error code of the sql exception + */ + ERROR_SQL(ErrorType.Datasource, 0201); + private int code; private ErrorType type; @@ -82,10 +89,6 @@ public enum ErrorCode { * Network error type. */ Network, - /** - * Security related error type. - */ - Security, /** * Tm error type. */ @@ -102,6 +105,10 @@ public enum ErrorCode { * Datasource error type. */ Datasource, + /** + * Security error type. + */ + Security, /** * Other error type. */ diff --git a/common/src/main/java/org/apache/seata/common/metadata/Cluster.java b/common/src/main/java/org/apache/seata/common/metadata/Cluster.java index 2ba769f58d..2dcfec0fd6 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/Cluster.java +++ b/common/src/main/java/org/apache/seata/common/metadata/Cluster.java @@ -26,6 +26,7 @@ public class Cluster { private String clusterType; private List<Unit> unitData = new ArrayList<>(); + public Cluster() { } diff --git a/common/src/main/java/org/apache/seata/common/metadata/Cluster.java b/core/src/main/java/org/apache/seata/core/store/MappingDO.java similarity index 52% copy from common/src/main/java/org/apache/seata/common/metadata/Cluster.java copy to core/src/main/java/org/apache/seata/core/store/MappingDO.java index 2ba769f58d..57e0c42e45 100644 --- a/common/src/main/java/org/apache/seata/common/metadata/Cluster.java +++ b/core/src/main/java/org/apache/seata/core/store/MappingDO.java @@ -14,46 +14,48 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seata.common.metadata; +package org.apache.seata.core.store; -import org.apache.seata.common.metadata.namingserver.Unit; +public class MappingDO { + private String namespace; -import java.util.ArrayList; -import java.util.List; + private String cluster; -public class Cluster { - private String clusterName; - private String clusterType; - private List<Unit> unitData = new ArrayList<>(); + private String unit; - public Cluster() { + private String vGroup; + + + public String getNamespace() { + return namespace; } - public String getClusterName() { - return clusterName; + public void setNamespace(String namespace) { + this.namespace = namespace; } - public void setClusterName(String clusterName) { - this.clusterName = clusterName; + public String getCluster() { + return cluster; } - public String getClusterType() { - return clusterType; + public void setCluster(String cluster) { + this.cluster = cluster; } - public void setClusterType(String clusterType) { - this.clusterType = clusterType; + public String getUnit() { + return unit; } - public List<Unit> getUnitData() { - return unitData; + public void setUnit(String unit) { + this.unit = unit; } - public void setUnitData(List<Unit> unitData) { - this.unitData = unitData; + public String getVGroup() { + return vGroup; } + public void setVGroup(String vGroup) { + this.vGroup = vGroup; + } } - - diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java index 9d0cb9b832..07074b6db1 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java +++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/NamingManager.java @@ -26,7 +26,7 @@ import org.apache.seata.common.metadata.namingserver.NamingServerNode; import org.apache.seata.common.metadata.namingserver.Unit; import org.apache.seata.common.result.Result; import org.apache.seata.common.util.HttpClientUtil; -import org.apache.seata.namingserver.constants.NamingServerConstants; +import org.apache.seata.common.NamingServerConstants; import org.apache.seata.namingserver.listener.ClusterChangeEvent; import org.apache.seata.namingserver.entity.pojo.ClusterData; import org.apache.seata.namingserver.entity.vo.monitor.ClusterVO; @@ -55,7 +55,7 @@ import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; -import static org.apache.seata.namingserver.constants.NamingServerConstants.CONSTANT_GROUP; +import static org.apache.seata.common.NamingServerConstants.CONSTANT_GROUP; @Component public class NamingManager { diff --git a/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java b/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java index c9bf44edbf..6ea5b9552e 100644 --- a/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java +++ b/namingserver/src/test/java/org/apache/seata/namingserver/NamingControllerTest.java @@ -35,7 +35,7 @@ import java.util.Map; import java.util.UUID; -import static org.apache.seata.namingserver.constants.NamingServerConstants.CONSTANT_GROUP; +import static org.apache.seata.common.NamingServerConstants.CONSTANT_GROUP; import static org.junit.jupiter.api.Assertions.*; diff --git a/script/server/db/dm.sql b/script/server/db/dm.sql index fd9d067abd..ec3d1a27b8 100644 --- a/script/server/db/dm.sql +++ b/script/server/db/dm.sql @@ -91,3 +91,12 @@ INSERT INTO "SEATA"."DISTRIBUTED_LOCK" ("LOCK_KEY", "LOCK_VALUE", "EXPIRE") VALU INSERT INTO "SEATA"."DISTRIBUTED_LOCK" ("LOCK_KEY", "LOCK_VALUE", "EXPIRE") VALUES ('RetryCommitting', ' ', 0); INSERT INTO "SEATA"."DISTRIBUTED_LOCK" ("LOCK_KEY", "LOCK_VALUE", "EXPIRE") VALUES ('RetryRollbacking', ' ', 0); INSERT INTO "SEATA"."DISTRIBUTED_LOCK" ("LOCK_KEY", "LOCK_VALUE", "EXPIRE") VALUES ('TxTimeoutCheck', ' ', 0); + + +CREATE TABLE "SEATA"."VGROUP_TABLE" +( + `VGROUP` VARCHAR2(255), + `NAMESPACE` VARCHAR2(255), + `CLUSTER` VARCHAR2(255), + PRIMARY KEY (`VGROUP`) +); \ No newline at end of file diff --git a/script/server/db/mysql.sql b/script/server/db/mysql.sql index bc2e3926f8..fe0811dded 100644 --- a/script/server/db/mysql.sql +++ b/script/server/db/mysql.sql @@ -87,4 +87,14 @@ CREATE TABLE IF NOT EXISTS `distributed_lock` INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0); -INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0); \ No newline at end of file +INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0); + + +CREATE TABLE IF NOT EXISTS `vgroup_table` +( + `vGroup` VARCHAR(255), + `namespace` VARCHAR(255), + `cluster` VARCHAR(255), + primary key (`vGroup`) +) ENGINE = InnoDB + DEFAULT CHARSET = utf8mb4; \ No newline at end of file diff --git a/script/server/db/oracle.sql b/script/server/db/oracle.sql index 143a7f22f7..797603b797 100644 --- a/script/server/db/oracle.sql +++ b/script/server/db/oracle.sql @@ -87,3 +87,10 @@ INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('AsyncCommit INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0); INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0); INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0); + +CREATE TABLE vgroup_table +( + vGroup VARCHAR2(255) PRIMARY KEY, + namespace VARCHAR2(255), + cluster VARCHAR2(255) +); \ No newline at end of file diff --git a/script/server/db/postgresql.sql b/script/server/db/postgresql.sql index e29e5908bc..91fd2128a0 100644 --- a/script/server/db/postgresql.sql +++ b/script/server/db/postgresql.sql @@ -87,3 +87,11 @@ INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('AsyncCommit INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0); INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0); INSERT INTO distributed_lock (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0); + +CREATE TABLE IF NOT EXISTS vgroup_table +( + vGroup VARCHAR(255), + namespace VARCHAR(255), + cluster VARCHAR(255), + PRIMARY KEY (vGroup) +); \ No newline at end of file diff --git a/script/server/db/sqlserver.sql b/script/server/db/sqlserver.sql index 729cc1178e..d39464f554 100644 --- a/script/server/db/sqlserver.sql +++ b/script/server/db/sqlserver.sql @@ -117,3 +117,12 @@ INSERT INTO [distributed_lock] (lock_key, lock_value, expire) VALUES ('RetryComm INSERT INTO [distributed_lock] (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0); INSERT INTO [distributed_lock] (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0); INSERT INTO [distributed_lock] (lock_key, lock_value, expire) VALUES ('UndologDelete', ' ', 0); + +CREATE TABLE [vgroup_table] +( + [vGroup] nvarchar(255) NOT NULL, + [namespace] nvarchar(255) NOT NULL, + [cluster] nvarchar(255) NOT NULL, + PRIMARY KEY CLUSTERED ([vGroup]) + WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) +) \ No newline at end of file diff --git a/server/pom.xml b/server/pom.xml index ca1ad4a50e..341c90cc2a 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -282,6 +282,12 @@ <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-core</artifactId> </dependency> + + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>${jackson-mapper.version}</version> + </dependency> </dependencies> <build> diff --git a/server/src/main/java/org/apache/seata/server/Server.java b/server/src/main/java/org/apache/seata/server/Server.java index 1ebd10a5b8..40616914c5 100644 --- a/server/src/main/java/org/apache/seata/server/Server.java +++ b/server/src/main/java/org/apache/seata/server/Server.java @@ -16,11 +16,16 @@ */ package org.apache.seata.server; +import java.util.Objects; +import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + import org.apache.seata.common.XID; import org.apache.seata.common.holder.ObjectHolder; +import org.apache.seata.common.metadata.Node; +import org.apache.seata.common.metadata.namingserver.Instance; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.common.util.NetUtil; import org.apache.seata.common.util.StringUtils; @@ -31,20 +36,75 @@ import org.apache.seata.server.coordinator.DefaultCoordinator; import org.apache.seata.server.lock.LockerManagerFactory; import org.apache.seata.server.metrics.MetricsManager; import org.apache.seata.server.session.SessionHolder; +import org.apache.seata.server.store.StoreConfig; +import org.apache.seata.server.store.VGroupMappingStoreManager; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationListener; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.EnumerablePropertySource; +import org.springframework.core.env.PropertySource; import org.springframework.web.context.support.GenericWebApplicationContext; +import static org.apache.seata.common.ConfigurationKeys.CLUSTER_NAME_KEY; +import static org.apache.seata.common.ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR; +import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_REGISTRY; +import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_TYPE; +import static org.apache.seata.common.ConfigurationKeys.META_PREFIX; +import static org.apache.seata.common.ConfigurationKeys.NAMESPACE_KEY; +import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER; import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_APPLICATION_CONTEXT; +import static org.apache.seata.common.Constants.OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGEX_SPLIT_CHAR; import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.REGISTRY_PREFERED_NETWORKS; /** * The type Server. - * */ public class Server { + + public static void metadataInit() { + + ConfigurableEnvironment environment = (ConfigurableEnvironment) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_CONFIGURABLE_ENVIRONMENT); + + // load node properties + Instance instance = Instance.getInstance(); + // load namespace + String namespace = environment.getProperty(NAMESPACE_KEY, "public"); + instance.setNamespace(namespace); + // load cluster name + String clusterName = environment.getProperty(CLUSTER_NAME_KEY, "default"); + instance.setClusterName(clusterName); + + // load cluster type + String clusterType = String.valueOf(StoreConfig.getSessionMode()); + instance.addMetadata("cluster-type", "raft".equals(clusterType) ? clusterType : "default"); + + // load unit name + instance.setUnit(String.valueOf(UUID.randomUUID())); + + // load node Endpoint + instance.setControlEndpoint(new Node.Endpoint(NetUtil.getLocalIp(), Integer.parseInt(Objects.requireNonNull(environment.getProperty("server.port"))), "http")); + + // load metadata + for (PropertySource<?> propertySource : environment.getPropertySources()) { + if (propertySource instanceof EnumerablePropertySource) { + EnumerablePropertySource<?> enumerablePropertySource = (EnumerablePropertySource<?>) propertySource; + for (String propertyName : enumerablePropertySource.getPropertyNames()) { + if (propertyName.startsWith(META_PREFIX)) { + instance.addMetadata(propertyName.substring(META_PREFIX.length()), enumerablePropertySource.getProperty(propertyName)); + } + } + } + } + + // load vgroup mapping relationship + VGroupMappingStoreManager vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); + instance.addMetadata("vGroup", vGroupMappingStoreManager.loadVGroups()); + vGroupMappingStoreManager.notifyMapping(); + } + + /** * The entry point of application. * @@ -60,9 +120,9 @@ public class Server { MetricsManager.get().init(); ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(), - NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, - new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()), - new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy()); + NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, + new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()), + new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy()); //127.0.0.1 and 0.0.0.0 are not valid here. if (NetUtil.isValidIp(parameterParser.getHost(), false)) { @@ -79,14 +139,14 @@ public class Server { XID.setPort(nettyRemotingServer.getListenPort()); UUIDGenerator.init(parameterParser.getServerNode()); ConfigurableListableBeanFactory beanFactory = - ((GenericWebApplicationContext)ObjectHolder.INSTANCE + ((GenericWebApplicationContext) ObjectHolder.INSTANCE .getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT)).getBeanFactory(); DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer); if (coordinator instanceof ApplicationListener) { beanFactory.registerSingleton(NettyRemotingServer.class.getName(), nettyRemotingServer); beanFactory.registerSingleton(DefaultCoordinator.class.getName(), coordinator); - ((GenericWebApplicationContext)ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT)) - .addApplicationListener((ApplicationListener<?>)coordinator); + ((GenericWebApplicationContext) ObjectHolder.INSTANCE.getObject(OBJECT_KEY_SPRING_APPLICATION_CONTEXT)) + .addApplicationListener((ApplicationListener<?>) coordinator); } //log store mode : file, db, redis SessionHolder.init(); @@ -96,7 +156,10 @@ public class Server { // let ServerRunner do destroy instead ShutdownHook, see https://github.com/seata/seata/issues/4028 ServerRunner.addDisposable(coordinator); - + if (StringUtils.equals(ConfigurationFactory.getInstance().getConfig(FILE_ROOT_REGISTRY + + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_TYPE), NAMING_SERVER)) { + metadataInit(); + } nettyRemotingServer.init(); } } diff --git a/server/src/main/java/org/apache/seata/server/controller/NamingController.java b/server/src/main/java/org/apache/seata/server/controller/NamingController.java new file mode 100644 index 0000000000..cc521ca9f3 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/controller/NamingController.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.controller; + +import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.result.Result; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.store.MappingDO; +import org.apache.seata.server.session.SessionHolder; +import org.apache.seata.server.store.VGroupMappingStoreManager; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.PostConstruct; + +import static org.apache.seata.common.ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR; +import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_REGISTRY; +import static org.apache.seata.common.ConfigurationKeys.FILE_ROOT_TYPE; +import static org.apache.seata.common.ConfigurationKeys.NAMING_SERVER; + +@RestController +@RequestMapping("/naming/v1") +public class NamingController { + + private VGroupMappingStoreManager vGroupMappingStoreManager; + + protected static final Configuration CONFIG = ConfigurationFactory.getInstance(); + + @PostConstruct + private void init() { + if (StringUtils.equals(ConfigurationFactory.getInstance().getConfig(FILE_ROOT_REGISTRY + + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_TYPE), NAMING_SERVER)) { + vGroupMappingStoreManager = SessionHolder.getRootVGroupMappingManager(); + } + } + + /** + * add vGroup in cluster + * + * @param vGroup + * @return + */ + @GetMapping("/addVGroup") + public Result<?> addVGroup(@RequestParam String vGroup, @RequestParam String unit) { + Result<?> result = new Result<>(); + MappingDO mappingDO = new MappingDO(); + mappingDO.setNamespace(Instance.getInstance().getNamespace()); + mappingDO.setCluster(Instance.getInstance().getClusterName()); + mappingDO.setUnit(unit); + mappingDO.setVGroup(vGroup); + boolean rst = vGroupMappingStoreManager.addVGroup(mappingDO); + if (!rst) { + result.setCode("500"); + result.setMessage("add vGroup failed!"); + } + // push the newest mapping relationship + vGroupMappingStoreManager.notifyMapping(); + return result; + } + + /** + * remove vGroup in cluster + * + * @param vGroup + * @return + */ + @GetMapping("/removeVGroup") + public Result<?> removeVGroup(@RequestParam String vGroup) { + Result<?> result = new Result<>(); + boolean rst = vGroupMappingStoreManager.removeVGroup(vGroup); + if (!rst) { + result.setCode("500"); + result.setMessage("remove vGroup failed!"); + } + // push the newest mapping relationship + vGroupMappingStoreManager.notifyMapping(); + return result; + } + + +} diff --git a/server/src/main/java/org/apache/seata/server/session/SessionHolder.java b/server/src/main/java/org/apache/seata/server/session/SessionHolder.java index f32839e78e..381be450f9 100644 --- a/server/src/main/java/org/apache/seata/server/session/SessionHolder.java +++ b/server/src/main/java/org/apache/seata/server/session/SessionHolder.java @@ -43,6 +43,7 @@ import org.apache.seata.server.cluster.raft.context.SeataClusterContext; import org.apache.seata.server.lock.distributed.DistributedLockerFactory; import org.apache.seata.server.store.StoreConfig; import org.apache.seata.server.store.StoreConfig.SessionMode; +import org.apache.seata.server.store.VGroupMappingStoreManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,6 @@ import static org.apache.seata.common.DefaultValues.DEFAULT_SESSION_STORE_FILE_D /** * The type Session holder. - * */ public class SessionHolder { @@ -75,6 +75,13 @@ public class SessionHolder { */ private static long DISTRIBUTED_LOCK_EXPIRE_TIME = CONFIG.getLong(ConfigurationKeys.DISTRIBUTED_LOCK_EXPIRE_TIME, DEFAULT_DISTRIBUTED_LOCK_EXPIRE_TIME); + /** + * The default vgroup mapping store dir + */ + public static final String DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR = System.getProperty("user.dir"); + + private static VGroupMappingStoreManager ROOT_VGROUP_MAPPING_MANAGER; + private static SessionManager ROOT_SESSION_MANAGER; private static volatile Map<String, SessionManager> SESSION_MANAGER_MAP; @@ -83,6 +90,7 @@ public class SessionHolder { public static void init() { init(null); } + /** * Init. * @@ -98,6 +106,8 @@ public class SessionHolder { if (SessionMode.DB.equals(sessionMode)) { ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.DB.getName()); reload(sessionMode); + + ROOT_VGROUP_MAPPING_MANAGER = EnhancedServiceLoader.load(VGroupMappingStoreManager.class, SessionMode.DB.getName()); } else if (SessionMode.RAFT.equals(sessionMode) || SessionMode.FILE.equals(sessionMode)) { RaftServerManager.init(); if (CollectionUtils.isNotEmpty(RaftServerManager.getRaftServers())) { @@ -106,23 +116,31 @@ public class SessionHolder { if (SessionMode.RAFT.equals(sessionMode)) { String group = CONFIG.getConfig(ConfigurationKeys.SERVER_RAFT_GROUP, DEFAULT_SEATA_GROUP); ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.RAFT.getName(), - new Object[] {ROOT_SESSION_MANAGER_NAME}); + new Object[]{ROOT_SESSION_MANAGER_NAME}); SESSION_MANAGER_MAP = new HashMap<>(); SESSION_MANAGER_MAP.put(group, ROOT_SESSION_MANAGER); RaftServerManager.start(); } else { + String vGroupMappingStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR, + DEFAULT_VGROUP_MAPPING_STORE_FILE_DIR); String sessionStorePath = CONFIG.getConfig(ConfigurationKeys.STORE_FILE_DIR, DEFAULT_SESSION_STORE_FILE_DIR) + separator + System.getProperty(SERVER_SERVICE_PORT_CAMEL); - if (StringUtils.isBlank(sessionStorePath)) { + if (StringUtils.isBlank(sessionStorePath) || StringUtils.isBlank(vGroupMappingStorePath)) { throw new StoreException("the {store.file.dir} is empty."); } + ROOT_VGROUP_MAPPING_MANAGER = EnhancedServiceLoader.load(VGroupMappingStoreManager.class, SessionMode.FILE.getName(), + new Object[]{vGroupMappingStorePath}); + + ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.FILE.getName(), + new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath}); ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.FILE.getName(), - new Object[] {ROOT_SESSION_MANAGER_NAME, sessionStorePath}); + new Object[]{ROOT_SESSION_MANAGER_NAME, sessionStorePath}); reload(sessionMode); } } else if (SessionMode.REDIS.equals(sessionMode)) { ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, SessionMode.REDIS.getName()); + ROOT_VGROUP_MAPPING_MANAGER = EnhancedServiceLoader.load(VGroupMappingStoreManager.class, SessionMode.REDIS.getName()); reload(sessionMode); } else { // unknown store @@ -137,7 +155,7 @@ public class SessionHolder { */ protected static void reload(SessionMode sessionMode) { if (sessionMode == SessionMode.FILE) { - ((Reloadable)ROOT_SESSION_MANAGER).reload(); + ((Reloadable) ROOT_SESSION_MANAGER).reload(); reload(ROOT_SESSION_MANAGER.allSessions(), sessionMode); } else { reload(null, sessionMode); @@ -231,7 +249,7 @@ public class SessionHolder { // Redis, db and so on CompletableFuture.runAsync(() -> { SessionCondition searchCondition = new SessionCondition(GlobalStatus.UnKnown, GlobalStatus.Committed, - GlobalStatus.Rollbacked, GlobalStatus.TimeoutRollbacked, GlobalStatus.Finished); + GlobalStatus.Rollbacked, GlobalStatus.TimeoutRollbacked, GlobalStatus.Finished); searchCondition.setLazyLoadBranch(true); long now = System.currentTimeMillis(); @@ -277,6 +295,16 @@ public class SessionHolder { } + //region get group mapping manager + public static VGroupMappingStoreManager getRootVGroupMappingManager() { + if (ROOT_VGROUP_MAPPING_MANAGER == null) { + init(); + if (ROOT_VGROUP_MAPPING_MANAGER == null) { + throw new ShouldNeverHappenException("vGroupMappingManager is NOT init!"); + } + } + return ROOT_VGROUP_MAPPING_MANAGER; + } //endregion @@ -328,7 +356,7 @@ public class SessionHolder { * @return the value */ public static <T> T lockAndExecute(GlobalSession globalSession, GlobalSession.LockCallable<T> lockCallable) - throws TransactionException { + throws TransactionException { return getRootSessionManager().lockAndExecute(globalSession, lockCallable); } diff --git a/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java new file mode 100644 index 0000000000..6389168dae --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/storage/db/store/DataBaseVGroupMappingStoreManager.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.storage.db.store; + +import org.apache.seata.common.loader.EnhancedServiceLoader; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.constants.ConfigurationKeys; +import org.apache.seata.core.store.MappingDO; +import org.apache.seata.core.store.db.DataSourceProvider; +import org.apache.seata.server.store.VGroupMappingStoreManager; + +import javax.sql.DataSource; +import java.util.HashMap; +import java.util.List; + +@LoadLevel(name = "db") +public class DataBaseVGroupMappingStoreManager implements VGroupMappingStoreManager { + protected VGroupMappingDataBaseDAO vGroupMappingDataBaseDAO; + + protected static final Configuration CONFIG = ConfigurationFactory.getInstance(); + + public DataBaseVGroupMappingStoreManager() { + String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE); + //init dataSource + DataSource vGroupMappingDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide(); + vGroupMappingDataBaseDAO = new VGroupMappingDataBaseDAO(vGroupMappingDataSource); + } + + @Override + public boolean addVGroup(MappingDO mappingDO) { + return vGroupMappingDataBaseDAO.insertMappingDO(mappingDO); + } + + @Override + public boolean removeVGroup(String vGroup) { + return vGroupMappingDataBaseDAO.deleteMappingDOByVGroup(vGroup); + } + + @Override + public HashMap<String, Object> loadVGroups() { + List<MappingDO> mappingDOS = vGroupMappingDataBaseDAO.queryMappingDO(); + Instance instance = Instance.getInstance(); + HashMap<String, Object> mappings = new HashMap<>(); + for (MappingDO mappingDO : mappingDOS) { + if (mappingDO.getCluster() != null && mappingDO.getCluster().equals(instance.getClusterName())) { + mappings.put(mappingDO.getVGroup(), null); + } + } + return mappings; + } + + +} diff --git a/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java b/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java new file mode 100644 index 0000000000..81d227eb80 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/storage/db/store/VGroupMappingDataBaseDAO.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.storage.db.store; + +import org.apache.seata.common.exception.ErrorCode; +import org.apache.seata.common.exception.SeataRuntimeException; +import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.common.util.IOUtil; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.store.MappingDO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.seata.common.ConfigurationKeys.MAPPING_TABLE_NAME; +import static org.apache.seata.common.ConfigurationKeys.REGISTRY_NAMINGSERVER_CLUSTER; +import static org.apache.seata.common.NamingServerConstants.DEFAULT_VGROUP_MAPPING; + + +public class VGroupMappingDataBaseDAO { + private static final Logger LOGGER = LoggerFactory.getLogger(VGroupMappingDataBaseDAO.class); + + protected DataSource vGroupMappingDataSource = null; + + protected final String vMapping; + + protected static final Configuration CONFIG = ConfigurationFactory.getInstance(); + + public VGroupMappingDataBaseDAO(DataSource vGroupMappingDataSource) { + this.vGroupMappingDataSource = vGroupMappingDataSource; + this.vMapping = CONFIG.getConfig(MAPPING_TABLE_NAME, DEFAULT_VGROUP_MAPPING); + } + + public boolean insertMappingDO(MappingDO mappingDO) { + clearMappingDOByVGroup(mappingDO.getVGroup()); + String sql = "INSERT INTO " + vMapping + " (vgroup,namespace, cluster) VALUES (?, ?, ?)"; + Connection conn = null; + PreparedStatement ps = null; + try { + int index = 1; + conn = vGroupMappingDataSource.getConnection(); + conn.setAutoCommit(true); + ps = conn.prepareStatement(sql); + ps.setString(index++, mappingDO.getVGroup()); + ps.setString(index++, mappingDO.getNamespace()); + ps.setString(index++, mappingDO.getCluster()); + + return ps.executeUpdate() > 0; + } catch (SQLException e) { + throw new SeataRuntimeException(ErrorCode.ERR_CONFIG, e); + } finally { + IOUtil.close(ps, conn); + } + } + + public boolean clearMappingDOByVGroup(String vGroup) { + String sql = "DELETE FROM " + vMapping + " WHERE vGroup = ?"; + Connection conn = null; + PreparedStatement ps = null; + try { + conn = vGroupMappingDataSource.getConnection(); + conn.setAutoCommit(true); + ps = conn.prepareStatement(sql); + ps.setString(1, vGroup); + return ps.executeUpdate() > 0; + } catch (SQLException e) { + throw new SeataRuntimeException(ErrorCode.ERR_CONFIG, e); + } finally { + IOUtil.close(ps, conn); + } + } + + public boolean deleteMappingDOByVGroup(String vGroup) { + String sql = "DELETE FROM " + vMapping + " WHERE vGroup = ? and cluster = ?"; + Instance instance = Instance.getInstance(); + Connection conn = null; + PreparedStatement ps = null; + try { + conn = vGroupMappingDataSource.getConnection(); + conn.setAutoCommit(true); + ps = conn.prepareStatement(sql); + ps.setString(1, vGroup); + ps.setString(2, instance.getClusterName()); + return ps.executeUpdate() > 0; + } catch (SQLException e) { + throw new SeataRuntimeException(ErrorCode.ERROR_SQL,e); + } finally { + IOUtil.close(ps, conn); + } + } + + public List<MappingDO> queryMappingDO() { + String sql = "SELECT vgroup,namespace, cluster FROM " + vMapping + + " WHERE cluster = ?"; + Connection conn = null; + PreparedStatement ps = null; + ResultSet rs = null; + List<MappingDO> result = new ArrayList<>(); + + try { + conn = vGroupMappingDataSource.getConnection(); + ps = conn.prepareStatement(sql); + ps.setString(1, CONFIG.getConfig(REGISTRY_NAMINGSERVER_CLUSTER)); + rs = ps.executeQuery(); + + while (rs.next()) { + MappingDO mappingDO = new MappingDO(); + mappingDO.setNamespace(rs.getString("namespace")); + mappingDO.setCluster(rs.getString("cluster")); + mappingDO.setVGroup(rs.getString("vGroup")); + result.add(mappingDO); + } + } catch (SQLException e) { + throw new SeataRuntimeException(ErrorCode.ERR_CONFIG, e); + } finally { + IOUtil.close(rs, ps, conn); + } + + return result; + } + + +} diff --git a/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java new file mode 100644 index 0000000000..4b4fd26e0d --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManager.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.storage.file.store; + + + +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.config.Configuration; +import org.apache.seata.config.ConfigurationFactory; +import org.apache.seata.core.store.MappingDO; +import org.apache.seata.server.store.VGroupMappingStoreManager; +import org.apache.commons.io.FileUtils; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.locks.ReentrantLock; + +@LoadLevel(name = "file") +public class FileVGroupMappingStoreManager implements VGroupMappingStoreManager { + private static final Logger LOGGER = LoggerFactory.getLogger(FileVGroupMappingStoreManager.class); + + public static final String ROOT_MAPPING_MANAGER_NAME = "vgroup_mapping.json"; + + private final ReentrantLock writeLock = new ReentrantLock(); + + private String storePath; + + protected static final Configuration CONFIG = ConfigurationFactory.getInstance(); + + + public FileVGroupMappingStoreManager() { + } + + public FileVGroupMappingStoreManager(String mappingStoreFilePath) { + storePath = mappingStoreFilePath + File.separator + ROOT_MAPPING_MANAGER_NAME; + } + + @Override + public boolean addVGroup(MappingDO mappingDO) { + HashMap<String, Object> vGroupMapping = loadVGroups(); + vGroupMapping.put(mappingDO.getVGroup(), mappingDO.getUnit()); + boolean isSaved = save(vGroupMapping); + if (!isSaved) { + LOGGER.error("add mapping relationship failed!"); + } + return isSaved; + } + + @Override + public boolean removeVGroup(String vGroup) { + HashMap<String, Object> vGroupMapping = loadVGroups(); + vGroupMapping.remove(vGroup); + boolean isSaved = save(vGroupMapping); + if (!isSaved) { + LOGGER.error("remove mapping relationship failed!"); + } + return isSaved; + } + + @Override + public HashMap<String, Object> loadVGroups() { + HashMap<String, Object> vGroupMapping = new HashMap<>(); + try { + File fileToLoad = new File(storePath); + if (!fileToLoad.exists()) { + try { + // create new file to record vgroup mapping relationship + boolean fileCreated = fileToLoad.createNewFile(); + if (fileCreated) { + LOGGER.info("New vgroup file created at path: " + storePath); + } else { + LOGGER.warn("Failed to create a new vgroup file at path: " + storePath); + } + } catch (IOException e) { + LOGGER.error("Error while creating a new file: " + e.getMessage()); + } + } + + String fileContent = FileUtils.readFileToString(fileToLoad, "UTF-8"); + + if (!fileContent.isEmpty()) { + ObjectMapper objectMapper = new ObjectMapper(); + vGroupMapping = objectMapper.readValue(fileContent, new TypeReference<HashMap<String, Object>>() { + }); + } + + + } catch (Exception e) { + LOGGER.error("mapping relationship load failed! " + e); + } + return vGroupMapping; + } + + + public boolean save(HashMap<String, Object> vGroupMapping) { + writeLock.lock(); + try { + ObjectMapper objectMapper = new ObjectMapper(); + String jsonMapping = objectMapper.writeValueAsString(vGroupMapping); + FileUtils.writeStringToFile(new File(storePath), jsonMapping, "UTF-8"); + return true; + } catch (IOException e) { + LOGGER.error("mapping relationship saved failed! ", e); + return false; + } finally { + writeLock.unlock(); + } + } +} diff --git a/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java new file mode 100644 index 0000000000..032b46b4b8 --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManager.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.storage.redis.store; + +import org.apache.seata.common.exception.RedisException; +import org.apache.seata.common.loader.LoadLevel; +import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.core.store.MappingDO; +import org.apache.seata.server.storage.redis.JedisPooledFactory; +import org.apache.seata.server.store.VGroupMappingStoreManager; +import redis.clients.jedis.Jedis; + +import java.util.HashMap; +import java.util.Map; + +@LoadLevel(name = "redis") +public class RedisVGroupMappingStoreManager implements VGroupMappingStoreManager { + + private static final String REDIS_SPLIT_KEY = ":"; + + @Override + public boolean addVGroup(MappingDO mappingDO) { + String vGroup = mappingDO.getVGroup(); + String namespace = mappingDO.getNamespace(); + String clusterName = mappingDO.getCluster(); + try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { + jedis.hset(namespace, vGroup, clusterName); + return true; + } catch (Exception ex) { + throw new RedisException(ex); + } + } + + @Override + public boolean removeVGroup(String vGroup) { + Instance instance = Instance.getInstance(); + String namespace = instance.getNamespace(); + try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { + jedis.hdel(namespace, vGroup); + return true; + } catch (Exception ex) { + throw new RedisException(ex); + } + } + + @Override + public HashMap<String, Object> loadVGroups() { + Instance instance = Instance.getInstance(); + String namespace = instance.getNamespace(); + String clusterName = instance.getClusterName(); + try (Jedis jedis = JedisPooledFactory.getJedisInstance()) { + Map<String, String> mappingKeyMap = jedis.hgetAll(namespace); + HashMap<String, Object> result = new HashMap<>(); + for (Map.Entry<String, String> entry : mappingKeyMap.entrySet()) { + result.put(entry.getKey(), null); + } + return result; + } catch (Exception ex) { + throw new RedisException(ex); + } + } +} diff --git a/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java new file mode 100644 index 0000000000..6f42f9822b --- /dev/null +++ b/server/src/main/java/org/apache/seata/server/store/VGroupMappingStoreManager.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.store; + +import org.apache.seata.common.XID; +import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.core.store.MappingDO; +import org.apache.seata.discovery.registry.MultiRegistryFactory; +import org.apache.seata.discovery.registry.RegistryService; + +import java.net.InetSocketAddress; +import java.util.HashMap; + +public interface VGroupMappingStoreManager { + /** + * add VGroup Mapping relationship in cluster + * + * @param mappingDO the relationship between vGroup and Cluster + */ + boolean addVGroup(MappingDO mappingDO); + + /** + * remove VGroup Mapping relationship in cluster + * + * @param vGroup + */ + boolean removeVGroup(String vGroup); + + /** + * get VGroup Mapping relationship in cluster + * + * @return Key:vGroup,Value:unit + */ + HashMap<String, Object> loadVGroups(); + + /** + * notify mapping relationship to all namingserver nodes + */ + default void notifyMapping() { + + Instance instance = Instance.getInstance(); + instance.addMetadata("vGroup", this.loadVGroups()); + try { + InetSocketAddress address = new InetSocketAddress(XID.getIpAddress(), XID.getPort()); + for (RegistryService registryService : MultiRegistryFactory.getInstances()) { + registryService.register(address); + } + } catch (Exception e) { + throw new RuntimeException("vGroup mapping relationship notified failed! ", e); + } + } + +} diff --git a/server/src/main/resources/META-INF/services/org.apache.seata.server.store.VGroupMappingStoreManager b/server/src/main/resources/META-INF/services/org.apache.seata.server.store.VGroupMappingStoreManager new file mode 100644 index 0000000000..25265e5099 --- /dev/null +++ b/server/src/main/resources/META-INF/services/org.apache.seata.server.store.VGroupMappingStoreManager @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.seata.server.storage.db.store.DataBaseVGroupMappingStoreManager +org.apache.seata.server.storage.file.store.FileVGroupMappingStoreManager +org.apache.seata.server.storage.redis.store.RedisVGroupMappingStoreManager \ No newline at end of file diff --git a/server/src/main/resources/application.example.yml b/server/src/main/resources/application.example.yml index fb4cfad34a..b14414b6cd 100644 --- a/server/src/main/resources/application.example.yml +++ b/server/src/main/resources/application.example.yml @@ -76,6 +76,13 @@ seata: # support: nacos 、 eureka 、 redis 、 zk 、 consul 、 etcd3 、 sofa type: file preferred-networks: 30.240.* + metadata: + weight: 1 + namingserver: + server-addr: 127.0.0.1:8080 + cluster: default + namespace: public + heartbeat-period: 1000 nacos: application: seata-server server-addr: 127.0.0.1:8848 diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/constants/NamingServerConstants.java b/server/src/test/java/org/apache/seata/server/controller/NamingControllerTest.java similarity index 53% rename from namingserver/src/main/java/org/apache/seata/namingserver/constants/NamingServerConstants.java rename to server/src/test/java/org/apache/seata/server/controller/NamingControllerTest.java index 91c11a3623..8ea8997aae 100644 --- a/namingserver/src/main/java/org/apache/seata/namingserver/constants/NamingServerConstants.java +++ b/server/src/test/java/org/apache/seata/server/controller/NamingControllerTest.java @@ -14,36 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.seata.namingserver.constants; +package org.apache.seata.server.controller; -public interface NamingServerConstants { - /** - * The constant HTTP_PREFIX - */ - String HTTP_PREFIX = "http://"; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; - /** - * The constant HTTP_ADD_GROUP_SUFFIX - */ - String HTTP_ADD_GROUP_SUFFIX = "/naming/v1/addVGroup?"; - /** - * The constant CONSTANT_UNIT - */ - String CONSTANT_UNIT = "unit"; +@Disabled +@SpringBootTest +class NamingControllerTest { + @Autowired + private NamingController namingController; - /** - * The constant CONSTANT_GROUP - */ - String CONSTANT_GROUP = "vGroup"; + @Test + void addVGroup() { + namingController.addVGroup("group1","unit1"); + } - /** - * The constant HTTP_REMOVE_GROUP_SUFFIX - */ - String HTTP_REMOVE_GROUP_SUFFIX = "/naming/v1/removeVGroup?"; - - /** - * The constant IP_PORT_SPLIT_CHAR - */ - String IP_PORT_SPLIT_CHAR = ":"; -} + @Test + void removeVGroup() { + namingController.removeVGroup("group1"); + } +} \ No newline at end of file diff --git a/server/src/test/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManagerTest.java b/server/src/test/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManagerTest.java new file mode 100644 index 0000000000..e0343118a6 --- /dev/null +++ b/server/src/test/java/org/apache/seata/server/storage/file/store/FileVGroupMappingStoreManagerTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.storage.file.store; + +import org.apache.commons.io.FileUtils; +import org.apache.seata.core.store.MappingDO; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.boot.test.context.SpringBootTest; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +@SpringBootTest +public class FileVGroupMappingStoreManagerTest { + + private FileVGroupMappingStoreManager fileVGroupMappingStoreManager; + private static final String STORE_PATH = "sessionStore/vgroup_mapping.json"; + private static final String VGROUP_NAME = "testVGroup"; + private static final String UNIT = "testUnit"; + + @BeforeEach + public void setUp() { + fileVGroupMappingStoreManager = new FileVGroupMappingStoreManager("sessionStore"); + File file = new File(STORE_PATH); + if (file.exists()) { + file.delete(); + } + } + + @Test + public void testAddVGroupSuccess() { + MappingDO mappingDO = new MappingDO(); + mappingDO.setVGroup(VGROUP_NAME); + mappingDO.setUnit(UNIT); + + assertTrue(fileVGroupMappingStoreManager.addVGroup(mappingDO)); + + HashMap<String, Object> vGroups = fileVGroupMappingStoreManager.loadVGroups(); + assertEquals(UNIT, vGroups.get(VGROUP_NAME)); + } + + @Test + public void testRemoveVGroupSuccess() { + MappingDO mappingDO = new MappingDO(); + mappingDO.setVGroup(VGROUP_NAME); + mappingDO.setUnit(UNIT); + + fileVGroupMappingStoreManager.addVGroup(mappingDO); + assertTrue(fileVGroupMappingStoreManager.removeVGroup(VGROUP_NAME)); + + HashMap<String, Object> vGroups = fileVGroupMappingStoreManager.loadVGroups(); + assertNull(vGroups.get(VGROUP_NAME)); + } + + @Test + public void testLoadVGroups() throws IOException { + HashMap<String, Object> expectedMapping = new HashMap<>(); + expectedMapping.put(VGROUP_NAME, UNIT); + File file = new File(STORE_PATH); + FileUtils.writeStringToFile(file, "{\"testVGroup\":\"testUnit\"}", StandardCharsets.UTF_8); + + HashMap<String, Object> actualMapping = fileVGroupMappingStoreManager.loadVGroups(); + assertEquals(expectedMapping, actualMapping); + } + + @Test + public void testSave() { + HashMap<String, Object> vGroupMapping = new HashMap<>(); + vGroupMapping.put(VGROUP_NAME, UNIT); + + assertTrue(fileVGroupMappingStoreManager.save(vGroupMapping)); + + File file = new File(STORE_PATH); + assertTrue(file.exists()); + + try { + String content = FileUtils.readFileToString(file, StandardCharsets.UTF_8); + assertEquals("{\"testVGroup\":\"testUnit\"}", content); + } catch (IOException e) { + fail("Failed to read the file content"); + } + } + + @Test + public void testAddVGroupFailure() { + FileVGroupMappingStoreManager spyManager = spy(new FileVGroupMappingStoreManager( "src/test/resources")); + doReturn(false).when(spyManager).save(any(HashMap.class)); + MappingDO mappingDO = new MappingDO(); + mappingDO.setVGroup(VGROUP_NAME); + mappingDO.setUnit(UNIT); + + assertFalse(spyManager.addVGroup(mappingDO)); + } + + @Test + public void testRemoveVGroupFailure() { + FileVGroupMappingStoreManager spyManager = spy(new FileVGroupMappingStoreManager("src/test/resources")); + doReturn(false).when(spyManager).save(any(HashMap.class)); + MappingDO mappingDO = new MappingDO(); + mappingDO.setVGroup(VGROUP_NAME); + mappingDO.setUnit(UNIT); + + spyManager.addVGroup(mappingDO); + assertFalse(spyManager.removeVGroup(VGROUP_NAME)); + } +} diff --git a/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java b/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java new file mode 100644 index 0000000000..fac04d66de --- /dev/null +++ b/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.server.storage.redis.store; + +import org.apache.seata.common.metadata.namingserver.Instance; +import org.apache.seata.core.store.MappingDO; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.Map; + + +@SpringBootTest +public class RedisVGroupMappingStoreManagerTest { + private RedisVGroupMappingStoreManager redisVGroupMappingStoreManager; + + @BeforeEach + public void setUp() { + redisVGroupMappingStoreManager = new RedisVGroupMappingStoreManager(); + } + + @Test + public void testLoadVGroups() { + Instance instance = Instance.getInstance(); + instance.setNamespace("public"); + instance.setClusterName("testCluster"); + instance.setUnit("123"); + MappingDO mappingDO = new MappingDO(); + mappingDO.setVGroup("testVGroup"); + mappingDO.setCluster("testCluster"); + mappingDO.setNamespace("public"); + redisVGroupMappingStoreManager.addVGroup(mappingDO); + Map<String,Object> map = redisVGroupMappingStoreManager.loadVGroups(); + Assertions.assertTrue(map.containsKey("testVGroup")); + redisVGroupMappingStoreManager.removeVGroup("testVGroup"); + map = redisVGroupMappingStoreManager.loadVGroups(); + Assertions.assertFalse(map.containsKey("testVGroup")); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org