This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e20d56b9bd [ISSUE #9254] Refactor notifyMessageArriveInBatch in
RocksDBConsumeQueueStore to adapt to CombineConsumeQueueStore (#9566)
e20d56b9bd is described below
commit e20d56b9bd6f75a67e9aa85990fb711c2a07ad1a
Author: qianye <[email protected]>
AuthorDate: Wed Jul 23 16:11:56 2025 +0800
[ISSUE #9254] Refactor notifyMessageArriveInBatch in
RocksDBConsumeQueueStore to adapt to CombineConsumeQueueStore (#9566)
---
.../java/org/apache/rocketmq/store/DefaultMessageStore.java | 12 +++++++++++-
.../java/org/apache/rocketmq/store/RocksDBMessageStore.java | 1 -
.../rocketmq/store/queue/RocksDBConsumeQueueStore.java | 10 ++++++++++
3 files changed, 21 insertions(+), 2 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 99eaa4b43c..2bdd058f3f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -155,7 +155,8 @@ public class DefaultMessageStore implements MessageStore {
private final BrokerConfig brokerConfig;
private volatile boolean shutdown = true;
- protected boolean notifyMessageArriveInBatch = false;
+
+ private boolean notifyMessageArriveInBatch = false;
protected StoreCheckpoint storeCheckpoint;
private TimerMessageStore timerMessageStore;
@@ -3011,4 +3012,13 @@ public class DefaultMessageStore implements MessageStore
{
public MessageStoreStateMachine getStateMachine() {
return stateMachine;
}
+
+ public boolean isNotifyMessageArriveInBatch() {
+ return notifyMessageArriveInBatch;
+ }
+
+ public void setNotifyMessageArriveInBatch(boolean
notifyMessageArriveInBatch) {
+ this.notifyMessageArriveInBatch = notifyMessageArriveInBatch;
+ }
+
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
index 8f0a075ff4..0983dee7f9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/RocksDBMessageStore.java
@@ -31,7 +31,6 @@ public class RocksDBMessageStore extends DefaultMessageStore {
final MessageArrivingListener messageArrivingListener, final
BrokerConfig brokerConfig, final ConcurrentMap<String, TopicConfig>
topicConfigTable) throws
IOException {
super(messageStoreConfig, brokerStatsManager, messageArrivingListener,
brokerConfig, topicConfigTable);
- notifyMessageArriveInBatch = true;
}
@Override
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 9e72b0e565..afe528dbac 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -93,8 +93,18 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
private long dispatchFromPhyOffset;
+ /**
+ * there are two threads to notify longPolling when build cq successfully
+ *
+ * @see DefaultMessageStore.ReputMessageService#doReput()
+ * @see RocksGroupCommitService#groupCommit()
+ * <p>
+ * RocksDB CQ is build by RocksGroupCommitService, so we do not need to
notify longPolling in
+ * ReputMessageService
+ */
public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
super(messageStore);
+ messageStore.setNotifyMessageArriveInBatch(true);
this.storePath =
StorePathConfigHelper.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir());
this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore,
storePath);