This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9399399292b Fix IoTConsensus LogDispatcherThread Stop Using Futrue
Cancel (#12370)
9399399292b is described below
commit 9399399292b3a600140c043f2b3224885e314d36
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Thu Apr 18 19:15:38 2024 +0800
Fix IoTConsensus LogDispatcherThread Stop Using Futrue Cancel (#12370)
* fix LogDispatcherThread stop
* remove parallel
---
.../consensus/iot/logdispatcher/LogDispatcher.java | 27 +++++++++++++++++++---
1 file changed, 24 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index ba869d7c596..8f08fccaa18 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,8 +46,12 @@ import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -97,12 +101,13 @@ public class LogDispatcher {
public synchronized void start() {
if (!threads.isEmpty()) {
- threads.forEach(executorService::submit);
+ threads.forEach(thread ->
thread.setFuture(executorService.submit(thread)));
}
}
public synchronized void stop() {
if (!threads.isEmpty()) {
+ threads.forEach(LogDispatcherThread::stop);
executorService.shutdownNow();
int timeout = 10;
try {
@@ -113,7 +118,6 @@ public class LogDispatcher {
Thread.currentThread().interrupt();
logger.error("Unexpected Interruption when closing LogDispatcher
service ");
}
- threads.forEach(LogDispatcherThread::stop);
}
stopped = true;
}
@@ -129,7 +133,7 @@ public class LogDispatcher {
if (this.executorService == null) {
initLogSyncThreadPool();
}
- executorService.submit(thread);
+ thread.setFuture(executorService.submit(thread));
}
public synchronized void removeLogDispatcherThread(Peer peer) throws
IOException {
@@ -227,6 +231,8 @@ public class LogDispatcher {
private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
+ private Future<?> future;
+
public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long
initialSyncIndex) {
this.peer = peer;
this.config = config;
@@ -251,6 +257,10 @@ public class LogDispatcher {
return controller.getCurrentIndex();
}
+ public void setFuture(Future<?> future) {
+ this.future = future;
+ }
+
public long getLastFlushedSyncIndex() {
return controller.getLastFlushedIndex();
}
@@ -298,6 +308,17 @@ public class LogDispatcher {
public void stop() {
stopped = true;
+ if (!future.cancel(true)) {
+ logger.warn("LogDispatcherThread Future for {} is not stopped", peer);
+ }
+ try {
+ future.get(30, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e)
{
+ Thread.currentThread().interrupt();
+ logger.warn("LogDispatcherThread Future for {} is not stopped", peer,
e);
+ } catch (CancellationException ignored) {
+ // ignore because it is expected
+ }
long requestSize = 0;
for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
requestSize += indexedConsensusRequest.getSerializedSize();