This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d1974c5535 [ISSUE #8269] Support pop consumption filter in long 
polling service (#8271)
d1974c5535 is described below

commit d1974c55353488095e5122f5ce361c150611f21a
Author: lizhimins <707364...@qq.com>
AuthorDate: Thu Jun 6 20:19:06 2024 +0800

    [ISSUE #8269] Support pop consumption filter in long polling service (#8271)
---
 .../longpolling/NotifyMessageArrivingListener.java | 11 ++++--
 .../broker/longpolling/PopLongPollingService.java  | 44 ++++++++++++++++++----
 .../rocketmq/broker/longpolling/PopRequest.java    | 25 +++++++++---
 .../broker/processor/AckMessageProcessor.java      | 13 +++----
 .../broker/processor/NotificationProcessor.java    | 11 +++++-
 .../broker/processor/PopMessageProcessor.java      | 40 ++++++++++++++------
 6 files changed, 107 insertions(+), 37 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
index e55ed2778a..1ddb9f4f8e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -36,9 +36,12 @@ public class NotifyMessageArrivingListener implements 
MessageArrivingListener {
     @Override
     public void arriving(String topic, int queueId, long logicOffset, long 
tagsCode,
                          long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
-        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, 
logicOffset, tagsCode,
-            msgStoreTime, filterBitMap, properties);
-        this.popMessageProcessor.notifyMessageArriving(topic, queueId);
-        this.notificationProcessor.notifyMessageArriving(topic, queueId);
+
+        this.pullRequestHoldService.notifyMessageArriving(
+            topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, 
properties);
+        this.popMessageProcessor.notifyMessageArriving(
+            topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+        this.notificationProcessor.notifyMessageArriving(
+            topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index a768fe4b9c..b5179114f3 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -35,6 +35,9 @@ import 
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.MessageFilter;
 
 import static org.apache.rocketmq.broker.longpolling.PollingResult.NOT_POLLING;
 import static 
org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_FULL;
@@ -147,39 +150,61 @@ public class PopLongPollingService extends ServiceThread {
     }
 
     public void notifyMessageArrivingWithRetryTopic(final String topic, final 
int queueId) {
+        this.notifyMessageArrivingWithRetryTopic(topic, queueId, null, 0L, 
null, null);
+    }
+
+    public void notifyMessageArrivingWithRetryTopic(final String topic, final 
int queueId,
+        Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
         String notifyTopic;
         if (KeyBuilder.isPopRetryTopicV2(topic)) {
             notifyTopic = KeyBuilder.parseNormalTopic(topic);
         } else {
             notifyTopic = topic;
         }
-        notifyMessageArriving(notifyTopic, queueId);
+        notifyMessageArriving(notifyTopic, queueId, tagsCode, msgStoreTime, 
filterBitMap, properties);
     }
 
-    public void notifyMessageArriving(final String topic, final int queueId) {
+    public void notifyMessageArriving(final String topic, final int queueId,
+        Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
         ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
         if (cids == null) {
             return;
         }
         for (Map.Entry<String, Byte> cid : cids.entrySet()) {
             if (queueId >= 0) {
-                notifyMessageArriving(topic, cid.getKey(), -1);
+                notifyMessageArriving(topic, -1, cid.getKey(), tagsCode, 
msgStoreTime, filterBitMap, properties);
             }
-            notifyMessageArriving(topic, cid.getKey(), queueId);
+            notifyMessageArriving(topic, queueId, cid.getKey(), tagsCode, 
msgStoreTime, filterBitMap, properties);
         }
     }
 
-    public boolean notifyMessageArriving(final String topic, final String cid, 
final int queueId) {
+    public boolean notifyMessageArriving(final String topic, final int 
queueId, final String cid,
+        Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
         ConcurrentSkipListSet<PopRequest> remotingCommands = 
pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId));
         if (remotingCommands == null || remotingCommands.isEmpty()) {
             return false;
         }
+
         PopRequest popRequest = pollRemotingCommands(remotingCommands);
         if (popRequest == null) {
             return false;
         }
+
+        if (popRequest.getMessageFilter() != null && 
popRequest.getSubscriptionData() != null) {
+            boolean match = 
popRequest.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
+                new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, 
filterBitMap));
+            if (match && properties != null) {
+                match = 
popRequest.getMessageFilter().isMatchedByCommitLog(null, properties);
+            }
+            if (!match) {
+                remotingCommands.add(popRequest);
+                totalPollingNum.incrementAndGet();
+                return false;
+            }
+        }
+
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
-            POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}", 
popRequest);
+            POP_LOGGER.info("lock release, new msg arrive, wakeUp: {}", 
popRequest);
         }
         return wakeUp(popRequest);
     }
@@ -221,6 +246,11 @@ public class PopLongPollingService extends ServiceThread {
      */
     public PollingResult polling(final ChannelHandlerContext ctx, 
RemotingCommand remotingCommand,
         final PollingHeader requestHeader) {
+        return this.polling(ctx, remotingCommand, requestHeader, null, null);
+    }
+
+    public PollingResult polling(final ChannelHandlerContext ctx, 
RemotingCommand remotingCommand,
+        final PollingHeader requestHeader, SubscriptionData subscriptionData, 
MessageFilter messageFilter) {
         if (requestHeader.getPollTime() <= 0 || this.isStopped()) {
             return NOT_POLLING;
         }
@@ -234,7 +264,7 @@ public class PopLongPollingService extends ServiceThread {
         }
         cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
         long expired = requestHeader.getBornTime() + 
requestHeader.getPollTime();
-        final PopRequest request = new PopRequest(remotingCommand, ctx, 
expired);
+        final PopRequest request = new PopRequest(remotingCommand, ctx, 
expired, subscriptionData, messageFilter);
         boolean isFull = totalPollingNum.get() >= 
this.brokerController.getBrokerConfig().getMaxPopPollingSize();
         if (isFull) {
             POP_LOGGER.info("polling {}, result POLLING_FULL, total:{}", 
remotingCommand, totalPollingNum.get());
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java
index a45bcce9f6..0419dbf637 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java
@@ -16,28 +16,35 @@
  */
 package org.apache.rocketmq.broker.longpolling;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-import io.netty.channel.Channel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.MessageFilter;
 
 public class PopRequest {
     private static final AtomicLong COUNTER = new AtomicLong(Long.MIN_VALUE);
 
     private final RemotingCommand remotingCommand;
     private final ChannelHandlerContext ctx;
-    private final long expired;
     private final AtomicBoolean complete = new AtomicBoolean(false);
     private final long op = COUNTER.getAndIncrement();
 
-    public PopRequest(RemotingCommand remotingCommand, ChannelHandlerContext 
ctx, long expired) {
+    private final long expired;
+    private final SubscriptionData subscriptionData;
+    private final MessageFilter messageFilter;
+
+    public PopRequest(RemotingCommand remotingCommand, ChannelHandlerContext 
ctx,
+        long expired, SubscriptionData subscriptionData, MessageFilter 
messageFilter) {
+
         this.ctx = ctx;
         this.remotingCommand = remotingCommand;
         this.expired = expired;
+        this.subscriptionData = subscriptionData;
+        this.messageFilter = messageFilter;
     }
 
     public Channel getChannel() {
@@ -64,6 +71,14 @@ public class PopRequest {
         return expired;
     }
 
+    public SubscriptionData getSubscriptionData() {
+        return subscriptionData;
+    }
+
+    public MessageFilter getMessageFilter() {
+        return messageFilter;
+    }
+
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder("PopRequest{");
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 9a56498632..6f7b7e8a24 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -297,15 +297,12 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
                 qId, ackOffset,
                 popTime);
             if (nextOffset > -1) {
-                if 
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
-                    topic, consumeGroup, qId)) {
-                    
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
-                        consumeGroup, topic, qId, nextOffset);
+                if 
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(topic, 
consumeGroup, qId)) {
+                    
this.brokerController.getConsumerOffsetManager().commitOffset(
+                        channel.remoteAddress().toString(), consumeGroup, 
topic, qId, nextOffset);
                 }
-                if 
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
-                    consumeGroup, qId, invisibleTime)) {
-                    
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
-                        topic, consumeGroup, qId);
+                if 
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, 
consumeGroup, qId, invisibleTime)) {
+                    
this.brokerController.getPopMessageProcessor().notifyMessageArriving(topic, 
qId, consumeGroup);
                 }
             } else if (nextOffset == -1) {
                 String errorInfo = String.format("offset is illegal, key:%s, 
old:%d, commit:%d, next:%d, %s",
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 6447500cbe..c82725fe1e 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import org.apache.rocketmq.broker.BrokerController;
@@ -58,8 +59,16 @@ public class NotificationProcessor implements 
NettyRequestProcessor {
         return false;
     }
 
+    // When a new message is written to CommitLog, this method would be called.
+    // Suspended long polling will receive notification and be wakeup.
+    public void notifyMessageArriving(final String topic, final int queueId,
+        Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
+        this.popLongPollingService.notifyMessageArrivingWithRetryTopic(
+            topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
+    }
+
     public void notifyMessageArriving(final String topic, final int queueId) {
-        popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, 
queueId);
+        this.popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, 
queueId);
     }
 
     @Override
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 93c04a1b8d..3df4bec984 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -26,6 +26,7 @@ import io.opentelemetry.api.common.Attributes;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Random;
@@ -167,15 +168,23 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
     }
 
     public void notifyLongPollingRequestIfNeed(String topic, String group, int 
queueId) {
+        this.notifyLongPollingRequestIfNeed(
+            topic, group, queueId, null, 0L, null, null);
+    }
+
+    public void notifyLongPollingRequestIfNeed(String topic, String group, int 
queueId,
+        Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
         long popBufferOffset = 
this.brokerController.getPopMessageProcessor().getPopBufferMergeService().getLatestOffset(topic,
 group, queueId);
         long consumerOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, 
queueId);
         long maxOffset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
         long offset = Math.max(popBufferOffset, consumerOffset);
         if (maxOffset > offset) {
-            boolean notifySuccess = 
popLongPollingService.notifyMessageArriving(topic, group, -1);
+            boolean notifySuccess = 
popLongPollingService.notifyMessageArriving(
+                topic, -1, group, tagsCode, msgStoreTime, filterBitMap, 
properties);
             if (!notifySuccess) {
                 // notify pop queue
-                notifySuccess = 
popLongPollingService.notifyMessageArriving(topic, group, queueId);
+                notifySuccess = popLongPollingService.notifyMessageArriving(
+                    topic, queueId, group, tagsCode, msgStoreTime, 
filterBitMap, properties);
             }
             
this.brokerController.getNotificationProcessor().notifyMessageArriving(topic, 
queueId);
             if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -185,12 +194,15 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         }
     }
 
