RongtongJin commented on code in PR #9549:
URL: https://github.com/apache/rocketmq/pull/9549#discussion_r2370810785
##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java:
##########
@@ -98,6 +102,10 @@ public TopicRouteService(MQClientAPIFactory
mqClientAPIFactory) {
public @Nullable MessageQueueView reload(@NonNull String key,
@NonNull MessageQueueView oldValue) throws Exception {
try {
+ if (routeChangeNotifier != null) {
Review Comment:
同时打开开关
##########
broker/src/main/java/org/apache/rocketmq/broker/route/RouteEventService.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.route;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+import com.alibaba.fastjson2.JSON;
+
+public class RouteEventService {
+ private static final Logger LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerController brokerController;
+ private static final int MAX_TOPICS_PER_EVENT = 5000;
+
+ public RouteEventService(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ LOG.info("RouteEventService initialized for broker: {}",
+ brokerController.getBrokerConfig().getBrokerName());
+ }
+
+ public void publishEvent(RouteEventType eventType) {
+ if
(!brokerController.getBrokerConfig().isEnableRouteChangeNotification()) {
+ return;
+ }
+
+ if (brokerController.getTopicConfigManager() == null) {
+ return;
+ }
+
+ Set<String> topics =
brokerController.getTopicConfigManager().getTopicConfigTable().keySet();
+ publishEventInternal(eventType, topics);
+ }
+
+ public void publishEvent(RouteEventType eventType, String topicName) {
+ if
(!brokerController.getBrokerConfig().isEnableRouteChangeNotification()) {
+ return;
+ }
+
+ if (topicName == null) {
+ return;
+ }
+
+ publishEventInternal(eventType, Collections.singleton(topicName));
+ }
+
+ private void publishEventInternal(RouteEventType eventType, Set<String>
topics) {
+ try {
+ if (topics == null || topics.isEmpty()) {
+ sendEvent(eventType, null);
+ return;
+ }
+
+ List<String> topicList = new ArrayList<>(topics);
+ partitionTopics(topicList, MAX_TOPICS_PER_EVENT)
+ .forEach(batch -> sendEvent(eventType, batch));
+
+ LOG.info("[{}]: published event for {} topics", eventType,
topics.size());
+ } catch (Exception e) {
+ LOG.error("Failed to publish {} event for topics: {}", eventType,
topics, e);
+ }
+ }
+
+ private void sendEvent(RouteEventType eventType, List<String> topics) {
+ Map<String, Object> eventData = createEventData(eventType, topics);
+ MessageExtBrokerInner msg = createEventMessage(eventData);
+ TopicPublishInfo routeInfo =
brokerController.getTopicRouteInfoManager()
+ .tryToFindTopicPublishInfo(TopicValidator.RMQ_ROUTE_EVENT_TOPIC);
+
+ if (routeInfo == null || !routeInfo.ok()) {
+ LOG.warn("No route info for ROUTE_EVENT_TOPIC");
+ return;
+ }
+
+ String currentBroker =
brokerController.getBrokerConfig().getBrokerName();
+ Set<String> processedBrokers = new HashSet<>();
+ processedBrokers.add(currentBroker);
+
+ for (MessageQueue mq : routeInfo.getMessageQueueList()) {
+ if (processedBrokers.contains(mq.getBrokerName())) {
+ continue;
+ }
+
+ try {
+ SendResult result = brokerController.getEscapeBridge()
+ .putMessageToRemoteBroker(msg, mq.getBrokerName());
+
+ if (result != null && result.getSendStatus() ==
SendStatus.SEND_OK) {
+ LOG.debug("Event sent to broker: {}", mq.getBrokerName());
+ }
+ processedBrokers.add(mq.getBrokerName());
Review Comment:
只需发送一个broker成功即可。
这里可以根据事件不同做不同处理
1.如果是broker start、shutdown事件,要往有路由(排除自己)的其中一个broker发送成功即可
2.如果是topic change事件,其实往自己本地发送即可
##########
proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java:
##########
@@ -134,6 +142,12 @@ public String resolve(String name) {
}
}
}, serviceDetector);
+
+ this.routeChangeNotifier = new RouteChangeNotifier(
+ this.topicCache,
+ this.cacheRefreshExecutor
Review Comment:
这里使用单独的线程池吧,避免相互影响,线程池可以放在RouteChangeNotifier初始化
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]