This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 047ef7498f Ensuring consistency between broker and nameserver data 
when deleting a topic (#7066)
047ef7498f is described below

commit 047ef7498f2203a2234052603a99a114d8a65e17
Author: rongtong <[email protected]>
AuthorDate: Tue Jul 25 14:00:22 2023 +0800

    Ensuring consistency between broker and nameserver data when deleting a 
topic (#7066)
    
    Co-authored-by: 尘央 <[email protected]>
---
 .../apache/rocketmq/broker/BrokerController.java   |  11 ++
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  62 +++++++++++
 .../broker/processor/AdminBrokerProcessor.java     |  26 +++--
 .../rocketmq/broker/topic/TopicConfigManager.java  |   6 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |  14 +++
 .../rocketmq/common/namesrv/NamesrvConfig.java     |  17 +++
 .../namesrv/routeinfo/RouteInfoManager.java        |  64 ++++++++++--
 .../namesrv/routeinfo/RouteInfoManagerNewTest.java |  99 ++++++++++++++++++
 .../rocketmq/test/util/MQAdminTestUtils.java       |  37 +++++++
 .../dledger/DLedgerProduceAndConsumeIT.java        |   2 +-
 .../test/route/CreateAndUpdateTopicIT.java         | 114 +++++++++++++++++++++
 11 files changed, 429 insertions(+), 23 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 196401e268..972457194f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1678,6 +1678,17 @@ public class BrokerController {
         }, 1000, brokerConfig.getBrokerHeartbeatInterval(), 
TimeUnit.MILLISECONDS));
     }
 
