This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new dc4c694  Fix remove partition CI Bug (#3103)
dc4c694 is described below

commit dc4c6942891090b1a804776d3f50a29ac57d6b7f
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Thu Apr 29 15:25:47 2021 +0800

    Fix remove partition CI Bug (#3103)
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |   4 +-
 .../compaction/CompactionMergeTaskPoolManager.java |  38 ++-
 .../db/engine/compaction/TsFileManagement.java     |  19 +-
 .../level/LevelCompactionTsFileManagement.java     | 274 ++++++++++++---------
 .../no/NoCompactionTsFileManagement.java           | 132 ++++++----
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  14 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  44 ++--
 .../compaction/LevelCompactionCacheTest.java       |   2 +-
 .../engine/compaction/LevelCompactionLogTest.java  |   2 +-
 .../compaction/LevelCompactionMergeTest.java       |   6 +-
 .../compaction/LevelCompactionMoreDataTest.java    |   2 +-
 .../LevelCompactionTsFileManagementTest.java       |  69 ++++++
 .../NoCompactionTsFileManagementTest.java          |  71 +++++-
 13 files changed, 457 insertions(+), 220 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 0f2eed3..e7ed671 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -934,13 +934,13 @@ public class StorageEngine implements IService {
         set.stream()
             
.sorted(Comparator.comparing(StorageGroupProcessor::getVirtualStorageGroupId))
             .collect(Collectors.toList());
-    list.forEach(storageGroupProcessor -> 
storageGroupProcessor.getTsFileManagement().readLock());
+    list.forEach(StorageGroupProcessor::readLock);
     return list;
   }
 
   /** unlock all merge lock of the storage group processor related to the 
query */
   public void mergeUnLock(List<StorageGroupProcessor> list) {
-    list.forEach(storageGroupProcessor -> 
storageGroupProcessor.getTsFileManagement().readUnLock());
+    list.forEach(StorageGroupProcessor::readUnlock);
   }
 
   static class InstanceHolder {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 5b67d9e..cdf6f4a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -32,7 +32,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -46,6 +54,7 @@ public class CompactionMergeTaskPoolManager implements 
IService {
   private static final CompactionMergeTaskPoolManager INSTANCE =
       new CompactionMergeTaskPoolManager();
   private ExecutorService pool;
+  private Map<String, Set<Future<Void>>> storageGroupTasks = new 
ConcurrentHashMap<>();
 
   public static CompactionMergeTaskPoolManager getInstance() {
     return INSTANCE;
@@ -68,6 +77,7 @@ public class CompactionMergeTaskPoolManager implements 
IService {
       pool.shutdownNow();
       logger.info("Waiting for task pool to shut down");
       waitTermination();
+      storageGroupTasks.clear();
     }
   }
 
@@ -77,6 +87,7 @@ public class CompactionMergeTaskPoolManager implements 
IService {
       awaitTermination(pool, milliseconds);
       logger.info("Waiting for task pool to shut down");
       waitTermination();
+      storageGroupTasks.clear();
     }
   }
 
@@ -103,6 +114,7 @@ public class CompactionMergeTaskPoolManager implements 
IService {
           }
         }
       }
+      storageGroupTasks.clear();
       logger.info("All compaction task finish");
     }
   }
@@ -127,6 +139,7 @@ public class CompactionMergeTaskPoolManager implements 
IService {
       }
     }
     pool = null;
+    storageGroupTasks.clear();
     logger.info("CompactionManager stopped");
   }
 
@@ -146,9 +159,30 @@ public class CompactionMergeTaskPoolManager implements 
IService {
     return ServiceType.COMPACTION_SERVICE;
   }
 
