This is an automated email from the ASF dual-hosted git repository.

xingfudeshi pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 04447c71fc optimize: enhance close() logic of discovery module (#7375)
04447c71fc is described below

commit 04447c71fc3a48edf5d04b88fa823a640c2773e9
Author: YoWuwuuuw <[email protected]>
AuthorDate: Thu Jun 12 14:16:41 2025 +0800

    optimize: enhance close() logic of discovery module (#7375)
---
 changes/en-us/2.x.md                               |  2 +-
 changes/zh-cn/2.x.md                               |  1 +
 .../registry/consul/ConsulRegistryServiceImpl.java | 20 ++++-
 ...java => ConsulRegistryServiceImplMockTest.java} | 92 +++++++++++++++++-----
 .../discovery/registry/RegistryHeartBeats.java     |  6 +-
 .../registry/etcd3/EtcdRegistryServiceImpl.java    | 30 ++++++-
 .../{etcd => etcd3}/EtcdRegistryProviderTest.java  |  4 +-
 .../EtcdRegistryServiceImplMockTest.java           | 87 +++++++++++++-------
 .../EtcdRegistryServiceImplTest.java               |  4 +-
 .../registry/nacos/NacosRegistryServiceImpl.java   | 22 +++++-
 .../nacos/NacosRegistryServiceImplTest.java        | 23 ++++++
 .../registry/redis/RedisRegistryServiceImpl.java   | 42 ++++++++--
 .../redis/RedisRegisterServiceImplTest.java        | 42 +++++++++-
 .../registry/sofa/SofaRegistryServiceImpl.java     |  2 +-
 14 files changed, 301 insertions(+), 76 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index d56ea842a1..745b49f7ba 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -48,6 +48,7 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#7360](https://github.com/apache/incubator-seata/pull/7360)] Update 
resource cleanup logic for channel disconnection
 - [[#7363](https://github.com/apache/incubator-seata/pull/7363)] Upgrade npmjs 
dependencies
 - [[#7372](https://github.com/apache/incubator-seata/pull/7372)] optimize 
license ignore
+- [[#7375](https://github.com/apache/incubator-seata/pull/7375)] optimize 
close() logic of discovery module
 - [[#7388](https://github.com/apache/incubator-seata/pull/7388)] optimize 
binary packaging directory structure
 - [[#7412](https://github.com/apache/incubator-seata/pull/7412)] Helm template 
adapted to the new version of seata
 - [[#7414](https://github.com/apache/incubator-seata/pull/7414)] Remove the 
unused defaultEventExecutorGroup from the NettyClientBootstrap
@@ -88,7 +89,6 @@ Add changes here for all PR submitted to the 2.x branch.
 - [[#7379](https://github.com/apache/incubator-seata/issues/7379)] add UT for 
TccAnnotationProcessor class
 - [[#7422](https://github.com/apache/incubator-seata/pull/7422)] add UT for 
seata-spring-boot-starter module
 
-
 ### refactor:
 
 - [[#7315](https://github.com/apache/incubator-seata/pull/7315)] Refactor log 
testing to use ListAppender for more accurate and efficient log capture
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index c7ef41db1b..a6c6d74f6d 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -48,6 +48,7 @@
 - [[#7360](https://github.com/apache/incubator-seata/pull/7360)] 
更新通道断开连接时的资源清理逻辑
 - [[#7363](https://github.com/apache/incubator-seata/pull/7363)] 升级 npmjs 依赖项
 - [[#7372](https://github.com/apache/incubator-seata/pull/7372)] 改进忽略许可证标头检查
+- [[#7375](https://github.com/apache/incubator-seata/pull/7375)] 优化 discovery 
模块的 close 方法
 - [[#7388](https://github.com/apache/incubator-seata/pull/7388)] 优化二进制打包目录结构
 - [[#7412](https://github.com/apache/incubator-seata/pull/7412)] 适配新版本 Seata 的 
Helm 模板
 - [[#7414](https://github.com/apache/incubator-seata/pull/7414)] 移除 
NettyClientBootstrap 中 defaultEventExecutorGroup
diff --git 
a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
 
b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
index 0d27e4ef53..40664b727a 100644
--- 
a/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-consul/src/main/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImpl.java
@@ -364,7 +364,23 @@ public class ConsulRegistryServiceImpl implements 
RegistryService<ConsulListener
 
     @Override
     public void close() throws Exception {
-        client = null;
-    }
+        notifiers.values().forEach(ConsulNotifier::stop);
+        notifiers.clear();
+
+        // Shut down the ThreadPoolExecutor
+        if (notifierExecutor != null && !notifierExecutor.isShutdown()) {
+            notifierExecutor.shutdown();
+            try {
+                if (!notifierExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                    notifierExecutor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                notifierExecutor.shutdownNow();
+            } finally {
+                notifierExecutor = null;
+            }
+        }
 
+        RegistryHeartBeats.close(REGISTRY_TYPE);
+    }
 }
diff --git 
a/discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplTest.java
 
b/discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplMockTest.java
similarity index 68%
rename from 
discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplTest.java
rename to 
discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplMockTest.java
index 955c1865a2..689fcd76b8 100644
--- 
a/discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplTest.java
+++ 
b/discovery/seata-discovery-consul/src/test/java/org/apache/seata/discovery/registry/consul/ConsulRegistryServiceImplMockTest.java
@@ -16,18 +16,17 @@
  */
 package org.apache.seata.discovery.registry.consul;
 
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import com.ecwid.consul.transport.RawResponse;
 import com.ecwid.consul.v1.ConsulClient;
 import com.ecwid.consul.v1.Response;
 import com.ecwid.consul.v1.health.model.HealthService;
-import org.apache.seata.config.Configuration;
-import org.apache.seata.config.ConfigurationFactory;
-import org.apache.seata.config.exception.ConfigNotFoundException;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
 
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
@@ -35,14 +34,22 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
+import org.apache.seata.config.Configuration;
+import org.apache.seata.config.ConfigurationFactory;
+import org.apache.seata.config.exception.ConfigNotFoundException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
-public class ConsulRegistryServiceImplTest {
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+public class ConsulRegistryServiceImplMockTest {
 
     final String TEST_CLUSTER_NAME = "testCluster";
 
@@ -52,18 +59,22 @@ public class ConsulRegistryServiceImplTest {
 
     @BeforeEach
     public void init() throws Exception {
+        configuration = mock(Configuration.class);
         service = (ConsulRegistryServiceImpl) new 
ConsulRegistryProvider().provide();
         client = mock(ConsulClient.class);
-        this.setClient(service, client);
 
-        configuration = mock(Configuration.class);
+        Field clientField = 
ConsulRegistryServiceImpl.class.getDeclaredField("client");
+        clientField.setAccessible(true);
+        clientField.set(service, client);
     }
 
+    @Order(1)
     @Test
     public void testGetInstance() {
         Assertions.assertEquals(ConsulRegistryServiceImpl.getInstance(), 
service);
     }
 
+    @Order(2)
     @Test
     public void testRegister() throws Exception {
         InetSocketAddress inetSocketAddress = new 
InetSocketAddress("127.0.0.1", 8080);
@@ -77,6 +88,7 @@ public class ConsulRegistryServiceImplTest {
         verify(client).agentServiceDeregister(any(), any());
     }
 
+    @Order(3)
     @Test
     public void testSubscribeAndLookup() throws Exception {
         ConsulListener consulListener = mock(ConsulListener.class);
@@ -110,14 +122,54 @@ public class ConsulRegistryServiceImplTest {
         }
 
         service.unsubscribe(TEST_CLUSTER_NAME, consulListener);
-        Assertions.assertNull(getMap("notifiers").get(TEST_CLUSTER_NAME));
+        assertNull(getMap("notifiers").get(TEST_CLUSTER_NAME));
     }
 
+    @Order(4)
+    @Test
+    public void testClose() throws Exception {
+        ExecutorService executorService1 = mockExecutorService(false, new 
InterruptedException("Test interruption"));
+        service.close();
+        verifyCloseResults(executorService1, true);
 
-    private void setClient(ConsulRegistryServiceImpl service, ConsulClient 
client) throws Exception {
-        Field clientField = 
ConsulRegistryServiceImpl.class.getDeclaredField("client");
+        ExecutorService executorService = mockExecutorService(false, null);
+        service.close();
+
+        verifyCloseResults(executorService, true);
+    }
+
+    private ExecutorService mockExecutorService(boolean 
awaitTerminationResult, InterruptedException exception) throws Exception {
+        ExecutorService executorService = mock(ExecutorService.class);
+        when(executorService.isShutdown()).thenReturn(false);
+
+        if (exception != null) {
+            when(executorService.awaitTermination(5, 
TimeUnit.SECONDS)).thenThrow(exception);
+        } else {
+            when(executorService.awaitTermination(5, 
TimeUnit.SECONDS)).thenReturn(awaitTerminationResult);
+        }
+
+        setExecutorService(executorService);
+        return executorService;
+    }
+
+    /**
+     * Verify the results of the closure method
+     */
+    private void verifyCloseResults(ExecutorService executorService, boolean 
expectShutdownNow) throws Exception {
+        verify(executorService).shutdown();
+        verify(executorService).awaitTermination(5, TimeUnit.SECONDS);
+        if (expectShutdownNow) {
+            verify(executorService).shutdownNow();
+        }
+
+        Field clientField = 
ConsulRegistryServiceImpl.class.getDeclaredField("notifiers");
         clientField.setAccessible(true);
-        clientField.set(service, client);
+        ConcurrentMap notifiers = (ConcurrentMap)clientField.get(service);
+        assertTrue(notifiers.isEmpty());
+
+        Field executorServiceField = 
ConsulRegistryServiceImpl.class.getDeclaredField("notifierExecutor");
+        executorServiceField.setAccessible(true);
+        assertNull(executorServiceField.get(service));
     }
 
     private void setExecutorService(ExecutorService executorService) throws 
Exception {
diff --git 
a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryHeartBeats.java
 
b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryHeartBeats.java
index f9b539d375..9bc5277844 100644
--- 
a/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryHeartBeats.java
+++ 
b/discovery/seata-discovery-core/src/main/java/org/apache/seata/discovery/registry/RegistryHeartBeats.java
@@ -75,8 +75,10 @@ public class RegistryHeartBeats {
         }, period, period, TimeUnit.MILLISECONDS);
     }
 
-    public static void close() {
-        HEARTBEAT_SCHEDULED.shutdown();
+    public static void close(String registryType) {
+        if (getHeartbeatEnabled(registryType)) {
+            HEARTBEAT_SCHEDULED.shutdown();
+        }
     }
 
     private static long getHeartbeatPeriod(String registryType) {
diff --git 
a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
 
b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
index 4c43212d69..70607c2284 100644
--- 
a/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-etcd3/src/main/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImpl.java
@@ -225,12 +225,34 @@ public class EtcdRegistryServiceImpl implements 
RegistryService<Watch.Listener>
 
     @Override
     public void close() throws Exception {
-        if (lifeKeeper != null) {
-            lifeKeeper.stop();
-            if (lifeKeeperFuture != null) {
-                lifeKeeperFuture.get(3, TimeUnit.SECONDS);
+        // Shut down the ThreadPoolExecutor
+        if (executorService != null && !executorService.isShutdown()) {
+            executorService.shutdown();
+
+            try {
+                if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+                    executorService.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                LOGGER.warn("ExecutorService shutdown interrupted. Forcing 
shutdown.");
+                executorService.shutdownNow();
+            } finally {
+                executorService = null;
+            }
+        }
+
+        // Close the Etcd client and release the underlying connection
+        if (client != null) {
+            try {
+                client.close();
+            } catch (Exception e) {
+                LOGGER.warn("Failed to close Etcd client: {}", e.getMessage());
+            } finally {
+                client = null;
             }
         }
+
+        RegistryHeartBeats.close(REGISTRY_TYPE);
     }
 
     /**
diff --git 
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryProviderTest.java
 
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryProviderTest.java
similarity index 85%
rename from 
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryProviderTest.java
rename to 
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryProviderTest.java
index a6c8b94df1..d968553f50 100644
--- 
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryProviderTest.java
+++ 
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryProviderTest.java
@@ -14,10 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seata.discovery.registry.etcd;
+package org.apache.seata.discovery.registry.etcd3;
 
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryProvider;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryServiceImpl;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
diff --git 
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplMockTest.java
 
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplMockTest.java
similarity index 86%
rename from 
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplMockTest.java
rename to 
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplMockTest.java
index b3c1dcc585..d6fb1e455c 100644
--- 
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplMockTest.java
+++ 
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplMockTest.java
@@ -14,10 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seata.discovery.registry.etcd;
+package org.apache.seata.discovery.registry.etcd3;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.seata.common.DefaultValues.DEFAULT_TX_GROUP;
+import static org.junit.Assert.assertNull;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -43,6 +44,7 @@ import io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
 import io.etcd.jetcd.options.GetOption;
 import io.etcd.jetcd.options.PutOption;
 import io.etcd.jetcd.options.WatchOption;
+
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
@@ -53,11 +55,10 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+
 import org.apache.seata.config.Configuration;
 import org.apache.seata.config.ConfigurationFactory;
 import org.apache.seata.config.exception.ConfigNotFoundException;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryProvider;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryServiceImpl;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -132,6 +133,7 @@ public class EtcdRegistryServiceImplMockTest {
         System.setProperty(EtcdRegistryServiceImpl.TEST_ENDPONT, "");
     }
 
+    @Order(1)
     @Test
     public void testRegister() throws Exception {
         long leaseId = 1L;
@@ -170,6 +172,7 @@ public class EtcdRegistryServiceImplMockTest {
         verify(executorService, times(1)).submit(any(Callable.class));
     }
 
+    @Order(2)
     @Test
     public void testUnregister() throws Exception {
         InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8091);
@@ -184,8 +187,8 @@ public class EtcdRegistryServiceImplMockTest {
         verify(mockKVClient, times(1)).delete(any());
     }
 
+    @Order(3)
     @Test
-    @Order(1)
     public void testLookup() throws Exception {
         List<String> services = Arrays.asList("127.0.0.1:8091", 
"127.0.0.1:8092", "127.0.0.1:8093");
         GetResponse mockGetResponse = createMockGetResponse(services);
@@ -213,30 +216,7 @@ public class EtcdRegistryServiceImplMockTest {
         }
     }
 
-    private GetResponse createMockGetResponse(List<String> addresses) {
-        // Create mock ResponseHeader
-        ResponseHeader mockHeader =
-                ResponseHeader.newBuilder().setRevision(12345L).build();
-
-        // Create mock KeyValue list
-        List<KeyValue> mockKeyValues = addresses.stream()
-                .map(address -> {
-                    KeyValue mockKeyValue = mock(KeyValue.class);
-                    
when(mockKeyValue.getValue()).thenReturn(ByteSequence.from(address, UTF_8));
-                    return mockKeyValue;
-                })
-                .collect(Collectors.toList());
-
-        // Create mock RangeResponse
-        RangeResponse mockRangeResponse =
-                RangeResponse.newBuilder().setHeader(mockHeader).build();
-
-        // Create mock GetResponse
-        GetResponse mockGetResponse = spy(new GetResponse(mockRangeResponse, 
ByteSequence.EMPTY));
-        when(mockGetResponse.getKvs()).thenReturn(mockKeyValues);
-        return mockGetResponse;
-    }
-
+    @Order(4)
     @Test
     public void testSubscribe() throws Exception {
         Watch.Listener mockListener = mock(Watch.Listener.class);
@@ -246,6 +226,7 @@ public class EtcdRegistryServiceImplMockTest {
         verify(executorService, times(1)).submit(any(Runnable.class));
     }
 
+    @Order(5)
     @Test
     public void testUnsubscribe() throws Exception {
         Watch.Listener mockListener = mock(Watch.Listener.class);
@@ -263,4 +244,54 @@ public class EtcdRegistryServiceImplMockTest {
         registryService.unsubscribe(DEFAULT_TX_GROUP, mockListener);
         assertEquals(0, latch.getCount(), "Latch should be 0");
     }
+
+    @Order(6)
+    @Test
+    public void testClose() throws Exception {
+        // 1.condition: executorService shutdown with exception
+        when(executorService.isShutdown()).thenReturn(false);
+        when(executorService.awaitTermination(5, 
TimeUnit.SECONDS)).thenThrow(new InterruptedException("Test interruption"));
+        registryService.close();
+
+        verify(executorService).shutdown();
+        verify(executorService).shutdownNow();
+        verify(mockClient).close();
+
+        Mockito.reset(executorService);
+        Field executorServiceField = 
EtcdRegistryServiceImpl.class.getDeclaredField("executorService");
+        executorServiceField.setAccessible(true);
+        executorServiceField.set(registryService, executorService);
+
+        // 2.condition: executorService normal shutdown
+        when(executorService.isShutdown()).thenReturn(false);
+        when(executorService.awaitTermination(5, 
TimeUnit.SECONDS)).thenReturn(false);
+        registryService.close();
+
+        Field clientField = 
EtcdRegistryServiceImpl.class.getDeclaredField("client");
+        clientField.setAccessible(true);
+        assertNull(clientField.get(null));
+        assertNull(executorServiceField.get(registryService));
+    }
+
+    private GetResponse createMockGetResponse(List<String> addresses) {
+        // Create mock ResponseHeader
+        ResponseHeader mockHeader = 
ResponseHeader.newBuilder().setRevision(12345L).build();
+
+        // Create mock KeyValue list
+        List<KeyValue> mockKeyValues = addresses.stream()
+                .map(address -> {
+                    KeyValue mockKeyValue = mock(KeyValue.class);
+                    
when(mockKeyValue.getValue()).thenReturn(ByteSequence.from(address, UTF_8));
+                    return mockKeyValue;
+                })
+                .collect(Collectors.toList());
+
+        // Create mock RangeResponse
+        RangeResponse mockRangeResponse = 
RangeResponse.newBuilder().setHeader(mockHeader).build();
+
+        // Create mock GetResponse
+        GetResponse mockGetResponse = spy(new GetResponse(mockRangeResponse, 
ByteSequence.EMPTY));
+        when(mockGetResponse.getKvs()).thenReturn(mockKeyValues);
+        return mockGetResponse;
+    }
 }
diff --git 
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplTest.java
 
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplTest.java
similarity index 97%
rename from 
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplTest.java
rename to 
discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplTest.java
index 90ae8e8869..dc68349f4b 100644
--- 
a/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd/EtcdRegistryServiceImplTest.java
+++ 
b/discovery/seata-discovery-etcd3/src/test/java/org/apache/seata/discovery/registry/etcd3/EtcdRegistryServiceImplTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seata.discovery.registry.etcd;
+package org.apache.seata.discovery.registry.etcd3;
 
 import io.etcd.jetcd.ByteSequence;
 import io.etcd.jetcd.Client;
@@ -23,8 +23,6 @@ import io.etcd.jetcd.launcher.junit4.EtcdClusterResource;
 import io.etcd.jetcd.options.DeleteOption;
 import io.etcd.jetcd.options.GetOption;
 import io.etcd.jetcd.watch.WatchResponse;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryProvider;
-import org.apache.seata.discovery.registry.etcd3.EtcdRegistryServiceImpl;
 import org.apache.seata.discovery.registry.RegistryService;
 import org.junit.Rule;
 import org.junit.jupiter.api.AfterAll;
diff --git 
a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
 
b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
index 5edfaed70d..eb97534ac5 100644
--- 
a/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-nacos/src/main/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImpl.java
@@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * The type Nacos registry service.
- *
  */
 public class NacosRegistryServiceImpl implements 
RegistryService<EventListener> {
 
@@ -208,7 +207,25 @@ public class NacosRegistryServiceImpl implements 
RegistryService<EventListener>
 
     @Override
     public void close() throws Exception {
+        if (naming != null) {
+            try {
+                naming.shutDown();
+            } catch (Exception e) {
+                LOGGER.warn("Error while shutting down Nacos NamingService", 
e);
+            } finally {
+                naming = null;
+            }
+        }
 
+        if (useSLBWay && namingMaintain != null) {
+            try {
+                namingMaintain.shutDown();
+            } catch (Exception e) {
+                LOGGER.warn("Error while shutting down Nacos 
NamingMaintainService", e);
+            } finally {
+                namingMaintain = null;
+            }
+        }
     }
 
     /**
@@ -272,8 +289,9 @@ public class NacosRegistryServiceImpl implements 
RegistryService<EventListener>
 
     /**
      * init nacos auth properties
-     *
+     * <p>
      * username/password > ak/sk > ramRoleName
+     *
      * @param sourceProperties the source properties
      * @return auth properties
      */
diff --git 
a/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
 
b/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
index e00bcc0a9c..482ed7d871 100644
--- 
a/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
+++ 
b/discovery/seata-discovery-nacos/src/test/java/org/apache/seata/discovery/registry/nacos/NacosRegistryServiceImplTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.seata.discovery.registry.nacos;
 
+import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.Properties;
 
@@ -23,6 +24,7 @@ import org.apache.seata.common.util.ReflectionUtil;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 /**
  * The type Nacos registry serivce impl test
@@ -40,5 +42,26 @@ public class NacosRegistryServiceImplTest {
         
Assertions.assertThat(properties.getProperty("contextPath")).isEqualTo("/bar");
     }
 
+    @Test
+    public void testClose() throws Exception {
+        NacosRegistryServiceImpl instance = 
NacosRegistryServiceImpl.getInstance();
+        NacosRegistryServiceImpl.getNamingInstance();
+
+        Field useSLBWayField = 
NacosRegistryServiceImpl.class.getDeclaredField("useSLBWay");
+        useSLBWayField.setAccessible(true);
+        useSLBWayField.set(instance, true);
+        NacosRegistryServiceImpl.getNamingMaintainInstance();
+
+        instance.close();
+
+        Field namingField = 
NacosRegistryServiceImpl.class.getDeclaredField("naming");
+        namingField.setAccessible(true);
+        assertNull(namingField.get(null));
+
+        Field namingMaintainField = 
NacosRegistryServiceImpl.class.getDeclaredField("namingMaintain");
+        namingMaintainField.setAccessible(true);
+        assertNull(namingMaintainField.get(null));
+    }
+
 
 }
\ No newline at end of file
diff --git 
a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
 
b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
index 6c18a57e72..9a694aab62 100644
--- 
a/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-redis/src/main/java/org/apache/seata/discovery/registry/redis/RedisRegistryServiceImpl.java
@@ -52,7 +52,6 @@ import redis.clients.jedis.ScanResult;
 
 /**
  * The type Redis registry service.
- *
  */
 public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener> {
 
@@ -244,7 +243,7 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
                 switch (eventType) {
                     case RedisListener.REGISTER:
                         CollectionUtils.computeIfAbsent(CLUSTER_ADDRESS_MAP, 
clusterName, value -> ConcurrentHashMap.newKeySet(2))
-                            .add(NetUtil.toInetSocketAddress(serverAddr));
+                                .add(NetUtil.toInetSocketAddress(serverAddr));
                         break;
                     case RedisListener.UN_REGISTER:
                         removeServerAddressByPushEmptyProtection(clusterName, 
serverAddr);
@@ -258,13 +257,12 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
     }
 
     /**
-     *
      * if the serverAddr is unique in the address list and
      * the callback cluster is current cluster, then enable push-empty 
protection
      * Otherwise, remove it.
      *
      * @param notifyCluserName notifyCluserName
-     * @param serverAddr serverAddr
+     * @param serverAddr       serverAddr
      */
     private void removeServerAddressByPushEmptyProtection(String 
notifyCluserName, String serverAddr) {
 
@@ -288,9 +286,39 @@ public class RedisRegistryServiceImpl implements 
RegistryService<RedisListener>
 
     @Override
     public void close() {
-        threadPoolExecutorForSubscribe.shutdown();
-        threadPoolExecutorForUpdateMap.shutdown();
-        RegistryHeartBeats.close();
+        // Shut down the ThreadPoolExecutors
+        if (threadPoolExecutorForSubscribe != null && 
!threadPoolExecutorForSubscribe.isShutdown()) {
+            threadPoolExecutorForSubscribe.shutdown();
+
+            try {
+                if (!threadPoolExecutorForSubscribe.awaitTermination(5, 
TimeUnit.SECONDS)) {
+                    threadPoolExecutorForSubscribe.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                LOGGER.warn("ExecutorService threadPoolExecutorForSubscribe 
shutdown interrupted. Forcing shutdown.");
+                threadPoolExecutorForSubscribe.shutdownNow();
+            } finally {
+                threadPoolExecutorForSubscribe = null;
+            }
+        }
+
+        if (threadPoolExecutorForUpdateMap != null && 
!threadPoolExecutorForUpdateMap.isShutdown()) {
+            threadPoolExecutorForUpdateMap.shutdown();
+
+            try {
+                if (!threadPoolExecutorForUpdateMap.awaitTermination(5, 
TimeUnit.SECONDS)) {
+                    threadPoolExecutorForUpdateMap.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                LOGGER.warn("ExecutorService threadPoolExecutorForUpdateMap 
shutdown interrupted. Forcing shutdown.");
+                threadPoolExecutorForUpdateMap.shutdownNow();
+            } finally {
+                threadPoolExecutorForUpdateMap = null;
+            }
+        }
+
+        RegistryHeartBeats.close(REGISTRY_TYPE);
+
         jedisPool.destroy();
     }
 
diff --git 
a/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
 
b/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
index 36079b12e8..28c3f36a3d 100644
--- 
a/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
+++ 
b/discovery/seata-discovery-redis/src/test/java/org/apache/seata/discovery/registry/redis/RedisRegisterServiceImplTest.java
@@ -19,8 +19,12 @@ package org.apache.seata.discovery.registry.redis;
 import org.apache.seata.common.util.NetUtil;
 import org.apache.seata.config.Configuration;
 import org.apache.seata.config.ConfigurationFactory;
-import org.junit.jupiter.api.*;
-import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
 import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import org.mockito.MockedStatic;
 import org.mockito.internal.util.collections.Sets;
@@ -32,14 +36,19 @@ import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 
 @EnabledIfSystemProperty(named = "redisCaseEnabled", matches = "true")
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 public class RedisRegisterServiceImplTest {
 
     private static RedisRegistryServiceImpl redisRegistryService;
@@ -56,6 +65,7 @@ public class RedisRegisterServiceImplTest {
     }
 
     @Test
+    @Order(1)
     public void testFlow() {
 
         redisRegistryService.register(new 
InetSocketAddress(NetUtil.getLocalIp(), 8091));
@@ -68,6 +78,7 @@ public class RedisRegisterServiceImplTest {
     }
 
     @Test
+    @Order(2)
     public void testRemoveServerAddressByPushEmptyProtection()
             throws NoSuchFieldException, NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
 
@@ -80,7 +91,7 @@ public class RedisRegisterServiceImplTest {
         Field field = 
RedisRegistryServiceImpl.class.getDeclaredField("CLUSTER_ADDRESS_MAP");
         field.setAccessible(true);
 
-        ConcurrentMap<String, Set<InetSocketAddress>> CLUSTER_ADDRESS_MAP = 
(ConcurrentMap<String, Set<InetSocketAddress>>)field.get(null);
+        ConcurrentMap<String, Set<InetSocketAddress>> CLUSTER_ADDRESS_MAP = 
(ConcurrentMap<String, Set<InetSocketAddress>>) field.get(null);
         CLUSTER_ADDRESS_MAP.put("cluster", 
Sets.newSet(NetUtil.toInetSocketAddress("127.0.0.1:8091")));
 
         Method method = 
RedisRegistryServiceImpl.class.getDeclaredMethod("removeServerAddressByPushEmptyProtection",
 String.class, String.class);
@@ -100,4 +111,29 @@ public class RedisRegisterServiceImplTest {
         Assertions.assertEquals(0, CLUSTER_ADDRESS_MAP.get("cluster").size());
     }
 
+    @Test
+    @Order(3)
+    public void testClose() throws Exception {
+        Field executorServiceField1 = 
RedisRegistryServiceImpl.class.getDeclaredField("threadPoolExecutorForSubscribe");
+        executorServiceField1.setAccessible(true);
+        ScheduledExecutorService executorService1 = 
mock(ScheduledExecutorService.class);
+        when(executorService1.isShutdown()).thenReturn(false);
+        when(executorService1.awaitTermination(5, 
TimeUnit.SECONDS)).thenThrow(new InterruptedException("Test interruption"));
+        executorServiceField1.set(redisRegistryService, executorService1);
+
+        Field executorServiceField2 = 
RedisRegistryServiceImpl.class.getDeclaredField("threadPoolExecutorForUpdateMap");
+        executorServiceField2.setAccessible(true);
+        ScheduledExecutorService executorService2 = 
mock(ScheduledExecutorService.class);
+        when(executorService2.isShutdown()).thenReturn(false);
+        when(executorService2.awaitTermination(5, 
TimeUnit.SECONDS)).thenThrow(new InterruptedException("Test interruption"));
+        executorServiceField2.set(redisRegistryService, executorService2);
+
+        redisRegistryService.close();
+
+        verify(executorService1).shutdownNow();
+        verify(executorService2).shutdownNow();
+        assertNull(executorServiceField1.get(redisRegistryService));
+        assertNull(executorServiceField2.get(redisRegistryService));
+    }
+
 }
diff --git 
a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
 
b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
index fa0b428dda..e16aeb6453 100644
--- 
a/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
+++ 
b/discovery/seata-discovery-sofa/src/main/java/org/apache/seata/discovery/registry/sofa/SofaRegistryServiceImpl.java
@@ -47,7 +47,6 @@ import static 
org.apache.seata.config.ConfigurationKeys.FILE_ROOT_REGISTRY;
 
 /**
  * The type SOFARegistry registry service.
- *
  */
 public class SofaRegistryServiceImpl implements 
RegistryService<SubscriberDataObserver> {
 
@@ -206,6 +205,7 @@ public class SofaRegistryServiceImpl implements 
RegistryService<SubscriberDataOb
 
     @Override
     public void close() throws Exception {
+
     }
 
     private static Properties getNamingProperties() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to