This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new d1974c5535 [ISSUE #8269] Support pop consumption filter in long polling service (#8271) d1974c5535 is described below commit d1974c55353488095e5122f5ce361c150611f21a Author: lizhimins <707364...@qq.com> AuthorDate: Thu Jun 6 20:19:06 2024 +0800 [ISSUE #8269] Support pop consumption filter in long polling service (#8271) --- .../longpolling/NotifyMessageArrivingListener.java | 11 ++++-- .../broker/longpolling/PopLongPollingService.java | 44 ++++++++++++++++++---- .../rocketmq/broker/longpolling/PopRequest.java | 25 +++++++++--- .../broker/processor/AckMessageProcessor.java | 13 +++---- .../broker/processor/NotificationProcessor.java | 11 +++++- .../broker/processor/PopMessageProcessor.java | 40 ++++++++++++++------ 6 files changed, 107 insertions(+), 37 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java index e55ed2778a..1ddb9f4f8e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java @@ -36,9 +36,12 @@ public class NotifyMessageArrivingListener implements MessageArrivingListener { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { - this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode, - msgStoreTime, filterBitMap, properties); - this.popMessageProcessor.notifyMessageArriving(topic, queueId); - this.notificationProcessor.notifyMessageArriving(topic, queueId); + + this.pullRequestHoldService.notifyMessageArriving( + topic, queueId, logicOffset, tagsCode, msgStoreTime, filterBitMap, properties); + this.popMessageProcessor.notifyMessageArriving( + topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); + this.notificationProcessor.notifyMessageArriving( + topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index a768fe4b9c..b5179114f3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -35,6 +35,9 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.store.ConsumeQueueExt; +import org.apache.rocketmq.store.MessageFilter; import static org.apache.rocketmq.broker.longpolling.PollingResult.NOT_POLLING; import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_FULL; @@ -147,39 +150,61 @@ public class PopLongPollingService extends ServiceThread { } public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId) { + this.notifyMessageArrivingWithRetryTopic(topic, queueId, null, 0L, null, null); + } + + public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId, + Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { String notifyTopic; if (KeyBuilder.isPopRetryTopicV2(topic)) { notifyTopic = KeyBuilder.parseNormalTopic(topic); } else { notifyTopic = topic; } - notifyMessageArriving(notifyTopic, queueId); + notifyMessageArriving(notifyTopic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); } - public void notifyMessageArriving(final String topic, final int queueId) { + public void notifyMessageArriving(final String topic, final int queueId, + Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic); if (cids == null) { return; } for (Map.Entry<String, Byte> cid : cids.entrySet()) { if (queueId >= 0) { - notifyMessageArriving(topic, cid.getKey(), -1); + notifyMessageArriving(topic, -1, cid.getKey(), tagsCode, msgStoreTime, filterBitMap, properties); } - notifyMessageArriving(topic, cid.getKey(), queueId); + notifyMessageArriving(topic, queueId, cid.getKey(), tagsCode, msgStoreTime, filterBitMap, properties); } } - public boolean notifyMessageArriving(final String topic, final String cid, final int queueId) { + public boolean notifyMessageArriving(final String topic, final int queueId, final String cid, + Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { ConcurrentSkipListSet<PopRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId)); if (remotingCommands == null || remotingCommands.isEmpty()) { return false; } + PopRequest popRequest = pollRemotingCommands(remotingCommands); if (popRequest == null) { return false; } + + if (popRequest.getMessageFilter() != null && popRequest.getSubscriptionData() != null) { + boolean match = popRequest.getMessageFilter().isMatchedByConsumeQueue(tagsCode, + new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)); + if (match && properties != null) { + match = popRequest.getMessageFilter().isMatchedByCommitLog(null, properties); + } + if (!match) { + remotingCommands.add(popRequest); + totalPollingNum.incrementAndGet(); + return false; + } + } + if (brokerController.getBrokerConfig().isEnablePopLog()) { - POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}", popRequest); + POP_LOGGER.info("lock release, new msg arrive, wakeUp: {}", popRequest); } return wakeUp(popRequest); } @@ -221,6 +246,11 @@ public class PopLongPollingService extends ServiceThread { */ public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand remotingCommand, final PollingHeader requestHeader) { + return this.polling(ctx, remotingCommand, requestHeader, null, null); + } + + public PollingResult polling(final ChannelHandlerContext ctx, RemotingCommand remotingCommand, + final PollingHeader requestHeader, SubscriptionData subscriptionData, MessageFilter messageFilter) { if (requestHeader.getPollTime() <= 0 || this.isStopped()) { return NOT_POLLING; } @@ -234,7 +264,7 @@ public class PopLongPollingService extends ServiceThread { } cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE); long expired = requestHeader.getBornTime() + requestHeader.getPollTime(); - final PopRequest request = new PopRequest(remotingCommand, ctx, expired); + final PopRequest request = new PopRequest(remotingCommand, ctx, expired, subscriptionData, messageFilter); boolean isFull = totalPollingNum.get() >= this.brokerController.getBrokerConfig().getMaxPopPollingSize(); if (isFull) { POP_LOGGER.info("polling {}, result POLLING_FULL, total:{}", remotingCommand, totalPollingNum.get()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java index a45bcce9f6..0419dbf637 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java @@ -16,28 +16,35 @@ */ package org.apache.rocketmq.broker.longpolling; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.util.Comparator; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import org.apache.rocketmq.remoting.protocol.RemotingCommand; - -import io.netty.channel.Channel; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.store.MessageFilter; public class PopRequest { private static final AtomicLong COUNTER = new AtomicLong(Long.MIN_VALUE); private final RemotingCommand remotingCommand; private final ChannelHandlerContext ctx; - private final long expired; private final AtomicBoolean complete = new AtomicBoolean(false); private final long op = COUNTER.getAndIncrement(); - public PopRequest(RemotingCommand remotingCommand, ChannelHandlerContext ctx, long expired) { + private final long expired; + private final SubscriptionData subscriptionData; + private final MessageFilter messageFilter; + + public PopRequest(RemotingCommand remotingCommand, ChannelHandlerContext ctx, + long expired, SubscriptionData subscriptionData, MessageFilter messageFilter) { + this.ctx = ctx; this.remotingCommand = remotingCommand; this.expired = expired; + this.subscriptionData = subscriptionData; + this.messageFilter = messageFilter; } public Channel getChannel() { @@ -64,6 +71,14 @@ public class PopRequest { return expired; } + public SubscriptionData getSubscriptionData() { + return subscriptionData; + } + + public MessageFilter getMessageFilter() { + return messageFilter; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("PopRequest{"); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 9a56498632..6f7b7e8a24 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -297,15 +297,12 @@ public class AckMessageProcessor implements NettyRequestProcessor { qId, ackOffset, popTime); if (nextOffset > -1) { - if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset( - topic, consumeGroup, qId)) { - this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), - consumeGroup, topic, qId, nextOffset); + if (!this.brokerController.getConsumerOffsetManager().hasOffsetReset(topic, consumeGroup, qId)) { + this.brokerController.getConsumerOffsetManager().commitOffset( + channel.remoteAddress().toString(), consumeGroup, topic, qId, nextOffset); } - if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, - consumeGroup, qId, invisibleTime)) { - this.brokerController.getPopMessageProcessor().notifyMessageArriving( - topic, consumeGroup, qId); + if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic, consumeGroup, qId, invisibleTime)) { + this.brokerController.getPopMessageProcessor().notifyMessageArriving(topic, qId, consumeGroup); } } else if (nextOffset == -1) { String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 6447500cbe..c82725fe1e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.util.Map; import java.util.Objects; import java.util.Random; import org.apache.rocketmq.broker.BrokerController; @@ -58,8 +59,16 @@ public class NotificationProcessor implements NettyRequestProcessor { return false; } + // When a new message is written to CommitLog, this method would be called. + // Suspended long polling will receive notification and be wakeup. + public void notifyMessageArriving(final String topic, final int queueId, + Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { + this.popLongPollingService.notifyMessageArrivingWithRetryTopic( + topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); + } + public void notifyMessageArriving(final String topic, final int queueId) { - popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId); + this.popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId); } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 93c04a1b8d..3df4bec984 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -26,6 +26,7 @@ import io.opentelemetry.api.common.Attributes; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Random; @@ -167,15 +168,23 @@ public class PopMessageProcessor implements NettyRequestProcessor { } public void notifyLongPollingRequestIfNeed(String topic, String group, int queueId) { + this.notifyLongPollingRequestIfNeed( + topic, group, queueId, null, 0L, null, null); + } + + public void notifyLongPollingRequestIfNeed(String topic, String group, int queueId, + Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { long popBufferOffset = this.brokerController.getPopMessageProcessor().getPopBufferMergeService().getLatestOffset(topic, group, queueId); long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, queueId); long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId); long offset = Math.max(popBufferOffset, consumerOffset); if (maxOffset > offset) { - boolean notifySuccess = popLongPollingService.notifyMessageArriving(topic, group, -1); + boolean notifySuccess = popLongPollingService.notifyMessageArriving( + topic, -1, group, tagsCode, msgStoreTime, filterBitMap, properties); if (!notifySuccess) { // notify pop queue - notifySuccess = popLongPollingService.notifyMessageArriving(topic, group, queueId); + notifySuccess = popLongPollingService.notifyMessageArriving( + topic, queueId, group, tagsCode, msgStoreTime, filterBitMap, properties); } this.brokerController.getNotificationProcessor().notifyMessageArriving(topic, queueId); if (this.brokerController.getBrokerConfig().isEnablePopLog()) { @@ -185,12 +194,15 @@ public class PopMessageProcessor implements NettyRequestProcessor { } } - public void notifyMessageArriving(final String topic, final int queueId) { - popLongPollingService.notifyMessageArrivingWithRetryTopic(topic, queueId); + public void notifyMessageArriving(final String topic, final int queueId, + Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { + popLongPollingService.notifyMessageArrivingWithRetryTopic( + topic, queueId, tagsCode, msgStoreTime, filterBitMap, properties); } - public boolean notifyMessageArriving(final String topic, final String cid, final int queueId) { - return popLongPollingService.notifyMessageArriving(topic, cid, queueId); + public void notifyMessageArriving(final String topic, final int queueId, final String cid) { + popLongPollingService.notifyMessageArriving( + topic, queueId, cid, null, 0L, null, null); } @Override @@ -292,10 +304,11 @@ public class PopMessageProcessor implements NettyRequestProcessor { } BrokerConfig brokerConfig = brokerController.getBrokerConfig(); + SubscriptionData subscriptionData = null; ExpressionMessageFilter messageFilter = null; - if (requestHeader.getExp() != null && requestHeader.getExp().length() > 0) { + if (requestHeader.getExp() != null && !requestHeader.getExp().isEmpty()) { try { - SubscriptionData subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType()); + subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType()); brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData); @@ -329,7 +342,7 @@ public class PopMessageProcessor implements NettyRequestProcessor { } } else { try { - SubscriptionData subscriptionData = FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG); + subscriptionData = FilterAPI.build(requestHeader.getTopic(), "*", ExpressionType.TAG); brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData); @@ -403,17 +416,20 @@ public class PopMessageProcessor implements NettyRequestProcessor { } final RemotingCommand finalResponse = response; + SubscriptionData finalSubscriptionData = subscriptionData; getMessageFuture.thenApply(restNum -> { if (!getMessageResult.getMessageBufferList().isEmpty()) { finalResponse.setCode(ResponseCode.SUCCESS); getMessageResult.setStatus(GetMessageStatus.FOUND); if (restNum > 0) { // all queue pop can not notify specified queue pop, and vice versa - popLongPollingService.notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(), - requestHeader.getQueueId()); + popLongPollingService.notifyMessageArriving( + requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getConsumerGroup(), + null, 0L, null, null); } } else { - PollingResult pollingResult = popLongPollingService.polling(ctx, request, new PollingHeader(requestHeader)); + PollingResult pollingResult = popLongPollingService.polling( + ctx, request, new PollingHeader(requestHeader), finalSubscriptionData, finalMessageFilter); if (PollingResult.POLLING_SUC == pollingResult) { return null; } else if (PollingResult.POLLING_FULL == pollingResult) {