This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new 17358bbd7c bugfix: gracefully shut down the server (#6143) 17358bbd7c is described below commit 17358bbd7c3e978b13ccb08183d36d76b88c04ca Author: laywin <lay...@yeah.net> AuthorDate: Thu Jan 11 17:53:33 2024 +0800 bugfix: gracefully shut down the server (#6143) --- changes/en-us/2.x.md | 2 + changes/zh-cn/2.x.md | 2 + .../main/java/io/seata/common/DefaultValues.java | 2 +- .../core/rpc/netty/NettyRemotingServerTest.java | 83 ++++++++++++++++++++++ .../registry/consul/ConsulRegistryServiceImpl.java | 9 ++- .../seata/discovery/registry/RegistryService.java | 32 +++++++-- .../registry/etcd3/EtcdRegistryServiceImpl.java | 2 + .../registry/eureka/EurekaRegistryServiceImpl.java | 2 + .../registry/nacos/NacosRegistryServiceImpl.java | 2 + .../registry/redis/RedisRegistryServiceImpl.java | 4 +- .../registry/sofa/SofaRegistryServiceImpl.java | 4 +- .../registry/zk/ZookeeperRegisterServiceImpl.java | 2 + .../zk/ZookeeperRegisterServiceImplTest.java | 67 +++++++++++++++++ .../boot/autoconfigure/CorePropertiesTest.java | 2 +- 14 files changed, 205 insertions(+), 10 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index c6a1cd2f55..df52104bf8 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -15,9 +15,11 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6121](https://github.com/apache/incubator-seata/pull/6121)] fix the branch transaction order error when rolling back - [[#6182](https://github.com/apache/incubator-seata/pull/6182)] fix guava-32.0.0-jre.jar zip file is empty in ci - [[#6196](https://github.com/apache/incubator-seata/pull/6196)] fix asf config file format error +- [[#6143](https://github.com/apache/incubator-seata/pull/6143)] gracefully shut down the server - [[#6204](https://github.com/apache/incubator-seata/pull/6204)] fix the problem that The incorrect configuration needs to be fixed - [[#6248](https://github.com/apache/incubator-seata/pull/6248)] fix JDBC resultSet, statement, connection closing order + ### optimize: - [[#6031](https://github.com/apache/incubator-seata/pull/6031)] add a check for the existence of the undolog table - [[#6089](https://github.com/apache/incubator-seata/pull/6089)] modify the semantics of RaftServerFactory and remove unnecessary singleton diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 0730187615..047473b569 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -15,9 +15,11 @@ - [[#6121](https://github.com/apache/incubator-seata/pull/6121)] 修复回滚分支事务时没有按照时间排序的问题 - [[#6182](https://github.com/apache/incubator-seata/pull/6182)] 修复在ci中guava-32.0.0-jre.jar zip文件为空的问题 - [[#6196](https://github.com/apache/incubator-seata/pull/6196)] 修复asf配置格式错误的问题 +- [[#6143](https://github.com/apache/incubator-seata/pull/6143)] 修复优雅停机 - [[#6204](https://github.com/apache/incubator-seata/pull/6204)] 修复错误配置问题 - [[#6248](https://github.com/apache/incubator-seata/pull/6248)] 修复JDBC resultSet, statement, connection关闭顺序 + ### optimize: - [[#6031](https://github.com/apache/incubator-seata/pull/6031)] 添加undo_log表的存在性校验 - [[#6089](https://github.com/apache/incubator-seata/pull/6089)] 修改RaftServerFactory语义并删除不必要的单例构建 diff --git a/common/src/main/java/io/seata/common/DefaultValues.java b/common/src/main/java/io/seata/common/DefaultValues.java index 0db7db8b0d..1aa5352ab4 100644 --- a/common/src/main/java/io/seata/common/DefaultValues.java +++ b/common/src/main/java/io/seata/common/DefaultValues.java @@ -47,7 +47,7 @@ public interface DefaultValues { /** * Shutdown timeout default 3s */ - int DEFAULT_SHUTDOWN_TIMEOUT_SEC = 3; + int DEFAULT_SHUTDOWN_TIMEOUT_SEC = 13; int DEFAULT_SELECTOR_THREAD_SIZE = 1; int DEFAULT_BOSS_THREAD_SIZE = 1; diff --git a/core/src/test/java/io/seata/core/rpc/netty/NettyRemotingServerTest.java b/core/src/test/java/io/seata/core/rpc/netty/NettyRemotingServerTest.java new file mode 100644 index 0000000000..cad4252626 --- /dev/null +++ b/core/src/test/java/io/seata/core/rpc/netty/NettyRemotingServerTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.core.rpc.netty; + +import io.netty.channel.Channel; +import io.seata.common.XID; +import io.seata.common.loader.EnhancedServiceLoader; +import io.seata.core.rpc.RegisterCheckAuthHandler; +import io.seata.discovery.registry.MultiRegistryFactory; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + + +public class NettyRemotingServerTest { + + private NettyRemotingServer nettyRemotingServer; + + @BeforeEach + public void init() { + nettyRemotingServer = new NettyRemotingServer(new ThreadPoolExecutor(1, 1, 0, + TimeUnit.SECONDS, new LinkedBlockingDeque<>())); + } + @Test + public void testInit() throws NoSuchFieldException, IllegalAccessException { + + MockedStatic<EnhancedServiceLoader> enhancedServiceLoaderMockedStatic = Mockito.mockStatic(EnhancedServiceLoader.class); + enhancedServiceLoaderMockedStatic.when(() -> EnhancedServiceLoader.load((RegisterCheckAuthHandler.class))).thenReturn(null); + + MockedStatic<MultiRegistryFactory> multiRegistryFactoryMockedStatic = Mockito.mockStatic(MultiRegistryFactory.class); + multiRegistryFactoryMockedStatic.when(MultiRegistryFactory::getInstances).thenReturn( + Collections.emptyList()); + + XID.setIpAddress("127.0.0.1"); + XID.setPort(8093); + + nettyRemotingServer.init(); + + multiRegistryFactoryMockedStatic.close(); + enhancedServiceLoaderMockedStatic.close(); + + Field field = NettyRemotingServer.class.getDeclaredField("initialized"); + field.setAccessible(true); + + Assertions.assertTrue(((AtomicBoolean)field.get(nettyRemotingServer)).get()); + } + + @Test + public void testDestroyChannel() { + Channel channel = Mockito.mock(Channel.class); + nettyRemotingServer.destroyChannel("127.0.0.1:8091", channel); + Mockito.verify(channel).close(); + } + + @Test + public void destory() { + nettyRemotingServer.destroy(); + Assertions.assertTrue(nettyRemotingServer != null); + } +} diff --git a/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java b/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java index 7ddf7a6289..9a51e56e1c 100644 --- a/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java +++ b/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java @@ -304,10 +304,15 @@ public class ConsulRegistryServiceImpl implements RegistryService<ConsulListener if (cluster == null || services == null) { return; } - clusterAddressMap.put(cluster, services.stream() + + List<InetSocketAddress> addresses = services.stream() .map(HealthService::getService) .map(service -> new InetSocketAddress(service.getAddress(), service.getPort())) - .collect(Collectors.toList())); + .collect(Collectors.toList()); + + clusterAddressMap.put(cluster, addresses); + + removeOfflineAddressesIfNecessary(cluster, addresses); } /** diff --git a/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java b/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java index 86486b1708..56f65f313c 100644 --- a/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java +++ b/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java @@ -17,12 +17,15 @@ package io.seata.discovery.registry; import java.net.InetSocketAddress; -import java.util.ArrayList; +import java.util.Set; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Collection; +import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import io.seata.config.ConfigurationCache; import io.seata.config.ConfigurationFactory; @@ -117,12 +120,33 @@ public interface RegistryService<T> { } default List<InetSocketAddress> aliveLookup(String transactionServiceGroup) { - return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k -> new ArrayList<>()); + return CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k -> new ArrayList<>()); } default List<InetSocketAddress> refreshAliveLookup(String transactionServiceGroup, List<InetSocketAddress> aliveAddress) { - return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress); + return CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress); + } + + + /** + * + * remove offline addresses if necessary. + * + * Intersection of the old and new addresses + * + * @param clusterName + * @param newAddressed + */ + default void removeOfflineAddressesIfNecessary(String clusterName, Collection<InetSocketAddress> newAddressed) { + + List<InetSocketAddress> currentAddresses = CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList()); + + List<InetSocketAddress> inetSocketAddresses = currentAddresses + .stream().filter(newAddressed::contains).collect( + Collectors.toList()); + + CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses); } } diff --git a/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java b/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java index a08be77e79..3f8f956506 100644 --- a/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java +++ b/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java @@ -252,6 +252,8 @@ public class EtcdRegistryServiceImpl implements RegistryService<Watch.Listener> return new InetSocketAddress(instanceInfo[0], Integer.parseInt(instanceInfo[1])); }).collect(Collectors.toList()); clusterAddressMap.put(cluster, new Pair<>(getResponse.getHeader().getRevision(), instanceList)); + + removeOfflineAddressesIfNecessary(cluster, instanceList); } /** diff --git a/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java b/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java index 3719349710..62e3251f1d 100644 --- a/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java +++ b/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java @@ -168,6 +168,8 @@ public class EurekaRegistryServiceImpl implements RegistryService<EurekaEventLis .map(instance -> new InetSocketAddress(instance.getIPAddr(), instance.getPort())) .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); + + removeOfflineAddressesIfNecessary(clusterName, newAddressList); } } diff --git a/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java b/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java index 708b2b54bd..002b555bcb 100644 --- a/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java +++ b/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java @@ -192,6 +192,8 @@ public class NacosRegistryServiceImpl implements RegistryService<EventListener> .map(eachInstance -> new InetSocketAddress(eachInstance.getIp(), eachInstance.getPort())) .collect(Collectors.toList()); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); + + removeOfflineAddressesIfNecessary(clusterName, newAddressList); } }); } diff --git a/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java b/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java index e0775a404d..483460f90c 100644 --- a/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java +++ b/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java @@ -278,7 +278,9 @@ public class RedisRegistryServiceImpl implements RegistryService<RedisListener> } } } - CLUSTER_ADDRESS_MAP.get(notifyCluserName).remove(inetSocketAddress); + socketAddresses.remove(inetSocketAddress); + + removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses); } @Override diff --git a/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java b/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java index 39bd39671f..957ba02c0b 100644 --- a/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java +++ b/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java @@ -17,9 +17,9 @@ package io.seata.discovery.registry.sofa; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -173,6 +173,8 @@ public class SofaRegistryServiceImpl implements RegistryService<SubscriberDataOb } else { List<InetSocketAddress> newAddressList = flatData(instances); CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); + + removeOfflineAddressesIfNecessary(clusterName, newAddressList); } respondRegistries.countDown(); }); diff --git a/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java b/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java index 0d26f7b686..d0c01fd8fb 100644 --- a/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java +++ b/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java @@ -308,6 +308,8 @@ public class ZookeeperRegisterServiceImpl implements RegistryService<IZkChildLis } } CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList); + + removeOfflineAddressesIfNecessary(clusterName, newAddressList); } private String getClusterName() { diff --git a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java index 9faaadf04d..9a21e61cf1 100644 --- a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java +++ b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java @@ -16,8 +16,10 @@ */ package io.seata.discovery.registry.zk; +import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -103,4 +105,69 @@ public class ZookeeperRegisterServiceImplTest { service.unsubscribe("default", listener2); } + @Test + public void testLookUp() throws Exception { + ZookeeperRegisterServiceImpl zookeeperRegisterService = ZookeeperRegisterServiceImpl.getInstance(); + + ZkClient client = service.buildZkClient("127.0.0.1:2181", 5000, 5000); + client.createPersistent("/registry/zk/cluster"); + client.createEphemeral("/registry/zk/cluster/127.0.0.1:8091"); + + Field field = ZookeeperRegisterServiceImpl.class.getDeclaredField("zkClient"); + field.setAccessible(true); + field.set(zookeeperRegisterService, client); + + + System.setProperty("txServiceGroup", "default_tx_group"); + System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); + + + List<InetSocketAddress> addressList = zookeeperRegisterService.lookup("default_tx_group"); + + Assertions.assertEquals(addressList, Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + } + + @Test + public void testRemoveOfflineAddressesIfNecessaryNoRemoveCase() { + service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + + Assertions.assertEquals(1, service.CURRENT_ADDRESS_MAP.get("cluster").size()); + } + + @Test + public void testRemoveOfflineAddressesIfNecessaryRemoveCase() { + service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + service.removeOfflineAddressesIfNecessary("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + + Assertions.assertEquals(0, service.CURRENT_ADDRESS_MAP.get("cluster").size()); + } + + @Test + public void testAliveLookup() { + + System.setProperty("txServiceGroup", "default_tx_group"); + System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); + + service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + List<InetSocketAddress> result = service.aliveLookup("default_tx_group"); + + Assertions.assertEquals(result, Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + } + + + @Test + public void tesRefreshAliveLookup() { + + System.setProperty("txServiceGroup", "default_tx_group"); + System.setProperty("service.vgroupMapping.default_tx_group", "cluster"); + + service.CURRENT_ADDRESS_MAP.put("cluster", Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091))); + + service.refreshAliveLookup("default_tx_group", + Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + + Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("cluster"), + Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091))); + } } diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java index 829e6f83a2..4b007b8f88 100644 --- a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java +++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java @@ -75,7 +75,7 @@ public class CorePropertiesTest { @Test public void testShutdownProperties() { - assertEquals(3L, context.getBean(ShutdownProperties.class).getWait()); + assertEquals(13L, context.getBean(ShutdownProperties.class).getWait()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org