This is an automated email from the ASF dual-hosted git repository.
xingfudeshi pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push:
new 04447c71fc optimize: enhance close() logic of discovery module (#7375)
04447c71fc is described below
commit 04447c71fc3a48edf5d04b88fa823a640c2773e9
Author: YoWuwuuuw <[email protected]>
AuthorDate: Thu Jun 12 14:16:41 2025 +0800
optimize: enhance close() logic of discovery module (#7375)
---
changes/en-us/2.x.md | 2 +-
changes/zh-cn/2.x.md | 1 +
.../registry/consul/ConsulRegistryServiceImpl.java | 20 ++++-
...java => ConsulRegistryServiceImplMockTest.java} | 92 +++++++++++++++++-----
.../discovery/registry/RegistryHeartBeats.java | 6 +-
.../registry/etcd3/EtcdRegistryServiceImpl.java | 30 ++++++-
.../{etcd => etcd3}/EtcdRegistryProviderTest.java | 4 +-
.../EtcdRegistryServiceImplMockTest.java | 87 +++++++++++++-------
.../EtcdRegistryServiceImplTest.java | 4 +-
.../registry/nacos/NacosRegistryServiceImpl.java | 22 +++++-
.../nacos/NacosRegistryServiceImplTest.java | 23 ++++++
.../registry/redis/RedisRegistryServiceImpl.java | 42 ++++++++--
.../redis/RedisRegisterServiceImplTest.java | 42 +++++++++-
.../registry/sofa/SofaRegistryServiceImpl.java | 2 +-
14 files changed, 301 insertions(+), 76 deletions(-)
diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index d56ea842a1..745b49f7ba 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -48,6 +48,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7360](https://github.com/apache/incubator-seata/pull/7360)] Update
resource cleanup logic for channel disconnection
- [[#7363](https://github.com/apache/incubator-seata/pull/7363)] Upgrade npmjs
dependencies
- [[#7372](https://github.com/apache/incubator-seata/pull/7372)] optimize
license ignore
+- [[#7375](https://github.com/apache/incubator-seata/pull/7375)] optimize
close() logic of discovery module
- [[#7388](https://github.com/apache/incubator-seata/pull/7388)] optimize
binary packaging directory structure
- [[#7412](https://github.com/apache/incubator-seata/pull/7412)] Helm template
adapted to the new version of seata
- [[#7414](https://github.com/apache/incubator-seata/pull/7414)] Remove the
unused defaultEventExecutorGroup from the NettyClientBootstrap
@@ -88,7 +89,6 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7379](https://github.com/apache/incubator-seata/issues/7379)] add UT for
TccAnnotationProcessor class
- [[#7422](https://github.com/apache/incubator-seata/pull/7422)] add UT for
seata-spring-boot-starter module
-
### refactor:
- [[#7315](https://github.com/apache/incubator-seata/pull/7315)] Refactor log
testing to use ListAppender for more accurate and efficient log capture
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index c7ef41db1b..a6c6d74f6d 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -48,6 +48,7 @@
- [[#7360](https://github.com/apache/incubator-seata/pull/7360)]
更新通道断开连接时的资源清理逻辑
- [[#7363](https://github.com/apache/incubator-seata/pull/7363)] 升级 npmjs 依赖项
- [[#7372](https://github.com/apache/incubator-seata/pull/7372)] 改进忽略许可证标头检查
+- [[#7375](https://github.com/apache/incubator-seata/pull/7375)] 优化 discovery
模块的 close 方法
- [[#7388](https://github.com/apache/incubator-seata/pull/7388)] 优化二进制打包目录结构
- [[#7412](https://github.com/apache/incubator-seata/pull/7412)] 适配新版本 Seata 的
Helm 模板
- [[#7414](https://github.com/apache/incubator-seata/pull/7414)] 移除
NettyClientBootstrap 中 defaultEventExecutorGroup
diff --git
a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
index 0d27e4ef53..40664b727a 100644
---
a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
+++
b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
@@ -364,7 +364,23 @@ public class ConsulRegistryServiceImpl implements
RegistryService<ConsulListener
@Override
public void close() throws Exception {
- client = null;
- }
+ notifiers.values().forEach(ConsulNotifier::stop);
+ notifiers.clear();
+
+ // Shut down the ThreadPoolExecutor
+ if (notifierExecutor != null && !notifierExecutor.isShutdown()) {
+ notifierExecutor.shutdown();
+ try {
+ if (!notifierExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ notifierExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ notifierExecutor.shutdownNow();
+ } finally {
+ notifierExecutor = null;
+ }
+ }
+ RegistryHeartBeats.close(REGISTRY_TYPE);
+ }
}
diff --git
a/discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplTest.java
b/discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplMockTest.java
similarity index 68%
rename from
discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplTest.java
rename to
discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplMockTest.java
index 955c1865a2..689fcd76b8 100644
---
a/discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplTest.java
+++
b/discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplMockTest.java
@@ -16,18 +16,17 @@
*/
package org.apache.seata.discovery.registry.consul;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import com.ecwid.consul.transport.RawResponse;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.health.model.HealthService;
-import org.apache.seata.config.Configuration;
-import org.apache.seata.config.ConfigurationFactory;
-import org.apache.seata.config.exception.ConfigNotFoundException;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
@@ -35,14 +34,22 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
+import org.apache.seata.config.Configuration;
+import org.apache.seata.config.ConfigurationFactory;
+import org.apache.seata.config.exception.ConfigNotFoundException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
-public class ConsulRegistryServiceImplTest {
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class ConsulRegistryServiceImplMockTest {
final String TEST_CLUSTER_NAME = "testCluster";
@@ -52,18 +59,22 @@ public class ConsulRegistryServiceImplTest {
@BeforeEach
public void init() throws Exception {
+ configuration = mock(Configuration.class);
service = (ConsulRegistryServiceImpl) new
ConsulRegistryProvider().provide();
client = mock(ConsulClient.class);
- this.setClient(service, client);
- configuration = mock(Configuration.class);
+ Field clientField =
ConsulRegistryServiceImpl.class.getDeclaredField("client");
+ clientField.setAccessible(true);
+ clientField.set(service, client);
}
+ @Order(1)
@Test
public void testGetInstance() {
Assertions.assertEquals(ConsulRegistryServiceImpl.getInstance(),
service);
}
+ @Order(2)
@Test
public void testRegister() throws Exception {
InetSocketAddress inetSocketAddress = new
InetSocketAddress("127.0.0.1", 8080);
@@ -77,6 +88,7 @@ public class ConsulRegistryServiceImplTest {
verify(client).agentServiceDeregister(any(), any());
}
+ @Order(3)
@Test
public void testSubscribeAndLookup() throws Exception {
ConsulListener consulListener = mock(ConsulListener.class);
@@ -110,14 +122,54 @@ public class ConsulRegistryServiceImplTest {
}
service.unsubscribe(TEST_CLUSTER_NAME, consulListener);
- Assertions.assertNull(getMap("notifiers").get(TEST_CLUSTER_NAME));
+ assertNull(getMap("notifiers").get(TEST_CLUSTER_NAME));
}
+ @Order(4)
+ @Test
+ public void testClose() throws Exception {
+ ExecutorService executorService1 = mockExecutorService(false, new
InterruptedException("Test interruption"));
+ service.close();
+ verifyCloseResults(executorService1, true);
- private void setClient(ConsulRegistryServiceImpl service, ConsulClient
client) throws Exception {
- Field clientField =
ConsulRegistryServiceImpl.class.getDeclaredField("client");
+ ExecutorService executorService = mockExecutorService(false, null);
+ service.close();
+
+ verifyCloseResults(executorService, true);
+ }
+
+ private ExecutorService mockExecutorService(boolean
awaitTerminationResult, InterruptedException exception) throws Exception {
+ ExecutorService executorService = mock(ExecutorService.class);
+ when(executorService.isShutdown()).thenReturn(false);
+
+ if (exception != null) {
+ when(executorService.awaitTermination(5,
TimeUnit.SECONDS)).thenThrow(exception);
+ } else {
+ when(executorService.awaitTermination(5,
TimeUnit.SECONDS)).thenReturn(awaitTerminationResult);
+ }
+
+ setExecutorService(executorService);
+ return executorService;
+ }
+
+ /**
+ * Verify the results of the closure method
+ */
+ private void verifyCloseResults(ExecutorService executorService, boolean
expectShutdownNow) throws Exception {
+ verify(executorService).shutdown();
+ verify(executorService).awaitTermination(5, TimeUnit.SECONDS);
+ if (expectShutdownNow) {
+ verify(executorService).shutdownNow();
+ }
+
+ Field clientField =
ConsulRegistryServiceImpl.class.getDeclaredField("notifiers");
clientField.setAccessible(true);
- clientField.set(service, client);
+ ConcurrentMap notifiers = (ConcurrentMap)clientField.get(service);
+ assertTrue(notifiers.isEmpty());
+
+ Field executorServiceField =
ConsulRegistryServiceImpl.class.getDeclaredField("notifierExecutor");
+ executorServiceField.setAccessible(true);
+ assertNull(executorServiceField.get(service));
}
private void setExecutorService(ExecutorService executorService) throws
Exception {
diff --git
a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryHeartBeats.java
b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryHeartBeats.java
index f9b539d375..9bc5277844 100644
---
a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryHeartBeats.java
+++
b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryHeartBeats.java
@@ -75,8 +75,10 @@ public class RegistryHeartBeats {
}, period, period, TimeUnit.MILLISECONDS);
}
- public static void close() {
- HEARTBEAT_SCHEDULED.shutdown();
+ public static void close(String registryType) {
+ if (getHeartbeatEnabled(registryType)) {
+ HEARTBEAT_SCHEDULED.shutdown();
+ }
}
private static long getHeartbeatPeriod(String registryType) {
diff --git
a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
index 4c43212d69..70607c2284 100644
---
a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
+++
b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
@@ -225,12 +225,34 @@ public class EtcdRegistryServiceImpl implements
RegistryService<Watch.Listener>
@Override
public void close() throws Exception {
- if (lifeKeeper != null) {
- lifeKeeper.stop();
- if (lifeKeeperFuture != null) {
- lifeKeeperFuture.get(3, TimeUnit.SECONDS);
+ // Shut down the ThreadPoolExecutor
+ if (executorService != null && !executorService.isShutdown()) {
+ executorService.shutdown();
+
+ try {
+ if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("ExecutorService shutdown interrupted. Forcing
shutdown.");
+ executorService.shutdownNow();
+ } finally {
+ executorService = null;
+ }
+ }
+
+ // Close the Etcd client and release the underlying connection
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOGGER.warn("Failed to close Etcd client: {}", e.getMessage());
+ } finally {
+ client = null;
}
}
+
+ RegistryHeartBeats.close(REGISTRY_TYPE);
}
/**
diff --git
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryProviderTest.java
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryProviderTest.java
similarity index 85%
rename from
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryProviderTest.java
rename to
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryProviderTest.java
index a6c8b94df1..d968553f50 100644
---
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryProviderTest.java
+++
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryProviderTest.java
@@ -14,10 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seata.discovery.registry.etcd;
+package org.apache.seata.discovery.registry.etcd3;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryProvider;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryServiceImpl;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
diff --git
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplMockTest.java
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplMockTest.java
similarity index 86%
rename from
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplMockTest.java
rename to
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplMockTest.java
index b3c1dcc585..d6fb1e455c 100644
---
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplMockTest.java
+++
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplMockTest.java
@@ -14,10 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seata.discovery.registry.etcd;
+package org.apache.seata.discovery.registry.etcd3;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.seata.common.DefaultValues.DEFAULT_TX_GROUP;
+import static org.junit.Assert.assertNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -43,6 +44,7 @@ import io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
+
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Arrays;
@@ -53,11 +55,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
import org.apache.seata.config.exception.ConfigNotFoundException;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryProvider;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryServiceImpl;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -132,6 +133,7 @@ public class EtcdRegistryServiceImplMockTest {
System.setProperty(EtcdRegistryServiceImpl.TEST_ENDPONT, "");
}
+ @Order(1)
@Test
public void testRegister() throws Exception {
long leaseId = 1L;
@@ -170,6 +172,7 @@ public class EtcdRegistryServiceImplMockTest {
verify(executorService, times(1)).submit(any(Callable.class));
}
+ @Order(2)
@Test
public void testUnregister() throws Exception {
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8091);
@@ -184,8 +187,8 @@ public class EtcdRegistryServiceImplMockTest {
verify(mockKVClient, times(1)).delete(any());
}
+ @Order(3)
@Test
- @Order(1)
public void testLookup() throws Exception {
List<String> services = Arrays.asList("127.0.0.1:8091",
"127.0.0.1:8092", "127.0.0.1:8093");
GetResponse mockGetResponse = createMockGetResponse(services);
@@ -213,30 +216,7 @@ public class EtcdRegistryServiceImplMockTest {
}
}
- private GetResponse createMockGetResponse(List<String> addresses) {
- // Create mock ResponseHeader
- ResponseHeader mockHeader =
- ResponseHeader.newBuilder().setRevision(12345L).build();
-
- // Create mock KeyValue list
- List<KeyValue> mockKeyValues = addresses.stream()
- .map(address -> {
- KeyValue mockKeyValue = mock(KeyValue.class);
-
when(mockKeyValue.getValue()).thenReturn(ByteSequence.from(address, UTF_8));
- return mockKeyValue;
- })
- .collect(Collectors.toList());
-
- // Create mock RangeResponse
- RangeResponse mockRangeResponse =
- RangeResponse.newBuilder().setHeader(mockHeader).build();
-
- // Create mock GetResponse
- GetResponse mockGetResponse = spy(new GetResponse(mockRangeResponse,
ByteSequence.EMPTY));
- when(mockGetResponse.getKvs()).thenReturn(mockKeyValues);
- return mockGetResponse;
- }
-
+ @Order(4)
@Test
public void testSubscribe() throws Exception {
Watch.Listener mockListener = mock(Watch.Listener.class);
@@ -246,6 +226,7 @@ public class EtcdRegistryServiceImplMockTest {
verify(executorService, times(1)).submit(any(Runnable.class));
}
+ @Order(5)
@Test
public void testUnsubscribe() throws Exception {
Watch.Listener mockListener = mock(Watch.Listener.class);
@@ -263,4 +244,54 @@ public class EtcdRegistryServiceImplMockTest {
registryService.unsubscribe(DEFAULT_TX_GROUP, mockListener);
assertEquals(0, latch.getCount(), "Latch should be 0");
}
+
+ @Order(6)
+ @Test
+ public void testClose() throws Exception {
+ // 1.condition: executorService shutdown with exception
+ when(executorService.isShutdown()).thenReturn(false);
+ when(executorService.awaitTermination(5,
TimeUnit.SECONDS)).thenThrow(new InterruptedException("Test interruption"));
+ registryService.close();
+
+ verify(executorService).shutdown();
+ verify(executorService).shutdownNow();
+ verify(mockClient).close();
+
+ Mockito.reset(executorService);
+ Field executorServiceField =
EtcdRegistryServiceImpl.class.getDeclaredField("executorService");
+ executorServiceField.setAccessible(true);
+ executorServiceField.set(registryService, executorService);
+
+ // 2.condition: executorService normal shutdown
+ when(executorService.isShutdown()).thenReturn(false);
+ when(executorService.awaitTermination(5,
TimeUnit.SECONDS)).thenReturn(false);
+ registryService.close();
+
+ Field clientField =
EtcdRegistryServiceImpl.class.getDeclaredField("client");
+ clientField.setAccessible(true);
+ assertNull(clientField.get(null));
+ assertNull(executorServiceField.get(registryService));
+ }
+
+ private GetResponse createMockGetResponse(List<String> addresses) {
+ // Create mock ResponseHeader
+ ResponseHeader mockHeader =
ResponseHeader.newBuilder().setRevision(12345L).build();
+
+ // Create mock KeyValue list
+ List<KeyValue> mockKeyValues = addresses.stream()
+ .map(address -> {
+ KeyValue mockKeyValue = mock(KeyValue.class);
+
when(mockKeyValue.getValue()).thenReturn(ByteSequence.from(address, UTF_8));
+ return mockKeyValue;
+ })
+ .collect(Collectors.toList());
+
+ // Create mock RangeResponse
+ RangeResponse mockRangeResponse =
RangeResponse.newBuilder().setHeader(mockHeader).build();
+
+ // Create mock GetResponse
+ GetResponse mockGetResponse = spy(new GetResponse(mockRangeResponse,
ByteSequence.EMPTY));
+ when(mockGetResponse.getKvs()).thenReturn(mockKeyValues);
+ return mockGetResponse;
+ }
}
diff --git
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplTest.java
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplTest.java
similarity index 97%
rename from
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplTest.java
rename to
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplTest.java
index 90ae8e8869..dc68349f4b 100644
---
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplTest.java
+++
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seata.discovery.registry.etcd;
+package org.apache.seata.discovery.registry.etcd3;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
@@ -23,8 +23,6 @@ import io.etcd.jetcd.launcher.junit4.EtcdClusterResource;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.watch.WatchResponse;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryProvider;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryServiceImpl;
import org.apache.seata.discovery.registry.RegistryService;
import org.junit.Rule;
import org.junit.jupiter.api.AfterAll;
diff --git
a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
index 5edfaed70d..eb97534ac5 100644
---
a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
+++
b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
@@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
/**
* The type Nacos registry service.
- *
*/
public class NacosRegistryServiceImpl implements
RegistryService<EventListener> {
@@ -208,7 +207,25 @@ public class NacosRegistryServiceImpl implements
RegistryService<EventListener>
@Override
public void close() throws Exception {
+ if (naming != null) {
+ try {
+ naming.shutDown();
+ } catch (Exception e) {
+ LOGGER.warn("Error while shutting down Nacos NamingService",
e);
+ } finally {
+ naming = null;
+ }
+ }
+ if (useSLBWay && namingMaintain != null) {
+ try {
+ namingMaintain.shutDown();
+ } catch (Exception e) {
+ LOGGER.warn("Error while shutting down Nacos
NamingMaintainService", e);
+ } finally {
+ namingMaintain = null;
+ }
+ }
}
/**
@@ -272,8 +289,9 @@ public class NacosRegistryServiceImpl implements
RegistryService<EventListener>
/**
* init nacos auth properties
- *
+ * <p>
* username/password > ak/sk > ramRoleName
+ *
* @param sourceProperties the source properties
* @return auth properties
*/
diff --git
a/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
b/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
index e00bcc0a9c..482ed7d871 100644
---
a/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
+++
b/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.seata.discovery.registry.nacos;
+import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Properties;
@@ -23,6 +24,7 @@ import org.apache.seata.common.util.ReflectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertNull;
/**
* The type Nacos registry serivce impl test
@@ -40,5 +42,26 @@ public class NacosRegistryServiceImplTest {
Assertions.assertThat(properties.getProperty("contextPath")).isEqualTo("/bar");
}
+ @Test
+ public void testClose() throws Exception {
+ NacosRegistryServiceImpl instance =
NacosRegistryServiceImpl.getInstance();
+ NacosRegistryServiceImpl.getNamingInstance();
+
+ Field useSLBWayField =
NacosRegistryServiceImpl.class.getDeclaredField("useSLBWay");
+ useSLBWayField.setAccessible(true);
+ useSLBWayField.set(instance, true);
+ NacosRegistryServiceImpl.getNamingMaintainInstance();
+
+ instance.close();
+
+ Field namingField =
NacosRegistryServiceImpl.class.getDeclaredField("naming");
+ namingField.setAccessible(true);
+ assertNull(namingField.get(null));
+
+ Field namingMaintainField =
NacosRegistryServiceImpl.class.getDeclaredField("namingMaintain");
+ namingMaintainField.setAccessible(true);
+ assertNull(namingMaintainField.get(null));
+ }
+
}
\ No newline at end of file
diff --git
a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
index 6c18a57e72..9a694aab62 100644
---
a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
+++
b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
@@ -52,7 +52,6 @@ import redis.clients.jedis.ScanResult;
/**
* The type Redis registry service.
- *
*/
public class RedisRegistryServiceImpl implements
RegistryService<RedisListener> {
@@ -244,7 +243,7 @@ public class RedisRegistryServiceImpl implements
RegistryService<RedisListener>
switch (eventType) {
case RedisListener.REGISTER:
CollectionUtils.computeIfAbsent(CLUSTER_ADDRESS_MAP,
clusterName, value -> ConcurrentHashMap.newKeySet(2))
- .add(NetUtil.toInetSocketAddress(serverAddr));
+ .add(NetUtil.toInetSocketAddress(serverAddr));
break;
case RedisListener.UN_REGISTER:
removeServerAddressByPushEmptyProtection(clusterName,
serverAddr);
@@ -258,13 +257,12 @@ public class RedisRegistryServiceImpl implements
RegistryService<RedisListener>
}
/**
- *
* if the serverAddr is unique in the address list and
* the callback cluster is current cluster, then enable push-empty
protection
* Otherwise, remove it.
*
* @param notifyCluserName notifyCluserName
- * @param serverAddr serverAddr
+ * @param serverAddr serverAddr
*/
private void removeServerAddressByPushEmptyProtection(String
notifyCluserName, String serverAddr) {
@@ -288,9 +286,39 @@ public class RedisRegistryServiceImpl implements
RegistryService<RedisListener>
@Override
public void close() {
- threadPoolExecutorForSubscribe.shutdown();
- threadPoolExecutorForUpdateMap.shutdown();
- RegistryHeartBeats.close();
+ // Shut down the ThreadPoolExecutors
+ if (threadPoolExecutorForSubscribe != null &&
!threadPoolExecutorForSubscribe.isShutdown()) {
+ threadPoolExecutorForSubscribe.shutdown();
+
+ try {
+ if (!threadPoolExecutorForSubscribe.awaitTermination(5,
TimeUnit.SECONDS)) {
+ threadPoolExecutorForSubscribe.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("ExecutorService threadPoolExecutorForSubscribe
shutdown interrupted. Forcing shutdown.");
+ threadPoolExecutorForSubscribe.shutdownNow();
+ } finally {
+ threadPoolExecutorForSubscribe = null;
+ }
+ }
+
+ if (threadPoolExecutorForUpdateMap != null &&
!threadPoolExecutorForUpdateMap.isShutdown()) {
+ threadPoolExecutorForUpdateMap.shutdown();
+
+ try {
+ if (!threadPoolExecutorForUpdateMap.awaitTermination(5,
TimeUnit.SECONDS)) {
+ threadPoolExecutorForUpdateMap.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("ExecutorService threadPoolExecutorForUpdateMap
shutdown interrupted. Forcing shutdown.");
+ threadPoolExecutorForUpdateMap.shutdownNow();
+ } finally {
+ threadPoolExecutorForUpdateMap = null;
+ }
+ }
+
+ RegistryHeartBeats.close(REGISTRY_TYPE);
+
jedisPool.destroy();
}
diff --git
a/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
b/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
index 36079b12e8..28c3f36a3d 100644
---
a/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
+++
b/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
@@ -19,8 +19,12 @@ package org.apache.seata.discovery.registry.redis;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.config.Configuration;
import org.apache.seata.config.ConfigurationFactory;
-import org.junit.jupiter.api.*;
-import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.mockito.MockedStatic;
import org.mockito.internal.util.collections.Sets;
@@ -32,14 +36,19 @@ import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@EnabledIfSystemProperty(named = "redisCaseEnabled", matches = "true")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class RedisRegisterServiceImplTest {
private static RedisRegistryServiceImpl redisRegistryService;
@@ -56,6 +65,7 @@ public class RedisRegisterServiceImplTest {
}
@Test
+ @Order(1)
public void testFlow() {
redisRegistryService.register(new
InetSocketAddress(NetUtil.getLocalIp(), 8091));
@@ -68,6 +78,7 @@ public class RedisRegisterServiceImplTest {
}
@Test
+ @Order(2)
public void testRemoveServerAddressByPushEmptyProtection()
throws NoSuchFieldException, NoSuchMethodException,
InvocationTargetException, IllegalAccessException {
@@ -80,7 +91,7 @@ public class RedisRegisterServiceImplTest {
Field field =
RedisRegistryServiceImpl.class.getDeclaredField("CLUSTER_ADDRESS_MAP");
field.setAccessible(true);
- ConcurrentMap<String, Set<InetSocketAddress>> CLUSTER_ADDRESS_MAP =
(ConcurrentMap<String, Set<InetSocketAddress>>)field.get(null);
+ ConcurrentMap<String, Set<InetSocketAddress>> CLUSTER_ADDRESS_MAP =
(ConcurrentMap<String, Set<InetSocketAddress>>) field.get(null);
CLUSTER_ADDRESS_MAP.put("cluster",
Sets.newSet(NetUtil.toInetSocketAddress("127.0.0.1:8091")));
Method method =
RedisRegistryServiceImpl.class.getDeclaredMethod("removeServerAddressByPushEmptyProtection",
String.class, String.class);
@@ -100,4 +111,29 @@ public class RedisRegisterServiceImplTest {
Assertions.assertEquals(0, CLUSTER_ADDRESS_MAP.get("cluster").size());
}
+ @Test
+ @Order(3)
+ public void testClose() throws Exception {
+ Field executorServiceField1 =
RedisRegistryServiceImpl.class.getDeclaredField("threadPoolExecutorForSubscribe");
+ executorServiceField1.setAccessible(true);
+ ScheduledExecutorService executorService1 =
mock(ScheduledExecutorService.class);
+ when(executorService1.isShutdown()).thenReturn(false);
+ when(executorService1.awaitTermination(5,
TimeUnit.SECONDS)).thenThrow(new InterruptedException("Test interruption"));
+ executorServiceField1.set(redisRegistryService, executorService1);
+
+ Field executorServiceField2 =
RedisRegistryServiceImpl.class.getDeclaredField("threadPoolExecutorForUpdateMap");
+ executorServiceField2.setAccessible(true);
+ ScheduledExecutorService executorService2 =
mock(ScheduledExecutorService.class);
+ when(executorService2.isShutdown()).thenReturn(false);
+ when(executorService2.awaitTermination(5,
TimeUnit.SECONDS)).thenThrow(new InterruptedException("Test interruption"));
+ executorServiceField2.set(redisRegistryService, executorService2);
+
+ redisRegistryService.close();
+
+ verify(executorService1).shutdownNow();
+ verify(executorService2).shutdownNow();
+ assertNull(executorServiceField1.get(redisRegistryService));
+ assertNull(executorServiceField2.get(redisRegistryService));
+ }
+
}
diff --git
a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
index fa0b428dda..e16aeb6453 100644
---
a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
+++
b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
@@ -47,7 +47,6 @@ import static
org.apache.seata.config.ConfigurationKeys.FILE_ROOT_REGISTRY;
/**
* The type SOFARegistry registry service.
- *
*/
public class SofaRegistryServiceImpl implements
RegistryService<SubscriberDataObserver> {
@@ -206,6 +205,7 @@ public class SofaRegistryServiceImpl implements
RegistryService<SubscriberDataOb
@Override
public void close() throws Exception {
+
}
private static Properties getNamingProperties() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]