This is an automated email from the ASF dual-hosted git repository.
jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 17358bbd7c bugfix: gracefully shut down the server (#6143)
17358bbd7c is described below
commit 17358bbd7c3e978b13ccb08183d36d76b88c04ca
Author: laywin <[email protected]>
AuthorDate: Thu Jan 11 17:53:33 2024 +0800
bugfix: gracefully shut down the server (#6143)
---
changes/en-us/2.x.md | 2 +
changes/zh-cn/2.x.md | 2 +
.../main/java/io/seata/common/DefaultValues.java | 2 +-
.../core/rpc/netty/NettyRemotingServerTest.java | 83 ++++++++++++++++++++++
.../registry/consul/ConsulRegistryServiceImpl.java | 9 ++-
.../seata/discovery/registry/RegistryService.java | 32 +++++++--
.../registry/etcd3/EtcdRegistryServiceImpl.java | 2 +
.../registry/eureka/EurekaRegistryServiceImpl.java | 2 +
.../registry/nacos/NacosRegistryServiceImpl.java | 2 +
.../registry/redis/RedisRegistryServiceImpl.java | 4 +-
.../registry/sofa/SofaRegistryServiceImpl.java | 4 +-
.../registry/zk/ZookeeperRegisterServiceImpl.java | 2 +
.../zk/ZookeeperRegisterServiceImplTest.java | 67 +++++++++++++++++
.../boot/autoconfigure/CorePropertiesTest.java | 2 +-
14 files changed, 205 insertions(+), 10 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index c6a1cd2f55..df52104bf8 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -15,9 +15,11 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6121](https://github.com/apache/incubator-seata/pull/6121)] fix the
branch transaction order error when rolling back
- [[#6182](https://github.com/apache/incubator-seata/pull/6182)] fix
guava-32.0.0-jre.jar zip file is empty in ci
- [[#6196](https://github.com/apache/incubator-seata/pull/6196)] fix asf
config file format error
+- [[#6143](https://github.com/apache/incubator-seata/pull/6143)] gracefully
shut down the server
- [[#6204](https://github.com/apache/incubator-seata/pull/6204)] fix the
problem that The incorrect configuration needs to be fixed
- [[#6248](https://github.com/apache/incubator-seata/pull/6248)] fix JDBC
resultSet, statement, connection closing order
+
### optimize:
- [[#6031](https://github.com/apache/incubator-seata/pull/6031)] add a check
for the existence of the undolog table
- [[#6089](https://github.com/apache/incubator-seata/pull/6089)] modify the
semantics of RaftServerFactory and remove unnecessary singleton
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index 0730187615..047473b569 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -15,9 +15,11 @@
- [[#6121](https://github.com/apache/incubator-seata/pull/6121)]
修复回滚分支事务时没有按照时间排序的问题
- [[#6182](https://github.com/apache/incubator-seata/pull/6182)]
修复在ci中guava-32.0.0-jre.jar zip文件为空的问题
- [[#6196](https://github.com/apache/incubator-seata/pull/6196)] 修复asf配置格式错误的问题
+- [[#6143](https://github.com/apache/incubator-seata/pull/6143)] 修复优雅停机
- [[#6204](https://github.com/apache/incubator-seata/pull/6204)] 修复错误配置问题
- [[#6248](https://github.com/apache/incubator-seata/pull/6248)] 修复JDBC
resultSet, statement, connection关闭顺序
+
### optimize:
- [[#6031](https://github.com/apache/incubator-seata/pull/6031)]
添加undo_log表的存在性校验
- [[#6089](https://github.com/apache/incubator-seata/pull/6089)]
修改RaftServerFactory语义并删除不必要的单例构建
diff --git a/common/src/main/java/io/seata/common/DefaultValues.java
b/common/src/main/java/io/seata/common/DefaultValues.java
index 0db7db8b0d..1aa5352ab4 100644
--- a/common/src/main/java/io/seata/common/DefaultValues.java
+++ b/common/src/main/java/io/seata/common/DefaultValues.java
@@ -47,7 +47,7 @@ public interface DefaultValues {
/**
* Shutdown timeout default 3s
*/
- int DEFAULT_SHUTDOWN_TIMEOUT_SEC = 3;
+ int DEFAULT_SHUTDOWN_TIMEOUT_SEC = 13;
int DEFAULT_SELECTOR_THREAD_SIZE = 1;
int DEFAULT_BOSS_THREAD_SIZE = 1;
diff --git
a/core/src/test/java/io/seata/core/rpc/netty/NettyRemotingServerTest.java
b/core/src/test/java/io/seata/core/rpc/netty/NettyRemotingServerTest.java
new file mode 100644
index 0000000000..cad4252626
--- /dev/null
+++ b/core/src/test/java/io/seata/core/rpc/netty/NettyRemotingServerTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.seata.core.rpc.netty;
+
+import io.netty.channel.Channel;
+import io.seata.common.XID;
+import io.seata.common.loader.EnhancedServiceLoader;
+import io.seata.core.rpc.RegisterCheckAuthHandler;
+import io.seata.discovery.registry.MultiRegistryFactory;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class NettyRemotingServerTest {
+
+ private NettyRemotingServer nettyRemotingServer;
+
+ @BeforeEach
+ public void init() {
+ nettyRemotingServer = new NettyRemotingServer(new
ThreadPoolExecutor(1, 1, 0,
+ TimeUnit.SECONDS, new LinkedBlockingDeque<>()));
+ }
+ @Test
+ public void testInit() throws NoSuchFieldException,
IllegalAccessException {
+
+ MockedStatic<EnhancedServiceLoader>
enhancedServiceLoaderMockedStatic =
Mockito.mockStatic(EnhancedServiceLoader.class);
+ enhancedServiceLoaderMockedStatic.when(() ->
EnhancedServiceLoader.load((RegisterCheckAuthHandler.class))).thenReturn(null);
+
+ MockedStatic<MultiRegistryFactory>
multiRegistryFactoryMockedStatic =
Mockito.mockStatic(MultiRegistryFactory.class);
+
multiRegistryFactoryMockedStatic.when(MultiRegistryFactory::getInstances).thenReturn(
+ Collections.emptyList());
+
+ XID.setIpAddress("127.0.0.1");
+ XID.setPort(8093);
+
+ nettyRemotingServer.init();
+
+ multiRegistryFactoryMockedStatic.close();
+ enhancedServiceLoaderMockedStatic.close();
+
+ Field field =
NettyRemotingServer.class.getDeclaredField("initialized");
+ field.setAccessible(true);
+
+
Assertions.assertTrue(((AtomicBoolean)field.get(nettyRemotingServer)).get());
+ }
+
+ @Test
+ public void testDestroyChannel() {
+ Channel channel = Mockito.mock(Channel.class);
+ nettyRemotingServer.destroyChannel("127.0.0.1:8091", channel);
+ Mockito.verify(channel).close();
+ }
+
+ @Test
+ public void destory() {
+ nettyRemotingServer.destroy();
+ Assertions.assertTrue(nettyRemotingServer != null);
+ }
+}
diff --git
a/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
b/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
index 7ddf7a6289..9a51e56e1c 100644
---
a/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
+++
b/discovery/seata-discovery-consul/src/main/java/io/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
@@ -304,10 +304,15 @@ public class ConsulRegistryServiceImpl implements
RegistryService<ConsulListener
if (cluster == null || services == null) {
return;
}
- clusterAddressMap.put(cluster, services.stream()
+
+ List<InetSocketAddress> addresses = services.stream()
.map(HealthService::getService)
.map(service -> new InetSocketAddress(service.getAddress(),
service.getPort()))
- .collect(Collectors.toList()));
+ .collect(Collectors.toList());
+
+ clusterAddressMap.put(cluster, addresses);
+
+ removeOfflineAddressesIfNecessary(cluster, addresses);
}
/**
diff --git
a/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java
b/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java
index 86486b1708..56f65f313c 100644
---
a/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java
+++
b/discovery/seata-discovery-core/src/main/java/io/seata/discovery/registry/RegistryService.java
@@ -17,12 +17,15 @@
package io.seata.discovery.registry;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
+import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Collection;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import io.seata.config.ConfigurationCache;
import io.seata.config.ConfigurationFactory;
@@ -117,12 +120,33 @@ public interface RegistryService<T> {
}
default List<InetSocketAddress> aliveLookup(String
transactionServiceGroup) {
- return CURRENT_ADDRESS_MAP.computeIfAbsent(transactionServiceGroup, k
-> new ArrayList<>());
+ return
CURRENT_ADDRESS_MAP.computeIfAbsent(getServiceGroup(transactionServiceGroup), k
-> new ArrayList<>());
}
default List<InetSocketAddress> refreshAliveLookup(String
transactionServiceGroup,
List<InetSocketAddress> aliveAddress) {
- return CURRENT_ADDRESS_MAP.put(transactionServiceGroup, aliveAddress);
+ return
CURRENT_ADDRESS_MAP.put(getServiceGroup(transactionServiceGroup), aliveAddress);
+ }
+
+
+ /**
+ *
+ * remove offline addresses if necessary.
+ *
+ * Intersection of the old and new addresses
+ *
+ * @param clusterName
+ * @param newAddressed
+ */
+ default void removeOfflineAddressesIfNecessary(String clusterName,
Collection<InetSocketAddress> newAddressed) {
+
+ List<InetSocketAddress> currentAddresses =
CURRENT_ADDRESS_MAP.getOrDefault(clusterName, Collections.emptyList());
+
+ List<InetSocketAddress> inetSocketAddresses = currentAddresses
+ .stream().filter(newAddressed::contains).collect(
+ Collectors.toList());
+
+ CURRENT_ADDRESS_MAP.put(clusterName, inetSocketAddresses);
}
}
diff --git
a/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
b/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
index a08be77e79..3f8f956506 100644
---
a/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
+++
b/discovery/seata-discovery-etcd3/src/main/java/io/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
@@ -252,6 +252,8 @@ public class EtcdRegistryServiceImpl implements
RegistryService<Watch.Listener>
return new InetSocketAddress(instanceInfo[0],
Integer.parseInt(instanceInfo[1]));
}).collect(Collectors.toList());
clusterAddressMap.put(cluster, new
Pair<>(getResponse.getHeader().getRevision(), instanceList));
+
+ removeOfflineAddressesIfNecessary(cluster, instanceList);
}
/**
diff --git
a/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
b/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
index 3719349710..62e3251f1d 100644
---
a/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
+++
b/discovery/seata-discovery-eureka/src/main/java/io/seata/discovery/registry/eureka/EurekaRegistryServiceImpl.java
@@ -168,6 +168,8 @@ public class EurekaRegistryServiceImpl implements
RegistryService<EurekaEventLis
.map(instance -> new
InetSocketAddress(instance.getIPAddr(), instance.getPort()))
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
+
+ removeOfflineAddressesIfNecessary(clusterName, newAddressList);
}
}
diff --git
a/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
b/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
index 708b2b54bd..002b555bcb 100644
---
a/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
+++
b/discovery/seata-discovery-nacos/src/main/java/io/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
@@ -192,6 +192,8 @@ public class NacosRegistryServiceImpl implements
RegistryService<EventListener>
.map(eachInstance -> new
InetSocketAddress(eachInstance.getIp(), eachInstance.getPort()))
.collect(Collectors.toList());
CLUSTER_ADDRESS_MAP.put(clusterName,
newAddressList);
+
+ removeOfflineAddressesIfNecessary(clusterName,
newAddressList);
}
});
}
diff --git
a/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
b/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
index e0775a404d..483460f90c 100644
---
a/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
+++
b/discovery/seata-discovery-redis/src/main/java/io/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
@@ -278,7 +278,9 @@ public class RedisRegistryServiceImpl implements
RegistryService<RedisListener>
}
}
}
- CLUSTER_ADDRESS_MAP.get(notifyCluserName).remove(inetSocketAddress);
+ socketAddresses.remove(inetSocketAddress);
+
+ removeOfflineAddressesIfNecessary(notifyCluserName, socketAddresses);
}
@Override
diff --git
a/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
b/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
index 39bd39671f..957ba02c0b 100644
---
a/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
+++
b/discovery/seata-discovery-sofa/src/main/java/io/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
@@ -17,9 +17,9 @@
package io.seata.discovery.registry.sofa;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -173,6 +173,8 @@ public class SofaRegistryServiceImpl implements
RegistryService<SubscriberDataOb
} else {
List<InetSocketAddress> newAddressList =
flatData(instances);
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
+
+ removeOfflineAddressesIfNecessary(clusterName,
newAddressList);
}
respondRegistries.countDown();
});
diff --git
a/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java
b/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java
index 0d26f7b686..d0c01fd8fb 100644
---
a/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java
+++
b/discovery/seata-discovery-zk/src/main/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImpl.java
@@ -308,6 +308,8 @@ public class ZookeeperRegisterServiceImpl implements
RegistryService<IZkChildLis
}
}
CLUSTER_ADDRESS_MAP.put(clusterName, newAddressList);
+
+ removeOfflineAddressesIfNecessary(clusterName, newAddressList);
}
private String getClusterName() {
diff --git
a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java
b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java
index 9faaadf04d..9a21e61cf1 100644
---
a/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java
+++
b/discovery/seata-discovery-zk/src/test/java/io/seata/discovery/registry/zk/ZookeeperRegisterServiceImplTest.java
@@ -16,8 +16,10 @@
*/
package io.seata.discovery.registry.zk;
+import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -103,4 +105,69 @@ public class ZookeeperRegisterServiceImplTest {
service.unsubscribe("default", listener2);
}
+ @Test
+ public void testLookUp() throws Exception {
+ ZookeeperRegisterServiceImpl zookeeperRegisterService =
ZookeeperRegisterServiceImpl.getInstance();
+
+ ZkClient client = service.buildZkClient("127.0.0.1:2181", 5000, 5000);
+ client.createPersistent("/registry/zk/cluster");
+ client.createEphemeral("/registry/zk/cluster/127.0.0.1:8091");
+
+ Field field =
ZookeeperRegisterServiceImpl.class.getDeclaredField("zkClient");
+ field.setAccessible(true);
+ field.set(zookeeperRegisterService, client);
+
+
+ System.setProperty("txServiceGroup", "default_tx_group");
+ System.setProperty("service.vgroupMapping.default_tx_group",
"cluster");
+
+
+ List<InetSocketAddress> addressList =
zookeeperRegisterService.lookup("default_tx_group");
+
+ Assertions.assertEquals(addressList, Collections.singletonList(new
InetSocketAddress("127.0.0.1", 8091)));
+ }
+
+ @Test
+ public void testRemoveOfflineAddressesIfNecessaryNoRemoveCase() {
+ service.CURRENT_ADDRESS_MAP.put("cluster",
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
+ service.removeOfflineAddressesIfNecessary("cluster",
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
+
+ Assertions.assertEquals(1,
service.CURRENT_ADDRESS_MAP.get("cluster").size());
+ }
+
+ @Test
+ public void testRemoveOfflineAddressesIfNecessaryRemoveCase() {
+ service.CURRENT_ADDRESS_MAP.put("cluster",
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
+ service.removeOfflineAddressesIfNecessary("cluster",
Collections.singletonList(new InetSocketAddress("127.0.0.2", 8091)));
+
+ Assertions.assertEquals(0,
service.CURRENT_ADDRESS_MAP.get("cluster").size());
+ }
+
+ @Test
+ public void testAliveLookup() {
+
+ System.setProperty("txServiceGroup", "default_tx_group");
+ System.setProperty("service.vgroupMapping.default_tx_group",
"cluster");
+
+ service.CURRENT_ADDRESS_MAP.put("cluster",
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
+ List<InetSocketAddress> result =
service.aliveLookup("default_tx_group");
+
+ Assertions.assertEquals(result, Collections.singletonList(new
InetSocketAddress("127.0.0.1", 8091)));
+ }
+
+
+ @Test
+ public void tesRefreshAliveLookup() {
+
+ System.setProperty("txServiceGroup", "default_tx_group");
+ System.setProperty("service.vgroupMapping.default_tx_group",
"cluster");
+
+ service.CURRENT_ADDRESS_MAP.put("cluster",
Collections.singletonList(new InetSocketAddress("127.0.0.1", 8091)));
+
+ service.refreshAliveLookup("default_tx_group",
+ Collections.singletonList(new InetSocketAddress("127.0.0.2",
8091)));
+
+ Assertions.assertEquals(service.CURRENT_ADDRESS_MAP.get("cluster"),
+ Collections.singletonList(new InetSocketAddress("127.0.0.2",
8091)));
+ }
}
diff --git
a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java
b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java
index 829e6f83a2..4b007b8f88 100644
---
a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java
+++
b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/test/java/io/seata/spring/boot/autoconfigure/CorePropertiesTest.java
@@ -75,7 +75,7 @@ public class CorePropertiesTest {
@Test
public void testShutdownProperties() {
- assertEquals(3L, context.getBean(ShutdownProperties.class).getWait());
+ assertEquals(13L, context.getBean(ShutdownProperties.class).getWait());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]