This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2219fd8c1a [ISSUE #9244] Avoid writing dirty data in consumption mode
(#9245)
2219fd8c1a is described below
commit 2219fd8c1aee3bc42f6a66394e0e4cf131006a26
Author: hqbfz <[email protected]>
AuthorDate: Mon Mar 17 17:32:41 2025 +0800
[ISSUE #9244] Avoid writing dirty data in consumption mode (#9245)
---
.../broker/processor/QueryAssignmentProcessor.java | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
index 2f4cb7b15f..d29e3d0e06 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
@@ -33,6 +33,7 @@ import
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueAssignment;
@@ -49,6 +50,7 @@ import
org.apache.rocketmq.remoting.protocol.body.QueryAssignmentRequestBody;
import org.apache.rocketmq.remoting.protocol.body.QueryAssignmentResponseBody;
import
org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
public class QueryAssignmentProcessor implements NettyRequestProcessor {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -314,8 +316,20 @@ public class QueryAssignmentProcessor implements
NettyRequestProcessor {
response.setRemark("retry topic is not allowed to set mode");
return response;
}
+ TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+ if (null == topicConfig) {
+ response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+ response.setRemark("topic[" + topic + "] not exist");
+ return response;
+ }
final String consumerGroup = requestBody.getConsumerGroup();
+ SubscriptionGroupConfig groupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(consumerGroup);
+ if (null == groupConfig) {
+ response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+ response.setRemark("subscription group does not exist");
+ return response;
+ }
this.messageRequestModeManager.setMessageRequestMode(topic,
consumerGroup, requestBody);
this.messageRequestModeManager.persist();