This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 1a18dbb807 [To rel/0.13][IOTDB-5183]Fix CI OOM (#8412)
1a18dbb807 is described below
commit 1a18dbb807d7bab64c63a8d666b5812ed96881a5
Author: 周沛辰 <[email protected]>
AuthorDate: Wed Dec 14 20:53:36 2022 +0800
[To rel/0.13][IOTDB-5183]Fix CI OOM (#8412)
---
.../iotdb/db/engine/compaction/CompactionTaskManager.java | 12 +++++++++++-
.../db/engine/compaction/cross/CrossSpaceCompactionTest.java | 2 ++
.../db/engine/compaction/inner/InnerSeqCompactionTest.java | 5 +++++
.../db/engine/compaction/inner/InnerUnseqCompactionTest.java | 5 +++++
4 files changed, 23 insertions(+), 1 deletion(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 070f0224d1..ddac1512ec 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -134,6 +134,7 @@ public class CompactionTaskManager implements IService {
@Override
public void stop() {
if (taskExecutionPool != null) {
+ subCompactionTaskExecutionPool.shutdownNow();
taskExecutionPool.shutdownNow();
compactionTaskSubmissionThreadPool.shutdownNow();
logger.info("Waiting for task taskExecutionPool to shut down");
@@ -146,6 +147,7 @@ public class CompactionTaskManager implements IService {
@Override
public void waitAndStop(long milliseconds) {
if (taskExecutionPool != null) {
+ awaitTermination(subCompactionTaskExecutionPool, milliseconds);
awaitTermination(taskExecutionPool, milliseconds);
awaitTermination(compactionTaskSubmissionThreadPool, milliseconds);
logger.info("Waiting for task taskExecutionPool to shut down in {} ms",
milliseconds);
@@ -184,7 +186,7 @@ public class CompactionTaskManager implements IService {
private void waitTermination() {
long startTime = System.currentTimeMillis();
- while (!taskExecutionPool.isTerminated()) {
+ while (!subCompactionTaskExecutionPool.isTerminated() ||
!taskExecutionPool.isTerminated()) {
int timeMillis = 0;
try {
Thread.sleep(200);
@@ -202,6 +204,7 @@ public class CompactionTaskManager implements IService {
}
}
taskExecutionPool = null;
+ subCompactionTaskExecutionPool = null;
storageGroupTasks.clear();
logger.info("CompactionManager stopped");
}
@@ -369,6 +372,8 @@ public class CompactionTaskManager implements IService {
public void restart() throws InterruptedException {
if
(IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread() > 0)
{
if (taskExecutionPool != null) {
+ subCompactionTaskExecutionPool.shutdownNow();
+ subCompactionTaskExecutionPool.awaitTermination(Long.MAX_VALUE,
TimeUnit.MILLISECONDS);
this.taskExecutionPool.shutdownNow();
this.taskExecutionPool.awaitTermination(Long.MAX_VALUE,
TimeUnit.MILLISECONDS);
}
@@ -377,6 +382,11 @@ public class CompactionTaskManager implements IService {
IoTDBThreadPoolFactory.newScheduledThreadPool(
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread(),
ThreadName.COMPACTION_SERVICE.getName());
+ this.subCompactionTaskExecutionPool =
+ IoTDBThreadPoolFactory.newScheduledThreadPool(
+
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread()
+ *
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(),
+ ThreadName.COMPACTION_SUB_SERVICE.getName());
this.compactionTaskSubmissionThreadPool =
IoTDBThreadPoolFactory.newScheduledThreadPool(1,
ThreadName.COMPACTION_SERVICE.getName());
candidateCompactionTaskQueue.regsitPollLastHook(
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
index eadfd6b71e..2e0905ff39 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/CrossSpaceCompactionTest.java
@@ -110,6 +110,7 @@ public class CrossSpaceCompactionTest {
Collections.emptyMap());
}
CompactionTaskManager.getInstance().start();
+ IoTDB.activated = true;
Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
}
@@ -121,6 +122,7 @@ public class CrossSpaceCompactionTest {
TimeSeriesMetadataCache.getInstance().clear();
IoTDB.metaManager.clear();
CompactionTaskManager.getInstance().stop();
+ IoTDB.activated = false;
EnvironmentUtils.cleanAllDir();
Thread.currentThread().setName(oldThreadName);
new CompactionConfigRestorer().restoreCompactionConfig();
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
index d7703a1580..03142c891d 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.engine.compaction.inner;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import
org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
import org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils;
import org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils;
@@ -98,10 +99,14 @@ public class InnerSeqCompactionTest {
TSFileDescriptor.getInstance().getConfig().getCompressor(),
Collections.emptyMap());
}
+ CompactionTaskManager.getInstance().start();
+ IoTDB.activated = true;
}
@After
public void tearDown() throws IOException, StorageEngineException {
+ IoTDB.activated = false;
+ CompactionTaskManager.getInstance().stop();
new CompactionConfigRestorer().restoreCompactionConfig();
CompactionClearUtils.clearAllCompactionFiles();
ChunkCache.getInstance().clear();
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
index 14707e6b96..a3ce39baa9 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.inner;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
import
org.apache.iotdb.db.engine.compaction.inner.utils.InnerSpaceCompactionUtils;
import org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils;
@@ -110,10 +111,14 @@ public class InnerUnseqCompactionTest {
Collections.emptyMap());
}
Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
+ CompactionTaskManager.getInstance().start();
+ IoTDB.activated = true;
}
@After
public void tearDown() throws IOException, StorageEngineException {
+ IoTDB.activated = false;
+ CompactionTaskManager.getInstance().stop();
new CompactionConfigRestorer().restoreCompactionConfig();
CompactionClearUtils.clearAllCompactionFiles();
ChunkCache.getInstance().clear();