This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7fe0349f71 [ISSUE #10063] Notification request adds subscription 
expression to support on-demand wake-up (#10064)
7fe0349f71 is described below

commit 7fe0349f7167de2b97b644602e04c9fe05a84333
Author: qianye <[email protected]>
AuthorDate: Thu Feb 5 10:55:32 2026 +0800

    [ISSUE #10063] Notification request adds subscription expression to support 
on-demand wake-up (#10064)
---
 .../broker/processor/NotificationProcessor.java    | 102 ++++++++++++++++++---
 .../org/apache/rocketmq/common/BrokerConfig.java   |  22 ++++-
 .../protocol/header/NotificationRequestHeader.java |  19 ++++
 .../rocketmq/test/client/rmq/RMQPopClient.java     |   8 ++
 4 files changed, 137 insertions(+), 14 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 4563132fe4..24b587d1c6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -20,7 +20,11 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.util.Map;
 import java.util.Random;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
+import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.broker.longpolling.PollingHeader;
 import org.apache.rocketmq.broker.longpolling.PollingResult;
 import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
@@ -29,6 +33,7 @@ import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -37,10 +42,17 @@ import 
org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.exception.ConsumeQueueException;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+import org.apache.rocketmq.store.queue.CqUnit;
+import org.apache.rocketmq.store.queue.ReferredIterator;
+import org.rocksdb.RocksDBException;
 
 public class NotificationProcessor implements NettyRequestProcessor {
     private static final Logger POP_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -136,25 +148,60 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
         int randomQ = random.nextInt(100);
         boolean hasMsg = false;
         BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+
+        SubscriptionData subscriptionData = null;
+        ExpressionMessageFilter messageFilter = null;
+        if (brokerConfig.isUseMessageFilterForNotification() &&
+            StringUtils.isNotEmpty(requestHeader.getExpType()) &&
+            StringUtils.isNotEmpty(requestHeader.getExp())) {
+            try {
+                // origin topic
+                subscriptionData = FilterAPI.build(
+                    requestHeader.getTopic(), requestHeader.getExp(), 
requestHeader.getExpType());
+
+                ConsumerFilterData consumerFilterData = null;
+                if 
(!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+                    consumerFilterData = ConsumerFilterManager.build(
+                        requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), requestHeader.getExp(),
+                        requestHeader.getExpType(), 
System.currentTimeMillis());
+                    if (consumerFilterData == null) {
+                        POP_LOGGER.warn("Parse the consumer's subscription[{}] 
failed, group: {}",
+                            requestHeader.getExp(), 
requestHeader.getConsumerGroup());
+                        
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+                        response.setRemark("parse the consumer's subscription 
failed");
+                        return response;
+                    }
+                }
+                messageFilter = new ExpressionMessageFilter(
+                    subscriptionData, consumerFilterData, 
brokerController.getConsumerFilterManager());
+            } catch (Exception e) {
+                POP_LOGGER.warn("Parse the consumer's subscription[{}] error, 
group: {}", requestHeader.getExp(),
+                    requestHeader.getConsumerGroup());
+                response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+                response.setRemark("parse the consumer's subscription failed");
+                return response;
+            }
+        }
+
         if (requestHeader.getQueueId() < 0) {
             // read all queue
-            hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader);
+            hasMsg = hasMsgFromTopic(topicConfig, randomQ, requestHeader, 
subscriptionData, messageFilter);
         } else {
             int queueId = requestHeader.getQueueId();
-            hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), 
requestHeader, queueId);
+            hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), 
requestHeader, queueId, subscriptionData, messageFilter);
         }
         // if it doesn't have message, fetch retry
         if (!hasMsg) {
             String retryTopic = 
KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), brokerConfig.isEnableRetryTopicV2());
-            hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader);
+            hasMsg = hasMsgFromTopic(retryTopic, randomQ, requestHeader, null, 
null);
             if (!hasMsg && brokerConfig.isEnableRetryTopicV2() && 
brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
                 String retryTopicConfigV1 = 
KeyBuilder.buildPopRetryTopicV1(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
-                hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, 
requestHeader);
+                hasMsg = hasMsgFromTopic(retryTopicConfigV1, randomQ, 
requestHeader, null, null);
             }
         }
 
         if (!hasMsg) {
-            PollingResult pollingResult = popLongPollingService.polling(ctx, 
request, new PollingHeader(requestHeader));
+            PollingResult pollingResult = popLongPollingService.polling(ctx, 
request, new PollingHeader(requestHeader), subscriptionData, messageFilter);
             if (pollingResult == PollingResult.POLLING_SUC) {
                 return null;
             } else if (pollingResult == PollingResult.POLLING_FULL) {
@@ -166,19 +213,19 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
         return response;
     }
 
-    private boolean hasMsgFromTopic(String topicName, int randomQ, 
NotificationRequestHeader requestHeader)
+    private boolean hasMsgFromTopic(String topicName, int randomQ, 
NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, 
MessageFilter messageFilter)
         throws RemotingCommandException {
         TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(topicName);
-        return hasMsgFromTopic(topicConfig, randomQ, requestHeader);
+        return hasMsgFromTopic(topicConfig, randomQ, requestHeader, 
subscriptionData, messageFilter);
     }
 
-    private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, 
NotificationRequestHeader requestHeader)
+    private boolean hasMsgFromTopic(TopicConfig topicConfig, int randomQ, 
NotificationRequestHeader requestHeader, SubscriptionData subscriptionData, 
MessageFilter messageFilter)
         throws RemotingCommandException {
         boolean hasMsg;
         if (topicConfig != null) {
             for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
                 int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
-                hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), 
requestHeader, queueId);
+                hasMsg = hasMsgFromQueue(topicConfig.getTopicName(), 
requestHeader, queueId, subscriptionData, messageFilter);
                 if (hasMsg) {
                     return true;
                 }