-  public void submitTask(Runnable compactionMergeTask) throws 
RejectedExecutionException {
+  public void submitTask(String storageGroupName, Callable<Void> 
compactionMergeTask)
+      throws RejectedExecutionException {
     if (pool != null && !pool.isTerminated()) {
-      pool.submit(compactionMergeTask);
+      Future<Void> future = pool.submit(compactionMergeTask);
+      storageGroupTasks
+          .computeIfAbsent(storageGroupName, k -> new 
ConcurrentSkipListSet<>())
+          .add(future);
+    }
+  }
+
+  /**
+   * Abort all compactions of a storage group. The caller must acquire the 
write lock of the
+   * corresponding storage group.
+   */
+  public void abortCompaction(String storageGroup) {
+    Set<Future<Void>> subTasks =
+        storageGroupTasks.getOrDefault(storageGroup, Collections.emptySet());
+    Iterator<Future<Void>> subIterator = subTasks.iterator();
+    while (subIterator.hasNext()) {
+      Future<Void> next = subIterator.next();
+      if (!next.isDone() && !next.isCancelled()) {
+        next.cancel(true);
+      }
+      subIterator.remove();
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index a483f73..3b68e37 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -44,6 +44,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -129,11 +130,11 @@ public abstract class TsFileManagement {
   /** fork current TsFile list (call this before merge) */
   public abstract void forkCurrentFileList(long timePartition) throws 
IOException;
 
-  public void readLock() {
+  protected void readLock() {
     compactionMergeLock.readLock().lock();
   }
 
-  public void readUnLock() {
+  protected void readUnLock() {
     compactionMergeLock.readLock().unlock();
   }
 
@@ -151,7 +152,7 @@ public abstract class TsFileManagement {
 
   protected abstract void merge(long timePartition);
 
-  public class CompactionMergeTask implements Runnable {
+  public class CompactionMergeTask implements Callable<Void> {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
     private long timePartitionId;
@@ -163,13 +164,14 @@ public abstract class TsFileManagement {
     }
 
     @Override
-    public void run() {
+    public Void call() {
       merge(timePartitionId);
       closeCompactionMergeCallBack.call();
+      return null;
     }
   }
 
-  public class CompactionRecoverTask implements Runnable {
+  public class CompactionRecoverTask implements Callable<Void> {
 
     private CloseCompactionMergeCallBack closeCompactionMergeCallBack;
 
@@ -178,9 +180,10 @@ public abstract class TsFileManagement {
     }
 
     @Override
-    public void run() {
+    public Void call() {
       recover();
       closeCompactionMergeCallBack.call();
+      return null;
     }
   }
 
@@ -382,8 +385,8 @@ public abstract class TsFileManagement {
       List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, File 
mergeLog) {
     logger.info("{} a merge task is ending...", storageGroupName);
 
-    if (unseqFiles.isEmpty()) {
-      // merge runtime exception arose, just end this merge
+    if (Thread.currentThread().isInterrupted() || unseqFiles.isEmpty()) {
+      // merge task abort, or merge runtime exception arose, just end this 
merge
       isUnseqMerging = false;
       logger.info("{} a merge task abnormally ends", storageGroupName);
       return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 5bd7cbc..61f928d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -43,7 +43,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -51,8 +51,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
 import static 
org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
@@ -80,14 +78,12 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
 
   // First map is partition list; Second list is level list; Third list is 
file list in level;
   private final Map<Long, List<SortedSet<TsFileResource>>> 
sequenceTsFileResources =
-      new ConcurrentSkipListMap<>();
-  private final Map<Long, List<List<TsFileResource>>> 
unSequenceTsFileResources =
-      new ConcurrentSkipListMap<>();
+      new HashMap<>();
+  private final Map<Long, List<List<TsFileResource>>> 
unSequenceTsFileResources = new HashMap<>();
   private final List<List<TsFileResource>> forkedSequenceTsFileResources = new 
ArrayList<>();
   private final List<List<TsFileResource>> forkedUnSequenceTsFileResources = 
new ArrayList<>();
-  private final List<TsFileResource> sequenceRecoverTsFileResources = new 
CopyOnWriteArrayList<>();
-  private final List<TsFileResource> unSequenceRecoverTsFileResources =
-      new CopyOnWriteArrayList<>();
+  private final List<TsFileResource> sequenceRecoverTsFileResources = new 
ArrayList<>();
+  private final List<TsFileResource> unSequenceRecoverTsFileResources = new 
ArrayList<>();
 
   public LevelCompactionTsFileManagement(String storageGroupName, String 
storageGroupDir) {
     super(storageGroupName, storageGroupDir);
@@ -139,17 +135,13 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
     if (sequence) {
       if (sequenceTsFileResources.containsKey(timePartitionId)) {
         if (sequenceTsFileResources.get(timePartitionId).size() > level) {
-          synchronized (sequenceTsFileResources) {
-            
sequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
-          }
+          
sequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
         }
       }
     } else {
       if (unSequenceTsFileResources.containsKey(timePartitionId)) {
         if (unSequenceTsFileResources.get(timePartitionId).size() > level) {
-          synchronized (unSequenceTsFileResources) {
-            
unSequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
-          }
+          
unSequenceTsFileResources.get(timePartitionId).get(level).removeAll(mergeTsFiles);
         }
       }
     }
@@ -173,82 +165,89 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
   @Deprecated
   @Override
   public List<TsFileResource> getTsFileList(boolean sequence) {
-    List<TsFileResource> result = new ArrayList<>();
-    if (sequence) {
-      synchronized (sequenceTsFileResources) {
+    readLock();
+    try {
+      List<TsFileResource> result = new ArrayList<>();
+      if (sequence) {
         for (long timePartition : sequenceTsFileResources.keySet()) {
           result.addAll(getTsFileListByTimePartition(true, timePartition));
         }
-      }
-    } else {
-      synchronized (unSequenceTsFileResources) {
+      } else {
         for (long timePartition : unSequenceTsFileResources.keySet()) {
           result.addAll(getTsFileListByTimePartition(false, timePartition));
         }
       }
+      return result;
+    } finally {
+      readUnLock();
     }
-    return result;
   }
 
   public List<TsFileResource> getTsFileListByTimePartition(boolean sequence, 
long timePartition) {
-    List<TsFileResource> result = new ArrayList<>();
-    if (sequence) {
-      synchronized (sequenceTsFileResources) {
+    readLock();
+    try {
+      List<TsFileResource> result = new ArrayList<>();
+      if (sequence) {
         List<SortedSet<TsFileResource>> sequenceTsFileList =
             sequenceTsFileResources.get(timePartition);
         for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
           result.addAll(sequenceTsFileList.get(i));
         }
-      }
-    } else {
-      synchronized (unSequenceTsFileResources) {
+      } else {
         List<List<TsFileResource>> unSequenceTsFileList =
             unSequenceTsFileResources.get(timePartition);
         for (int i = unSequenceTsFileList.size() - 1; i >= 0; i--) {
           result.addAll(unSequenceTsFileList.get(i));
         }
       }
+      return result;
+    } finally {
+      readUnLock();
     }
-    return result;
   }
 
   @Override
   public Iterator<TsFileResource> getIterator(boolean sequence) {
-    return getTsFileList(sequence).iterator();
+    readLock();
+    try {
+      return getTsFileList(sequence).iterator();
+    } finally {
+      readUnLock();
+    }
   }
 
   @Override
   public void remove(TsFileResource tsFileResource, boolean sequence) {
-    if (sequence) {
-      synchronized (sequenceTsFileResources) {
+    writeLock();
+    try {
+      if (sequence) {
         for (SortedSet<TsFileResource> sequenceTsFileResource :
             sequenceTsFileResources.get(tsFileResource.getTimePartition())) {
           sequenceTsFileResource.remove(tsFileResource);
         }
-      }
-    } else {
-      synchronized (unSequenceTsFileResources) {
+      } else {
         for (List<TsFileResource> unSequenceTsFileResource :
             unSequenceTsFileResources.get(tsFileResource.getTimePartition())) {
           unSequenceTsFileResource.remove(tsFileResource);
         }
       }
+    } finally {
+      writeUnlock();
     }
   }
 
   @Override
   public void removeAll(List<TsFileResource> tsFileResourceList, boolean 
sequence) {
-    if (sequence) {
-      synchronized (sequenceTsFileResources) {
+    writeLock();
+    try {
+      if (sequence) {
         for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
             sequenceTsFileResources.values()) {
           for (SortedSet<TsFileResource> levelTsFileResource : 
partitionSequenceTsFileResource) {
             levelTsFileResource.removeAll(tsFileResourceList);
           }
         }
-      }
-    } else {
-      synchronized (unSequenceTsFileResources) {
+      } else {
         for (List<List<TsFileResource>> partitionUnSequenceTsFileResource :
             unSequenceTsFileResources.values()) {
           for (List<TsFileResource> levelTsFileResource : 
partitionUnSequenceTsFileResource) {
@@ -256,15 +255,18 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
           }
         }
       }
+    } finally {
+      writeUnlock();
     }
   }
 
   @Override
   public void add(TsFileResource tsFileResource, boolean sequence) {
-    long timePartitionId = tsFileResource.getTimePartition();
-    int level = getMergeLevel(tsFileResource.getTsFile());
-    if (sequence) {
-      synchronized (sequenceTsFileResources) {
+    writeLock();
+    try {
+      long timePartitionId = tsFileResource.getTimePartition();
+      int level = getMergeLevel(tsFileResource.getTsFile());
+      if (sequence) {
         if (level <= seqLevelNum - 1) {
           // current file has normal level
           sequenceTsFileResources
@@ -278,9 +280,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
               .get(seqLevelNum - 1)
               .add(tsFileResource);
         }
-      }
-    } else {
-      synchronized (unSequenceTsFileResources) {
+      } else {
         if (level <= unseqLevelNum - 1) {
           // current file has normal level
           unSequenceTsFileResources
@@ -295,101 +295,124 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
               .add(tsFileResource);
         }
       }
+    } finally {
+      writeUnlock();
     }
   }
 
   @Override
   public void addRecover(TsFileResource tsFileResource, boolean sequence) {
     if (sequence) {
-      synchronized (sequenceRecoverTsFileResources) {
-        sequenceRecoverTsFileResources.add(tsFileResource);
-      }
+      sequenceRecoverTsFileResources.add(tsFileResource);
     } else {
-      synchronized (unSequenceTsFileResources) {
-        unSequenceRecoverTsFileResources.add(tsFileResource);
-      }
+      unSequenceRecoverTsFileResources.add(tsFileResource);
     }
   }
 
   @Override
   public void addAll(List<TsFileResource> tsFileResourceList, boolean 
sequence) {
-    for (TsFileResource tsFileResource : tsFileResourceList) {
-      add(tsFileResource, sequence);
+    writeLock();
+    try {
+      for (TsFileResource tsFileResource : tsFileResourceList) {
+        add(tsFileResource, sequence);
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
   @Override
   public boolean contains(TsFileResource tsFileResource, boolean sequence) {
-    if (sequence) {
-      for (SortedSet<TsFileResource> sequenceTsFileResource :
-          sequenceTsFileResources.computeIfAbsent(
-              tsFileResource.getTimePartition(), 
this::newSequenceTsFileResources)) {
-        if (sequenceTsFileResource.contains(tsFileResource)) {
-          return true;
+    readLock();
+    try {
+      if (sequence) {
+        for (SortedSet<TsFileResource> sequenceTsFileResource :
+            sequenceTsFileResources.computeIfAbsent(
+                tsFileResource.getTimePartition(), 
this::newSequenceTsFileResources)) {
+          if (sequenceTsFileResource.contains(tsFileResource)) {
+            return true;
+          }
         }
-      }
-    } else {
-      for (List<TsFileResource> unSequenceTsFileResource :
-          unSequenceTsFileResources.computeIfAbsent(
-              tsFileResource.getTimePartition(), 
this::newUnSequenceTsFileResources)) {
-        if (unSequenceTsFileResource.contains(tsFileResource)) {
-          return true;
+      } else {
+        for (List<TsFileResource> unSequenceTsFileResource :
+            unSequenceTsFileResources.computeIfAbsent(
+                tsFileResource.getTimePartition(), 
this::newUnSequenceTsFileResources)) {
+          if (unSequenceTsFileResource.contains(tsFileResource)) {
+            return true;
+          }
         }
       }
+      return false;
+    } finally {
+      readUnLock();
     }
-    return false;
   }
 
   @Override
   public void clear() {
-    sequenceTsFileResources.clear();
-    unSequenceTsFileResources.clear();
+    writeLock();
+    try {
+      sequenceTsFileResources.clear();
+      unSequenceTsFileResources.clear();
+    } finally {
+      writeUnlock();
+    }
   }
 
   @Override
   @SuppressWarnings("squid:S3776")
   public boolean isEmpty(boolean sequence) {
-    if (sequence) {
-      for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
-          sequenceTsFileResources.values()) {
-        for (SortedSet<TsFileResource> sequenceTsFileResource : 
partitionSequenceTsFileResource) {
-          if (!sequenceTsFileResource.isEmpty()) {
-            return false;
+    readLock();
+    try {
+      if (sequence) {
+        for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
+            sequenceTsFileResources.values()) {
+          for (SortedSet<TsFileResource> sequenceTsFileResource : 
partitionSequenceTsFileResource) {
+            if (!sequenceTsFileResource.isEmpty()) {
+              return false;
+            }
           }
         }
-      }
-    } else {
-      for (List<List<TsFileResource>> partitionUnSequenceTsFileResource :
-          unSequenceTsFileResources.values()) {
-        for (List<TsFileResource> unSequenceTsFileResource : 
partitionUnSequenceTsFileResource) {
-          if (!unSequenceTsFileResource.isEmpty()) {
-            return false;
+      } else {
+        for (List<List<TsFileResource>> partitionUnSequenceTsFileResource :
+            unSequenceTsFileResources.values()) {
+          for (List<TsFileResource> unSequenceTsFileResource : 
partitionUnSequenceTsFileResource) {
+            if (!unSequenceTsFileResource.isEmpty()) {
+              return false;
+            }
           }
         }
       }
+      return true;
+    } finally {
+      readUnLock();
     }
-    return true;
   }
 
   @Override
   public int size(boolean sequence) {
-    int result = 0;
-    if (sequence) {
-      for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
-          sequenceTsFileResources.values()) {
-        for (int i = seqLevelNum - 1; i >= 0; i--) {
-          result += partitionSequenceTsFileResource.get(i).size();
+    readLock();
+    try {
+      int result = 0;
+      if (sequence) {
+        for (List<SortedSet<TsFileResource>> partitionSequenceTsFileResource :
+            sequenceTsFileResources.values()) {
+          for (int i = seqLevelNum - 1; i >= 0; i--) {
+            result += partitionSequenceTsFileResource.get(i).size();
+          }
         }
-      }
-    } else {
-      for (List<List<TsFileResource>> partitionUnSequenceTsFileResource :
-          unSequenceTsFileResources.values()) {
-        for (int i = unseqLevelNum - 1; i >= 0; i--) {
-          result += partitionUnSequenceTsFileResource.get(i).size();
+      } else {
+        for (List<List<TsFileResource>> partitionUnSequenceTsFileResource :
+            unSequenceTsFileResources.values()) {
+          for (int i = unseqLevelNum - 1; i >= 0; i--) {
+            result += partitionUnSequenceTsFileResource.get(i).size();
+          }
         }
       }
+      return result;
+    } finally {
+      readUnLock();
     }
-    return result;
   }
 
   /** recover files */
@@ -479,6 +502,10 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
             // complete compaction and delete source file
             writeLock();
             try {
+              if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException(
+                    String.format("%s [Compaction] abort", storageGroupName));
+              }
               int targetLevel = getMergeLevel(targetResource.getTsFile());
               if (isSeq) {
                 
sequenceTsFileResources.get(timePartition).get(targetLevel).add(targetResource);
@@ -499,7 +526,7 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
           }
         }
       }
-    } catch (IOException | IllegalPathException e) {
+    } catch (IOException | IllegalPathException | InterruptedException e) {
       logger.error("recover level tsfile management error ", e);
     } finally {
       if (logFile.exists()) {
@@ -532,21 +559,22 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
 
   @Override
   public void forkCurrentFileList(long timePartition) {
-    synchronized (sequenceTsFileResources) {
+    readLock();
+    try {
       forkTsFileList(
           forkedSequenceTsFileResources,
           sequenceTsFileResources.computeIfAbsent(timePartition, 
this::newSequenceTsFileResources),
           seqLevelNum,
           seqFileNumInEachLevel);
-    }
-    // we have to copy all unseq file
-    synchronized (unSequenceTsFileResources) {
+      // we have to copy all unseq file
       forkTsFileList(
           forkedUnSequenceTsFileResources,
           unSequenceTsFileResources.computeIfAbsent(
               timePartition, this::newUnSequenceTsFileResources),
           unseqLevelNum + 1,
           unseqFileNumInEachLevel);
+    } finally {
+      readUnLock();
     }
   }
 
@@ -664,6 +692,11 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
                 toMergeTsFiles.size());
             writeLock();
             try {
+              if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException(
+                    String.format("%s [Compaction] abort", storageGroupName));
+              }
+
               if (sequence) {
                 sequenceTsFileResources.get(timePartition).get(i + 
1).add(newResource);
               } else {
@@ -710,32 +743,31 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
   }
 
   private List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
-    List<SortedSet<TsFileResource>> newSequenceTsFileResources = new 
CopyOnWriteArrayList<>();
+    List<SortedSet<TsFileResource>> newSequenceTsFileResources = new 
ArrayList<>();
     for (int i = 0; i < seqLevelNum; i++) {
       newSequenceTsFileResources.add(
-          Collections.synchronizedSortedSet(
-              new TreeSet<>(
-                  (o1, o2) -> {
-                    try {
-                      int rangeCompare =
-                          Long.compare(
-                              
Long.parseLong(o1.getTsFile().getParentFile().getName()),
-                              
Long.parseLong(o2.getTsFile().getParentFile().getName()));
-                      return rangeCompare == 0
-                          ? compareFileName(o1.getTsFile(), o2.getTsFile())
-                          : rangeCompare;
-                    } catch (NumberFormatException e) {
-                      return compareFileName(o1.getTsFile(), o2.getTsFile());
-                    }
-                  })));
+          new TreeSet<>(
+              (o1, o2) -> {
+                try {
+                  int rangeCompare =
+                      Long.compare(
+                          
Long.parseLong(o1.getTsFile().getParentFile().getName()),
+                          
Long.parseLong(o2.getTsFile().getParentFile().getName()));
+                  return rangeCompare == 0
+                      ? compareFileName(o1.getTsFile(), o2.getTsFile())
+                      : rangeCompare;
+                } catch (NumberFormatException e) {
+                  return compareFileName(o1.getTsFile(), o2.getTsFile());
+                }
+              }));
     }
     return newSequenceTsFileResources;
   }
 
   private List<List<TsFileResource>> newUnSequenceTsFileResources(Long k) {
-    List<List<TsFileResource>> newUnSequenceTsFileResources = new 
CopyOnWriteArrayList<>();
+    List<List<TsFileResource>> newUnSequenceTsFileResources = new 
ArrayList<>();
     for (int i = 0; i < unseqLevelNum; i++) {
-      newUnSequenceTsFileResources.add(new CopyOnWriteArrayList<>());
+      newUnSequenceTsFileResources.add(new ArrayList<>());
     }
     return newUnSequenceTsFileResources;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
index 6d9864d..5c3d8b1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/no/NoCompactionTsFileManagement.java
@@ -50,65 +50,75 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
   @Deprecated
   @Override
   public List<TsFileResource> getTsFileList(boolean sequence) {
-    List<TsFileResource> result = new ArrayList<>();
-    if (sequence) {
-      synchronized (sequenceFileTreeSetMap) {
+    readLock();
+    try {
+      List<TsFileResource> result = new ArrayList<>();
+      if (sequence) {
         for (TreeSet<TsFileResource> tsFileResourceTreeSet : 
sequenceFileTreeSetMap.values()) {
           result.addAll(tsFileResourceTreeSet);
         }
-      }
-    } else {
-      synchronized (unSequenceFileListMap) {
+      } else {
         for (List<TsFileResource> tsFileResourceList : 
unSequenceFileListMap.values()) {
           result.addAll(tsFileResourceList);
         }
       }
+      return result;
+    } finally {
+      readUnLock();
     }
-    return result;
   }
 
   @Override
   public List<TsFileResource> getTsFileListByTimePartition(boolean sequence, 
long timePartition) {
-    if (sequence) {
-      synchronized (sequenceFileTreeSetMap) {
-        return new 
ArrayList<>(sequenceFileTreeSetMap.getOrDefault(timePartition, new 
TreeSet<>()));
-      }
-    } else {
-      synchronized (unSequenceFileListMap) {
+    readLock();
+    try {
+      if (sequence) {
+        return new ArrayList<>(
+            sequenceFileTreeSetMap.getOrDefault(timePartition, 
newSequenceTsFileResources(0L)));
+      } else {
         return new ArrayList<>(
             unSequenceFileListMap.getOrDefault(timePartition, 
Collections.emptyList()));
       }
+    } finally {
+      readUnLock();
     }
   }
 
   @Override
   public Iterator<TsFileResource> getIterator(boolean sequence) {
-    return getTsFileList(sequence).iterator();
+    readLock();
+    try {
+      return getTsFileList(sequence).iterator();
+    } finally {
+      readUnLock();
+    }
   }
 
   @Override
   public void remove(TsFileResource tsFileResource, boolean sequence) {
-    if (sequence) {
-      synchronized (sequenceFileTreeSetMap) {
+    writeLock();
+    try {
+      if (sequence) {
         TreeSet<TsFileResource> sequenceFileTreeSet =
             sequenceFileTreeSetMap.get(tsFileResource.getTimePartition());
         sequenceFileTreeSet.remove(tsFileResource);
-      }
-    } else {
-      synchronized (unSequenceFileListMap) {
+      } else {
         List<TsFileResource> unSequenceFileList =
             unSequenceFileListMap.get(tsFileResource.getTimePartition());
         unSequenceFileList.remove(tsFileResource);
       }
+    } finally {
+      writeUnlock();
     }
   }
 
   @Override
   public void removeAll(List<TsFileResource> tsFileResourceList, boolean 
sequence) {
-    if (tsFileResourceList.size() > 0) {
-      tsFileResourceList.sort((o1, o2) -> (int) (o1.getTimePartition() - 
o2.getTimePartition()));
-      if (sequence) {
-        synchronized (sequenceFileTreeSetMap) {
+    writeLock();
+    try {
+      if (tsFileResourceList.size() > 0) {
+        tsFileResourceList.sort((o1, o2) -> (int) (o1.getTimePartition() - 
o2.getTimePartition()));
+        if (sequence) {
           long currTimePartition = 
tsFileResourceList.get(0).getTimePartition();
           int startIndex = 0;
           for (int i = 1; i < tsFileResourceList.size(); i++) {
@@ -124,9 +134,7 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
           sequenceFileTreeSetMap
               .get(currTimePartition)
               .removeAll(tsFileResourceList.subList(startIndex, 
tsFileResourceList.size()));
-        }
-      } else {
-        synchronized (unSequenceFileListMap) {
+        } else {
           long currTimePartition = 
tsFileResourceList.get(0).getTimePartition();
           int startIndex = 0;
           for (int i = 1; i < tsFileResourceList.size(); i++) {
@@ -144,24 +152,27 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
               .removeAll(tsFileResourceList.subList(startIndex, 
tsFileResourceList.size()));
         }
       }
+    } finally {
+      writeUnlock();
     }
   }
 
   @Override
   public void add(TsFileResource tsFileResource, boolean sequence) {
-    long timePartitionId = tsFileResource.getTimePartition();
-    if (sequence) {
-      synchronized (sequenceFileTreeSetMap) {
+    writeLock();
+    try {
+      long timePartitionId = tsFileResource.getTimePartition();
+      if (sequence) {
         sequenceFileTreeSetMap
             .computeIfAbsent(timePartitionId, this::newSequenceTsFileResources)
             .add(tsFileResource);
-      }
-    } else {
-      synchronized (unSequenceFileListMap) {
+      } else {
         unSequenceFileListMap
             .computeIfAbsent(timePartitionId, 
this::newUnSequenceTsFileResources)
             .add(tsFileResource);
       }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -172,73 +183,86 @@ public class NoCompactionTsFileManagement extends 
TsFileManagement {
 
   @Override
   public void addAll(List<TsFileResource> tsFileResourceList, boolean 
sequence) {
-    for (TsFileResource tsFileResource : tsFileResourceList) {
-      add(tsFileResource, sequence);
+    writeLock();
+    try {
+      for (TsFileResource tsFileResource : tsFileResourceList) {
+        add(tsFileResource, sequence);
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
   @Override
   public boolean contains(TsFileResource tsFileResource, boolean sequence) {
-    if (sequence) {
-      synchronized (sequenceFileTreeSetMap) {
+    readLock();
+    try {
+      if (sequence) {
         return sequenceFileTreeSetMap
             .getOrDefault(tsFileResource.getTimePartition(), 
newSequenceTsFileResources(0L))
             .contains(tsFileResource);
-      }
-    } else {
-      synchronized (unSequenceFileListMap) {
+      } else {
         return unSequenceFileListMap
             .getOrDefault(tsFileResource.getTimePartition(), new ArrayList<>())
             .contains(tsFileResource);
       }
+    } finally {
+      readUnLock();
     }
   }
 
   @Override
   public void clear() {
-    sequenceFileTreeSetMap.clear();
-    unSequenceFileListMap.clear();
+    writeLock();
+    try {
+      sequenceFileTreeSetMap.clear();
+      unSequenceFileListMap.clear();
+    } finally {
+      writeUnlock();
+    }
   }
 
   @Override
   public boolean isEmpty(boolean sequence) {
-    if (sequence) {
-      synchronized (sequenceFileTreeSetMap) {
+    readLock();
+    try {
+      if (sequence) {
         for (Set<TsFileResource> sequenceFileTreeSet : 
sequenceFileTreeSetMap.values()) {
           if (!sequenceFileTreeSet.isEmpty()) {
             return false;
           }
         }
-      }
-    } else {
-      synchronized (unSequenceFileListMap) {
+      } else {
         for (List<TsFileResource> unSequenceFileList : 
unSequenceFileListMap.values()) {
           if (!unSequenceFileList.isEmpty()) {
             return false;
           }
         }
       }
+      return true;
+    } finally {
+      readUnLock();
     }
-    return true;
   }
 
   @Override
   public int size(boolean sequence) {
-    int result = 0;
-    if (sequence) {
-      synchronized (sequenceFileTreeSetMap) {
+    readLock();
+    try {
+      int result = 0;
+      if (sequence) {
         for (Set<TsFileResource> sequenceFileTreeSet : 
sequenceFileTreeSetMap.values()) {
           result += sequenceFileTreeSet.size();
         }
-      }
-    } else {
-      synchronized (unSequenceFileListMap) {
+      } else {
         for (List<TsFileResource> unSequenceFileList : 
unSequenceFileListMap.values()) {
           result += unSequenceFileList.size();
         }
       }
+      return result;
+    } finally {
+      readUnLock();
     }
-    return result;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 15b5176..0c4241b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -164,6 +164,10 @@ public class MergeFileTask {
 
     seqFile.writeLock();
     try {
+      if (Thread.currentThread().isInterrupted()) {
+        return;
+      }
+
       
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
 
       resource.removeFileReader(seqFile);
@@ -338,16 +342,20 @@ public class MergeFileTask {
     }
     updateStartTimeAndEndTime(seqFile, fileWriter);
     resource.removeFileReader(seqFile);
-    
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
     fileWriter.endFile();
 
     updatePlanIndexes(seqFile);
-    mergeLogger.logFileMergeEnd();
-    logger.debug("{} moved unmerged chunks of {} to the new file", taskName, 
seqFile);
 
     seqFile.writeLock();
     try {
+      if (Thread.currentThread().isInterrupted()) {
+        return;
+      }
+
       seqFile.serialize();
+      mergeLogger.logFileMergeEnd();
+      logger.debug("{} moved unmerged chunks of {} to the new file", taskName, 
seqFile);
+      
FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
 
       // change tsFile name
       seqFile.getTsFile().delete();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2282543..e0e7589 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -515,6 +515,7 @@ public class StorageGroupProcessor {
       try {
         CompactionMergeTaskPoolManager.getInstance()
             .submitTask(
+                logicalStorageGroupName,
                 tsFileManagement.new 
CompactionRecoverTask(this::closeCompactionMergeCallBack));
       } catch (RejectedExecutionException e) {
         this.closeCompactionMergeCallBack();
@@ -1524,7 +1525,7 @@ public class StorageGroupProcessor {
       QueryFileManager filePathsManager,
       Filter timeFilter)
       throws QueryProcessException {
-    insertLock.readLock().lock();
+    readLock();
     try {
       List<TsFileResource> seqResources =
           getFileResourceListForQuery(
@@ -1556,10 +1557,18 @@ public class StorageGroupProcessor {
     } catch (MetadataException e) {
       throw new QueryProcessException(e);
     } finally {
-      insertLock.readLock().unlock();
+      readUnlock();
     }
   }
 
+  public void readLock() {
+    insertLock.readLock().lock();
+  }
+
+  public void readUnlock() {
+    insertLock.readLock().unlock();
+  }
+
   public void writeLock() {
     insertLock.writeLock().lock();
   }
@@ -1661,7 +1670,6 @@ public class StorageGroupProcessor {
     // TODO: how to avoid partial deletion?
     // FIXME: notice that if we may remove a SGProcessor out of memory, we 
need to close all opened
     // mod files in mergingModification, sequenceFileList, and 
unsequenceFileList
-    tsFileManagement.readLock();
     writeLock();
 
     // record files which are updated so that we can roll back them in case of 
exception
@@ -1704,7 +1712,6 @@ public class StorageGroupProcessor {
       throw new IOException(e);
     } finally {
       writeUnlock();
-      tsFileManagement.readUnLock();
     }
   }
 
@@ -1944,6 +1951,7 @@ public class StorageGroupProcessor {
         tsFileManagement.setForceFullMerge(fullMerge);
         CompactionMergeTaskPoolManager.getInstance()
             .submitTask(
+                logicalStorageGroupName,
                 tsFileManagement
                 .new CompactionMergeTask(this::closeCompactionMergeCallBack, 
timePartition));
       } catch (IOException | RejectedExecutionException e) {
@@ -2001,14 +2009,12 @@ public class StorageGroupProcessor {
     upgradeFileCount.getAndAdd(-1);
     // load all upgraded resources in this sg to tsFileManagement
     if (upgradeFileCount.get() == 0) {
-      tsFileManagement.writeLock();
       writeLock();
       try {
         loadUpgradedResources(upgradeSeqFileList, true);
         loadUpgradedResources(upgradeUnseqFileList, false);
       } finally {
         writeUnlock();
-        tsFileManagement.writeUnlock();
       }
       // after upgrade complete, update partitionLatestFlushedTimeForEachDevice
       for (Entry<Long, Map<String, Long>> entry :
@@ -2089,7 +2095,6 @@ public class StorageGroupProcessor {
   public void loadNewTsFileForSync(TsFileResource newTsFileResource) throws 
LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
-    tsFileManagement.writeLock();
     writeLock();
     try {
       if (loadTsFileByType(
@@ -2113,7 +2118,6 @@ public class StorageGroupProcessor {
       throw new LoadFileException(e);
     } finally {
       writeUnlock();
-      tsFileManagement.writeUnlock();
     }
   }
 
@@ -2161,7 +2165,6 @@ public class StorageGroupProcessor {
   public void loadNewTsFile(TsFileResource newTsFileResource) throws 
LoadFileException {
     File tsfileToBeInserted = newTsFileResource.getTsFile();
     long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
-    tsFileManagement.writeLock();
     writeLock();
     try {
       List<TsFileResource> sequenceList = tsFileManagement.getTsFileList(true);
@@ -2222,7 +2225,6 @@ public class StorageGroupProcessor {
       throw new LoadFileException(e);
     } finally {
       writeUnlock();
-      tsFileManagement.writeUnlock();
     }
   }
 
@@ -2612,7 +2614,6 @@ public class StorageGroupProcessor {
    *     module.
    */
   public boolean deleteTsfile(File tsfieToBeDeleted) {
-    tsFileManagement.writeLock();
     writeLock();
     TsFileResource tsFileResourceToBeDeleted = null;
     try {
@@ -2638,7 +2639,6 @@ public class StorageGroupProcessor {
       }
     } finally {
       writeUnlock();
-      tsFileManagement.writeUnlock();
     }
     if (tsFileResourceToBeDeleted == null) {
       return false;
@@ -2668,7 +2668,6 @@ public class StorageGroupProcessor {
    * @return whether the file to be moved exists. @UsedBy load external tsfile 
module.
    */
   public boolean moveTsfile(File fileToBeMoved, File targetDir) {
-    tsFileManagement.writeLock();
     writeLock();
     TsFileResource tsFileResourceToBeMoved = null;
     try {
@@ -2694,7 +2693,6 @@ public class StorageGroupProcessor {
       }
     } finally {
       writeUnlock();
-      tsFileManagement.writeUnlock();
     }
     if (tsFileResourceToBeMoved == null) {
       return false;
@@ -2801,22 +2799,21 @@ public class StorageGroupProcessor {
   /** remove all partitions that satisfy a filter. */
   public void removePartitions(TimePartitionFilter filter) {
     // this requires blocking all other activities
-    tsFileManagement.writeLock();
-    insertLock.writeLock().lock();
+    writeLock();
     try {
-      // abort ongoing merges
+      // abort ongoing comapctions and merges
+      
CompactionMergeTaskPoolManager.getInstance().abortCompaction(logicalStorageGroupName);
       MergeManager.getINSTANCE().abortMerge(logicalStorageGroupName);
       // close all working files that should be removed
       removePartitions(filter, workSequenceTsFileProcessors.entrySet());
       removePartitions(filter, workUnsequenceTsFileProcessors.entrySet());
 
       // remove data files
-      removePartitions(filter, tsFileManagement.getIterator(true));
-      removePartitions(filter, tsFileManagement.getIterator(false));
+      removePartitions(filter, tsFileManagement.getIterator(true), true);
+      removePartitions(filter, tsFileManagement.getIterator(false), false);
 
     } finally {
-      insertLock.writeLock().unlock();
-      tsFileManagement.writeUnlock();
+      writeUnlock();
     }
   }
 
@@ -2840,12 +2837,13 @@ public class StorageGroupProcessor {
   }
 
   // may remove the iterator's data
-  private void removePartitions(TimePartitionFilter filter, 
Iterator<TsFileResource> iterator) {
+  private void removePartitions(
+      TimePartitionFilter filter, Iterator<TsFileResource> iterator, boolean 
sequence) {
     while (iterator.hasNext()) {
       TsFileResource tsFileResource = iterator.next();
       if (filter.satisfy(logicalStorageGroupName, 
tsFileResource.getTimePartition())) {
         tsFileResource.remove();
-        iterator.remove();
+        tsFileManagement.remove(tsFileResource, sequence);
         updateLatestFlushTimeToPartition(tsFileResource.getTimePartition(), 
Long.MIN_VALUE);
         logger.debug("{} is removed during deleting partitions", 
tsFileResource.getTsFilePath());
       }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
index bd02b4f..46c2771 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionCacheTest.java
@@ -97,7 +97,7 @@ public class LevelCompactionCacheTest extends 
LevelCompactionTest {
         levelCompactionTsFileManagement
         .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
     compactionMergeWorking = true;
-    compactionMergeTask.run();
+    compactionMergeTask.call();
     while (compactionMergeWorking) {
       // wait
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
index c39ab3a..d6d9c99 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionLogTest.java
@@ -69,7 +69,7 @@ public class LevelCompactionLogTest extends 
LevelCompactionTest {
         levelCompactionTsFileManagement
         .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
     compactionMergeWorking = true;
-    compactionMergeTask.run();
+    compactionMergeTask.call();
     while (compactionMergeWorking) {
       // wait
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index 4e2d65c..3d9aecd 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -82,7 +82,7 @@ public class LevelCompactionMergeTest extends 
LevelCompactionTest {
         levelCompactionTsFileManagement
         .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
     compactionMergeWorking = true;
-    compactionMergeTask.run();
+    compactionMergeTask.call();
     while (compactionMergeWorking) {
       // wait
     }
@@ -126,7 +126,7 @@ public class LevelCompactionMergeTest extends 
LevelCompactionTest {
         levelCompactionTsFileManagement
         .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
     compactionMergeWorking = true;
-    compactionMergeTask.run();
+    compactionMergeTask.call();
     while (compactionMergeWorking) {
       // wait
     }
@@ -191,7 +191,7 @@ public class LevelCompactionMergeTest extends 
LevelCompactionTest {
         levelCompactionTsFileManagement
         .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
     compactionMergeWorking = true;
-    compactionMergeTask.run();
+    compactionMergeTask.call();
     while (compactionMergeWorking) {
       // wait
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
index 55df625..1411eac 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMoreDataTest.java
@@ -203,7 +203,7 @@ public class LevelCompactionMoreDataTest extends 
LevelCompactionTest {
         levelCompactionTsFileManagement
         .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
     compactionMergeWorking = true;
-    compactionMergeTask.run();
+    compactionMergeTask.call();
     while (compactionMergeWorking) {
       // wait
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
index 4dc40b4..7375bad 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionTsFileManagementTest.java
@@ -156,4 +156,73 @@ public class LevelCompactionTsFileManagementTest extends 
LevelCompactionTest {
     assertEquals(0, levelCompactionTsFileManagement.size(true));
     assertEquals(0, levelCompactionTsFileManagement.size(false));
   }
+
+  @Test
+  public void testIteratorRemove() {
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, 
tempSGDir.getPath());
+    for (TsFileResource tsFileResource : seqResources) {
+      levelCompactionTsFileManagement.add(tsFileResource, true);
+    }
+    levelCompactionTsFileManagement.addAll(seqResources, false);
+    assertEquals(6, 
levelCompactionTsFileManagement.getTsFileList(true).size());
+
+    Iterator<TsFileResource> tsFileResourceIterator =
+        levelCompactionTsFileManagement.getIterator(true);
+    tsFileResourceIterator.next();
+    try {
+      tsFileResourceIterator.remove();
+    } catch (UnsupportedOperationException e) {
+      // pass
+    }
+    assertEquals(6, 
levelCompactionTsFileManagement.getTsFileList(true).size());
+
+    TsFileResource tsFileResource1 =
+        new TsFileResource(
+            new File(
+                TestConstant.BASE_OUTPUT_PATH.concat(
+                    10
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 10
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 1
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 0
+                        + ".tsfile")));
+    TsFileResource tsFileResource2 =
+        new TsFileResource(
+            new File(
+                TestConstant.BASE_OUTPUT_PATH.concat(
+                    11
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 11
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 1
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 0
+                        + ".tsfile")));
+    levelCompactionTsFileManagement.add(tsFileResource1, true);
+    levelCompactionTsFileManagement.add(tsFileResource2, true);
+    TsFileResource tsFileResource3 =
+        new TsFileResource(
+            new File(
+                TestConstant.BASE_OUTPUT_PATH.concat(
+                    12
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 12
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 2
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 0
+                        + ".tsfile")));
+    levelCompactionTsFileManagement.add(tsFileResource3, true);
+    Iterator<TsFileResource> tsFileResourceIterator2 =
+        levelCompactionTsFileManagement.getIterator(true);
+    int count = 0;
+    while (tsFileResourceIterator2.hasNext()) {
+      count++;
+      tsFileResourceIterator2.next();
+    }
+    assertEquals(9, count);
+  }
 }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
index 2c14215..6407c9e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/NoCompactionTsFileManagementTest.java
@@ -154,11 +154,80 @@ public class NoCompactionTsFileManagementTest extends 
LevelCompactionTest {
     noCompactionTsFileManagement.recover();
     CompactionMergeTask compactionMergeTask =
         noCompactionTsFileManagement.new CompactionMergeTask(() -> {}, 0);
-    compactionMergeTask.run();
+    compactionMergeTask.call();
     assertEquals(1, noCompactionTsFileManagement.size(true));
     assertEquals(1, noCompactionTsFileManagement.size(false));
     noCompactionTsFileManagement.clear();
     assertEquals(0, noCompactionTsFileManagement.size(true));
     assertEquals(0, noCompactionTsFileManagement.size(false));
   }
+
+  @Test
+  public void testIteratorRemove() {
+    NoCompactionTsFileManagement noCompactionTsFileManagement =
+        new NoCompactionTsFileManagement(COMPACTION_TEST_SG, 
tempSGDir.getPath());
+    for (TsFileResource tsFileResource : seqResources) {
+      noCompactionTsFileManagement.add(tsFileResource, true);
+    }
+    noCompactionTsFileManagement.addAll(seqResources, false);
+    assertEquals(6, noCompactionTsFileManagement.getTsFileList(true).size());
+
+    Iterator<TsFileResource> tsFileResourceIterator =
+        noCompactionTsFileManagement.getIterator(true);
+    tsFileResourceIterator.next();
+    try {
+      tsFileResourceIterator.remove();
+    } catch (UnsupportedOperationException e) {
+      // pass
+    }
+    assertEquals(6, noCompactionTsFileManagement.getTsFileList(true).size());
+
+    TsFileResource tsFileResource1 =
+        new TsFileResource(
+            new File(
+                TestConstant.BASE_OUTPUT_PATH.concat(
+                    10
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 10
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 1
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 0
+                        + ".tsfile")));
+    TsFileResource tsFileResource2 =
+        new TsFileResource(
+            new File(
+                TestConstant.BASE_OUTPUT_PATH.concat(
+                    11
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 11
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 1
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 0
+                        + ".tsfile")));
+    noCompactionTsFileManagement.add(tsFileResource1, true);
+    noCompactionTsFileManagement.add(tsFileResource2, true);
+    TsFileResource tsFileResource3 =
+        new TsFileResource(
+            new File(
+                TestConstant.BASE_OUTPUT_PATH.concat(
+                    12
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 12
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 2
+                        + IoTDBConstant.FILE_NAME_SEPARATOR
+                        + 0
+                        + ".tsfile")));
+    noCompactionTsFileManagement.add(tsFileResource3, true);
+    Iterator<TsFileResource> tsFileResourceIterator2 =
+        noCompactionTsFileManagement.getIterator(true);
+    int count = 0;
+    while (tsFileResourceIterator2.hasNext()) {
+      count++;
+      tsFileResourceIterator2.next();
+    }
+    assertEquals(9, count);
+  }
 }

Reply via email to