This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7fc5452e0d [ISSUE #9970] Refactor the MessageQueueSelector to support
more flexible queue selection strategy (#9971)
7fc5452e0d is described below
commit 7fc5452e0d9b4f5b86a7babc2ccad2c60cf0dac1
Author: qianye <[email protected]>
AuthorDate: Wed Jan 7 15:40:09 2026 +0800
[ISSUE #9970] Refactor the MessageQueueSelector to support more flexible
queue selection strategy (#9971)
---
.../RocksdbGroupConfigTransferTest.java | 52 +--
.../rocketmq/client/latency/MQFaultStrategy.java | 8 +-
.../service/route/AddressableMessageQueue.java | 49 +--
.../route/DefaultMessageQueuePriorityProvider.java | 25 ++
.../proxy/service/route/MessageQueuePenalizer.java | 134 ++++++
.../route/MessageQueuePriorityProvider.java | 84 ++++
.../proxy/service/route/MessageQueueSelector.java | 115 ++---
.../proxy/service/route/MessageQueueView.java | 22 +-
.../proxy/service/route/TopicRouteService.java | 64 +--
.../grpc/v2/producer/SendMessageActivityTest.java | 8 +-
.../service/route/MessageQueuePenalizerTest.java | 472 +++++++++++++++++++++
.../route/MessageQueuePriorityProviderTest.java | 311 ++++++++++++++
.../service/route/MessageQueueSelectorTest.java | 8 +-
13 files changed, 1168 insertions(+), 184 deletions(-)
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
index 4fbec13860..b476cb205e 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -17,6 +17,15 @@
package org.apache.rocketmq.broker.subscription;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.common.BrokerConfig;
@@ -34,15 +43,6 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -78,24 +78,28 @@ public class RocksdbGroupConfigTransferTest {
if (notToBeExecuted()) {
return;
}
- Path pathToBeDeleted = Paths.get(basePath);
-
- try {
- Files.walk(pathToBeDeleted)
- .sorted(Comparator.reverseOrder())
- .forEach(path -> {
- try {
- Files.delete(path);
- } catch (IOException e) {
- // ignore
- }
- });
- } catch (IOException e) {
- // ignore
- }
+
if (rocksDBSubscriptionGroupManager != null) {
rocksDBSubscriptionGroupManager.stop();
}
+
+ Path root = Paths.get(basePath);
+ if (Files.notExists(root)) {
+ return;
+ }
+
+ try (Stream<Path> walk = Files.walk(root)) {
+ walk.sorted(Comparator.reverseOrder())
+ .forEach(p -> {
+ try {
+ Files.deleteIfExists(p);
+ } catch (IOException e) {
+ // ignore
+ }
+ });
+ } catch (IOException e) {
+ // ignore
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index 69fb533e5a..76875378df 100644
---
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -21,8 +21,9 @@ import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
-public class MQFaultStrategy {
+public class MQFaultStrategy implements StartAndShutdown {
private LatencyFaultTolerance<String> latencyFaultTolerance;
private volatile boolean sendLatencyFaultEnable;
private volatile boolean startDetectorEnable;
@@ -130,6 +131,11 @@ public class MQFaultStrategy {
this.latencyFaultTolerance.startDetector();
}
+ @Override
+ public void start() throws Exception {
+ this.startDetector();
+ }
+
public void shutdown() {
this.latencyFaultTolerance.shutdown();
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java
index ca877f3278..19f2c0db85 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java
@@ -17,22 +17,27 @@
package org.apache.rocketmq.proxy.service.route;
import com.google.common.base.MoreObjects;
-import java.util.Objects;
import org.apache.rocketmq.common.message.MessageQueue;
-public class AddressableMessageQueue implements
Comparable<AddressableMessageQueue> {
-
- private final MessageQueue messageQueue;
+public class AddressableMessageQueue extends MessageQueue {
private final String brokerAddr;
public AddressableMessageQueue(MessageQueue messageQueue, String
brokerAddr) {
- this.messageQueue = messageQueue;
+ super(messageQueue);
this.brokerAddr = brokerAddr;
}
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return new MessageQueue(getTopic(), getBrokerName(), getQueueId());
+ }
+
@Override
- public int compareTo(AddressableMessageQueue o) {
- return messageQueue.compareTo(o.messageQueue);
+ public int hashCode() {
+ return super.hashCode();
}
@Override
@@ -43,39 +48,13 @@ public class AddressableMessageQueue implements
Comparable<AddressableMessageQue
if (!(o instanceof AddressableMessageQueue)) {
return false;
}
- AddressableMessageQueue queue = (AddressableMessageQueue) o;
- return Objects.equals(messageQueue, queue.messageQueue);
- }
-
- @Override
- public int hashCode() {
- return messageQueue == null ? 1 : messageQueue.hashCode();
- }
-
- public int getQueueId() {
- return this.messageQueue.getQueueId();
- }
-
- public String getBrokerName() {
- return this.messageQueue.getBrokerName();
- }
-
- public String getTopic() {
- return messageQueue.getTopic();
- }
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
-
- public String getBrokerAddr() {
- return brokerAddr;
+ return super.equals(o);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("messageQueue", messageQueue)
+ .add("messageQueue", super.toString())
.add("brokerAddr", brokerAddr)
.toString();
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java
new file mode 100644
index 0000000000..90b0114f61
--- /dev/null
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+public class DefaultMessageQueuePriorityProvider implements
MessageQueuePriorityProvider<AddressableMessageQueue> {
+ @Override
+ public int priorityOf(AddressableMessageQueue queue) {
+ return 0;
+ }
+}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java
new file mode 100644
index 0000000000..d53056971d
--- /dev/null
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.service.route;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+@FunctionalInterface
+public interface MessageQueuePenalizer<Q extends MessageQueue> {
+
+ /**
+ * Returns the penalty value for the given MessageQueue; lower is better.
+ */
+ int penaltyOf(Q messageQueue);
+
+ /**
+ * Aggregates penalties from multiple penalizers for the same MessageQueue
(by summing them up).
+ */
+ static <Q extends MessageQueue> int evaluatePenalty(Q messageQueue,
List<MessageQueuePenalizer<Q>> penalizers) {
+ Objects.requireNonNull(messageQueue, "messageQueue");
+ if (penalizers == null || penalizers.isEmpty()) {
+ return 0;
+ }
+ int sum = 0;
+ for (MessageQueuePenalizer<Q> p : penalizers) {
+ sum += p.penaltyOf(messageQueue);
+ }
+ return sum;
+ }
+
+ /**
+ * Selects the queue with the lowest evaluated penalty from the given
queue list.
+ *
+ * <p>The method iterates through all queues exactly once, but starts from
a rotating index
+ * derived from {@code startIndex} (round-robin) to avoid always scanning
from position 0 .</p>
+ *
+ * <p>For each queue, it computes a penalty via {@link #evaluatePenalty}
using
+ * the provided {@code penalizers}. The queue with the smallest penalty is
selected.</p>
+ *
+ * <p>Short-circuit rule: if any queue has a {@code penalty<= 0}, it is
returned immediately,
+ * since no better result than 0 is expected.</p>
+ *
+ * @param queues candidate queues to select from
+ * @param penalizers penalty evaluators applied to each queue
+ * @param startIndex atomic counter used to determine the rotating start
position (round-robin)
+ * @param <Q> queue type
+ * @return a {@code Pair} of (selected queue, penalty), or {@code null} if
{@code queues} is null/empty
+ */
+ static <Q extends MessageQueue> Pair<Q, Integer>
selectLeastPenalty(List<Q> queues,
+ List<MessageQueuePenalizer<Q>> penalizers, AtomicInteger startIndex) {
+ if (queues == null || queues.isEmpty()) {
+ return null;
+ }
+ Q bestQueue = null;
+ int bestPenalty = Integer.MAX_VALUE;
+
+ for (int i = 0; i < queues.size(); i++) {
+ int index = Math.floorMod(startIndex.getAndIncrement(),
queues.size());
+ Q messageQueue = queues.get(index);
+ int penalty = evaluatePenalty(messageQueue, penalizers);
+
+ // Short-circuit: cannot do better than 0
+ if (penalty <= 0) {
+ return Pair.of(messageQueue, penalty);
+ }
+
+ if (penalty < bestPenalty) {
+ bestPenalty = penalty;
+ bestQueue = messageQueue;
+ }
+ }
+ return Pair.of(bestQueue, bestPenalty);
+ }
+
+ /**
+ * Selects a queue with the lowest computed penalty from multiple priority
groups.
+ *
+ * <p>The input {@code queuesWithPriority} is a list of queue groups
ordered by priority.
+ * For each priority group, this method delegates to {@link
#selectLeastPenalty} to pick the best queue
+ * within that group and obtain its penalty.</p>
+ *
+ * <p>Short-circuit rule: if any priority group yields a queue whose
{@code penalty <= 0},
+ * that result is returned immediately.</p>
+ *
+ * <p>Otherwise, it returns the queue with the smallest positive penalty
among all groups.
+ * If multiple groups produce the same minimum penalty, the first
encountered one wins.</p>
+ *
+ * @param queuesWithPriority priority-ordered groups of queues; each inner
list represents one priority level
+ * @param penalizers penalty calculators used by {@code
selectLeastPenalty} to score queues
+ * @param startIndex round-robin start index forwarded to {@code
selectLeastPenalty} to reduce contention/hotspots
+ * @param <Q> queue type
+ * @return a {@code Pair} of (selected queue, penalty), or {@code null} if
{@code queuesWithPriority} is null/empty
+ */
+ static <Q extends MessageQueue> Pair<Q, Integer>
selectLeastPenaltyWithPriority(List<List<Q>> queuesWithPriority,
+ List<MessageQueuePenalizer<Q>> penalizers, AtomicInteger startIndex) {
+ if (queuesWithPriority == null || queuesWithPriority.isEmpty()) {
+ return null;
+ }
+ if (queuesWithPriority.size() == 1) {
+ return selectLeastPenalty(queuesWithPriority.get(0), penalizers,
startIndex);
+ }
+ Q bestQueue = null;
+ int bestPenalty = Integer.MAX_VALUE;
+ for (List<Q> queues : queuesWithPriority) {
+ Pair<Q, Integer> queueAndPenalty = selectLeastPenalty(queues,
penalizers, startIndex);
+ int penalty = queueAndPenalty.getRight();
+ if (queueAndPenalty.getRight() <= 0) {
+ return queueAndPenalty;
+ }
+ if (penalty < bestPenalty) {
+ bestPenalty = penalty;
+ bestQueue = queueAndPenalty.getLeft();
+ }
+ }
+ return Pair.of(bestQueue, bestPenalty);
+ }
+}
\ No newline at end of file
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java
new file mode 100644
index 0000000000..57b6e65fe5
--- /dev/null
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+/**
+ * A functional interface for providing priority values for message queues.
+ * This interface allows custom priority determination logic to be applied to
message queues,
+ * enabling queue selection and routing based on priority levels.
+ * <p>
+ * The priority value follows the convention that smaller numeric values
indicate higher priority.
+ * For example, priority 0 is higher than priority 1.
+ * </p>
+ *
+ * @param <Q> the type of message queue, must extend {@link MessageQueue}
+ */
+@FunctionalInterface
+public interface MessageQueuePriorityProvider<Q extends MessageQueue> {
+
+ /**
+ * Determines the priority value of the given message queue.
+ * <p>
+ * Smaller values indicate higher priority. For example:
+ * <ul>
+ * <li>Priority 0: Highest priority</li>
+ * <li>Priority 1: Medium priority</li>
+ * <li>Priority 2: Lower priority</li>
+ * </ul>
+ * </p>
+ *
+ * @param q the message queue to evaluate
+ * @return the priority value, where smaller values indicate higher
priority
+ */
+ int priorityOf(Q q);
+
+ /**
+ * Groups message queues by their priority levels and returns them in
priority order.
+ * <p>
+ * This static utility method takes a list of message queues and a
priority provider,
+ * then organizes the queues into groups based on their priority values.
+ * The returned list is ordered from highest priority to lowest priority.
+ * </p>
+ *
+ * @param <Q> the type of message queue, must extend {@link
MessageQueue}
+ * @param queues the list of message queues to group by priority, can be
null or empty
+ * @param provider the priority provider to determine the priority of each
queue
+ * @return a list of lists, where each inner list contains queues of the
same priority level,
+ * ordered from highest priority (smallest value) to lowest
priority (largest value).
+ * Returns an empty list if the input queues are null or empty.
+ */
+ static <Q extends MessageQueue> List<List<Q>> buildPriorityGroups(List<Q>
queues, MessageQueuePriorityProvider<Q> provider) {
+ if (queues == null || queues.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Map<Integer, List<Q>> buckets = new TreeMap<>();
+ for (Q q : queues) {
+ int p = provider.priorityOf(q);
+ buckets.computeIfAbsent(p, k -> new ArrayList<>()).add(q);
+ }
+ return new ArrayList<>(buckets.values());
+ }
+}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
index f25fb907ef..0b028fa461 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.proxy.service.route;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
import com.google.common.math.IntMath;
import java.util.ArrayList;
import java.util.Collections;
@@ -30,13 +29,16 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
-import org.apache.rocketmq.client.latency.MQFaultStrategy;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import static
org.apache.rocketmq.proxy.service.route.MessageQueuePenalizer.selectLeastPenaltyWithPriority;
+import static
org.apache.rocketmq.proxy.service.route.MessageQueuePriorityProvider.buildPriorityGroups;
+
public class MessageQueueSelector {
private static final int BROKER_ACTING_QUEUE_ID = -1;
@@ -47,9 +49,18 @@ public class MessageQueueSelector {
private final Map<String, AddressableMessageQueue> brokerNameQueueMap =
new ConcurrentHashMap<>();
private final AtomicInteger queueIndex;
private final AtomicInteger brokerIndex;
- private MQFaultStrategy mqFaultStrategy;
+ private final List<MessageQueuePenalizer<AddressableMessageQueue>>
penalizers = new ArrayList<>();
+
+ // ordered by priority asc (smaller => higher priority)
+ private final List<List<AddressableMessageQueue>> queuesWithPriority;
+ private final List<List<AddressableMessageQueue>>
brokerActingQueuesWithPriority;
+
+ public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean
read) {
+ this(topicRouteWrapper, read, null);
+ }
- public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper,
MQFaultStrategy mqFaultStrategy, boolean read) {
+ public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean
read,
+ MessageQueuePriorityProvider<AddressableMessageQueue>
priorityProvider) {
if (read) {
this.queues.addAll(buildRead(topicRouteWrapper));
} else {
@@ -59,7 +70,12 @@ public class MessageQueueSelector {
Random random = new Random();
this.queueIndex = new AtomicInteger(random.nextInt());
this.brokerIndex = new AtomicInteger(random.nextInt());
- this.mqFaultStrategy = mqFaultStrategy;
+
+ if (priorityProvider == null) {
+ priorityProvider = new DefaultMessageQueuePriorityProvider();
+ }
+ this.queuesWithPriority = buildPriorityGroups(queues,
priorityProvider);
+ this.brokerActingQueuesWithPriority =
buildPriorityGroups(brokerActingQueues, priorityProvider);
}
private static List<AddressableMessageQueue> buildRead(TopicRouteWrapper
topicRoute) {
@@ -138,7 +154,7 @@ public class MessageQueueSelector {
private void buildBrokerActingQueues(String topic,
List<AddressableMessageQueue> normalQueues) {
for (AddressableMessageQueue mq : normalQueues) {
AddressableMessageQueue brokerActingQueue = new
AddressableMessageQueue(
- new MessageQueue(topic, mq.getMessageQueue().getBrokerName(),
BROKER_ACTING_QUEUE_ID),
+ new MessageQueue(topic, mq.getBrokerName(),
BROKER_ACTING_QUEUE_ID),
mq.getBrokerAddr());
if (!brokerActingQueues.contains(brokerActingQueue)) {
@@ -160,38 +176,15 @@ public class MessageQueueSelector {
}
public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) {
- if (mqFaultStrategy != null &&
mqFaultStrategy.isSendLatencyFaultEnable()) {
- List<MessageQueue> messageQueueList = null;
- MessageQueue messageQueue = null;
+ if (CollectionUtils.isNotEmpty(penalizers)) {
+ Pair<AddressableMessageQueue, Integer> queueAndPenalty;
if (onlyBroker) {
- messageQueueList =
transferAddressableQueues(brokerActingQueues);
+ queueAndPenalty =
selectLeastPenaltyWithPriority(brokerActingQueuesWithPriority, penalizers,
brokerIndex);
} else {
- messageQueueList = transferAddressableQueues(queues);
+ queueAndPenalty =
selectLeastPenaltyWithPriority(queuesWithPriority, penalizers, queueIndex);
}
- AddressableMessageQueue addressableMessageQueue = null;
-
- // use both available filter.
- messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker
? brokerIndex : queueIndex,
- mqFaultStrategy.getAvailableFilter(),
mqFaultStrategy.getReachableFilter());
- addressableMessageQueue = transferQueue2Addressable(messageQueue);
- if (addressableMessageQueue != null) {
- return addressableMessageQueue;
- }
-
- // use available filter.
- messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker
? brokerIndex : queueIndex,
- mqFaultStrategy.getAvailableFilter());
- addressableMessageQueue = transferQueue2Addressable(messageQueue);
- if (addressableMessageQueue != null) {
- return addressableMessageQueue;
- }
-
- // no available filter, then use reachable filter.
- messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker
? brokerIndex : queueIndex,
- mqFaultStrategy.getReachableFilter());
- addressableMessageQueue = transferQueue2Addressable(messageQueue);
- if (addressableMessageQueue != null) {
- return addressableMessageQueue;
+ if (queueAndPenalty != null && queueAndPenalty.getLeft() != null) {
+ return queueAndPenalty.getLeft();
}
}
@@ -199,46 +192,6 @@ public class MessageQueueSelector {
return selectOne(onlyBroker);
}
- private MessageQueue selectOneMessageQueue(List<MessageQueue>
messageQueueList, AtomicInteger sendQueue,
TopicPublishInfo.QueueFilter...filter) {
- if (messageQueueList == null || messageQueueList.isEmpty()) {
- return null;
- }
- if (filter != null && filter.length != 0) {
- for (int i = 0; i < messageQueueList.size(); i++) {
- int index = Math.abs(sendQueue.incrementAndGet() %
messageQueueList.size());
- MessageQueue mq = messageQueueList.get(index);
- boolean filterResult = true;
- for (TopicPublishInfo.QueueFilter f: filter) {
- Preconditions.checkNotNull(f);
- filterResult &= f.filter(mq);
- }
- if (filterResult) {
- return mq;
- }
- }
- }
- return null;
- }
-
- public List<MessageQueue>
transferAddressableQueues(List<AddressableMessageQueue>
addressableMessageQueueList) {
- if (addressableMessageQueueList == null) {
- return null;
- }
-
- return addressableMessageQueueList.stream()
- .map(AddressableMessageQueue::getMessageQueue)
- .collect(Collectors.toList());
- }
-
- private AddressableMessageQueue transferQueue2Addressable(MessageQueue
messageQueue) {
- for (AddressableMessageQueue amq: queues) {
- if (amq.getMessageQueue().equals(messageQueue)) {
- return amq;
- }
- }
- return null;
- }
-
public AddressableMessageQueue selectNextOne(AddressableMessageQueue last)
{
boolean onlyBroker = last.getQueueId() < 0;
AddressableMessageQueue newOne = last;
@@ -275,12 +228,10 @@ public class MessageQueueSelector {
return brokerActingQueues;
}
- public MQFaultStrategy getMQFaultStrategy() {
- return mqFaultStrategy;
- }
-
- public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
- this.mqFaultStrategy = mqFaultStrategy;
+ public void addPenalizer(MessageQueuePenalizer<AddressableMessageQueue>
penalizer) {
+ if (penalizer != null) {
+ this.penalizers.add(penalizer);
+ }
}
@Override
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
index 898e529f8c..a0d768d6da 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
@@ -17,7 +17,8 @@
package org.apache.rocketmq.proxy.service.route;
import com.google.common.base.MoreObjects;
-import org.apache.rocketmq.client.latency.MQFaultStrategy;
+import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
public class MessageQueueView {
@@ -27,11 +28,24 @@ public class MessageQueueView {
private final MessageQueueSelector writeSelector;
private final TopicRouteWrapper topicRouteWrapper;
- public MessageQueueView(String topic, TopicRouteData topicRouteData,
MQFaultStrategy mqFaultStrategy) {
+
+ public MessageQueueView(String topic, TopicRouteData topicRouteData,
List<MessageQueuePenalizer<AddressableMessageQueue>> penalizer) {
+ this(topic, topicRouteData, penalizer, null);
+ }
+
+ public MessageQueueView(String topic, TopicRouteData topicRouteData,
List<MessageQueuePenalizer<AddressableMessageQueue>> penalizer,
+ MessageQueuePriorityProvider<AddressableMessageQueue>
priorityProvider) {
this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic);
- this.readSelector = new MessageQueueSelector(topicRouteWrapper,
mqFaultStrategy, true);
- this.writeSelector = new MessageQueueSelector(topicRouteWrapper,
mqFaultStrategy, false);
+ this.readSelector = new MessageQueueSelector(topicRouteWrapper, true,
priorityProvider);
+ this.writeSelector = new MessageQueueSelector(topicRouteWrapper,
false, priorityProvider);
+
+ if (CollectionUtils.isNotEmpty(penalizer)) {
+ for (MessageQueuePenalizer<AddressableMessageQueue> p : penalizer)
{
+ this.readSelector.addPenalizer(p);
+ this.writeSelector.addPenalizer(p);
+ }
+ }
}
public TopicRouteData getTopicRouteData() {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index bcdf8140bc..dae3005746 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -19,11 +19,11 @@ package org.apache.rocketmq.proxy.service.route;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
-
+import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.ClientConfig;
@@ -32,12 +32,10 @@ import
org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.latency.Resolver;
import org.apache.rocketmq.client.latency.ServiceDetector;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
-import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.Address;
@@ -53,19 +51,15 @@ import org.checkerframework.checker.nullness.qual.Nullable;
public abstract class TopicRouteService extends AbstractStartAndShutdown {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
- private final MQClientAPIFactory mqClientAPIFactory;
- private MQFaultStrategy mqFaultStrategy;
-
+ private final MQFaultStrategy mqFaultStrategy;
protected final LoadingCache<String /* topicName */, MessageQueueView>
topicCache;
- protected final ScheduledExecutorService scheduledExecutorService;
protected final ThreadPoolExecutor cacheRefreshExecutor;
+ protected final List<MessageQueuePenalizer<AddressableMessageQueue>>
penalizers = new ArrayList<>();
+ protected MessageQueuePriorityProvider<AddressableMessageQueue>
priorityProvider = new DefaultMessageQueuePriorityProvider();
public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) {
ProxyConfig config = ConfigurationManager.getProxyConfig();
- this.scheduledExecutorService =
ThreadUtils.newSingleThreadScheduledExecutor(
- new ThreadFactoryImpl("TopicRouteService_")
- );
this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
config.getTopicRouteServiceThreadPoolNums(),
config.getTopicRouteServiceThreadPoolNums(),
@@ -74,7 +68,6 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
"TopicRouteCacheRefresh",
config.getTopicRouteServiceThreadPoolQueueCapacity()
);
- this.mqClientAPIFactory = mqClientAPIFactory;
this.topicCache =
Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum())
.expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(),
TimeUnit.SECONDS)
@@ -134,6 +127,8 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
}
}
}, serviceDetector);
+
+
this.penalizers.addAll(buildPenalizerByMQFaultStrategy(mqFaultStrategy));
this.init();
}
@@ -146,22 +141,7 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
}
protected void init() {
- this.appendShutdown(this.scheduledExecutorService::shutdown);
- this.appendStartAndShutdown(this.mqClientAPIFactory);
- }
-
- @Override
- public void shutdown() throws Exception {
- if (this.mqFaultStrategy.isStartDetectorEnable()) {
- mqFaultStrategy.shutdown();
- }
- }
-
- @Override
- public void start() throws Exception {
- if (this.mqFaultStrategy.isStartDetectorEnable()) {
- this.mqFaultStrategy.startDetector();
- }
+ this.appendStartAndShutdown(this.mqFaultStrategy);
}
public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig
proxyConfig) {
@@ -220,10 +200,36 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
protected MessageQueueView buildMessageQueueView(String topic,
TopicRouteData topicRouteData) {
if (isTopicRouteValid(topicRouteData)) {
- MessageQueueView tmp = new MessageQueueView(topic, topicRouteData,
TopicRouteService.this.getMqFaultStrategy());
+ MessageQueueView tmp = new MessageQueueView(topic, topicRouteData,
this.penalizers, this.priorityProvider);
log.debug("load topic route from namesrv. topic: {}, queue: {}",
topic, tmp);
return tmp;
}
return MessageQueueView.WRAPPED_EMPTY_QUEUE;
}
+
+ public void
setPriorityProvider(MessageQueuePriorityProvider<AddressableMessageQueue>
priorityProvider) {
+ this.priorityProvider = priorityProvider;
+ }
+
+ public void addPenalizer(MessageQueuePenalizer<AddressableMessageQueue>
penalizer) {
+ this.penalizers.add(penalizer);
+ }
+
+ @VisibleForTesting
+ public static List<MessageQueuePenalizer<AddressableMessageQueue>>
buildPenalizerByMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
+ List<MessageQueuePenalizer<AddressableMessageQueue>> penalizers = new
ArrayList<>();
+ penalizers.add(messageQueue -> {
+ if (!mqFaultStrategy.isSendLatencyFaultEnable() ||
mqFaultStrategy.getAvailableFilter().filter(messageQueue)) {
+ return 0;
+ }
+ return 10;
+ });
+ penalizers.add(messageQueue -> {
+ if (!mqFaultStrategy.isSendLatencyFaultEnable() ||
mqFaultStrategy.getReachableFilter().filter(messageQueue)) {
+ return 0;
+ }
+ return 100;
+ });
+ return penalizers;
+ }
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
index a64867ddfe..870aa0424f 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
@@ -59,6 +59,7 @@ import org.assertj.core.util.Lists;
import org.junit.Before;
import org.junit.Test;
+import static
org.apache.rocketmq.proxy.service.route.TopicRouteService.buildPenalizerByMQFaultStrategy;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
@@ -379,7 +380,7 @@ public class SendMessageActivityTest extends
BaseActivityTest {
MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class);
when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy);
when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false);
- MessageQueueView messageQueueView = new MessageQueueView(TOPIC,
topicRouteData, topicRouteService.getMqFaultStrategy());
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC,
topicRouteData, null);
AddressableMessageQueue firstSelect =
selector.select(ProxyContext.create(), messageQueueView);
AddressableMessageQueue secondSelect =
selector.select(ProxyContext.create(), messageQueueView);
@@ -415,10 +416,7 @@ public class SendMessageActivityTest extends
BaseActivityTest {
mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, true);
mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, false);
- TopicRouteService topicRouteService = mock(TopicRouteService.class);
-
when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy);
- MessageQueueView messageQueueView = new MessageQueueView(TOPIC,
topicRouteData, topicRouteService.getMqFaultStrategy());
-
+ MessageQueueView messageQueueView = new MessageQueueView(TOPIC,
topicRouteData, buildPenalizerByMQFaultStrategy(mqFaultStrategy));
AddressableMessageQueue firstSelect =
selector.select(ProxyContext.create(), messageQueueView);
assertEquals(firstSelect.getBrokerName(), BROKER_NAME2);
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java
new file mode 100644
index 0000000000..f31d973cce
--- /dev/null
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class MessageQueuePenalizerTest {
+
+ /**
+ * Test evaluatePenalty with null messageQueue should throw
NullPointerException
+ */
+ @Test(expected = NullPointerException.class)
+ public void testEvaluatePenalty_NullMessageQueue() {
+ List<MessageQueuePenalizer<MessageQueue>> penalizers = new
ArrayList<>();
+ penalizers.add(mq -> 10);
+ MessageQueuePenalizer.evaluatePenalty(null, penalizers);
+ }
+
+ /**
+ * Test evaluatePenalty with null penalizers should return 0
+ */
+ @Test
+ public void testEvaluatePenalty_NullPenalizers() {
+ MessageQueue mq = new MessageQueue("topic", "broker", 0);
+ int penalty = MessageQueuePenalizer.evaluatePenalty(mq, null);
+ assertEquals(0, penalty);
+ }
+
+ /**
+ * Test evaluatePenalty with empty penalizers should return 0
+ */
+ @Test
+ public void testEvaluatePenalty_EmptyPenalizers() {
+ MessageQueue mq = new MessageQueue("topic", "broker", 0);
+ int penalty = MessageQueuePenalizer.evaluatePenalty(mq,
Collections.emptyList());
+ assertEquals(0, penalty);
+ }
+
+ /**
+ * Test evaluatePenalty aggregates penalties from multiple penalizers by
summing them up
+ */
+ @Test
+ public void testEvaluatePenalty_MultiplePenalizers() {
+ MessageQueue mq = new MessageQueue("topic", "broker", 0);
+ List<MessageQueuePenalizer<MessageQueue>> penalizers = Arrays.asList(
+ q -> 10,
+ q -> 20,
+ q -> 5
+ );
+ int penalty = MessageQueuePenalizer.evaluatePenalty(mq, penalizers);
+ assertEquals(35, penalty);
+ }
+
+ /**
+ * Test evaluatePenalty with negative penalties (sum should still work)
+ */
+ @Test
+ public void testEvaluatePenalty_NegativePenalties() {
+ MessageQueue mq = new MessageQueue("topic", "broker", 0);
+ List<MessageQueuePenalizer<MessageQueue>> penalizers = Arrays.asList(
+ q -> -5,
+ q -> 10,
+ q -> -3
+ );
+ int penalty = MessageQueuePenalizer.evaluatePenalty(mq, penalizers);
+ assertEquals(2, penalty);
+ }
+
+ /**
+ * Test selectLeastPenalty with null queues should return null
+ */
+ @Test
+ public void testSelectLeastPenalty_NullQueues() {
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(mq -> 10);
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenalty(null, penalizers, startIndex);
+ assertNull(result);
+ }
+
+ /**
+ * Test selectLeastPenalty with empty queues should return null
+ */
+ @Test
+ public void testSelectLeastPenalty_EmptyQueues() {
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(mq -> 10);
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenalty(
+ Collections.emptyList(), penalizers, startIndex);
+ assertNull(result);
+ }
+
+ /**
+ * Test selectLeastPenalty selects the queue with the lowest penalty
+ */
+ @Test
+ public void testSelectLeastPenalty_LowestPenalty() {
+ MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+ MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+ MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+ List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+ // Penalizer that assigns different penalties based on queue id
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(
+ mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? 10 : 30)
+ );
+
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex);
+
+ assertNotNull(result);
+ assertEquals(mq1, result.getLeft());
+ assertEquals(10, result.getRight().intValue());
+ }
+
+ /**
+ * Test selectLeastPenalty short-circuits when penalty <= 0
+ */
+ @Test
+ public void testSelectLeastPenalty_ShortCircuitZeroPenalty() {
+ MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+ MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+ MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+ List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+ // mq1 has penalty 0, should short-circuit
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(
+ mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? 0 : 30)
+ );
+
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex);
+
+ assertNotNull(result);
+ assertEquals(mq1, result.getLeft());
+ assertEquals(0, result.getRight().intValue());
+ }
+
+ /**
+ * Test selectLeastPenalty short-circuits when penalty is negative
+ */
+ @Test
+ public void testSelectLeastPenalty_ShortCircuitNegativePenalty() {
+ MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+ MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+ MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+ List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+ // mq1 has penalty -5, should short-circuit
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(
+ mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? -5 : 30)
+ );
+
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex);
+
+ assertNotNull(result);
+ assertEquals(mq1, result.getLeft());
+ assertEquals(-5, result.getRight().intValue());
+ }
+
+ /**
+ * Test selectLeastPenalty with round-robin behavior (rotating start index)
+ * Verifies that startIndex affects the iteration order
+ */
+ @Test
+ public void testSelectLeastPenalty_RoundRobinStartIndex() {
+ MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+ MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+ MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+ List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+ // All queues have penalty 0, so whichever is encountered first will
be returned
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(mq -> 0);
+
+ // Starting from index 0
+ AtomicInteger startIndex1 = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result1 =
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex1);
+ assertNotNull(result1);
+ assertEquals(mq0, result1.getLeft());
+
+ // Starting from index 1
+ AtomicInteger startIndex2 = new AtomicInteger(1);
+ Pair<MessageQueue, Integer> result2 =
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex2);
+ assertNotNull(result2);
+ assertEquals(mq1, result2.getLeft());
+
+ // Starting from index 2
+ AtomicInteger startIndex3 = new AtomicInteger(2);
+ Pair<MessageQueue, Integer> result3 =
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex3);
+ assertNotNull(result3);
+ assertEquals(mq2, result3.getLeft());
+ }
+
+ /**
+ * Test selectLeastPenalty increments startIndex for each iteration
+ */
+ @Test
+ public void testSelectLeastPenalty_IncrementStartIndex() {
+ MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+ MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+ MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+ List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(mq -> 10);
+
+ AtomicInteger startIndex = new AtomicInteger(0);
+ MessageQueuePenalizer.selectLeastPenalty(queues, penalizers,
startIndex);
+
+ // After iterating through 3 queues, startIndex should be incremented
3 times
+ assertEquals(3, startIndex.get());
+ }
+
+ /**
+ * Test selectLeastPenalty handles startIndex wrapping with Math.floorMod
+ */
+ @Test
+ public void testSelectLeastPenalty_StartIndexWrapping() {
+ MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+ MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+ MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+ List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(mq -> 0);
+
+ // Start with large index to test wrapping
+ AtomicInteger startIndex = new AtomicInteger(100);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex);
+
+ assertNotNull(result);
+ // 100 % 3 = 1, so should start from mq1
+ assertEquals(mq1, result.getLeft());
+ }
+
+ /**
+ * Test selectLeastPenaltyWithPriority with null queuesWithPriority should
return null
+ */
+ @Test
+ public void testSelectLeastPenaltyWithPriority_NullQueues() {
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(mq -> 10);
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+ null, penalizers, startIndex);
+ assertNull(result);
+ }
+
+ /**
+ * Test selectLeastPenaltyWithPriority with empty queuesWithPriority
should return null
+ */
+ @Test
+ public void testSelectLeastPenaltyWithPriority_EmptyQueues() {
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(mq -> 10);
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+ Collections.emptyList(), penalizers, startIndex);
+ assertNull(result);
+ }
+
+ /**
+ * Test selectLeastPenaltyWithPriority with single priority group
delegates to selectLeastPenalty
+ */
+ @Test
+ public void testSelectLeastPenaltyWithPriority_SinglePriorityGroup() {
+ MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+ MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+ List<MessageQueue> queues = Arrays.asList(mq0, mq1);
+
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(
+ mq -> mq.getQueueId() == 0 ? 20 : 10
+ );
+
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+ Collections.singletonList(queues), penalizers, startIndex);
+
+ assertNotNull(result);
+ assertEquals(mq1, result.getLeft());
+ assertEquals(10, result.getRight().intValue());
+ }
+
+ /**
+ * Test selectLeastPenaltyWithPriority selects queue with lowest penalty
across multiple priority groups
+ */
+ @Test
+ public void testSelectLeastPenaltyWithPriority_MultiplePriorityGroups() {
+ // Priority group 1 (higher priority)
+ MessageQueue mq0 = new MessageQueue("topic", "broker-high", 0);
+ MessageQueue mq1 = new MessageQueue("topic", "broker-high", 1);
+ List<MessageQueue> highPriorityQueues = Arrays.asList(mq0, mq1);
+
+ // Priority group 2 (lower priority)
+ MessageQueue mq2 = new MessageQueue("topic", "broker-low", 0);
+ MessageQueue mq3 = new MessageQueue("topic", "broker-low", 1);
+ List<MessageQueue> lowPriorityQueues = Arrays.asList(mq2, mq3);
+
+ List<List<MessageQueue>> queuesWithPriority =
Arrays.asList(highPriorityQueues, lowPriorityQueues);
+
+ // Assign penalties: high-priority queues have higher penalties,
low-priority have lower
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(
+ mq -> mq.getBrokerName().equals("broker-high") ? 50 : 10
+ );
+
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+ queuesWithPriority, penalizers, startIndex);
+
+ assertNotNull(result);
+ // Should select from low-priority group because it has lower penalty
+ assertTrue(result.getLeft().getBrokerName().equals("broker-low"));
+ assertEquals(10, result.getRight().intValue());
+ }
+
+ /**
+ * Test selectLeastPenaltyWithPriority short-circuits when a priority
group yields penalty <= 0
+ */
+ @Test
+ public void testSelectLeastPenaltyWithPriority_ShortCircuitZeroPenalty() {
+ // Priority group 1
+ MessageQueue mq0 = new MessageQueue("topic", "broker-high", 0);
+ List<MessageQueue> highPriorityQueues = Collections.singletonList(mq0);
+
+ // Priority group 2
+ MessageQueue mq1 = new MessageQueue("topic", "broker-low", 0);
+ List<MessageQueue> lowPriorityQueues = Collections.singletonList(mq1);
+
+ List<List<MessageQueue>> queuesWithPriority =
Arrays.asList(highPriorityQueues, lowPriorityQueues);
+
+ // First group has penalty 0, should short-circuit
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(
+ mq -> mq.getBrokerName().equals("broker-high") ? 0 : 100
+ );
+
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+ queuesWithPriority, penalizers, startIndex);
+
+ assertNotNull(result);
+ assertEquals(mq0, result.getLeft());
+ assertEquals(0, result.getRight().intValue());
+ }
+
+ /**
+ * Test selectLeastPenaltyWithPriority when first group encounters zero
penalty during iteration
+ */
+ @Test
+ public void testSelectLeastPenaltyWithPriority_FirstGroupHasZeroPenalty() {
+ // Priority group 1
+ MessageQueue mq0 = new MessageQueue("topic", "broker1", 0);
+ MessageQueue mq1 = new MessageQueue("topic", "broker1", 1);
+ List<MessageQueue> group1 = Arrays.asList(mq0, mq1);
+
+ // Priority group 2
+ MessageQueue mq2 = new MessageQueue("topic", "broker2", 0);
+ List<MessageQueue> group2 = Collections.singletonList(mq2);
+
+ List<List<MessageQueue>> queuesWithPriority = Arrays.asList(group1,
group2);
+
+ // mq1 in first group has penalty 0
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(
+ mq -> mq.getQueueId() == 1 && mq.getBrokerName().equals("broker1")
? 0 : 50
+ );
+
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+ queuesWithPriority, penalizers, startIndex);
+
+ assertNotNull(result);
+ assertEquals(mq1, result.getLeft());
+ assertEquals(0, result.getRight().intValue());
+ }
+
+ /**
+ * Test selectLeastPenaltyWithPriority returns first encountered minimum
when multiple groups have same minimum penalty
+ */
+ @Test
+ public void testSelectLeastPenaltyWithPriority_SameMinimumPenalty() {
+ // Priority group 1
+ MessageQueue mq0 = new MessageQueue("topic", "broker1", 0);
+ List<MessageQueue> group1 = Collections.singletonList(mq0);
+
+ // Priority group 2
+ MessageQueue mq1 = new MessageQueue("topic", "broker2", 0);
+ List<MessageQueue> group2 = Collections.singletonList(mq1);
+
+ // Priority group 3
+ MessageQueue mq2 = new MessageQueue("topic", "broker3", 0);
+ List<MessageQueue> group3 = Collections.singletonList(mq2);
+
+ List<List<MessageQueue>> queuesWithPriority = Arrays.asList(group1,
group2, group3);
+
+ // All have same penalty
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(mq -> 10);
+
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+ queuesWithPriority, penalizers, startIndex);
+
+ assertNotNull(result);
+ // Should return first encountered (from group1)
+ assertEquals(mq0, result.getLeft());
+ assertEquals(10, result.getRight().intValue());
+ }
+
+ /**
+ * Test selectLeastPenaltyWithPriority with complex scenario:
+ * Multiple priority groups with varying penalties
+ */
+ @Test
+ public void testSelectLeastPenaltyWithPriority_ComplexScenario() {
+ // Priority group 1: penalties 100, 90
+ MessageQueue mq0 = new MessageQueue("topic", "broker1", 0);
+ MessageQueue mq1 = new MessageQueue("topic", "broker1", 1);
+ List<MessageQueue> group1 = Arrays.asList(mq0, mq1);
+
+ // Priority group 2: penalties 50, 30
+ MessageQueue mq2 = new MessageQueue("topic", "broker2", 0);
+ MessageQueue mq3 = new MessageQueue("topic", "broker2", 1);
+ List<MessageQueue> group2 = Arrays.asList(mq2, mq3);
+
+ // Priority group 3: penalties 80, 20
+ MessageQueue mq4 = new MessageQueue("topic", "broker3", 0);
+ MessageQueue mq5 = new MessageQueue("topic", "broker3", 1);
+ List<MessageQueue> group3 = Arrays.asList(mq4, mq5);
+
+ List<List<MessageQueue>> queuesWithPriority = Arrays.asList(group1,
group2, group3);
+
+ List<MessageQueuePenalizer<MessageQueue>> penalizers =
Collections.singletonList(mq -> {
+ if (mq.getBrokerName().equals("broker1")) {
+ return mq.getQueueId() == 0 ? 100 : 90;
+ } else if (mq.getBrokerName().equals("broker2")) {
+ return mq.getQueueId() == 0 ? 50 : 30;
+ } else {
+ return mq.getQueueId() == 0 ? 80 : 20;
+ }
+ });
+
+ AtomicInteger startIndex = new AtomicInteger(0);
+ Pair<MessageQueue, Integer> result =
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+ queuesWithPriority, penalizers, startIndex);
+
+ assertNotNull(result);
+ // Should select mq5 from group3 with penalty 20 (the global minimum)
+ assertEquals(mq5, result.getLeft());
+ assertEquals(20, result.getRight().intValue());
+ }
+}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java
new file mode 100644
index 0000000000..22f2a68e8b
--- /dev/null
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class MessageQueuePriorityProviderTest {
+
+ @Test
+ public void testPriorityOfWithLambda() {
+ // Test functional interface implementation using lambda
+ MessageQueuePriorityProvider<MessageQueue> provider = mq ->
mq.getQueueId();
+
+ MessageQueue queue1 = new MessageQueue("topic", "broker", 0);
+ MessageQueue queue2 = new MessageQueue("topic", "broker", 5);
+ MessageQueue queue3 = new MessageQueue("topic", "broker", 10);
+
+ assertEquals(0, provider.priorityOf(queue1));
+ assertEquals(5, provider.priorityOf(queue2));
+ assertEquals(10, provider.priorityOf(queue3));
+ }
+
+ @Test
+ public void testPriorityOfWithConstantValue() {
+ // Test with constant priority
+ MessageQueuePriorityProvider<MessageQueue> constantProvider = mq -> 1;
+
+ MessageQueue queue1 = new MessageQueue("topic1", "broker1", 0);
+ MessageQueue queue2 = new MessageQueue("topic2", "broker2", 5);
+
+ assertEquals(1, constantProvider.priorityOf(queue1));
+ assertEquals(1, constantProvider.priorityOf(queue2));
+ }
+
+ @Test
+ public void testPriorityOfBasedOnBrokerName() {
+ // Test priority based on broker name hash
+ MessageQueuePriorityProvider<MessageQueue> brokerProvider =
+ mq -> mq.getBrokerName().hashCode() % 10;
+
+ MessageQueue queue1 = new MessageQueue("topic", "broker-a", 0);
+ MessageQueue queue2 = new MessageQueue("topic", "broker-b", 0);
+
+ int priority1 = brokerProvider.priorityOf(queue1);
+ int priority2 = brokerProvider.priorityOf(queue2);
+
+ // Priorities should be deterministic for the same broker
+ assertEquals(priority1, brokerProvider.priorityOf(queue1));
+ assertEquals(priority2, brokerProvider.priorityOf(queue2));
+ }
+
+ @Test
+ public void testBuildPriorityGroupsWithNullList() {
+ MessageQueuePriorityProvider<MessageQueue> provider = mq -> 0;
+ List<List<MessageQueue>> result =
MessageQueuePriorityProvider.buildPriorityGroups(null, provider);
+
+ assertNotNull(result);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testBuildPriorityGroupsWithEmptyList() {
+ MessageQueuePriorityProvider<MessageQueue> provider = mq -> 0;
+ List<List<MessageQueue>> result =
MessageQueuePriorityProvider.buildPriorityGroups(
+ Collections.emptyList(), provider);
+
+ assertNotNull(result);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testBuildPriorityGroupsWithSinglePriority() {
+ MessageQueuePriorityProvider<MessageQueue> provider = mq -> 0;
+
+ List<MessageQueue> queues = Arrays.asList(
+ new MessageQueue("topic", "broker1", 0),
+ new MessageQueue("topic", "broker1", 1),
+ new MessageQueue("topic", "broker1", 2)
+ );
+
+ List<List<MessageQueue>> result =
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ assertEquals(3, result.get(0).size());
+ }
+
+ @Test
+ public void testBuildPriorityGroupsWithMultiplePriorities() {
+ // Priority based on queue ID: 0->high, 1->medium, 2->low
+ MessageQueuePriorityProvider<MessageQueue> provider = mq -> {
+ if (mq.getQueueId() < 2) return 0; // High priority
+ if (mq.getQueueId() < 4) return 1; // Medium priority
+ return 2; // Low priority
+ };
+
+ List<MessageQueue> queues = Arrays.asList(
+ new MessageQueue("topic", "broker", 0), // priority 0
+ new MessageQueue("topic", "broker", 1), // priority 0
+ new MessageQueue("topic", "broker", 2), // priority 1
+ new MessageQueue("topic", "broker", 3), // priority 1
+ new MessageQueue("topic", "broker", 4), // priority 2
+ new MessageQueue("topic", "broker", 5) // priority 2
+ );
+
+ List<List<MessageQueue>> result =
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+
+ assertNotNull(result);
+ assertEquals(3, result.size());
+
+ // First group (highest priority 0)
+ assertEquals(2, result.get(0).size());
+ assertEquals(0, result.get(0).get(0).getQueueId());
+ assertEquals(1, result.get(0).get(1).getQueueId());
+
+ // Second group (medium priority 1)
+ assertEquals(2, result.get(1).size());
+ assertEquals(2, result.get(1).get(0).getQueueId());
+ assertEquals(3, result.get(1).get(1).getQueueId());
+
+ // Third group (low priority 2)
+ assertEquals(2, result.get(2).size());
+ assertEquals(4, result.get(2).get(0).getQueueId());
+ assertEquals(5, result.get(2).get(1).getQueueId());
+ }
+
+ @Test
+ public void testBuildPriorityGroupsOrderedByPriority() {
+ // Test that groups are ordered from high to low priority (ascending
numeric value)
+ MessageQueuePriorityProvider<MessageQueue> provider = mq ->
mq.getQueueId();
+
+ List<MessageQueue> queues = Arrays.asList(
+ new MessageQueue("topic", "broker", 5),
+ new MessageQueue("topic", "broker", 0),
+ new MessageQueue("topic", "broker", 3),
+ new MessageQueue("topic", "broker", 1)
+ );
+
+ List<List<MessageQueue>> result =
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+
+ assertNotNull(result);
+ assertEquals(4, result.size());
+
+ // Verify order: 0, 1, 3, 5 (ascending)
+ assertEquals(0, result.get(0).get(0).getQueueId());
+ assertEquals(1, result.get(1).get(0).getQueueId());
+ assertEquals(3, result.get(2).get(0).getQueueId());
+ assertEquals(5, result.get(3).get(0).getQueueId());
+ }
+
+ @Test
+ public void testBuildPriorityGroupsWithNegativePriorities() {
+ // Test with negative priority values
+ MessageQueuePriorityProvider<MessageQueue> provider = mq ->
mq.getQueueId() - 5;
+
+ List<MessageQueue> queues = Arrays.asList(
+ new MessageQueue("topic", "broker", 0), // priority -5
+ new MessageQueue("topic", "broker", 5), // priority 0
+ new MessageQueue("topic", "broker", 10) // priority 5
+ );
+
+ List<List<MessageQueue>> result =
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+
+ assertNotNull(result);
+ assertEquals(3, result.size());
+
+ // Verify order: -5, 0, 5 (ascending)
+ assertEquals(0, result.get(0).get(0).getQueueId());
+ assertEquals(5, result.get(1).get(0).getQueueId());
+ assertEquals(10, result.get(2).get(0).getQueueId());
+ }
+
+ @Test
+ public void testBuildPriorityGroupsWithMixedBrokers() {
+ // Priority based on broker name
+ MessageQueuePriorityProvider<MessageQueue> provider = mq -> {
+ if (mq.getBrokerName().equals("broker-high")) return 0;
+ if (mq.getBrokerName().equals("broker-medium")) return 1;
+ return 2;
+ };
+
+ List<MessageQueue> queues = Arrays.asList(
+ new MessageQueue("topic", "broker-high", 0),
+ new MessageQueue("topic", "broker-low", 0),
+ new MessageQueue("topic", "broker-medium", 0),
+ new MessageQueue("topic", "broker-high", 1),
+ new MessageQueue("topic", "broker-medium", 1)
+ );
+
+ List<List<MessageQueue>> result =
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+
+ assertNotNull(result);
+ assertEquals(3, result.size());
+
+ // High priority group
+ assertEquals(2, result.get(0).size());
+ assertEquals("broker-high", result.get(0).get(0).getBrokerName());
+ assertEquals("broker-high", result.get(0).get(1).getBrokerName());
+
+ // Medium priority group
+ assertEquals(2, result.get(1).size());
+ assertEquals("broker-medium", result.get(1).get(0).getBrokerName());
+
+ // Low priority group
+ assertEquals(1, result.get(2).size());
+ assertEquals("broker-low", result.get(2).get(0).getBrokerName());
+ }
+
+ @Test
+ public void testBuildPriorityGroupsPreservesQueueOrder() {
+ // Test that queues with same priority maintain their relative order
+ MessageQueuePriorityProvider<MessageQueue> provider = mq -> 0;
+
+ List<MessageQueue> queues = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ queues.add(new MessageQueue("topic", "broker", i));
+ }
+
+ List<List<MessageQueue>> result =
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ assertEquals(10, result.get(0).size());
+
+ // Verify order is maintained
+ for (int i = 0; i < 10; i++) {
+ assertEquals(i, result.get(0).get(i).getQueueId());
+ }
+ }
+
+ @Test
+ public void testBuildPriorityGroupsWithCustomMessageQueue() {
+ // Test with extended MessageQueue type
+ class CustomMessageQueue extends MessageQueue {
+ private int customPriority;
+
+ public CustomMessageQueue(String topic, String brokerName, int
queueId, int customPriority) {
+ super(topic, brokerName, queueId);
+ this.customPriority = customPriority;
+ }
+
+ public int getCustomPriority() {
+ return customPriority;
+ }
+ }
+
+ MessageQueuePriorityProvider<CustomMessageQueue> provider =
+ CustomMessageQueue::getCustomPriority;
+
+ List<CustomMessageQueue> queues = Arrays.asList(
+ new CustomMessageQueue("topic", "broker", 0, 2),
+ new CustomMessageQueue("topic", "broker", 1, 0),
+ new CustomMessageQueue("topic", "broker", 2, 1)
+ );
+
+ List<List<CustomMessageQueue>> result =
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+
+ assertNotNull(result);
+ assertEquals(3, result.size());
+
+ // Verify order by custom priority: 0, 1, 2
+ assertEquals(0, result.get(0).get(0).getCustomPriority());
+ assertEquals(1, result.get(1).get(0).getCustomPriority());
+ assertEquals(2, result.get(2).get(0).getCustomPriority());
+ }
+
+ @Test
+ public void testBuildPriorityGroupsWithLargeNumberOfQueues() {
+ // Test with large number of queues
+ MessageQueuePriorityProvider<MessageQueue> provider = mq ->
mq.getQueueId() % 5;
+
+ List<MessageQueue> queues = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ queues.add(new MessageQueue("topic", "broker", i));
+ }
+
+ List<List<MessageQueue>> result =
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+
+ assertNotNull(result);
+ assertEquals(5, result.size()); // 5 different priorities (0-4)
+
+ // Each group should have 20 queues (100 / 5)
+ for (List<MessageQueue> group : result) {
+ assertEquals(20, group.size());
+ }
+ }
+}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
index d150f87c40..e44ed28f4a 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
@@ -30,12 +30,12 @@ public class MessageQueueSelectorTest extends
BaseServiceTest {
public void testReadMessageQueue() {
queueData.setPerm(PermName.PERM_READ);
queueData.setReadQueueNums(0);
- MessageQueueSelector messageQueueSelector = new
MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true);
+ MessageQueueSelector messageQueueSelector = new
MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true);
assertTrue(messageQueueSelector.getQueues().isEmpty());
queueData.setPerm(PermName.PERM_READ);
queueData.setReadQueueNums(3);
- messageQueueSelector = new MessageQueueSelector(new
TopicRouteWrapper(topicRouteData, TOPIC), null, true);
+ messageQueueSelector = new MessageQueueSelector(new
TopicRouteWrapper(topicRouteData, TOPIC), true);
assertEquals(3, messageQueueSelector.getQueues().size());
assertEquals(1, messageQueueSelector.getBrokerActingQueues().size());
for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) {
@@ -58,12 +58,12 @@ public class MessageQueueSelectorTest extends
BaseServiceTest {
public void testWriteMessageQueue() {
queueData.setPerm(PermName.PERM_WRITE);
queueData.setReadQueueNums(0);
- MessageQueueSelector messageQueueSelector = new
MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false);
+ MessageQueueSelector messageQueueSelector = new
MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false);
assertTrue(messageQueueSelector.getQueues().isEmpty());
queueData.setPerm(PermName.PERM_WRITE);
queueData.setWriteQueueNums(3);
- messageQueueSelector = new MessageQueueSelector(new
TopicRouteWrapper(topicRouteData, TOPIC), null, false);
+ messageQueueSelector = new MessageQueueSelector(new
TopicRouteWrapper(topicRouteData, TOPIC), false);
assertEquals(3, messageQueueSelector.getQueues().size());
assertEquals(1, messageQueueSelector.getBrokerActingQueues().size());
for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) {