This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9173def404 [ISSUE #8348] Allow custom fast-failure queues to be added 
in BrokerFastFailure (#8347)
9173def404 is described below

commit 9173def4040f00ea5a4f1e913382b8ccd2b08d17
Author: rongtong <jinrongton...@mails.ucas.ac.cn>
AuthorDate: Thu Jul 4 16:39:53 2024 +0800

    [ISSUE #8348] Allow custom fast-failure queues to be added in 
BrokerFastFailure (#8347)
---
 .../apache/rocketmq/broker/BrokerController.java   |  2 +
 .../rocketmq/broker/latency/BrokerFastFailure.java | 45 +++++++-------
 .../broker/latency/BrokerFastFailureTest.java      | 68 +++++++++++++++++++++-
 3 files changed, 94 insertions(+), 21 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 76224db5cb..145a952230 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -2519,4 +2519,6 @@ public class BrokerController {
     public void setColdDataCgCtrService(ColdDataCgCtrService 
coldDataCgCtrService) {
         this.coldDataCgCtrService = coldDataCgCtrService;
     }
+
+
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 0135ac929a..ce8fdd8857 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -16,11 +16,15 @@
  */
 package org.apache.rocketmq.broker.latency;
 
+import java.util.List;
+import java.util.ArrayList;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -42,13 +46,26 @@ public class BrokerFastFailure {
 
     private volatile long jstackTime = System.currentTimeMillis();
 
+    private final List<Pair<BlockingQueue<Runnable>, Supplier<Long>>> 
cleanExpiredRequestQueueList = new ArrayList<>();
+
     public BrokerFastFailure(final BrokerController brokerController) {
         this.brokerController = brokerController;
+        initCleanExpiredRequestQueueList();
         this.scheduledExecutorService = ThreadUtils.newScheduledThreadPool(1,
             new ThreadFactoryImpl("BrokerFastFailureScheduledThread", true,
                 brokerController == null ? null : 
brokerController.getBrokerConfig()));
     }
 
+    private void initCleanExpiredRequestQueueList() {
+        cleanExpiredRequestQueueList.add(new 
Pair<>(this.brokerController.getSendThreadPoolQueue(), () -> 
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()));
+        cleanExpiredRequestQueueList.add(new 
Pair<>(this.brokerController.getPullThreadPoolQueue(), () -> 
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue()));
+        cleanExpiredRequestQueueList.add(new 
Pair<>(this.brokerController.getLitePullThreadPoolQueue(), () -> 
this.brokerController.getBrokerConfig().getWaitTimeMillsInLitePullQueue()));
+        cleanExpiredRequestQueueList.add(new 
Pair<>(this.brokerController.getHeartbeatThreadPoolQueue(), () -> 
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()));
+        cleanExpiredRequestQueueList.add(new 
Pair<>(this.brokerController.getEndTransactionThreadPoolQueue(), () -> 
this.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue()));
+        cleanExpiredRequestQueueList.add(new 
Pair<>(this.brokerController.getAckThreadPoolQueue(), () -> 
this.brokerController.getBrokerConfig().getWaitTimeMillsInAckQueue()));
+        cleanExpiredRequestQueueList.add(new 
Pair<>(this.brokerController.getAdminBrokerThreadPoolQueue(), () -> 
this.brokerController.getBrokerConfig().getWaitTimeMillsInAdminBrokerQueue()));
+    }
+
     public static RequestTask castRunnable(final Runnable runnable) {
         try {
             if (runnable instanceof FutureTaskExt) {
@@ -98,26 +115,9 @@ public class BrokerFastFailure {
             }
         }
 
-        
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
-            
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
-
-        
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
-            
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
-
-        
cleanExpiredRequestInQueue(this.brokerController.getLitePullThreadPoolQueue(),
-            
this.brokerController.getBrokerConfig().getWaitTimeMillsInLitePullQueue());
-
-        
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
-            
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
-
-        
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(),
 this
-            
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
-
-        
cleanExpiredRequestInQueue(this.brokerController.getAckThreadPoolQueue(),
-            brokerController.getBrokerConfig().getWaitTimeMillsInAckQueue());
-
-        
cleanExpiredRequestInQueue(this.brokerController.getAdminBrokerThreadPoolQueue(),
-            
brokerController.getBrokerConfig().getWaitTimeMillsInAdminBrokerQueue());
+        for (Pair<BlockingQueue<Runnable>, Supplier<Long>> pair : 
cleanExpiredRequestQueueList) {
+            cleanExpiredRequestInQueue(pair.getObject1(), 
pair.getObject2().get());
+        }
     }
 
     void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> 
blockingQueue, final long maxWaitTimeMillsInQueue) {
@@ -154,6 +154,11 @@ public class BrokerFastFailure {
         }
     }
 
+    public synchronized void 
addCleanExpiredRequestQueue(BlockingQueue<Runnable> cleanExpiredRequestQueue,
+        Supplier<Long> maxWaitTimeMillsInQueueSupplier) {
+        cleanExpiredRequestQueueList.add(new Pair<>(cleanExpiredRequestQueue, 
maxWaitTimeMillsInQueueSupplier));
+    }
+
     public void shutdown() {
         this.scheduledExecutorService.shutdown();
     }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
index 31b547cf1b..2216a1d50c 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
@@ -19,16 +19,46 @@ package org.apache.rocketmq.broker.latency;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.future.FutureTaskExt;
 import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class BrokerFastFailureTest {
+
+    private BrokerController brokerController;
+
+    private final BrokerConfig brokerConfig = new BrokerConfig();
+
+    private MessageStore messageStore;
+
+    @Before
+    public void setUp() {
+        brokerController = Mockito.mock(BrokerController.class);
+        messageStore = Mockito.mock(DefaultMessageStore.class);
+        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+        
Mockito.when(brokerController.getSendThreadPoolQueue()).thenReturn(queue);
+        
Mockito.when(brokerController.getPullThreadPoolQueue()).thenReturn(queue);
+        
Mockito.when(brokerController.getLitePullThreadPoolQueue()).thenReturn(queue);
+        
Mockito.when(brokerController.getHeartbeatThreadPoolQueue()).thenReturn(queue);
+        
Mockito.when(brokerController.getEndTransactionThreadPoolQueue()).thenReturn(queue);
+        
Mockito.when(brokerController.getAdminBrokerThreadPoolQueue()).thenReturn(queue);
+        
Mockito.when(brokerController.getAckThreadPoolQueue()).thenReturn(queue);
+        
Mockito.when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        Mockito.when(messageStore.isOSPageCacheBusy()).thenReturn(false);
+        
Mockito.when(brokerController.getMessageStore()).thenReturn(messageStore);
+    }
+
     @Test
     public void testCleanExpiredRequestInQueue() throws Exception {
-        BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
+        BrokerFastFailure brokerFastFailure = new 
BrokerFastFailure(brokerController);
 
         BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
         brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
@@ -63,4 +93,40 @@ public class BrokerFastFailureTest {
         assertThat(((FutureTaskExt) 
queue.peek()).getRunnable()).isEqualTo(requestTask);
     }
 
+    @Test
+    public void testCleanExpiredCustomRequestInQueue() throws Exception {
+        BrokerFastFailure brokerFastFailure = new 
BrokerFastFailure(brokerController);
+        brokerFastFailure.start();
+        brokerConfig.setWaitTimeMillsInAckQueue(10);
+        BlockingQueue<Runnable> customThreadPoolQueue = new 
LinkedBlockingQueue<>();
+        brokerFastFailure.addCleanExpiredRequestQueue(customThreadPoolQueue, 
() -> brokerConfig.getWaitTimeMillsInAckQueue());
+
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+
+            }
+        };
+        RequestTask requestTask = new RequestTask(runnable, null, null);
+        customThreadPoolQueue.add(new FutureTaskExt<>(requestTask, null));
+
+        Thread.sleep(2000);
+
+        assertThat(customThreadPoolQueue.size()).isEqualTo(0);
+        assertThat(requestTask.isStopRun()).isEqualTo(true);
+
+        brokerConfig.setWaitTimeMillsInAckQueue(10000);
+
+        RequestTask requestTask2 = new RequestTask(runnable, null, null);
+        customThreadPoolQueue.add(new FutureTaskExt<>(requestTask2, null));
+
+        Thread.sleep(1000);
+
+        assertThat(customThreadPoolQueue.size()).isEqualTo(1);
+        assertThat(((FutureTaskExt) 
customThreadPoolQueue.peek()).getRunnable()).isEqualTo(requestTask2);
+
+        brokerFastFailure.shutdown();
+
+    }
+
 }
\ No newline at end of file

Reply via email to