This is an automated email from the ASF dual-hosted git repository.
ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7583fda113 fix(store): close all consume queue file handles on
ConsumeQueueStore shutdown (#10060)
7583fda113 is described below
commit 7583fda11342da70f2b172018f54b82e258c5efe
Author: rongtong <[email protected]>
AuthorDate: Mon Feb 2 11:41:35 2026 +0800
fix(store): close all consume queue file handles on ConsumeQueueStore
shutdown (#10060)
* fix(store): close all consume queue file handles on ConsumeQueueStore
shutdown
* remove implementation
---------
Co-authored-by: RongtongJin <[email protected]>
---
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java | 9 +++++++--
.../java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java | 5 +++++
.../java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java | 2 ++
.../java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java | 3 +++
.../org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java | 5 +++++
5 files changed, 22 insertions(+), 2 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 02f90cef1d..2a77ede32a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -37,12 +37,11 @@ import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.queue.ConsumeQueueStore;
import org.apache.rocketmq.store.queue.CqUnit;
-import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.apache.rocketmq.store.queue.QueueOffsetOperator;
import org.apache.rocketmq.store.queue.ReferredIterator;
-public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle
{
+public class ConsumeQueue implements ConsumeQueueInterface {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
/**
@@ -1236,4 +1235,10 @@ public class ConsumeQueue implements
ConsumeQueueInterface, FileQueueLifeCycle {
flush(0);
}
+
+ @Override
+ public boolean shutdown() {
+ this.mappedFileQueue.cleanResourcesAll();
+ return true;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 3f1dc237d6..7ad29ff538 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -1200,4 +1200,9 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
public void initializeWithOffset(long offset, long minPhyOffset) {
// not support now
}
+
+ @Override
+ public boolean shutdown() {
+ return true;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index e9b0312c01..d5d096becd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -171,6 +171,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
log.error("Failed to flush all consume queues", e);
return false;
}
+
return true;
}
@@ -864,4 +865,5 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
return messageStore.getBrokerConfig().getIdentifier() +
CleanConsumeQueueService.class.getSimpleName();
}
}
+
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
index 95cc0887f4..89cb0b58ab 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
@@ -78,7 +78,10 @@ public interface FileQueueLifeCycle extends Swappable {
/**
* Does the first file exist?
+ *
* @return true if it exists
*/
boolean isFirstFileExist();
+
+ boolean shutdown();
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 03fa5ac912..0d58d9a693 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -507,4 +507,9 @@ public class RocksDBConsumeQueue implements
ConsumeQueueInterface {
ERROR_LOG.error("RocksDBConsumeQueue initializeWithOffset Failed.
topic={}, queueId={}, offset={}", topic, queueId, offset, e);
}
}
+
+ @Override
+ public boolean shutdown() {
+ return true;
+ }
}