This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git


The following commit(s) were added to refs/heads/develop by this push:
     new 495d573  [ISSUE #274] Fix startup exception (#276)
495d573 is described below

commit 495d573d1964b395c0b6098b8cc498a2ab7f6fd4
Author: yx9o <yangx_s...@163.com>
AuthorDate: Thu May 23 10:26:17 2024 +0800

    [ISSUE #274] Fix startup exception (#276)
    
    * fix: startup exception
    
    * fix: wrong unit test
    
    * Adjust loading order
    
    * Adjust loading order
    
    * Cherry pick #265
---
 distribution/conf/spring.xml                       |  2 +-
 .../mqtt/ds/meta/MetaPersistManagerSample.java     |  1 -
 .../rocketmq/mqtt/ds/notify/NotifyManager.java     | 15 +++++++-----
 .../ds/test/meta/TestMetaPersistManagerSample.java | 28 ++++++++++++++++------
 .../mqtt/ds/test/notify/TestNotifyManager.java     |  2 +-
 5 files changed, 32 insertions(+), 16 deletions(-)

diff --git a/distribution/conf/spring.xml b/distribution/conf/spring.xml
index 12a4bc6..cf90640 100644
--- a/distribution/conf/spring.xml
+++ b/distribution/conf/spring.xml
@@ -35,7 +35,7 @@
 
     <bean id="authManager" 
class="org.apache.rocketmq.mqtt.ds.auth.AuthManagerSample" init-method="init"/>
 
-    <bean id="metaPersistManager" 
class="org.apache.rocketmq.mqtt.ds.meta.MetaPersistManagerSample" 
init-method="init"/>
+    <bean id="metaPersistManager" 
class="org.apache.rocketmq.mqtt.ds.meta.MetaPersistManagerSample" 
init-method="init" depends-on="notifyManager"/>
 
     <bean id="RetainedPersistManager" 
class="org.apache.rocketmq.mqtt.ds.meta.RetainedPersistManagerImpl" 
init-method="init"/>
     <bean id="willMsgPersistManager" 
class="org.apache.rocketmq.mqtt.ds.meta.WillMsgPersistManagerImpl" 
init-method="init"/>
diff --git 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
index 97b85e1..ff396c0 100644
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.mqtt.ds.meta;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
diff --git 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
index e027fa5..d94b0f6 100644
--- 
a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
+++ 
b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java
@@ -46,9 +46,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -73,9 +73,7 @@ public class NotifyManager {
     private NettyRemotingClient remotingClient;
     private DefaultMQProducer defaultMQProducer;
 
-
-    @Resource
-    private ServiceConf serviceConf;
+    private final ServiceConf serviceConf;
 
     @Resource
     private MetaPersistManager metaPersistManager;
@@ -83,8 +81,13 @@ public class NotifyManager {
     @Resource
     private FirstTopicManager firstTopicManager;
 
-    @PostConstruct
-    public void init() throws MQClientException {
+    @Autowired
+    public NotifyManager(ServiceConf serviceConf) {
+        this.serviceConf = serviceConf;
+        init();
+    }
+
+    private void init() {
         defaultMQPushConsumer = 
MqFactory.buildDefaultMQPushConsumer(dispatcherConsumerGroup, 
serviceConf.getProperties(), new Dispatcher());
         defaultMQPushConsumer.setPullInterval(1);
         defaultMQPushConsumer.setConsumeMessageBatchMaxSize(64);
diff --git 
a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java
 
b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java
index 1c9eebf..5dad741 100644
--- 
a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java
+++ 
b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java
@@ -19,41 +19,55 @@ package org.apache.rocketmq.mqtt.ds.test.meta;
 
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.reflect.MethodUtils;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.mqtt.common.util.TopicUtils;
 import org.apache.rocketmq.mqtt.ds.meta.MetaPersistManagerSample;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.body.Connection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
 
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TestMetaPersistManagerSample {
+
     private static final String RMQ_NAMESPACE = "LMQ";
+
     private static final String KEY_LMQ_ALL_FIRST_TOPICS = "ALL_FIRST_TOPICS";
-    private static final String KEY_LMQ_CONNECT_NODES = "LMQ_CONNECT_NODES";
 
     @Test
-    public void refreshMeta() throws IllegalAccessException, 
RemotingException, InterruptedException, MQClientException, 
InvocationTargetException, NoSuchMethodException {
+    public void refreshMeta() throws IllegalAccessException, 
RemotingException, InterruptedException, MQClientException,
+        InvocationTargetException, NoSuchMethodException, MQBrokerException {
         MetaPersistManagerSample metaPersistManagerSample = new 
MetaPersistManagerSample();
         DefaultMQAdminExt defaultMQAdminExt = mock(DefaultMQAdminExt.class);
         FieldUtils.writeDeclaredField(metaPersistManagerSample, 
"defaultMQAdminExt", defaultMQAdminExt, true);
         String firstTopic = "test";
         String wildcards = "test/2/#";
         String node = "localhost";
+        Connection connection = new Connection();
+        connection.setClientAddr(node);
+        HashSet<Connection> connectNodeSet = new HashSet<>();
+        connectNodeSet.add(connection);
+        ConsumerConnection consumerConnection = new ConsumerConnection();
+        consumerConnection.setConnectionSet(connectNodeSet);
+
         
when(defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE,KEY_LMQ_ALL_FIRST_TOPICS)).thenReturn(firstTopic);
         
when(defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE,firstTopic)).thenReturn(wildcards);
-        
when(defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE,KEY_LMQ_CONNECT_NODES)).thenReturn(node);
+        
when(defaultMQAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(consumerConnection);
         MethodUtils.invokeMethod(metaPersistManagerSample, true, 
"refreshMeta");
-        
Assert.assertTrue(firstTopic.equals(metaPersistManagerSample.getAllFirstTopics().iterator().next()));
-        
Assert.assertTrue(TopicUtils.normalizeTopic(wildcards).equals(metaPersistManagerSample.getWildcards(firstTopic).iterator().next()));
-        
Assert.assertTrue(node.equals(metaPersistManagerSample.getConnectNodeSet().iterator().next()));
+        assertEquals(firstTopic, 
metaPersistManagerSample.getAllFirstTopics().iterator().next());
+        assertEquals(TopicUtils.normalizeTopic(wildcards), 
metaPersistManagerSample.getWildcards(firstTopic).iterator().next());
+        assertEquals("localhost", 
metaPersistManagerSample.getConnectNodeSet().iterator().next());
     }
 }
diff --git 
a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java
 
b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java
index 0e2d28a..fd17bd4 100644
--- 
a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java
+++ 
b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java
@@ -88,7 +88,7 @@ public class TestNotifyManager {
 
     @Before
     public void SetUp() throws IllegalAccessException {
-        notifyManager = new NotifyManager();
+        notifyManager = new NotifyManager(serviceConf);
         FieldUtils.writeDeclaredField(notifyManager, "metaPersistManager", 
metaPersistManager, true);
         FieldUtils.writeDeclaredField(notifyManager, "firstTopicManager", 
firstTopicManager, true);
         FieldUtils.writeDeclaredField(notifyManager, "defaultMQPushConsumer", 
defaultMQPushConsumer, true);

Reply via email to