This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 24ca9e46b5 [ISSUE #10043] Make TimerMessageReputService thread pool 
configurable and shutdown gracefully (#10044)
24ca9e46b5 is described below

commit 24ca9e46b50c03f1978e1dca6233d8e8b2e05ade
Author: wizcraft_kris <[email protected]>
AuthorDate: Fri Jan 30 10:04:40 2026 +0800

    [ISSUE #10043] Make TimerMessageReputService thread pool configurable and 
shutdown gracefully (#10044)
---
 .../TransactionalMessageRocksDBService.java        |  6 ++---
 .../org/apache/rocketmq/common/BrokerConfig.java   | 30 ++++++++++++++++++++++
 .../rocketmq/store/config/MessageStoreConfig.java  | 27 +++++++++++++++++++
 .../timer/rocksdb/TimerMessageRocksDBStore.java    | 25 ++++++++++++------
 4 files changed, 77 insertions(+), 11 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
index 1fc38eb3d6..dbd3575d69 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
@@ -76,11 +76,11 @@ public class TransactionalMessageRocksDBService {
     private void initService() {
         this.transStatusService = new TransStatusCheckService();
         this.checkTranStatusTaskExecutor = ThreadUtils.newThreadPoolExecutor(
-            2,
-            5,
+            
brokerController.getBrokerConfig().getTransactionCheckRocksdbCoreThreads(),
+            
brokerController.getBrokerConfig().getTransactionCheckRocksdbMaxThreads(),
             100,
             TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(2000),
+            new 
ArrayBlockingQueue<>(brokerController.getBrokerConfig().getTransactionCheckRocksdbQueueCapacity()),
             new ThreadFactoryImpl("Transaction-rocksdb-msg-check-thread", 
brokerController.getBrokerIdentity()),
             new CallerRunsPolicy());
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index e9c588e9d1..caee5e45f2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -298,6 +298,12 @@ public class BrokerConfig extends BrokerIdentity {
 
     private long transactionMetricFlushInterval = 10 * 1000;
 
+    private int transactionCheckRocksdbCoreThreads = 2;
+
+    private int transactionCheckRocksdbMaxThreads = 5;
+
+    private int transactionCheckRocksdbQueueCapacity = 2000;
+
     /**
      * transaction batch op message
      */
@@ -2106,6 +2112,30 @@ public class BrokerConfig extends BrokerIdentity {
         this.transactionMetricFlushInterval = transactionMetricFlushInterval;
     }
 
+    public void setTransactionCheckRocksdbCoreThreads(int 
transactionCheckRocksdbCoreThreads) {
+        this.transactionCheckRocksdbCoreThreads = 
transactionCheckRocksdbCoreThreads;
+    }
+
+    public int getTransactionCheckRocksdbCoreThreads() {
+        return transactionCheckRocksdbCoreThreads;
+    }
+
+    public int getTransactionCheckRocksdbMaxThreads() {
+        return transactionCheckRocksdbMaxThreads;
+    }
+
+    public void setTransactionCheckRocksdbMaxThreads(int 
transactionCheckRocksdbMaxThreads) {
+        this.transactionCheckRocksdbMaxThreads = 
transactionCheckRocksdbMaxThreads;
+    }
+
+    public int getTransactionCheckRocksdbQueueCapacity() {
+        return transactionCheckRocksdbQueueCapacity;
+    }
+
+    public void setTransactionCheckRocksdbQueueCapacity(int 
transactionCheckRocksdbQueueCapacity) {
+        this.transactionCheckRocksdbQueueCapacity = 
transactionCheckRocksdbQueueCapacity;
+    }
+
     public long getPopInflightMessageThreshold() {
         return popInflightMessageThreshold;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index d7f17efd64..ffc261aa17 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -116,6 +116,9 @@ public class MessageStoreConfig {
     private int timerRocksDBRollRangeHours = 2;
     private boolean timerRecallToTimeWheelEnable = true;
     private boolean timerRecallToTimelineEnable = true;
+    private int timerReputServiceCorePoolSize = 6;
+    private int timerReputServiceMaxPoolSize = 6;
+    private int timerReputServiceQueueCapacity = 10000;
 
     private boolean transRocksDBEnable = false;
     private boolean transWriteOriginTransHalfEnable = true;
@@ -2227,6 +2230,30 @@ public class MessageStoreConfig {
         this.timerRecallToTimelineEnable = timerRecallToTimelineEnable;
     }
 
+    public void setTimerReputServiceCorePoolSize(int 
timerReputServiceCorePoolSize) {
+        this.timerReputServiceCorePoolSize = timerReputServiceCorePoolSize;
+    }
+
+    public int getTimerReputServiceCorePoolSize() {
+        return timerReputServiceCorePoolSize;
+    }
+
+    public void setTimerReputServiceMaxPoolSize(int 
timerReputServiceMaxPoolSize) {
+        this.timerReputServiceMaxPoolSize = timerReputServiceMaxPoolSize;
+    }
+
+    public int getTimerReputServiceMaxPoolSize() {
+        return timerReputServiceMaxPoolSize;
+    }
+
+    public void setTimerReputServiceQueueCapacity(int 
timerReputServiceQueueCapacity) {
+        this.timerReputServiceQueueCapacity = timerReputServiceQueueCapacity;
+    }
+
+    public int getTimerReputServiceQueueCapacity() {
+        return timerReputServiceQueueCapacity;
+    }
+
     public int getTimerRocksDBRollIntervalHours() {
         return timerRocksDBRollIntervalHours;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java
index ec13971d92..c48e177c9d 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -506,14 +507,16 @@ public class TimerMessageRocksDBStore {
         private final BlockingQueue<List<TimerRocksDBRecord>> queue;
         private final RateLimiter rateLimiter;
         private final boolean writeCheckPoint;
-        ExecutorService executor = new ThreadPoolExecutor(
-            6,
-            6,
-            60,
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<>(10000),
-            new ThreadPoolExecutor.CallerRunsPolicy()
-        );
+        private final ExecutorService executor =
+                ThreadUtils.newThreadPoolExecutor(
+                        storeConfig.getTimerReputServiceCorePoolSize(),
+                        storeConfig.getTimerReputServiceMaxPoolSize(),
+                        60L,
+                        TimeUnit.SECONDS,
+                        new 
LinkedBlockingQueue<>(storeConfig.getTimerReputServiceQueueCapacity()),
+                        
ThreadUtils.newGenericThreadFactory("TimerMessageReputService", false),
+                        new ThreadPoolExecutor.CallerRunsPolicy()
+                );
 
         public 
TimerMessageReputService(BlockingQueue<List<TimerRocksDBRecord>> queue, double 
maxTps, boolean writeCheckPoint) {
             this.queue = queue;
@@ -614,6 +617,12 @@ public class TimerMessageRocksDBStore {
                 return null;
             }
         }
+
+        @Override
+        public void shutdown() {
+            super.shutdown();
+            ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS);
+        }
     }
 
 }

Reply via email to