-    public void notifyMessageArriving(final String topic, final int queueId) {
-        popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, 
queueId);
+    public void notifyMessageArriving(final String topic, final int queueId,
+        Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
+        popLongPollingService.notifyMessageArrivingWithRetryTopic(
+            topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties);
     }
 
-    public boolean notifyMessageArriving(final String topic, final String cid, 
final int queueId) {
-        return popLongPollingService.notifyMessageArriving(topic, cid, 
queueId);
+    public void notifyMessageArriving(final String topic, final int queueId, 
final String cid) {
+        popLongPollingService.notifyMessageArriving(
+            topic, queueId, cid, null, 0L, null, null);
     }
 
     @Override
@@ -292,10 +304,11 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         }
 
         BrokerConfig brokerConfig = brokerController.getBrokerConfig();
+        SubscriptionData subscriptionData = null;
         ExpressionMessageFilter messageFilter = null;
-        if (requestHeader.getExp() != null && requestHeader.getExp().length() 
> 0) {
+        if (requestHeader.getExp() != null && 
!requestHeader.getExp().isEmpty()) {
             try {
-                SubscriptionData subscriptionData = 
FilterAPI.build(requestHeader.getTopic(), requestHeader.getExp(), 
requestHeader.getExpType());
+                subscriptionData = FilterAPI.build(requestHeader.getTopic(), 
requestHeader.getExp(), requestHeader.getExpType());
                 
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
                     requestHeader.getTopic(), subscriptionData);
 
@@ -329,7 +342,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
             }
         } else {
             try {
-                SubscriptionData subscriptionData = 
FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG);
+                subscriptionData = FilterAPI.build(requestHeader.getTopic(), 
"*", ExpressionType.TAG);
                 
brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
                     requestHeader.getTopic(), subscriptionData);
 
@@ -403,17 +416,20 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         }
 
         final RemotingCommand finalResponse = response;
+        SubscriptionData finalSubscriptionData = subscriptionData;
         getMessageFuture.thenApply(restNum -> {
             if (!getMessageResult.getMessageBufferList().isEmpty()) {
                 finalResponse.setCode(ResponseCode.SUCCESS);
                 getMessageResult.setStatus(GetMessageStatus.FOUND);
                 if (restNum > 0) {
                     // all queue pop can not notify specified queue pop, and 
vice versa
-                    
popLongPollingService.notifyMessageArriving(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(),
-                        requestHeader.getQueueId());
+                    popLongPollingService.notifyMessageArriving(
+                        requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getConsumerGroup(),
+                        null, 0L, null, null);
                 }
             } else {
-                PollingResult pollingResult = 
popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader));
+                PollingResult pollingResult = popLongPollingService.polling(
+                    ctx, request, new PollingHeader(requestHeader), 
finalSubscriptionData, finalMessageFilter);
                 if (PollingResult.POLLING_SUC == pollingResult) {
                     return null;
                 } else if (PollingResult.POLLING_FULL == pollingResult) {

Reply via email to