This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 047ef7498f Ensuring consistency between broker and nameserver data
when deleting a topic (#7066)
047ef7498f is described below
commit 047ef7498f2203a2234052603a99a114d8a65e17
Author: rongtong <[email protected]>
AuthorDate: Tue Jul 25 14:00:22 2023 +0800
Ensuring consistency between broker and nameserver data when deleting a
topic (#7066)
Co-authored-by: 尘央 <[email protected]>
---
.../apache/rocketmq/broker/BrokerController.java | 11 ++
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 62 +++++++++++
.../broker/processor/AdminBrokerProcessor.java | 26 +++--
.../rocketmq/broker/topic/TopicConfigManager.java | 6 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 14 +++
.../rocketmq/common/namesrv/NamesrvConfig.java | 17 +++
.../namesrv/routeinfo/RouteInfoManager.java | 64 ++++++++++--
.../namesrv/routeinfo/RouteInfoManagerNewTest.java | 99 ++++++++++++++++++
.../rocketmq/test/util/MQAdminTestUtils.java | 37 +++++++
.../dledger/DLedgerProduceAndConsumeIT.java | 2 +-
.../test/route/CreateAndUpdateTopicIT.java | 114 +++++++++++++++++++++
11 files changed, 429 insertions(+), 23 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 196401e268..972457194f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1678,6 +1678,17 @@ public class BrokerController {
}, 1000, brokerConfig.getBrokerHeartbeatInterval(),
TimeUnit.MILLISECONDS));
}
+ public synchronized void registerSingleTopicAll(final TopicConfig
topicConfig) {
+ TopicConfig tmpTopic = topicConfig;
+ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+ ||
!PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+ // Copy the topic config and modify the perm
+ tmpTopic = new TopicConfig(topicConfig);
+ tmpTopic.setPerm(topicConfig.getPerm() &
this.brokerConfig.getBrokerPermission());
+ }
+
this.brokerOuterAPI.registerSingleTopicAll(this.brokerConfig.getBrokerName(),
tmpTopic, 3000);
+ }
+
public synchronized void registerIncrementBrokerData(TopicConfig
topicConfig, DataVersion dataVersion) {
this.registerIncrementBrokerData(Collections.singletonList(topicConfig),
dataVersion);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index b6273e9ed5..1793a83c05 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -120,12 +121,14 @@ import
org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionRequ
import
org.apache.rocketmq.remoting.protocol.header.namesrv.QueryDataVersionResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.namesrv.RegisterTopicRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.rpc.ClientMetadata;
import org.apache.rocketmq.remoting.rpc.RpcClient;
@@ -614,6 +617,65 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark(),
brokerAddr);
}
+ /**
+ * Register the topic route info of single topic to all name server nodes.
+ * This method is used to replace incremental broker registration feature.
+ */
+ public void registerSingleTopicAll(
+ final String brokerName,
+ final TopicConfig topicConfig,
+ final int timeoutMills) {
+ String topic = topicConfig.getTopicName();
+ RegisterTopicRequestHeader requestHeader = new
RegisterTopicRequestHeader();
+ requestHeader.setTopic(topic);
+
+ TopicRouteData topicRouteData = new TopicRouteData();
+ List<QueueData> queueDatas = new ArrayList<>();
+ topicRouteData.setQueueDatas(queueDatas);
+
+ final QueueData queueData = new QueueData();
+ queueData.setBrokerName(brokerName);
+ queueData.setPerm(topicConfig.getPerm());
+ queueData.setReadQueueNums(topicConfig.getReadQueueNums());
+ queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
+ queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());
+ queueDatas.add(queueData);
+ final byte[] topicRouteBody = topicRouteData.encode();
+
+
+ List<String> nameServerAddressList =
this.remotingClient.getNameServerAddressList();
+ final CountDownLatch countDownLatch = new
CountDownLatch(nameServerAddressList.size());
+ for (final String namesrvAddr : nameServerAddressList) {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.REGISTER_TOPIC_IN_NAMESRV,
requestHeader);
+ request.setBody(topicRouteBody);
+
+ try {
+ brokerOuterExecutor.execute(() -> {
+ try {
+ RemotingCommand response =
BrokerOuterAPI.this.remotingClient.invokeSync(namesrvAddr, request,
timeoutMills);
+ assert response != null;
+ LOGGER.info("Register single topic %s to broker %s
with response code %s", topic, brokerName, response.getCode());
+ } catch (Exception e) {
+ LOGGER.warn(String.format("Register single topic %s to
broker %s exception", topic, brokerName), e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ });
+ } catch (Exception e) {
+ LOGGER.warn("Execute single topic registration task failed,
topic {}, broker name {}", topic, brokerName);
+ countDownLatch.countDown();
+ }
+
+ }
+
+ try {
+ if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
+ LOGGER.warn("Registration single topic to one or more name
servers timeout. Timeout threshold: {}ms", timeoutMills);
+ }
+ } catch (InterruptedException ignore) {
+ }
+ }
+
public List<Boolean> needRegister(
final String clusterName,
final String brokerAddr,
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 892a713308..569a1c57bd 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -441,13 +441,18 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
try {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- this.brokerController.registerIncrementBrokerData(topicConfig,
this.brokerController.getTopicConfigManager().getDataVersion());
+ if
(brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig,
this.brokerController.getTopicConfigManager().getDataVersion());
+ }
response.setCode(ResponseCode.SUCCESS);
} catch (Exception e) {
LOGGER.error("Update / create topic failed for [{}]", request, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
}
+
return response;
}
@@ -769,7 +774,8 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
return response;
}
- private synchronized RemotingCommand
updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx, RemotingCommand
request) {
+ private synchronized RemotingCommand
updateColdDataFlowCtrGroupConfig(ChannelHandlerContext ctx,
+ RemotingCommand request) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
LOGGER.info("updateColdDataFlowCtrGroupConfig called by {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
@@ -876,7 +882,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
}
MessageStore messageStore =
this.brokerController.getMessageStore();
if (messageStore instanceof DefaultMessageStore) {
- DefaultMessageStore defaultMessageStore =
(DefaultMessageStore)messageStore;
+ DefaultMessageStore defaultMessageStore =
(DefaultMessageStore) messageStore;
if (mode == LibC.MADV_NORMAL) {
defaultMessageStore.getMessageStoreConfig().setDataReadAheadEnable(true);
} else {
@@ -1835,13 +1841,13 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
/**
* Reset consumer offset.
*
- * @param topic Required, not null.
- * @param group Required, not null.
- * @param queueId if target queue ID is negative, all message queues
will be reset;
- * otherwise, only the target queue would get reset.
- * @param timestamp if timestamp is negative, offset would be reset to
broker offset at the time being;
- * otherwise, binary search is performed to locate target
offset.
- * @param offset Target offset to reset to if target queue ID is
properly provided.
+ * @param topic Required, not null.
+ * @param group Required, not null.
+ * @param queueId if target queue ID is negative, all message queues will
be reset; otherwise, only the target queue
+ * would get reset.
+ * @param timestamp if timestamp is negative, offset would be reset to
broker offset at the time being; otherwise,
+ * binary search is performed to locate target offset.
+ * @param offset Target offset to reset to if target queue ID is properly
provided.
* @return Affected queues and their new offset
*/
private RemotingCommand resetOffsetInner(String topic, String group, int
queueId, long timestamp, Long offset) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index e5fdd8675f..e905305129 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -305,7 +305,11 @@ public class TopicConfigManager extends ConfigManager {
log.error("createTopicIfAbsent ", e);
}
if (createNew && register) {
- this.brokerController.registerIncrementBrokerData(topicConfig,
dataVersion);
+ if
(brokerController.getBrokerConfig().isEnableSingleTopicRegister()) {
+ this.brokerController.registerSingleTopicAll(topicConfig);
+ } else {
+ this.brokerController.registerIncrementBrokerData(topicConfig,
dataVersion);
+ }
}
return this.topicConfigTable.get(topicConfig.getTopicName());
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index a4d82d1c53..02c692e2b2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -386,6 +386,12 @@ public class BrokerConfig extends BrokerIdentity {
*/
private boolean popResponseReturnActualRetryTopic = false;
+ /**
+ * If both the deleteTopicWithBrokerRegistration flag in the NameServer
configuration and this flag are set to true,
+ * it guarantees the ultimate consistency of data between the broker and
the nameserver during topic deletion.
+ */
+ private boolean enableSingleTopicRegister = false;
+
public long getMaxPopPollingSize() {
return maxPopPollingSize;
}
@@ -1689,4 +1695,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setPopResponseReturnActualRetryTopic(boolean
popResponseReturnActualRetryTopic) {
this.popResponseReturnActualRetryTopic =
popResponseReturnActualRetryTopic;
}
+
+ public boolean isEnableSingleTopicRegister() {
+ return enableSingleTopicRegister;
+ }
+
+ public void setEnableSingleTopicRegister(boolean
enableSingleTopicRegister) {
+ this.enableSingleTopicRegister = enableSingleTopicRegister;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
index 700febfe27..5b8a6dedb7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -82,6 +82,15 @@ public class NamesrvConfig {
private int waitSecondsForService = 45;
+ /**
+ * If enable this flag, the topics that don't exist in broker registration
payload will be deleted from name server.
+ *
+ * WARNING:
+ * 1. Enable this flag and "enableSingleTopicRegister" of broker config
meanwhile to avoid losing topic route info unexpectedly.
+ * 2. This flag does not support static topic currently.
+ */
+ private boolean deleteTopicWithBrokerRegistration = false;
+
public boolean isOrderMessageEnable() {
return orderMessageEnable;
}
@@ -241,4 +250,12 @@ public class NamesrvConfig {
public void setWaitSecondsForService(int waitSecondsForService) {
this.waitSecondsForService = waitSecondsForService;
}
+
+ public boolean isDeleteTopicWithBrokerRegistration() {
+ return deleteTopicWithBrokerRegistration;
+ }
+
+ public void setDeleteTopicWithBrokerRegistration(boolean
deleteTopicWithBrokerRegistration) {
+ this.deleteTopicWithBrokerRegistration =
deleteTopicWithBrokerRegistration;
+ }
}
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index ac27d76ce1..0055a1cc8e 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -121,9 +121,18 @@ public class RouteInfoManager {
if (queueDatas == null || queueDatas.isEmpty()) {
return;
}
+
try {
this.lock.writeLock().lockInterruptibly();
if (this.topicQueueTable.containsKey(topic)) {
+ Map<String, QueueData> queueDataMap =
this.topicQueueTable.get(topic);
+ for (QueueData queueData : queueDatas) {
+ if
(!this.brokerAddrTable.containsKey(queueData.getBrokerName())) {
+ log.warn("Register topic contains illegal broker, {},
{}", topic, queueData);
+ return;
+ }
+ queueDataMap.put(queueData.getBrokerName(), queueData);
+ }
log.info("Topic route already exist.{}, {}", topic,
this.topicQueueTable.get(topic));
} else {
// check and construct queue data map
@@ -299,7 +308,32 @@ public class RouteInfoManager {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
+
if (tcTable != null) {
+
+ TopicConfigAndMappingSerializeWrapper
mappingSerializeWrapper =
TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
+ Map<String, TopicQueueMappingInfo>
topicQueueMappingInfoMap =
mappingSerializeWrapper.getTopicQueueMappingInfoMap();
+
+ // Delete the topics that don't exist in tcTable from the
current broker
+ // Static topic is not supported currently
+ if (namesrvConfig.isDeleteTopicWithBrokerRegistration() &&
topicQueueMappingInfoMap.isEmpty()) {
+ final Set<String> oldTopicSet =
topicSetOfBrokerName(brokerName);
+ final Set<String> newTopicSet = tcTable.keySet();
+ final Sets.SetView<String> toDeleteTopics =
Sets.difference(oldTopicSet, newTopicSet);
+ for (final String toDeleteTopic : toDeleteTopics) {
+ Map<String, QueueData> queueDataMap =
topicQueueTable.get(toDeleteTopic);
+ final QueueData removedQD =
queueDataMap.remove(brokerName);
+ if (removedQD != null) {
+ log.info("deleteTopic, remove one broker's
topic {} {} {}", brokerName, toDeleteTopic, removedQD);
+ }
+
+ if (queueDataMap.isEmpty()) {
+ log.info("deleteTopic, remove the topic all
queue {}", toDeleteTopic);
+ topicQueueTable.remove(toDeleteTopic);
+ }
+ }
+ }
+
for (Map.Entry<String, TopicConfig> entry :
tcTable.entrySet()) {
if (registerFirst ||
this.isTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion(), brokerName,
@@ -312,19 +346,17 @@ public class RouteInfoManager {
this.createAndUpdateQueueData(brokerName,
topicConfig);
}
}
- }
- if (this.isBrokerTopicConfigChanged(clusterName, brokerAddr,
topicConfigWrapper.getDataVersion()) || registerFirst) {
- TopicConfigAndMappingSerializeWrapper
mappingSerializeWrapper =
TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper);
- Map<String, TopicQueueMappingInfo>
topicQueueMappingInfoMap =
mappingSerializeWrapper.getTopicQueueMappingInfoMap();
- //the topicQueueMappingInfoMap should never be null, but
can be empty
- for (Map.Entry<String, TopicQueueMappingInfo> entry :
topicQueueMappingInfoMap.entrySet()) {
- if
(!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
- topicQueueMappingInfoTable.put(entry.getKey(), new
HashMap<>());
+ if (this.isBrokerTopicConfigChanged(clusterName,
brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) {
+ //the topicQueueMappingInfoMap should never be null,
but can be empty
+ for (Map.Entry<String, TopicQueueMappingInfo> entry :
topicQueueMappingInfoMap.entrySet()) {
+ if
(!topicQueueMappingInfoTable.containsKey(entry.getKey())) {
+ topicQueueMappingInfoTable.put(entry.getKey(),
new HashMap<>());
+ }
+ //Note asset brokerName equal
entry.getValue().getBname()
+ //here use the mappingDetail.bname
+
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(),
entry.getValue());
}
- //Note asset brokerName equal
entry.getValue().getBname()
- //here use the mappingDetail.bname
-
topicQueueMappingInfoTable.get(entry.getKey()).put(entry.getValue().getBname(),
entry.getValue());
}
}
}
@@ -374,6 +406,16 @@ public class RouteInfoManager {
return result;
}
+ private Set<String> topicSetOfBrokerName(final String brokerName) {
+ Set<String> topicOfBroker = new HashSet<>();
+ for (final Entry<String, Map<String, QueueData>> entry :
this.topicQueueTable.entrySet()) {
+ if (entry.getValue().containsKey(brokerName)) {
+ topicOfBroker.add(entry.getKey());
+ }
+ }
+ return topicOfBroker;
+ }
+
public BrokerMemberGroup getBrokerMemberGroup(String clusterName, String
brokerName) {
BrokerMemberGroup groupMember = new BrokerMemberGroup(clusterName,
brokerName);
try {
diff --git
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
index b53519e5f6..6002d1f5a4 100644
---
a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
+++
b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerNewTest.java
@@ -22,6 +22,7 @@ import io.netty.channel.Channel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -37,6 +38,7 @@ import org.apache.rocketmq.remoting.protocol.body.TopicList;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.remoting.protocol.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.junit.After;
import org.junit.Before;
@@ -624,6 +626,92 @@ public class RouteInfoManagerNewTest {
.containsValues(BrokerBasicInfo.defaultBroker().brokerAddr,
BrokerBasicInfo.slaveBroker().brokerAddr);
}
+ @Test
+ public void keepTopicWithBrokerRegistration() {
+ RegisterBrokerResult masterResult =
registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic",
"TestTopic1");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+ masterResult =
registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(), "TestTopic1");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+ }
+
+ @Test
+ public void deleteTopicWithBrokerRegistration() {
+ config.setDeleteTopicWithBrokerRegistration(true);
+ registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(),
"TestTopic", "TestTopic1");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+ registerBrokerWithNormalTopic(BrokerBasicInfo.defaultBroker(),
"TestTopic1");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+ }
+
+ @Test
+ public void deleteTopicWithBrokerRegistration2() {
+ // Register two brokers and delete a specific one by one
+ config.setDeleteTopicWithBrokerRegistration(true);
+ final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker();
+ final BrokerBasicInfo master2 =
BrokerBasicInfo.defaultBroker().name(DEFAULT_BROKER + 1).addr(DEFAULT_ADDR + 9);
+
+ registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1");
+ registerBrokerWithNormalTopic(master2, "TestTopic", "TestTopic1");
+
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(2);
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
+
+
+ registerBrokerWithNormalTopic(master1, "TestTopic1");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas()).hasSize(1);
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic").getBrokerDatas().get(0).getBrokerName())
+ .isEqualTo(master2.brokerName);
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
+
+ registerBrokerWithNormalTopic(master2, "TestTopic1");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1").getBrokerDatas()).hasSize(2);
+ }
+
+ @Test
+ public void registerSingleTopicWithBrokerRegistration() {
+ config.setDeleteTopicWithBrokerRegistration(true);
+ final BrokerBasicInfo master1 = BrokerBasicInfo.defaultBroker();
+
+ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic");
+
+ // Single topic registration failed because there is no broker
connection exists
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+
+ // Register broker with TestTopic first and then register single topic
TestTopic1
+ registerBrokerWithNormalTopic(master1, "TestTopic");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+
+ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+ // Register the two topics to keep the route info
+ registerBrokerWithNormalTopic(master1, "TestTopic", "TestTopic1");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+ // Cancel the TestTopic1 with broker registration
+ registerBrokerWithNormalTopic(master1, "TestTopic");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNotNull();
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
+
+ // Add TestTopic1 and cancel all the topics with broker un-registration
+ registerSingleTopicWithBrokerName(master1.brokerName, "TestTopic1");
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNotNull();
+
+ routeInfoManager.unregisterBroker(master1.clusterName,
master1.brokerAddr, master1.brokerName, 0);
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic")).isNull();
+
assertThat(routeInfoManager.pickupTopicRouteData("TestTopic1")).isNull();
+
+
+ }
+
private RegisterBrokerResult registerBrokerWithNormalTopic(BrokerBasicInfo
brokerInfo, String... topics) {
ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap =
new ConcurrentHashMap<>();
TopicConfig baseTopic = new TopicConfig("baseTopic");
@@ -711,6 +799,17 @@ public class RouteInfoManagerNewTest {
return registerBrokerResult;
}
+ private void registerSingleTopicWithBrokerName(String brokerName,
String... topics) {
+ for (final String topic : topics) {
+ QueueData queueData = new QueueData();
+ queueData.setBrokerName(brokerName);
+ queueData.setReadQueueNums(8);
+ queueData.setWriteQueueNums(8);
+ queueData.setPerm(6);
+ routeInfoManager.registerTopic(topic,
Collections.singletonList(queueData));
+ }
+ }
+
static class BrokerBasicInfo {
String clusterName;
String brokerName;
diff --git
a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 11b00a72c6..d3d5de9e27 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.util;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -38,6 +39,7 @@ import
org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingOne;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
@@ -319,4 +321,39 @@ public class MQAdminTestUtils {
}
return consumeStats;
}
+
+ /**
+ * Delete topic from broker only without cleaning route info from name
server forwardly
+ *
+ * @param nameSrvAddr the namesrv addr to connect
+ * @param brokerName the specific broker
+ * @param topic the specific topic to delete
+ */
+ public static void deleteTopicFromBrokerOnly(String nameSrvAddr, String
brokerName, String topic) {
+ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+ mqAdminExt.setNamesrvAddr(nameSrvAddr);
+
+ try {
+ mqAdminExt.start();
+ String brokerAddr =
CommandUtil.fetchMasterAddrByBrokerName(mqAdminExt, brokerName);
+ mqAdminExt.deleteTopicInBroker(Collections.singleton(brokerAddr),
topic);
+ } catch (Exception ignored) {
+ } finally {
+ mqAdminExt.shutdown();
+ }
+ }
+
+ public static TopicRouteData examineTopicRouteInfo(String nameSrvAddr,
String topicName) {
+ DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+ mqAdminExt.setNamesrvAddr(nameSrvAddr);
+ TopicRouteData route = null;
+ try {
+ mqAdminExt.start();
+ route = mqAdminExt.examineTopicRouteInfo(topicName);
+ } catch (Exception ignored) {
+ } finally {
+ mqAdminExt.shutdown();
+ }
+ return route;
+ }
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
similarity index 99%
rename from
test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
rename to
test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
index 9e142eb617..43fefd6166 100644
---
a/test/src/test/java/org/apache/rocketmq/test/base/dledger/DLedgerProduceAndConsumeIT.java
+++
b/test/src/test/java/org/apache/rocketmq/test/dledger/DLedgerProduceAndConsumeIT.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.test.base.dledger;
+package org.apache.rocketmq.test.dledger;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
diff --git
a/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
new file mode 100644
index 0000000000..7e3c7b871d
--- /dev/null
+++
b/test/src/test/java/org/apache/rocketmq/test/route/CreateAndUpdateTopicIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.route;
+
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CreateAndUpdateTopicIT extends BaseConf {
+
+ @Test
+ public void testCreateOrUpdateTopic_EnableSingleTopicRegistration() {
+ String topic = "test-topic-without-broker-registration";
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
+
+ final boolean createResult =
MQAdminTestUtils.createTopic(NAMESRV_ADDR, CLUSTER_NAME, topic, 8, null);
+ assertThat(createResult).isTrue();
+
+ TopicRouteData route =
MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, topic);
+ assertThat(route.getBrokerDatas()).hasSize(3);
+ assertThat(route.getQueueDatas()).hasSize(3);
+
+
brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
+
brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
+
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
+
+ }
+
+ @Test
+ public void testDeleteTopicFromNameSrvWithBrokerRegistration() {
+
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
+
+ String testTopic1 = "test-topic-keep-route";
+ String testTopic2 = "test-topic-delete-route";
+
+ boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR,
CLUSTER_NAME, testTopic1, 8, null);
+ assertThat(createResult).isTrue();
+
+
+ createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR,
CLUSTER_NAME, testTopic2, 8, null);
+ assertThat(createResult).isTrue();
+
+
+ TopicRouteData route =
MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic2);
+ assertThat(route.getBrokerDatas()).hasSize(3);
+
+ MQAdminTestUtils.deleteTopicFromBrokerOnly(NAMESRV_ADDR, BROKER1_NAME,
testTopic2);
+
+ // Deletion is lazy, trigger broker registration
+ brokerController1.registerBrokerAll(false, false, true);
+
+ // The route info of testTopic2 will be removed from broker1 after the
registration
+ route = MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR,
testTopic2);
+ assertThat(route.getBrokerDatas()).hasSize(2);
+
assertThat(route.getQueueDatas().get(0).getBrokerName()).isEqualTo(BROKER2_NAME);
+
assertThat(route.getQueueDatas().get(1).getBrokerName()).isEqualTo(BROKER3_NAME);
+
+
brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
+
brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
+
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
+
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
+ }
+
+ @Test
+ public void testStaticTopicNotAffected() throws Exception {
+
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(true);
+ brokerController1.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController2.getBrokerConfig().setEnableSingleTopicRegister(true);
+ brokerController3.getBrokerConfig().setEnableSingleTopicRegister(true);
+
+ String testTopic = "test-topic-not-affected";
+ String testStaticTopic = "test-static-topic";
+
+ boolean createResult = MQAdminTestUtils.createTopic(NAMESRV_ADDR,
CLUSTER_NAME, testTopic, 8, null);
+ assertThat(createResult).isTrue();
+
+ TopicRouteData route =
MQAdminTestUtils.examineTopicRouteInfo(NAMESRV_ADDR, testTopic);
+ assertThat(route.getBrokerDatas()).hasSize(3);
+ assertThat(route.getQueueDatas()).hasSize(3);
+
+ MQAdminTestUtils.createStaticTopicWithCommand(testStaticTopic, 10,
null, CLUSTER_NAME, NAMESRV_ADDR);
+
+ assertThat(route.getBrokerDatas()).hasSize(3);
+ assertThat(route.getQueueDatas()).hasSize(3);
+
+
brokerController1.getBrokerConfig().setEnableSingleTopicRegister(false);
+
brokerController2.getBrokerConfig().setEnableSingleTopicRegister(false);
+
brokerController3.getBrokerConfig().setEnableSingleTopicRegister(false);
+
namesrvController.getNamesrvConfig().setDeleteTopicWithBrokerRegistration(false);
+ }
+}