This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq-mqtt.git
The following commit(s) were added to refs/heads/develop by this push: new 495d573 [ISSUE #274] Fix startup exception (#276) 495d573 is described below commit 495d573d1964b395c0b6098b8cc498a2ab7f6fd4 Author: yx9o <yangx_s...@163.com> AuthorDate: Thu May 23 10:26:17 2024 +0800 [ISSUE #274] Fix startup exception (#276) * fix: startup exception * fix: wrong unit test * Adjust loading order * Adjust loading order * Cherry pick #265 --- distribution/conf/spring.xml | 2 +- .../mqtt/ds/meta/MetaPersistManagerSample.java | 1 - .../rocketmq/mqtt/ds/notify/NotifyManager.java | 15 +++++++----- .../ds/test/meta/TestMetaPersistManagerSample.java | 28 ++++++++++++++++------ .../mqtt/ds/test/notify/TestNotifyManager.java | 2 +- 5 files changed, 32 insertions(+), 16 deletions(-) diff --git a/distribution/conf/spring.xml b/distribution/conf/spring.xml index 12a4bc6..cf90640 100644 --- a/distribution/conf/spring.xml +++ b/distribution/conf/spring.xml @@ -35,7 +35,7 @@ <bean id="authManager" class="org.apache.rocketmq.mqtt.ds.auth.AuthManagerSample" init-method="init"/> - <bean id="metaPersistManager" class="org.apache.rocketmq.mqtt.ds.meta.MetaPersistManagerSample" init-method="init"/> + <bean id="metaPersistManager" class="org.apache.rocketmq.mqtt.ds.meta.MetaPersistManagerSample" init-method="init" depends-on="notifyManager"/> <bean id="RetainedPersistManager" class="org.apache.rocketmq.mqtt.ds.meta.RetainedPersistManagerImpl" init-method="init"/> <bean id="willMsgPersistManager" class="org.apache.rocketmq.mqtt.ds.meta.WillMsgPersistManagerImpl" init-method="init"/> diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java index 97b85e1..ff396c0 100644 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/meta/MetaPersistManagerSample.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.mqtt.ds.meta; -import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; diff --git a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java index e027fa5..d94b0f6 100644 --- a/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java +++ b/mqtt-ds/src/main/java/org/apache/rocketmq/mqtt/ds/notify/NotifyManager.java @@ -46,9 +46,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.HashSet; import java.util.Iterator; @@ -73,9 +73,7 @@ public class NotifyManager { private NettyRemotingClient remotingClient; private DefaultMQProducer defaultMQProducer; - - @Resource - private ServiceConf serviceConf; + private final ServiceConf serviceConf; @Resource private MetaPersistManager metaPersistManager; @@ -83,8 +81,13 @@ public class NotifyManager { @Resource private FirstTopicManager firstTopicManager; - @PostConstruct - public void init() throws MQClientException { + @Autowired + public NotifyManager(ServiceConf serviceConf) { + this.serviceConf = serviceConf; + init(); + } + + private void init() { defaultMQPushConsumer = MqFactory.buildDefaultMQPushConsumer(dispatcherConsumerGroup, serviceConf.getProperties(), new Dispatcher()); defaultMQPushConsumer.setPullInterval(1); defaultMQPushConsumer.setConsumeMessageBatchMaxSize(64); diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java index 1c9eebf..5dad741 100644 --- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java +++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/meta/TestMetaPersistManagerSample.java @@ -19,41 +19,55 @@ package org.apache.rocketmq.mqtt.ds.test.meta; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.reflect.MethodUtils; +import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.mqtt.common.util.TopicUtils; import org.apache.rocketmq.mqtt.ds.meta.MetaPersistManagerSample; import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.body.Connection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; import java.lang.reflect.InvocationTargetException; +import java.util.HashSet; +import static org.mockito.ArgumentMatchers.anyString; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class TestMetaPersistManagerSample { + private static final String RMQ_NAMESPACE = "LMQ"; + private static final String KEY_LMQ_ALL_FIRST_TOPICS = "ALL_FIRST_TOPICS"; - private static final String KEY_LMQ_CONNECT_NODES = "LMQ_CONNECT_NODES"; @Test - public void refreshMeta() throws IllegalAccessException, RemotingException, InterruptedException, MQClientException, InvocationTargetException, NoSuchMethodException { + public void refreshMeta() throws IllegalAccessException, RemotingException, InterruptedException, MQClientException, + InvocationTargetException, NoSuchMethodException, MQBrokerException { MetaPersistManagerSample metaPersistManagerSample = new MetaPersistManagerSample(); DefaultMQAdminExt defaultMQAdminExt = mock(DefaultMQAdminExt.class); FieldUtils.writeDeclaredField(metaPersistManagerSample, "defaultMQAdminExt", defaultMQAdminExt, true); String firstTopic = "test"; String wildcards = "test/2/#"; String node = "localhost"; + Connection connection = new Connection(); + connection.setClientAddr(node); + HashSet<Connection> connectNodeSet = new HashSet<>(); + connectNodeSet.add(connection); + ConsumerConnection consumerConnection = new ConsumerConnection(); + consumerConnection.setConnectionSet(connectNodeSet); + when(defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE,KEY_LMQ_ALL_FIRST_TOPICS)).thenReturn(firstTopic); when(defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE,firstTopic)).thenReturn(wildcards); - when(defaultMQAdminExt.getKVConfig(RMQ_NAMESPACE,KEY_LMQ_CONNECT_NODES)).thenReturn(node); + when(defaultMQAdminExt.examineConsumerConnectionInfo(anyString())).thenReturn(consumerConnection); MethodUtils.invokeMethod(metaPersistManagerSample, true, "refreshMeta"); - Assert.assertTrue(firstTopic.equals(metaPersistManagerSample.getAllFirstTopics().iterator().next())); - Assert.assertTrue(TopicUtils.normalizeTopic(wildcards).equals(metaPersistManagerSample.getWildcards(firstTopic).iterator().next())); - Assert.assertTrue(node.equals(metaPersistManagerSample.getConnectNodeSet().iterator().next())); + assertEquals(firstTopic, metaPersistManagerSample.getAllFirstTopics().iterator().next()); + assertEquals(TopicUtils.normalizeTopic(wildcards), metaPersistManagerSample.getWildcards(firstTopic).iterator().next()); + assertEquals("localhost", metaPersistManagerSample.getConnectNodeSet().iterator().next()); } } diff --git a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java index 0e2d28a..fd17bd4 100644 --- a/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java +++ b/mqtt-ds/src/test/java/org/apache/rocketmq/mqtt/ds/test/notify/TestNotifyManager.java @@ -88,7 +88,7 @@ public class TestNotifyManager { @Before public void SetUp() throws IllegalAccessException { - notifyManager = new NotifyManager(); + notifyManager = new NotifyManager(serviceConf); FieldUtils.writeDeclaredField(notifyManager, "metaPersistManager", metaPersistManager, true); FieldUtils.writeDeclaredField(notifyManager, "firstTopicManager", firstTopicManager, true); FieldUtils.writeDeclaredField(notifyManager, "defaultMQPushConsumer", defaultMQPushConsumer, true);