This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9399399292b Fix IoTConsensus LogDispatcherThread Stop Using Futrue 
Cancel (#12370)
9399399292b is described below

commit 9399399292b3a600140c043f2b3224885e314d36
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Thu Apr 18 19:15:38 2024 +0800

    Fix IoTConsensus LogDispatcherThread Stop Using Futrue Cancel (#12370)
    
    * fix LogDispatcherThread stop
    
    * remove parallel
---
 .../consensus/iot/logdispatcher/LogDispatcher.java | 27 +++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index ba869d7c596..8f08fccaa18 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,8 +46,12 @@ import java.util.Objects;
 import java.util.OptionalLong;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -97,12 +101,13 @@ public class LogDispatcher {
 
   public synchronized void start() {
     if (!threads.isEmpty()) {
-      threads.forEach(executorService::submit);
+      threads.forEach(thread -> 
thread.setFuture(executorService.submit(thread)));
     }
   }
 
   public synchronized void stop() {
     if (!threads.isEmpty()) {
+      threads.forEach(LogDispatcherThread::stop);
       executorService.shutdownNow();
       int timeout = 10;
       try {
@@ -113,7 +118,6 @@ public class LogDispatcher {
         Thread.currentThread().interrupt();
         logger.error("Unexpected Interruption when closing LogDispatcher 
service ");
       }
-      threads.forEach(LogDispatcherThread::stop);
     }
     stopped = true;
   }
@@ -129,7 +133,7 @@ public class LogDispatcher {
     if (this.executorService == null) {
       initLogSyncThreadPool();
     }
-    executorService.submit(thread);
+    thread.setFuture(executorService.submit(thread));
   }
 
   public synchronized void removeLogDispatcherThread(Peer peer) throws 
IOException {
@@ -227,6 +231,8 @@ public class LogDispatcher {
 
     private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
 
+    private Future<?> future;
+
     public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long 
initialSyncIndex) {
       this.peer = peer;
       this.config = config;
@@ -251,6 +257,10 @@ public class LogDispatcher {
       return controller.getCurrentIndex();
     }
 
+    public void setFuture(Future<?> future) {
+      this.future = future;
+    }
+
     public long getLastFlushedSyncIndex() {
       return controller.getLastFlushedIndex();
     }
@@ -298,6 +308,17 @@ public class LogDispatcher {
 
     public void stop() {
       stopped = true;
+      if (!future.cancel(true)) {
+        logger.warn("LogDispatcherThread Future for {} is not stopped", peer);
+      }
+      try {
+        future.get(30, TimeUnit.SECONDS);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) 
{
+        Thread.currentThread().interrupt();
+        logger.warn("LogDispatcherThread Future for {} is not stopped", peer, 
e);
+      } catch (CancellationException ignored) {
+        // ignore because it is expected
+      }
       long requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
         requestSize += indexedConsensusRequest.getSerializedSize();

Reply via email to