+    public synchronized void registerSingleTopicAll(final TopicConfig 
topicConfig) {
+        TopicConfig tmpTopic = topicConfig;
+        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+            || 
!PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+            // Copy the topic config and modify the perm
+            tmpTopic = new TopicConfig(topicConfig);
+            tmpTopic.setPerm(topicConfig.getPerm() & 
this.brokerConfig.getBrokerPermission());
+        }
+        
this.brokerOuterAPI.registerSingleTopicAll(this.brokerConfig.getBrokerName(), 
tmpTopic, 3000);
+    }
+
     public synchronized void registerIncrementBrokerData(TopicConfig 
topicConfig, DataVersion dataVersion) {
         
this.registerIncrementBrokerData(Collections.singletonList(topicConfig), 
dataVersion);
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index b6273e9ed5..1793a83c05 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.LockCallback;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UnlockCallback;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -120,12 +121,14 @@ import 
org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequ
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.rpc.ClientMetadata;
 import org.apache.rocketmq.remoting.rpc.RpcClient;
@@ -614,6 +617,65 @@ public class BrokerOuterAPI {
         throw new MQBrokerException(response.getCode(), response.getRemark(), 
brokerAddr);
     }
 
+    /**
+     * Register the topic route info of single topic to all name server nodes.
+     * This method is used to replace incremental broker registration feature.
+     */
+    public void registerSingleTopicAll(
+        final String brokerName,
+        final TopicConfig topicConfig,
+        final int timeoutMills) {
+        String topic = topicConfig.getTopicName();
+        RegisterTopicRequestHeader requestHeader = new 
RegisterTopicRequestHeader();
+        requestHeader.setTopic(topic);
+
+        TopicRouteData topicRouteData = new TopicRouteData();
+        List<QueueData> queueDatas = new ArrayList<>();
+        topicRouteData.setQueueDatas(queueDatas);
+
+        final QueueData queueData = new QueueData();
+        queueData.setBrokerName(brokerName);
+        queueData.setPerm(topicConfig.getPerm());
+        queueData.setReadQueueNums(topicConfig.getReadQueueNums());
+        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
+        queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());
+        queueDatas.add(queueData);
+        final byte[] topicRouteBody = topicRouteData.encode();
+
+
+        List<String> nameServerAddressList = 
this.remotingClient.getNameServerAddressList();
+        final CountDownLatch countDownLatch = new 
CountDownLatch(nameServerAddressList.size());
+        for (final String namesrvAddr : nameServerAddressList) {
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.REGISTER_TOPIC_IN_NAMESRV, 
requestHeader);
+            request.setBody(topicRouteBody);
+
+            try {
+                brokerOuterExecutor.execute(() -> {
+                    try {
+                        RemotingCommand response = 
BrokerOuterAPI.this.remotingClient.invokeSync(namesrvAddr, request, 
timeoutMills);
+                        assert response != null;
+                        LOGGER.info("Register single topic %s to broker %s 
with response code %s", topic, brokerName, response.getCode());
+                    } catch (Exception e) {
+                        LOGGER.warn(String.format("Register single topic %s to 
broker %s exception", topic, brokerName), e);
+                    } finally {
+                        countDownLatch.countDown();
+                    }
+                });
+            } catch (Exception e) {
+                LOGGER.warn("Execute single topic registration task failed, 
topic {}, broker name {}", topic, brokerName);
+                countDownLatch.countDown();
+            }
+
+        }
+
+        try {
+            if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
+                LOGGER.warn("Registration single topic to one or more name 
servers timeout. Timeout threshold: {}ms", timeoutMills);
+            }
+        } catch (InterruptedException ignore) {
+        }
+    }
+
     public List<Boolean> needRegister(
         final String clusterName,
         final String brokerAddr,
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 892a713308..569a1c57bd 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -441,13 +441,18 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
 
         try {
             
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
-            this.brokerController.registerIncrementBrokerData(topicConfig, 
this.brokerController.getTopicConfigManager().getDataVersion());
+            if 
(brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+                this.brokerController.registerSingleTopicAll(topicConfig);
+            } else {
+                this.brokerController.registerIncrementBrokerData(topicConfig, 
this.brokerController.getTopicConfigManager().getDataVersion());
+            }
             response.setCode(ResponseCode.SUCCESS);
         } catch (Exception e) {
             LOGGER.error("Update / create topic failed for [{}]", request, e);
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(e.getMessage());
         }
+
         return response;
     }
 
@@ -769,7 +774,8 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    private synchronized RemotingCommand 
updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, RemotingCommand 
request) {
+    private synchronized RemotingCommand 
updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx,
+        RemotingCommand request) {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         LOGGER.info("updateColdDataFlowCtrGroupConfig called by {}", 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
 
@@ -876,7 +882,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             }
             MessageStore messageStore = 
this.brokerController.getMessageStore();
             if (messageStore instanceof DefaultMessageStore) {
-                DefaultMessageStore defaultMessageStore = 
(DefaultMessageStore)messageStore;
+                DefaultMessageStore defaultMessageStore = 
(DefaultMessageStore) messageStore;
                 if (mode == LibC.MADV_NORMAL) {
                     
defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(true);
                 } else {
@@ -1835,13 +1841,13 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
     /**
      * Reset consumer offset.
      *
-     * @param topic     Required, not null.
-     * @param group     Required, not null.
-     * @param queueId   if target queue ID is negative, all message queues 
will be reset;
-     *                  otherwise, only the target queue would get reset.
-     * @param timestamp if timestamp is negative, offset would be reset to 
broker offset at the time being;
-     *                  otherwise, binary search is performed to locate target 
offset.
-     * @param offset    Target offset to reset to if target queue ID is 
properly provided.
+     * @param topic Required, not null.
+     * @param group Required, not null.
+     * @param queueId if target queue ID is negative, all message queues will 
be reset; otherwise, only the target queue
+     * would get reset.
+     * @param timestamp if timestamp is negative, offset would be reset to 
broker offset at the time being; otherwise,
+     * binary search is performed to locate target offset.
+     * @param offset Target offset to reset to if target queue ID is properly 
provided.
      * @return Affected queues and their new offset
      */
     private RemotingCommand resetOffsetInner(String topic, String group, int 
queueId, long timestamp, Long offset) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index e5fdd8675f..e905305129 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -305,7 +305,11 @@ public class TopicConfigManager extends ConfigManager {
             log.error("createTopicIfAbsent ", e);
         }
         if (createNew && register) {
-            this.brokerController.registerIncrementBrokerData(topicConfig, 
dataVersion);
+            if 
(brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+                this.brokerController.registerSingleTopicAll(topicConfig);
+            } else {
+                this.brokerController.registerIncrementBrokerData(topicConfig, 
dataVersion);
+            }
         }
         return this.topicConfigTable.get(topicConfig.getTopicName());
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a4d82d1c53..02c692e2b2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -386,6 +386,12 @@ public class BrokerConfig extends BrokerIdentity {
      */
     private boolean popResponseReturnActualRetryTopic = false;
 
+    /**
+     * If both the deleteTopicWithBrokerRegistration flag in the NameServer 
configuration and this flag are set to true,
+     * it guarantees the ultimate consistency of data between the broker and 
the nameserver during topic deletion.
+     */
+    private boolean enableSingleTopicRegister = false;
+
     public long getMaxPopPollingSize() {
         return maxPopPollingSize;
     }
@@ -1689,4 +1695,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setPopResponseReturnActualRetryTopic(boolean 
popResponseReturnActualRetryTopic) {
         this.popResponseReturnActualRetryTopic = 
popResponseReturnActualRetryTopic;
     }
+
+    public boolean isEnableSingleTopicRegister() {
+        return enableSingleTopicRegister;
+    }
+
+    public void setEnableSingleTopicRegister(boolean 
enableSingleTopicRegister) {
+        this.enableSingleTopicRegister = enableSingleTopicRegister;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index 700febfe27..5b8a6dedb7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -82,6 +82,15 @@ public class NamesrvConfig {
 
     private int waitSecondsForService = 45;
 
+    /**
+     * If enable this flag, the topics that don't exist in broker registration 
payload will be deleted from name server.
+     *
+     * WARNING:
+     * 1. Enable this flag and "enableSingleTopicRegister" of broker config 
meanwhile to avoid losing topic route info unexpectedly.
+     * 2. This flag does not support static topic currently.
+     */
+    private boolean deleteTopicWithBrokerRegistration = false;
+
     public boolean isOrderMessageEnable() {
         return orderMessageEnable;
     }
@@ -241,4 +250,12 @@ public class NamesrvConfig {
     public void setWaitSecondsForService(int waitSecondsForService) {
         this.waitSecondsForService = waitSecondsForService;
     }
+
+    public boolean isDeleteTopicWithBrokerRegistration() {
+        return deleteTopicWithBrokerRegistration;
+    }
+
+    public void setDeleteTopicWithBrokerRegistration(boolean 
deleteTopicWithBrokerRegistration) {
+        this.deleteTopicWithBrokerRegistration = 
deleteTopicWithBrokerRegistration;
+    }
 }
diff --git 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index ac27d76ce1..0055a1cc8e 100644
--- 
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ 
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -121,9 +121,18 @@ public class RouteInfoManager {
         if (queueDatas == null || queueDatas.isEmpty()) {
             return;
         }
+
         try {
             this.lock.writeLock().lockInterruptibly();
             if (this.topicQueueTable.containsKey(topic)) {
+                Map<String, QueueData> queueDataMap  = 
this.topicQueueTable.get(topic);
+                for (QueueData queueData : queueDatas) {
+                    if 
(!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
+                        log.warn("Register topic contains illegal broker, {}, 
{}", topic, queueData);
+                        return;
+                    }
+                    queueDataMap.put(queueData.getBrokerName(), queueData);
+                }
                 log.info("Topic route already exist.{}, {}", topic, 
this.topicQueueTable.get(topic));
             } else {
                 // check and construct queue data map
@@ -299,7 +308,32 @@ public class RouteInfoManager {
 
                 ConcurrentMap<String, TopicConfig> tcTable =
                     topicConfigWrapper.getTopicConfigTable();
+
                 if (tcTable != null) {
+
+                    TopicConfigAndMappingSerializeWrapper 
mappingSerializeWrapper = 
TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
+                    Map<String, TopicQueueMappingInfo> 
topicQueueMappingInfoMap = 
mappingSerializeWrapper.getTopicQueueMappingInfoMap();
+
+                    // Delete the topics that don't exist in tcTable from the 
current broker
+                    // Static topic is not supported currently
+                    if (namesrvConfig.isDeleteTopicWithBrokerRegistration() && 
topicQueueMappingInfoMap.isEmpty()) {
+                        final Set<String> oldTopicSet = 
topicSetOfBrokerName(brokerName);
+                        final Set<String> newTopicSet = tcTable.keySet();
+                        final Sets.SetView<String> toDeleteTopics = 
Sets.difference(oldTopicSet, newTopicSet);
+                        for (final String toDeleteTopic : toDeleteTopics) {
+                            Map<String, QueueData> queueDataMap = 
topicQueueTable.get(toDeleteTopic);
+                            final QueueData removedQD = 
queueDataMap.remove(brokerName);
+                            if (removedQD != null) {
+                                log.info("deleteTopic, remove one broker's 
topic {} {} {}", brokerName, toDeleteTopic, removedQD);
+                            }
+
+                            if (queueDataMap.isEmpty()) {
+                                log.info("deleteTopic, remove the topic all 
queue {}", toDeleteTopic);
+                                topicQueueTable.remove(toDeleteTopic);
+                            }
+                        }
+                    }
+
                     for (Map.Entry<String, TopicConfig> entry : 
tcTable.entrySet()) {
                         if (registerFirst || 
this.isTopicConfigChanged(clusterName, brokerAddr,
                             topicConfigWrapper.getDataVersion(), brokerName,
@@ -312,19 +346,17 @@ public class RouteInfoManager {
                             this.createAndUpdateQueueData(brokerName, 
topicConfig);
                         }
                     }
-                }
 
-                if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr, 
topicConfigWrapper.getDataVersion()) || registerFirst) {
-                    TopicConfigAndMappingSerializeWrapper 
mappingSerializeWrapper = 
TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
-                    Map<String, TopicQueueMappingInfo> 
topicQueueMappingInfoMap = 
mappingSerializeWrapper.getTopicQueueMappingInfoMap();
-                    //the topicQueueMappingInfoMap should never be null, but 
can be empty
-                    for (Map.Entry<String, TopicQueueMappingInfo> entry : 
topicQueueMappingInfoMap.entrySet()) {
-                        if 
(!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
-                            topicQueueMappingInfoTable.put(entry.getKey(), new 
HashMap<>());
+                    if (this.isBrokerTopicConfigChanged(clusterName, 
brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
+                        //the topicQueueMappingInfoMap should never be null, 
but can be empty
+                        for (Map.Entry<String, TopicQueueMappingInfo> entry : 
topicQueueMappingInfoMap.entrySet()) {
+                            if 
(!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
+                                topicQueueMappingInfoTable.put(entry.getKey(), 
new HashMap<>());
+                            }
+                            //Note asset brokerName equal 
entry.getValue().getBname()
+                            //here use the mappingDetail.bname
+                            
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), 
entry.getValue());
                         }
-                        //Note asset brokerName equal 
entry.getValue().getBname()
-                        //here use the mappingDetail.bname
-                        
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(), 
entry.getValue());
                     }
                 }
             }
@@ -374,6 +406,16 @@ public class RouteInfoManager {
         return result;
     }
 
+    private Set<String> topicSetOfBrokerName(final String brokerName) {
+        Set<String> topicOfBroker = new HashSet<>();
+        for (final Entry<String, Map<String, QueueData>> entry : 
this.topicQueueTable.entrySet()) {
+            if (entry.getValue().containsKey(brokerName)) {
+                topicOfBroker.add(entry.getKey());
+            }
+        }
+        return topicOfBroker;
+    }
+
     public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String 
brokerName) {
         BrokerMemberGroup groupMember = new BrokerMemberGroup(clusterName, 
brokerName);
         try {
diff --git 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
index b53519e5f6..6002d1f5a4 100644
--- 
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
+++ 
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
@@ -22,6 +22,7 @@ import io.netty.channel.Channel;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -37,6 +38,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
 import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 import org.junit.After;
 import org.junit.Before;
@@ -624,6 +626,92 @@ public class RouteInfoManagerNewTest {
             .containsValues(BrokerBasicInfo.defaultBroker().brokerAddr, 
BrokerBasicInfo.slaveBroker().brokerAddr);
     }
 
+    @Test
+    public void keepTopicWithBrokerRegistration() {
+        RegisterBrokerResult masterResult = 
registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic", 
"TestTopic1");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+        masterResult = 
registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(),  "TestTopic1");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+    }
+
+    @Test
+    public void deleteTopicWithBrokerRegistration() {
+        config.setDeleteTopicWithBrokerRegistration(true);
+        registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), 
"TestTopic", "TestTopic1");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+        registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(),  
"TestTopic1");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+    }
+
+    @Test
+    public void deleteTopicWithBrokerRegistration2() {
+        // Register two brokers and delete a specific one by one
+        config.setDeleteTopicWithBrokerRegistration(true);
+        final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker();
+        final BrokerBasicInfo master2 = 
BrokerBasicInfo.defaultBroker().name(DEFAULT_BROKER + 1).addr(DEFAULT_ADDR + 9);
+
+        registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1");
+        registerBrokerWithNormalTopic(master2, "TestTopic", "TestTopic1");
+
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(2);
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
+
+
+        registerBrokerWithNormalTopic(master1,  "TestTopic1");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(1);
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas().get(0).getBrokerName())
+            .isEqualTo(master2.brokerName);
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
+
+        registerBrokerWithNormalTopic(master2,  "TestTopic1");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
+    }
+
+    @Test
+    public void registerSingleTopicWithBrokerRegistration() {
+        config.setDeleteTopicWithBrokerRegistration(true);
+        final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker();
+
+        registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic");
+
+        // Single topic registration failed because there is no broker 
connection exists
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+
+        // Register broker with TestTopic first and then register single topic 
TestTopic1
+        registerBrokerWithNormalTopic(master1, "TestTopic");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+
+        registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+        // Register the two topics to keep the route info
+        registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+        // Cancel the TestTopic1 with broker registration
+        registerBrokerWithNormalTopic(master1, "TestTopic");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
+
+        // Add TestTopic1 and cancel all the topics with broker un-registration
+        registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1");
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+        routeInfoManager.unregisterBroker(master1.clusterName, 
master1.brokerAddr, master1.brokerName, 0);
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+        
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
+
+
+    }
+
     private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo 
brokerInfo, String... topics) {
         ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = 
new ConcurrentHashMap<>();
         TopicConfig baseTopic = new TopicConfig("baseTopic");
@@ -711,6 +799,17 @@ public class RouteInfoManagerNewTest {
         return registerBrokerResult;
     }
 
+    private void registerSingleTopicWithBrokerName(String brokerName, 
String... topics) {
+        for (final String topic : topics) {
+            QueueData queueData = new QueueData();
+            queueData.setBrokerName(brokerName);
+            queueData.setReadQueueNums(8);
+            queueData.setWriteQueueNums(8);
+            queueData.setPerm(6);
+            routeInfoManager.registerTopic(topic, 
Collections.singletonList(queueData));
+        }
+    }
+
     static class BrokerBasicInfo {
         String clusterName;
         String brokerName;
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java 
b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 11b00a72c6..d3d5de9e27 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.test.util;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -38,6 +39,7 @@ import 
org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
 import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
 import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingOne;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
@@ -319,4 +321,39 @@ public class MQAdminTestUtils {
         }
         return consumeStats;
     }
+
+    /**
+     * Delete topic from broker only without cleaning route info from name 
server forwardly
+     *
+     * @param nameSrvAddr the namesrv addr to connect
+     * @param brokerName the specific broker
+     * @param topic the specific topic to delete
+     */
+    public static void deleteTopicFromBrokerOnly(String nameSrvAddr, String 
brokerName, String topic) {
+        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.setNamesrvAddr(nameSrvAddr);
+
+        try {
+            mqAdminExt.start();
+            String brokerAddr = 
CommandUtil.fetchMasterAddrByBrokerName(mqAdminExt, brokerName);
+            mqAdminExt.deleteTopicInBroker(Collections.singleton(brokerAddr), 
topic);
+        } catch (Exception ignored) {
+        } finally {
+            mqAdminExt.shutdown();
+        }
+    }
+
+    public static TopicRouteData examineTopicRouteInfo(String nameSrvAddr, 
String topicName) {
+        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.setNamesrvAddr(nameSrvAddr);
+        TopicRouteData route = null;
+        try {
+            mqAdminExt.start();
+            route = mqAdminExt.examineTopicRouteInfo(topicName);
+        } catch (Exception ignored) {
+        } finally {
+            mqAdminExt.shutdown();
+        }
+        return route;
+    }
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
similarity index 99%
rename from 
test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
rename to 
test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
index 9e142eb617..43fefd6166 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.rocketmq.test.base.dledger;
+package org.apache.rocketmq.test.dledger;
 
 import java.util.UUID;
 import org.apache.rocketmq.broker.BrokerController;
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java 
b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
new file mode 100644
index 0000000000..7e3c7b871d
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.route;
+
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CreateAndUpdateTopicIT extends BaseConf {
+
+    @Test
+    public void testCreateOrUpdateTopic_EnableSingleTopicRegistration() {
+        String topic = "test-topic-without-broker-registration";
+        brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
+        brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
+        brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
+
+        final boolean createResult = 
MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, topic, 8, null);
+        assertThat(createResult).isTrue();
+
+        TopicRouteData route = 
MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, topic);
+        assertThat(route.getBrokerDatas()).hasSize(3);
+        assertThat(route.getQueueDatas()).hasSize(3);
+
+        
brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
+        
brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
+        
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
+
+    }
+
+    @Test
+    public void testDeleteTopicFromNameSrvWithBrokerRegistration() {
+        
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
+        brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
+        brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
+        brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
+
+        String testTopic1 = "test-topic-keep-route";
+        String testTopic2 = "test-topic-delete-route";
+
+        boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, 
CLUSTER_NAME, testTopic1, 8, null);
+        assertThat(createResult).isTrue();
+
+
+        createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, 
CLUSTER_NAME, testTopic2, 8, null);
+        assertThat(createResult).isTrue();
+
+
+        TopicRouteData route = 
MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
+        assertThat(route.getBrokerDatas()).hasSize(3);
+
+        MQAdminTestUtils.deleteTopicFromBrokerOnly(NAMESRV_ADDR, BROKER1_NAME, 
testTopic2);
+
+        // Deletion is lazy, trigger broker registration
+        brokerController1.registerBrokerAll(false, false, true);
+
+        // The route info of testTopic2 will be removed from broker1 after the 
registration
+        route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, 
testTopic2);
+        assertThat(route.getBrokerDatas()).hasSize(2);
+        
assertThat(route.getQueueDatas().get(0).getBrokerName()).isEqualTo(BROKER2_NAME);
+        
assertThat(route.getQueueDatas().get(1).getBrokerName()).isEqualTo(BROKER3_NAME);
+
+        
brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
+        
brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
+        
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
+        
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
+    }
+
+    @Test
+    public void testStaticTopicNotAffected() throws Exception {
+        
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
+        brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
+        brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
+        brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
+
+        String testTopic = "test-topic-not-affected";
+        String testStaticTopic = "test-static-topic";
+
+        boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR, 
CLUSTER_NAME, testTopic, 8, null);
+        assertThat(createResult).isTrue();
+
+        TopicRouteData route = 
MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic);
+        assertThat(route.getBrokerDatas()).hasSize(3);
+        assertThat(route.getQueueDatas()).hasSize(3);
+
+        MQAdminTestUtils.createStaticTopicWithCommand(testStaticTopic, 10, 
null, CLUSTER_NAME, NAMESRV_ADDR);
+
+        assertThat(route.getBrokerDatas()).hasSize(3);
+        assertThat(route.getQueueDatas()).hasSize(3);
+
+        
brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
+        
brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
+        
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
+        
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
+    }
+}


Reply via email to