This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 586a274839 [ISSUE #9279] Restrict system subscription group creation
and add pull request rejection policy (#9280)
586a274839 is described below
commit 586a2748394be4825d17af45f31731fb7d5649fc
Author: ymwneu <[email protected]>
AuthorDate: Tue Apr 1 17:12:36 2025 +0800
[ISSUE #9279] Restrict system subscription group creation and add pull
request rejection policy (#9280)
---
.../rocketmq/broker/coldctr/ColdDataCgCtrService.java | 2 +-
.../rocketmq/broker/processor/PullMessageProcessor.java | 13 +++++++++++++
.../broker/subscription/SubscriptionGroupManager.java | 3 ++-
.../main/java/org/apache/rocketmq/common/BrokerConfig.java | 10 ++++++++++
common/src/main/java/org/apache/rocketmq/common/MixAll.java | 6 +++++-
.../java/org/apache/rocketmq/store/DefaultMessageStore.java | 2 +-
6 files changed, 32 insertions(+), 4 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
index 2e24930405..5b8b2fb9ce 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataCgCtrService.java
@@ -187,7 +187,7 @@ public class ColdDataCgCtrService extends ServiceThread {
if (!this.messageStoreConfig.isColdDataFlowControlEnable()) {
return false;
}
- if (MixAll.isSysConsumerGroupForNoColdReadLimit(consumerGroup)) {
+ if (MixAll.isSysConsumerGroupPullMessage(consumerGroup)) {
return false;
}
AccAndTimeStamp accAndTimeStamp =
cgColdThresholdMapRuntime.get(consumerGroup);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 5f0735e74c..5d947fd088 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -489,6 +489,19 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
this.brokerController.getConsumerFilterManager());
}
+ if (brokerController.getBrokerConfig().isRejectPullConsumerEnable()) {
+ ConsumerGroupInfo consumerGroupInfo =
+
this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+ if (null == consumerGroupInfo || ConsumeType.CONSUME_ACTIVELY ==
consumerGroupInfo.getConsumeType()) {
+ if ((null == consumerGroupInfo || null ==
consumerGroupInfo.findChannel(channel))
+ &&
!MixAll.isSysConsumerGroupPullMessage(requestHeader.getConsumerGroup())) {
+ response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+ response.setRemark("the consumer's group info not exist,
or the pull consumer is rejected by server." +
FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+ return response;
+ }
+ }
+ }
+
final MessageStore messageStore = brokerController.getMessageStore();
if (this.brokerController.getMessageStore() instanceof
DefaultMessageStore) {
DefaultMessageStore defaultMessageStore = (DefaultMessageStore)
this.brokerController.getMessageStore();
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index d85342e1a1..f3e669fb3e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -260,7 +260,8 @@ public class SubscriptionGroupManager extends ConfigManager
{
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String
group) {
SubscriptionGroupConfig subscriptionGroupConfig =
getSubscriptionGroupConfig(group);
if (null == subscriptionGroupConfig) {
- if
(brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() ||
MixAll.isSysConsumerGroup(group)) {
+ if
(brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup()
+ || MixAll.isSysConsumerGroupAndEnableCreate(group,
brokerController.getBrokerConfig().isEnableCreateSysGroup())) {
if (group.length() > Validators.CHARACTER_MAX_LENGTH ||
TopicValidator.isTopicOrGroupIllegal(group)) {
return null;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index b7ec944505..44f5e1eff0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -457,6 +457,8 @@ public class BrokerConfig extends BrokerIdentity {
private boolean recallMessageEnable = false;
+ private boolean enableCreateSysGroup = true;
+
public String getConfigBlackList() {
return configBlackList;
}
@@ -2016,4 +2018,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setRecallMessageEnable(boolean recallMessageEnable) {
this.recallMessageEnable = recallMessageEnable;
}
+
+ public boolean isEnableCreateSysGroup() {
+ return enableCreateSysGroup;
+ }
+
+ public void setEnableCreateSysGroup(boolean enableCreateSysGroup) {
+ this.enableCreateSysGroup = enableCreateSysGroup;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index c05a1d1926..aca9bd4ed7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -178,6 +178,10 @@ public class MixAll {
return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
}
+ public static boolean isSysConsumerGroupAndEnableCreate(final String
consumerGroup, final boolean isEnableCreateSysGroup) {
+ return isEnableCreateSysGroup && isSysConsumerGroup(consumerGroup);
+ }
+
public static boolean isPredefinedGroup(final String consumerGroup) {
return PREDEFINE_GROUP_SET.contains(consumerGroup);
}
@@ -530,7 +534,7 @@ public class MixAll {
return path.normalize().toString();
}
- public static boolean isSysConsumerGroupForNoColdReadLimit(String
consumerGroup) {
+ public static boolean isSysConsumerGroupPullMessage(String consumerGroup) {
if (DEFAULT_CONSUMER_GROUP.equals(consumerGroup)
|| TOOLS_CONSUMER_GROUP.equals(consumerGroup)
|| SCHEDULE_CONSUMER_GROUP.equals(consumerGroup)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 13af812c3f..fc6bc4213a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -917,7 +917,7 @@ public class DefaultMessageStore implements MessageStore {
continue;
}
- if
(messageStoreConfig.isColdDataFlowControlEnable() &&
!MixAll.isSysConsumerGroupForNoColdReadLimit(group) &&
!selectResult.isInCache()) {
+ if
(messageStoreConfig.isColdDataFlowControlEnable() &&
!MixAll.isSysConsumerGroupPullMessage(group) && !selectResult.isInCache()) {
getResult.setColdDataSum(getResult.getColdDataSum() + sizePy);
}