This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 76cee72cc5 [ISSUE #9537] MQClientAPIFactory Implement 
NameServerUpdateCallback interface (#9538)
76cee72cc5 is described below

commit 76cee72cc5edae086cbe44ed2b00eb3753c120d5
Author: qianye <[email protected]>
AuthorDate: Fri Jul 11 14:31:15 2025 +0800

    [ISSUE #9537] MQClientAPIFactory Implement NameServerUpdateCallback 
interface (#9538)
---
 .../client/impl/mqclient/MQClientAPIFactory.java   |  19 ++-
 .../client/impl/mqclient/MQClientAPITest.java      | 146 +++++++++++++++++++++
 2 files changed, 161 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
index d85dcc70a5..28c1ad1930 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIFactory.java
@@ -17,19 +17,17 @@
 package org.apache.rocketmq.client.impl.mqclient;
 
 import com.google.common.base.Strings;
-
 import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.common.NameserverAccessConfig;
 import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
 import org.apache.rocketmq.common.ObjectCreator;
+import org.apache.rocketmq.common.utils.AsyncShutdownHelper;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
@@ -133,7 +131,9 @@ public class MQClientAPIFactory implements StartAndShutdown 
{
             remotingClientCreator
         );
 
-        if (!mqClientAPIExt.updateNameServerAddressList()) {
+        if (StringUtils.isEmpty(nameserverAccessConfig.getNamesrvDomain())) {
+            
mqClientAPIExt.updateNameServerAddressList(nameserverAccessConfig.getNamesrvAddr());
+        } else {
             mqClientAPIExt.fetchNameServerAddr();
             this.scheduledExecutorService.scheduleAtFixedRate(
                 mqClientAPIExt::fetchNameServerAddr,
@@ -142,7 +142,18 @@ public class MQClientAPIFactory implements 
StartAndShutdown {
                 TimeUnit.MILLISECONDS
             );
         }
+
         mqClientAPIExt.start();
         return mqClientAPIExt;
     }
+
+    public void onNameServerAddressChange(String namesrvAddress) {
+        for (MQClientAPIExt client : clients) {
+            client.onNameServerAddressChange(namesrvAddress);
+        }
+    }
+
+    public MQClientAPIExt[] getClients() {
+        return clients;
+    }
 }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPITest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPITest.java
new file mode 100644
index 0000000000..965c5ae16b
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPITest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.impl.mqclient;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.rocketmq.client.common.NameserverAccessConfig;
+import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class MQClientAPITest {
+
+    private NameserverAccessConfig nameserverAccessConfig;
+    private final ClientRemotingProcessor clientRemotingProcessor = new 
DoNothingClientRemotingProcessor(null);
+    private final RPCHook rpcHook = null;
+    private ScheduledExecutorService scheduledExecutorService;
+    private MQClientAPIFactory mqClientAPIFactory;
+
+    @BeforeEach
+    void setUp() {
+        scheduledExecutorService = 
ThreadUtils.newSingleThreadScheduledExecutor("TestScheduledExecutorService", 
true);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        scheduledExecutorService.shutdownNow();
+    }
+
+    @Test
+    void testInitWithNamesrvAddr() {
+        nameserverAccessConfig = new NameserverAccessConfig("127.0.0.1:9876", 
"", "");
+
+        mqClientAPIFactory = new MQClientAPIFactory(
+            nameserverAccessConfig,
+            "TestPrefix",
+            2,
+            clientRemotingProcessor,
+            rpcHook,
+            scheduledExecutorService
+        );
+
+        assertEquals("127.0.0.1:9876", 
System.getProperty("rocketmq.namesrv.addr"));
+    }
+
+    @Test
+    void testInitWithNamesrvDomain() {
+        nameserverAccessConfig = new NameserverAccessConfig("", "test-domain", 
"");
+
+        mqClientAPIFactory = new MQClientAPIFactory(
+            nameserverAccessConfig,
+            "TestPrefix",
+            2,
+            clientRemotingProcessor,
+            rpcHook,
+            scheduledExecutorService
+        );
+
+        assertEquals("test-domain", 
System.getProperty("rocketmq.namesrv.domain"));
+    }
+
+    @Test
+    void testInitThrowsExceptionWhenBothEmpty() {
+        nameserverAccessConfig = new NameserverAccessConfig("", "", "");
+
+        RuntimeException exception = assertThrows(RuntimeException.class, () 
-> new MQClientAPIFactory(
+            nameserverAccessConfig,
+            "TestPrefix",
+            2,
+            clientRemotingProcessor,
+            rpcHook,
+            scheduledExecutorService
+        ));
+
+        assertEquals("The configuration item NamesrvAddr is not configured", 
exception.getMessage());
+    }
+
+    @Test
+    void testStartCreatesClients() throws Exception {
+        nameserverAccessConfig = new NameserverAccessConfig("127.0.0.1:9876", 
"", "");
+
+        mqClientAPIFactory = new MQClientAPIFactory(
+            nameserverAccessConfig,
+            "TestPrefix",
+            2,
+            clientRemotingProcessor,
+            rpcHook,
+            scheduledExecutorService
+        );
+
+        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:123");
+
+        mqClientAPIFactory.start();
+
+        // Assert
+        MQClientAPIExt client = mqClientAPIFactory.getClient();
+        List<String> nameServerAddressList = client.getNameServerAddressList();
+        assertEquals(1, nameServerAddressList.size());
+        assertEquals("127.0.0.1:9876", nameServerAddressList.get(0));
+    }
+
+    @Test
+    void testOnNameServerAddressChangeUpdatesAllClients() throws Exception {
+        nameserverAccessConfig = new NameserverAccessConfig("127.0.0.1:9876", 
"", "");
+
+        mqClientAPIFactory = new MQClientAPIFactory(
+            nameserverAccessConfig,
+            "TestPrefix",
+            2,
+            clientRemotingProcessor,
+            rpcHook,
+            scheduledExecutorService
+        );
+        mqClientAPIFactory.start();
+
+        // Act
+        
mqClientAPIFactory.onNameServerAddressChange("new-address0;new-address1");
+
+        MQClientAPIExt client = mqClientAPIFactory.getClient();
+        List<String> nameServerAddressList = client.getNameServerAddressList();
+        assertEquals(2, nameServerAddressList.size());
+        assertEquals("new-address0", nameServerAddressList.get(0));
+    }
+}

Reply via email to