This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fc2ff7d80a4 Fix compaction worker stopped after drop database (#12357)
fc2ff7d80a4 is described below
commit fc2ff7d80a469dd07e6a5f4c5dce6b27cef85c77
Author: shuwenwei <[email protected]>
AuthorDate: Thu Apr 18 09:57:28 2024 +0800
Fix compaction worker stopped after drop database (#12357)
* fix compaction worker stopped after drop database
* fix restart
---
.../compaction/schedule/CompactionTaskManager.java | 9 +++++++++
.../dataregion/compaction/schedule/CompactionWorker.java | 12 ++++++++++--
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
index 7fa2e1158aa..aa3073bdce7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskManager.java
@@ -69,6 +69,7 @@ public class CompactionTaskManager implements IService {
// The thread pool that executes the compaction task. The default number of
threads for this pool
// is 10.
private WrappedThreadPoolExecutor taskExecutionPool;
+ private volatile boolean stopAllCompactionWorker = false;
// The thread pool that executes the sub compaction task.
private WrappedThreadPoolExecutor subCompactionTaskExecutionPool;
@@ -90,6 +91,10 @@ public class CompactionTaskManager implements IService {
return INSTANCE;
}
+ public boolean isStopAllCompactionWorker() {
+ return stopAllCompactionWorker;
+ }
+
@Override
public synchronized void start() {
if (taskExecutionPool == null
@@ -125,6 +130,7 @@ public class CompactionTaskManager implements IService {
@Override
public void stop() {
+ stopAllCompactionWorker = true;
if (taskExecutionPool != null) {
subCompactionTaskExecutionPool.shutdownNow();
taskExecutionPool.shutdownNow();
@@ -137,6 +143,7 @@ public class CompactionTaskManager implements IService {
@Override
public void waitAndStop(long milliseconds) {
+ stopAllCompactionWorker = true;
if (taskExecutionPool != null) {
awaitTermination(subCompactionTaskExecutionPool, milliseconds);
awaitTermination(taskExecutionPool, milliseconds);
@@ -412,6 +419,7 @@ public class CompactionTaskManager implements IService {
}
public void restart() throws InterruptedException {
+ stopAllCompactionWorker = true;
if (IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount() >
0) {
if (subCompactionTaskExecutionPool != null) {
this.subCompactionTaskExecutionPool.shutdownNow();
@@ -438,6 +446,7 @@ public class CompactionTaskManager implements IService {
init = true;
}
init = true;
+ stopAllCompactionWorker = false;
logger.info("Compaction task manager started.");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
index 903c16a5898..455f375962d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
@@ -50,14 +50,22 @@ public class CompactionWorker implements Runnable {
@SuppressWarnings("squid:S2142")
@Override
public void run() {
- while (!Thread.currentThread().isInterrupted()) {
+ while (true) {
+ if (Thread.currentThread().isInterrupted()) {
+ // If the interrupt is caused by `drop database`, clear the status
+ if (!CompactionTaskManager.getInstance().isStopAllCompactionWorker()) {
+ Thread.interrupted();
+ continue;
+ }
+ return;
+ }
AbstractCompactionTask task;
try {
task = compactionTaskQueue.take();
} catch (InterruptedException e) {
LOGGER.warn("CompactionThread-{} terminates because interruption",
threadId);
Thread.currentThread().interrupt();
- return;
+ continue;
}
processOneCompactionTask(task);
}