This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b5ab93ce7d [ISSUE #9286] Counting the filtered message when filter by 
SQL92 (#9287)
b5ab93ce7d is described below

commit b5ab93ce7d8d1167bf2e3be581c4a7bd21e1a9c7
Author: ymwneu <[email protected]>
AuthorDate: Tue Apr 1 13:48:04 2025 +0800

    [ISSUE #9286] Counting the filtered message when filter by SQL92 (#9287)
---
 .../rocketmq/broker/mqtrace/ConsumeMessageContext.java       |  9 +++++++++
 .../rocketmq/broker/processor/PullMessageProcessor.java      |  1 +
 .../java/org/apache/rocketmq/store/DefaultMessageStore.java  |  3 +++
 .../java/org/apache/rocketmq/store/GetMessageResult.java     | 12 +++++++++++-
 4 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
index ed7bfba06d..e45f48fe5a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
@@ -46,6 +46,7 @@ public class ConsumeMessageContext {
     private BrokerStatsManager.StatsType commercialRcvStats;
     private int commercialRcvTimes;
     private int commercialRcvSize;
+    private int filterMessageCount;
 
     private String namespace;
     public String getConsumerGroup() {
@@ -231,4 +232,12 @@ public class ConsumeMessageContext {
     public void setNamespace(String namespace) {
         this.namespace = namespace;
     }
+
+    public int getFilterMessageCount() {
+        return filterMessageCount;
+    }
+
+    public void setFilterMessageCount(int filterMessageCount) {
+        this.filterMessageCount = filterMessageCount;
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 5b11bc2fef..5f0735e74c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -713,6 +713,7 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
             context.setAccountOwnerParent(ownerParent);
             context.setAccountOwnerSelf(ownerSelf);
             
context.setNamespace(NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()));
+            
context.setFilterMessageCount(getMessageResult.getFilterMessageCount());
 
             switch (responseCode) {
                 case ResponseCode.SUCCESS:
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index d613468386..f8caf7beac 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -816,6 +816,7 @@ public class DefaultMessageStore implements MessageStore {
         long maxOffset = 0;
 
         GetMessageResult getResult = new GetMessageResult();
+        int filterMessageCount = 0;
 
         final long maxOffsetPy = this.commitLog.getMaxOffset();
 
@@ -927,6 +928,7 @@ public class DefaultMessageStore implements MessageStore {
                                 }
                                 // release...
                                 selectResult.release();
+                                filterMessageCount++;
                                 continue;
                             }
                             
this.storeStatsService.getGetMessageTransferredMsgCount().add(cqUnit.getBatchNum());
@@ -976,6 +978,7 @@ public class DefaultMessageStore implements MessageStore {
         getResult.setNextBeginOffset(nextBeginOffset);
         getResult.setMaxOffset(maxOffset);
         getResult.setMinOffset(minOffset);
+        getResult.setFilterMessageCount(filterMessageCount);
         return getResult;
     }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java 
b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
index a7556dfb85..6f322a19e1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
@@ -43,6 +43,8 @@ public class GetMessageResult {
 
     private long coldDataSum = 0L;
 
+    private int filterMessageCount;
+
     public static final GetMessageResult NO_MATCH_LOGIC_QUEUE =
         new GetMessageResult(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 0, 0, 0, 
Collections.emptyList(),
             Collections.emptyList(), Collections.emptyList());
@@ -177,10 +179,18 @@ public class GetMessageResult {
         this.coldDataSum = coldDataSum;
     }
 
+    public int getFilterMessageCount() {
+        return filterMessageCount;
+    }
+
+    public void setFilterMessageCount(int filterMessageCount) {
+        this.filterMessageCount = filterMessageCount;
+    }
+
     @Override
     public String toString() {
         return "GetMessageResult [status=" + status + ", nextBeginOffset=" + 
nextBeginOffset + ", minOffset="
             + minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + 
bufferTotalSize + ", messageCount=" + messageCount
-            + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
+            + ", filterMessageCount=" + filterMessageCount + ", 
suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
     }
 }

Reply via email to