@@ -187,7 +234,7 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
         return false;
     }
 
-    private boolean hasMsgFromQueue(String targetTopic, 
NotificationRequestHeader requestHeader, int queueId) throws 
RemotingCommandException {
+    private boolean hasMsgFromQueue(String targetTopic, 
NotificationRequestHeader requestHeader, int queueId, SubscriptionData 
subscriptionData, MessageFilter messageFilter) throws RemotingCommandException {
         if (Boolean.TRUE.equals(requestHeader.getOrder())) {
             if 
(this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getAttemptId(),
 requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
                 return false;
@@ -196,9 +243,40 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
         long offset = getPopOffset(targetTopic, 
requestHeader.getConsumerGroup(), queueId);
         try {
             long restNum = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(targetTopic, 
queueId) - offset;
+            int maxFilterMessageNum = 
this.brokerController.getBrokerConfig().getMaxMessageFilterNumForNotification();
+            boolean needFilter = restNum < maxFilterMessageNum &&
+                subscriptionData != null &&
+                messageFilter != null &&
+                ExpressionType.isTagType(subscriptionData.getExpressionType());
+            if (needFilter) {
+                ConsumeQueueInterface queue = 
this.brokerController.getMessageStore().getConsumeQueue(targetTopic, queueId);
+                // If the ConsumeQueue doesn't exist, it's not readable.
+                if (queue == null) {
+                    return false;
+                }
+                ReferredIterator<CqUnit> iterator = null;
+                try {
+                    // In order to take into account both the file CQ and the 
Rocksdb CQ,
+                    // the count passed here is 32.
+                    iterator = queue.iterateFrom(offset, 32);
+                    if (iterator != null) {
+                        while (iterator.hasNext()) {
+                            CqUnit cqUnit = iterator.next();
+                            if 
(messageFilter.isMatchedByConsumeQueue(cqUnit.getValidTagsCodeAsLong(), 
cqUnit.getCqExtUnit())) {
+                                return true;
+                            }
+                        }
+                        return false;
+                    }
+                } finally {
+                    if (iterator != null) {
+                        iterator.release();
+                    }
+                }
+            }
             return restNum > 0;
-        } catch (ConsumeQueueException e) {
-            throw new RemotingCommandException("Failed tp get max offset in 
queue", e);
+        } catch (ConsumeQueueException | RocksDBException e) {
+            throw new RemotingCommandException("Failed to get max offset in 
queue or iterate in queue", e);
         }
     }
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index caee5e45f2..7271c12b18 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.common;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.common.config.ConfigManagerVersion;
 import org.apache.rocketmq.common.constant.PermName;
@@ -24,8 +25,6 @@ import org.apache.rocketmq.common.metrics.MetricsExporterType;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 
-import java.util.concurrent.TimeUnit;
-
 public class BrokerConfig extends BrokerIdentity {
 
     private String brokerConfigPath = null;
@@ -256,6 +255,9 @@ public class BrokerConfig extends BrokerIdentity {
     private boolean useSeparateRetryQueue = false;
     private boolean realTimeNotifyConsumerChange = true;
 
+    private boolean useMessageFilterForNotification = true;
+    private int maxMessageFilterNumForNotification = 64;
+
     private boolean litePullMessageEnable = true;
 
     // The period to sync broker member group from namesrv, default value is 1 
second
@@ -2407,4 +2409,20 @@ public class BrokerConfig extends BrokerIdentity {
     public void setLiteLagLatencyTopK(int liteLagLatencyTopK) {
         this.liteLagLatencyTopK = liteLagLatencyTopK;
     }
+
+    public boolean isUseMessageFilterForNotification() {
+        return useMessageFilterForNotification;
+    }
+
+    public void setUseMessageFilterForNotification(boolean 
useMessageFilterForNotification) {
+        this.useMessageFilterForNotification = useMessageFilterForNotification;
+    }
+
+    public int getMaxMessageFilterNumForNotification() {
+        return maxMessageFilterNumForNotification;
+    }
+
+    public void setMaxMessageFilterNumForNotification(int 
maxMessageFilterNumForNotification) {
+        this.maxMessageFilterNumForNotification = 
maxMessageFilterNumForNotification;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
index 0e484f82c0..46c5930c1d 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/NotificationRequestHeader.java
@@ -44,6 +44,9 @@ public class NotificationRequestHeader extends 
TopicQueueRequestHeader {
     private Boolean order = Boolean.FALSE;
     private String attemptId;
 
+    private String expType;
+    private String exp;
+
     @CFNotNull
     @Override
     public void checkFields() throws RemotingCommandException {
@@ -108,6 +111,22 @@ public class NotificationRequestHeader extends 
TopicQueueRequestHeader {
         this.attemptId = attemptId;
     }
 
+    public String getExpType() {
+        return expType;
+    }
+
+    public void setExpType(String expType) {
+        this.expType = expType;
+    }
+
+    public String getExp() {
+        return exp;
+    }
+
+    public void setExp(String exp) {
+        this.exp = exp;
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index 09c60c0b45..c45a26c59d 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -199,6 +199,12 @@ public class RMQPopClient implements MQConsumer {
 
     public CompletableFuture<Boolean> notification(String brokerAddr, String 
topic,
         String consumerGroup, int queueId, Boolean order, String attemptId, 
long pollTime, long bornTime, long timeoutMillis) {
+        return notification(brokerAddr, topic, consumerGroup, queueId, order, 
attemptId, pollTime, bornTime, timeoutMillis, null, null);
+    }
+
+
+    public CompletableFuture<Boolean> notification(String brokerAddr, String 
topic,
+        String consumerGroup, int queueId, Boolean order, String attemptId, 
long pollTime, long bornTime, long timeoutMillis, String expType, String exp) {
         NotificationRequestHeader requestHeader = new 
NotificationRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setTopic(topic);
@@ -207,6 +213,8 @@ public class RMQPopClient implements MQConsumer {
         requestHeader.setBornTime(bornTime);
         requestHeader.setOrder(order);
         requestHeader.setAttemptId(attemptId);
+        requestHeader.setExpType(expType);
+        requestHeader.setExp(exp);
         return this.mqClientAPI.notification(brokerAddr, requestHeader, 
timeoutMillis);
     }
 }

Reply via email to