This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b5ab93ce7d [ISSUE #9286] Counting the filtered message when filter by
SQL92 (#9287)
b5ab93ce7d is described below
commit b5ab93ce7d8d1167bf2e3be581c4a7bd21e1a9c7
Author: ymwneu <[email protected]>
AuthorDate: Tue Apr 1 13:48:04 2025 +0800
[ISSUE #9286] Counting the filtered message when filter by SQL92 (#9287)
---
.../rocketmq/broker/mqtrace/ConsumeMessageContext.java | 9 +++++++++
.../rocketmq/broker/processor/PullMessageProcessor.java | 1 +
.../java/org/apache/rocketmq/store/DefaultMessageStore.java | 3 +++
.../java/org/apache/rocketmq/store/GetMessageResult.java | 12 +++++++++++-
4 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
index ed7bfba06d..e45f48fe5a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
@@ -46,6 +46,7 @@ public class ConsumeMessageContext {
private BrokerStatsManager.StatsType commercialRcvStats;
private int commercialRcvTimes;
private int commercialRcvSize;
+ private int filterMessageCount;
private String namespace;
public String getConsumerGroup() {
@@ -231,4 +232,12 @@ public class ConsumeMessageContext {
public void setNamespace(String namespace) {
this.namespace = namespace;
}
+
+ public int getFilterMessageCount() {
+ return filterMessageCount;
+ }
+
+ public void setFilterMessageCount(int filterMessageCount) {
+ this.filterMessageCount = filterMessageCount;
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 5b11bc2fef..5f0735e74c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -713,6 +713,7 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
context.setAccountOwnerParent(ownerParent);
context.setAccountOwnerSelf(ownerSelf);
context.setNamespace(NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()));
+
context.setFilterMessageCount(getMessageResult.getFilterMessageCount());
switch (responseCode) {
case ResponseCode.SUCCESS:
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index d613468386..f8caf7beac 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -816,6 +816,7 @@ public class DefaultMessageStore implements MessageStore {
long maxOffset = 0;
GetMessageResult getResult = new GetMessageResult();
+ int filterMessageCount = 0;
final long maxOffsetPy = this.commitLog.getMaxOffset();
@@ -927,6 +928,7 @@ public class DefaultMessageStore implements MessageStore {
}
// release...
selectResult.release();
+ filterMessageCount++;
continue;
}
this.storeStatsService.getGetMessageTransferredMsgCount().add(cqUnit.getBatchNum());
@@ -976,6 +978,7 @@ public class DefaultMessageStore implements MessageStore {
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
+ getResult.setFilterMessageCount(filterMessageCount);
return getResult;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
index a7556dfb85..6f322a19e1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
@@ -43,6 +43,8 @@ public class GetMessageResult {
private long coldDataSum = 0L;
+ private int filterMessageCount;
+
public static final GetMessageResult NO_MATCH_LOGIC_QUEUE =
new GetMessageResult(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE, 0, 0, 0,
Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
@@ -177,10 +179,18 @@ public class GetMessageResult {
this.coldDataSum = coldDataSum;
}
+ public int getFilterMessageCount() {
+ return filterMessageCount;
+ }
+
+ public void setFilterMessageCount(int filterMessageCount) {
+ this.filterMessageCount = filterMessageCount;
+ }
+
@Override
public String toString() {
return "GetMessageResult [status=" + status + ", nextBeginOffset=" +
nextBeginOffset + ", minOffset="
+ minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" +
bufferTotalSize + ", messageCount=" + messageCount
- + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
+ + ", filterMessageCount=" + filterMessageCount + ",
suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
}
}