This is an automated email from the ASF dual-hosted git repository.

ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7583fda113 fix(store): close all consume queue file handles on 
ConsumeQueueStore shutdown (#10060)
7583fda113 is described below

commit 7583fda11342da70f2b172018f54b82e258c5efe
Author: rongtong <[email protected]>
AuthorDate: Mon Feb 2 11:41:35 2026 +0800

    fix(store): close all consume queue file handles on ConsumeQueueStore 
shutdown (#10060)
    
    * fix(store): close all consume queue file handles on ConsumeQueueStore 
shutdown
    
    * remove implementation
    
    ---------
    
    Co-authored-by: RongtongJin <[email protected]>
---
 store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java  | 9 +++++++--
 .../java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java  | 5 +++++
 .../java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java  | 2 ++
 .../java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java | 3 +++
 .../org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java     | 5 +++++
 5 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 02f90cef1d..2a77ede32a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -37,12 +37,11 @@ import org.apache.rocketmq.store.logfile.MappedFile;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.ConsumeQueueStore;
 import org.apache.rocketmq.store.queue.CqUnit;
-import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
 import org.apache.rocketmq.store.queue.MultiDispatchUtils;
 import org.apache.rocketmq.store.queue.QueueOffsetOperator;
 import org.apache.rocketmq.store.queue.ReferredIterator;
 
-public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle 
{
+public class ConsumeQueue implements ConsumeQueueInterface {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
 
     /**
@@ -1236,4 +1235,10 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
 
         flush(0);
     }
+
+    @Override
+    public boolean shutdown() {
+        this.mappedFileQueue.cleanResourcesAll();
+        return true;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 3f1dc237d6..7ad29ff538 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -1200,4 +1200,9 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
     public void initializeWithOffset(long offset, long minPhyOffset) {
         // not support now
     }
+
+    @Override
+    public boolean shutdown() {
+        return true;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index e9b0312c01..d5d096becd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -171,6 +171,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
             log.error("Failed to flush all consume queues", e);
             return false;
         }
+
         return true;
     }
 
@@ -864,4 +865,5 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
             return messageStore.getBrokerConfig().getIdentifier() + 
CleanConsumeQueueService.class.getSimpleName();
         }
     }
+
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
index 95cc0887f4..89cb0b58ab 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/FileQueueLifeCycle.java
@@ -78,7 +78,10 @@ public interface FileQueueLifeCycle extends Swappable {
 
     /**
      * Does the first file exist?
+     *
      * @return true if it exists
      */
     boolean isFirstFileExist();
+
+    boolean shutdown();
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 03fa5ac912..0d58d9a693 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -507,4 +507,9 @@ public class RocksDBConsumeQueue implements 
ConsumeQueueInterface {
             ERROR_LOG.error("RocksDBConsumeQueue initializeWithOffset Failed. 
topic={}, queueId={}, offset={}", topic, queueId, offset, e);
         }
     }
+
+    @Override
+    public boolean shutdown() {
+        return true;
+    }
 }

Reply via email to