This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch compaction_worker_refactor_0928
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 67e1db3bdd0872fb0ebe2fbaf69ccabe9cb1e1e6
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Sat Oct 7 17:44:11 2023 +0800

    fix UT
---
 .../execute/task/AbstractCompactionTask.java       |  8 ---
 .../execute/task/CrossSpaceCompactionTask.java     | 42 +------------
 .../execute/task/InnerSpaceCompactionTask.java     | 55 +----------------
 .../compaction/schedule/CompactionWorker.java      |  7 ++-
 .../compaction/CompactionTaskComparatorTest.java   | 10 ---
 .../FastCrossCompactionPerformerTest.java          | 15 +++--
 .../cross/CrossSpaceCompactionSelectorTest.java    | 72 +++++++++++++++++++---
 ...eCrossSpaceCompactionWithFastPerformerTest.java |  7 ++-
 ...sSpaceCompactionWithReadPointPerformerTest.java |  7 ++-
 .../InnerSeqCompactionWithFastPerformerTest.java   |  4 +-
 ...nerSeqCompactionWithReadChunkPerformerTest.java |  5 +-
 .../inner/InnerSpaceCompactionSelectorTest.java    | 47 +++++++++++---
 12 files changed, 137 insertions(+), 142 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
index 1db4553c92e..02412723c48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java
@@ -136,14 +136,6 @@ public abstract class AbstractCompactionTask {
 
   public abstract boolean equalsOtherTask(AbstractCompactionTask otherTask);
 
-  /**
-   * Check if the compaction task is valid (selected files are not merging, 
closed and exist). If
-   * the task is valid, then set the merging status of selected files to true.
-   *
-   * @return true if the task is valid else false
-   */
-  public abstract boolean checkValidAndSetMerging();
-
   public void transitSourceFilesToMerging() throws 
FileCannotTransitToCompactingException {
     for (TsFileResource f : getAllSourceTsFiles()) {
       if (!f.setStatus(TsFileResourceStatus.COMPACTING)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
index 4d67f8f7022..08e36eb244c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java
@@ -23,8 +23,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
@@ -37,7 +35,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
-import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -272,11 +269,7 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
           false,
           true);
     } finally {
-      SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
-      SystemInfo.getInstance()
-          .decreaseCompactionFileNumCost(
-              selectedSequenceFiles.size() + selectedUnsequenceFiles.size());
-      releaseAllLocksAndResetStatus();
+      releaseAllLocks();
     }
     return isSuccess;
   }
@@ -292,7 +285,7 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
         && 
this.performer.getClass().isInstance(otherCrossCompactionTask.performer);
   }
 
-  private void releaseAllLocksAndResetStatus() {
+  private void releaseAllLocks() {
     for (TsFileResource tsFileResource : holdWriteLockList) {
       tsFileResource.writeUnlock();
     }
@@ -349,37 +342,6 @@ public class CrossSpaceCompactionTask extends 
AbstractCompactionTask {
     }
   }
 
-  @Override
-  public boolean checkValidAndSetMerging() {
-    if (!tsFileManager.isAllowCompaction()) {
-      resetCompactionCandidateStatusForAllSourceFiles();
-      return false;
-    }
-    if (!isDiskSpaceCheckPassed()) {
-      LOGGER.debug(
-          "cross compaction task start check failed because disk free ratio is 
less than disk_space_warning_threshold");
-      return false;
-    }
-    try {
-      SystemInfo.getInstance().addCompactionMemoryCost(memoryCost, 60);
-      SystemInfo.getInstance()
-          .addCompactionFileNum(selectedSequenceFiles.size() + 
selectedUnsequenceFiles.size(), 60);
-    } catch (Exception e) {
-      if (e instanceof InterruptedException) {
-        LOGGER.warn("Interrupted when allocating memory for compaction", e);
-        Thread.currentThread().interrupt();
-      } else if (e instanceof CompactionMemoryNotEnoughException) {
-        LOGGER.info("No enough memory for current compaction task {}", this, 
e);
-      } else if (e instanceof CompactionFileCountExceededException) {
-        LOGGER.info("No enough file num for current compaction task {}", this, 
e);
-        SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
-      }
-      resetCompactionCandidateStatusForAllSourceFiles();
-      return false;
-    }
-    return true;
-  }
-
   @Override
   public long getEstimatedMemoryCost() {
     return memoryCost;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
index f52048c24ff..e9eb19e58f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
-import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
@@ -42,7 +40,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
-import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 
@@ -116,7 +113,7 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
     if 
(IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) {
       if (this.performer instanceof ReadChunkCompactionPerformer) {
         innerSpaceEstimator = new ReadChunkInnerCompactionEstimator();
-      } else if (!sequence && this.performer instanceof 
FastCompactionInnerCompactionEstimator) {
+      } else if (!sequence && this.performer instanceof 
FastCompactionPerformer) {
         innerSpaceEstimator = new FastCompactionInnerCompactionEstimator();
       }
     }
@@ -450,57 +447,9 @@ public class InnerSpaceCompactionTask extends 
AbstractCompactionTask {
     }
   }
 
-  @Override
-  public boolean checkValidAndSetMerging() {
-    if (!tsFileManager.isAllowCompaction()) {
-      resetCompactionCandidateStatusForAllSourceFiles();
-      return false;
-    }
-    if (!isDiskSpaceCheckPassed()) {
-      LOGGER.debug(
-          "inner compaction task start check failed because disk free ratio is 
less than disk_space_warning_threshold");
-      return false;
-    }
-    try {
-      for (int i = 0; i < selectedTsFileResourceList.size(); ++i) {
-        TsFileResource resource = selectedTsFileResourceList.get(i);
-        if (!resource.setStatus(TsFileResourceStatus.COMPACTING)) {
-          releaseAllLocks();
-          return false;
-        }
-      }
-      if (innerSpaceEstimator != null) {
-        memoryCost = 
innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList);
-      }
-      SystemInfo.getInstance().addCompactionMemoryCost(memoryCost, 60);
-      
SystemInfo.getInstance().addCompactionFileNum(selectedTsFileResourceList.size(),
 60);
-    } catch (Exception e) {
-      if (e instanceof InterruptedException) {
-        LOGGER.warn("Interrupted when allocating memory for compaction", e);
-        Thread.currentThread().interrupt();
-      } else if (e instanceof CompactionMemoryNotEnoughException) {
-        LOGGER.warn("No enough memory for current compaction task {}", this, 
e);
-      } else if (e instanceof CompactionFileCountExceededException) {
-        LOGGER.warn("No enough file num for current compaction task {}", this, 
e);
-        SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost);
-      }
-      releaseAllLocks();
-      return false;
-    } finally {
-      try {
-        if (innerSpaceEstimator != null) {
-          innerSpaceEstimator.close();
-        }
-      } catch (IOException e) {
-        LOGGER.warn("Failed to close InnerSpaceCompactionMemoryEstimator");
-      }
-    }
-    return true;
-  }
-
   @Override
   public long getEstimatedMemoryCost() throws IOException {
-    if (memoryCost == 0L) {
+    if (innerSpaceEstimator != null && memoryCost == 0L) {
       memoryCost = 
innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList);
     }
     return memoryCost;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
index e722bb1bdaf..36421205502 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule;
 
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
@@ -75,8 +76,10 @@ public class CompactionWorker implements Runnable {
         return;
       }
       task.transitSourceFilesToMerging();
-      estimatedMemoryCost = task.getEstimatedMemoryCost();
-      memoryAcquired = 
SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60);
+      if 
(IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) {
+        estimatedMemoryCost = task.getEstimatedMemoryCost();
+        memoryAcquired = 
SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60);
+      }
       fileHandleAcquired =
           
SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60);
       CompactionTaskSummary summary = task.getSummary();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java
index 8ac9f67ad83..a1efb673e21 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java
@@ -494,11 +494,6 @@ public class CompactionTaskComparatorTest {
     public boolean equalsOtherTask(AbstractCompactionTask other) {
       return false;
     }
-
-    @Override
-    public boolean checkValidAndSetMerging() {
-      return true;
-    }
   }
 
   private static class FakeCrossSpaceCompactionTask extends 
CrossSpaceCompactionTask {
@@ -529,11 +524,6 @@ public class CompactionTaskComparatorTest {
     public boolean equalsOtherTask(AbstractCompactionTask other) {
       return false;
     }
-
-    @Override
-    public boolean checkValidAndSetMerging() {
-      return true;
-    }
   }
 
   private static class FakedTsFileResource extends TsFileResource {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java
index 6c5c336e6d9..d67bf6be322 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java
@@ -27,17 +27,20 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.IDataBlockReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.SeriesDataBlockReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.tools.validate.TsFileValidationTool;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -52,6 +55,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
@@ -4029,7 +4033,7 @@ public class FastCrossCompactionPerformerTest extends 
AbstractCompactionTest {
 
   @Test
   public void testReleaseFileNumAndMemoryAfterCrossTask()
-      throws IOException, MetadataException, WriteProcessException {
+      throws IOException, MetadataException, WriteProcessException, 
InterruptedException {
     int oldMaxCrossCompactionCandidateFileNum =
         SystemInfo.getInstance().getTotalFileLimitForCrossTask();
     SystemInfo.getInstance().setTotalFileLimitForCrossTask(15);
@@ -4050,9 +4054,12 @@ public class FastCrossCompactionPerformerTest extends 
AbstractCompactionTest {
               1000,
               0);
       Assert.assertTrue(task.setSourceFilesToCompactionCandidate());
-      boolean success = task.checkValidAndSetMerging();
-      Assert.assertTrue(success);
-      Assert.assertTrue(task.start());
+
+      FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+          Mockito.mock(FixedPriorityBlockingQueue.class);
+      Mockito.when(mockQueue.take()).thenReturn(task).thenThrow(new 
InterruptedException());
+      CompactionWorker worker = new CompactionWorker(0, mockQueue);
+      worker.run();
       Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionFileNumCost().get());
       Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionMemoryCost().get());
     } finally {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
index ac4f8d4a939..8b9358ac969 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java
@@ -22,7 +22,10 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.RewriteCrossSpaceCompactionSelector;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossCompactionTaskResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossSpaceCompactionCandidate;
@@ -30,12 +33,14 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManag
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.List;
@@ -176,7 +181,7 @@ public class CrossSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
 
   @Test
   public void testSelectWithTooManySourceFiles()
-      throws IOException, MetadataException, WriteProcessException {
+      throws IOException, MetadataException, WriteProcessException, 
InterruptedException {
     int oldMaxFileNumForCompaction = 
SystemInfo.getInstance().getTotalFileLimitForCrossTask();
     SystemInfo.getInstance().setTotalFileLimitForCrossTask(1);
     SystemInfo.getInstance().getCompactionFileNumCost().set(0);
@@ -207,7 +212,14 @@ public class CrossSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
       // set file status to COMPACTION_CANDIDATE
       
Assert.assertTrue(crossSpaceCompactionTask.setSourceFilesToCompactionCandidate());
 
-      Assert.assertFalse(crossSpaceCompactionTask.checkValidAndSetMerging());
+      FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+          Mockito.mock(FixedPriorityBlockingQueue.class);
+      Mockito.when(mockQueue.take())
+          .thenReturn(crossSpaceCompactionTask)
+          .thenThrow(new InterruptedException());
+      CompactionWorker worker = new CompactionWorker(0, mockQueue);
+      worker.run();
+
       Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionMemoryCost().get());
       Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionFileNumCost().get());
       for (TsFileResource resource : seqResources) {
@@ -564,9 +576,19 @@ public class CrossSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
                 cd1.countDown();
                 cd2.await();
 
-                if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
-                  throw new RuntimeException("cross space compaction task 
should be invalid.");
+                try {
+                  crossSpaceCompactionTask.transitSourceFilesToMerging();
+                  Assert.fail("cross space compaction task should be 
invalid.");
+                } catch (FileCannotTransitToCompactingException e) {
+
                 }
+                FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+                    Mockito.mock(FixedPriorityBlockingQueue.class);
+                Mockito.when(mockQueue.take())
+                    .thenReturn(crossSpaceCompactionTask)
+                    .thenThrow(new InterruptedException());
+                CompactionWorker worker = new CompactionWorker(0, mockQueue);
+                worker.run();
 
                 for (int i = 0; i < seqResources.size(); i++) {
                   TsFileResource resource = seqResources.get(i);
@@ -906,9 +928,19 @@ public class CrossSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
                 cd1.countDown();
                 cd2.await();
 
-                if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
-                  throw new RuntimeException("cross space compaction task 
should be invalid.");
+                try {
+                  crossSpaceCompactionTask.transitSourceFilesToMerging();
+                  Assert.fail("cross space compaction task should be 
invalid.");
+                } catch (FileCannotTransitToCompactingException e) {
+
                 }
+                FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+                    Mockito.mock(FixedPriorityBlockingQueue.class);
+                Mockito.when(mockQueue.take())
+                    .thenReturn(crossSpaceCompactionTask)
+                    .thenThrow(new InterruptedException());
+                CompactionWorker worker = new CompactionWorker(0, mockQueue);
+                worker.run();
 
                 for (int i = 0; i < seqResources.size(); i++) {
                   TsFileResource resource = seqResources.get(i);
@@ -1382,9 +1414,19 @@ public class CrossSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
                 cd1.countDown();
                 cd2.await();
 
-                if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
-                  throw new RuntimeException("cross space compaction task 
should be invalid.");
+                try {
+                  crossSpaceCompactionTask.transitSourceFilesToMerging();
+                  Assert.fail("cross space compaction task should be 
invalid.");
+                } catch (FileCannotTransitToCompactingException e) {
+
                 }
+                FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+                    Mockito.mock(FixedPriorityBlockingQueue.class);
+                Mockito.when(mockQueue.take())
+                    .thenReturn(crossSpaceCompactionTask)
+                    .thenThrow(new InterruptedException());
+                CompactionWorker worker = new CompactionWorker(0, mockQueue);
+                worker.run();
 
                 for (int i = 0; i < unseqResources.size(); i++) {
                   TsFileResource resource = unseqResources.get(i);
@@ -1723,9 +1765,19 @@ public class CrossSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
                 cd1.countDown();
                 cd2.await();
 
-                if (crossSpaceCompactionTask.checkValidAndSetMerging()) {
-                  throw new RuntimeException("cross space compaction task 
should be invalid.");
+                try {
+                  crossSpaceCompactionTask.transitSourceFilesToMerging();
+                  Assert.fail("cross space compaction task should be 
invalid.");
+                } catch (FileCannotTransitToCompactingException e) {
+
                 }
+                FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+                    Mockito.mock(FixedPriorityBlockingQueue.class);
+                Mockito.when(mockQueue.take())
+                    .thenReturn(crossSpaceCompactionTask)
+                    .thenThrow(new InterruptedException());
+                CompactionWorker worker = new CompactionWorker(0, mockQueue);
+                worker.run();
 
                 for (int i = 0; i < unseqResources.size(); i++) {
                   TsFileResource resource = unseqResources.get(i);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
index a0738f8f36b..5d9f43868ce 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -596,7 +597,8 @@ public class 
RewriteCrossSpaceCompactionWithFastPerformerTest extends AbstractCo
             0,
             0);
     task.setSourceFilesToCompactionCandidate();
-    task.checkValidAndSetMerging();
+    seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
+    unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
     // delete data in source file during compaction
     vsgp.deleteByDevice(
         new PartialPath(
@@ -714,7 +716,8 @@ public class 
RewriteCrossSpaceCompactionWithFastPerformerTest extends AbstractCo
             0,
             0);
     task.setSourceFilesToCompactionCandidate();
-    task.checkValidAndSetMerging();
+    seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
+    unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
     // delete data in source file during compaction
     vsgp.deleteByDevice(
         new PartialPath(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
index 89ac14094da..24bf2305eff 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -596,7 +597,8 @@ public class 
RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr
             0,
             0);
     task.setSourceFilesToCompactionCandidate();
-    task.checkValidAndSetMerging();
+    seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
+    unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
     // delete data in source file during compaction
     vsgp.deleteByDevice(
         new PartialPath(
@@ -714,7 +716,8 @@ public class 
RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr
             0,
             0);
     task.setSourceFilesToCompactionCandidate();
-    task.checkValidAndSetMerging();
+    seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
+    unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
     // delete data in source file during compaction
     vsgp.deleteByDevice(
         new PartialPath(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java
index b0ac91e2fed..e8c01e2b0e1 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionF
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTimeseriesType;
 import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -1136,7 +1137,8 @@ public class InnerSeqCompactionWithFastPerformerTest {
             0, vsgp.getTsFileResourceManager(), sourceResources, true, 
performer, 0);
 
     task.setSourceFilesToCompactionCandidate();
-    task.checkValidAndSetMerging();
+    // set the source files to COMPACTING manually to simulate the concurrent 
scenario
+    sourceResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
     // delete data during compaction
     vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1200, 0);
     vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1800, 0);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java
index 59537af962f..83b5933c60e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java
@@ -39,6 +39,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionF
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTimeseriesType;
 import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -1059,7 +1060,7 @@ public class InnerSeqCompactionWithReadChunkPerformerTest 
{
 
   @Test
   public void testCompactionWithDeletionsDuringCompactions()
-      throws MetadataException, IOException, DataRegionException {
+      throws MetadataException, IOException, DataRegionException, 
InterruptedException {
     // create source seq files
     List<TsFileResource> sourceResources = new ArrayList<>();
     List<List<Long>> chunkPagePointsNum = new ArrayList<>();
@@ -1102,7 +1103,7 @@ public class InnerSeqCompactionWithReadChunkPerformerTest 
{
             new ReadChunkCompactionPerformer(),
             0);
     task.setSourceFilesToCompactionCandidate();
-    task.checkValidAndSetMerging();
+    sourceResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING));
     // delete data during compaction
     vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1200, 0);
     vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1800, 0);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java
index 1c347c7f67e..5307364747d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java
@@ -23,20 +23,25 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SizeTieredCompactionSelector;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.List;
@@ -286,9 +291,20 @@ public class InnerSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
                     cd1.countDown();
                     cd2.await();
 
-                    if (innerSpaceCompactionTask.checkValidAndSetMerging()) {
-                      throw new RuntimeException("cross space compaction task 
should be invalid.");
+                    try {
+                      innerSpaceCompactionTask.transitSourceFilesToMerging();
+                      Assert.fail("inner space compaction task should be 
invalid.");
+                    } catch (FileCannotTransitToCompactingException e) {
+
                     }
+                    FixedPriorityBlockingQueue<AbstractCompactionTask> 
mockQueue =
+                        Mockito.mock(FixedPriorityBlockingQueue.class);
+                    Mockito.when(mockQueue.take())
+                        .thenReturn(innerSpaceCompactionTask)
+                        .thenThrow(new InterruptedException());
+                    CompactionWorker worker = new CompactionWorker(0, 
mockQueue);
+                    worker.run();
+
                     for (int i = 0; i < task.size(); i++) {
                       TsFileResource resource = task.get(i);
                       if (i == 1) {
@@ -300,8 +316,10 @@ public class InnerSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
                       }
                     }
                   } else {
-                    if (!innerSpaceCompactionTask.checkValidAndSetMerging()) {
-                      throw new RuntimeException("cross space compaction task 
should be valid.");
+                    try {
+                      innerSpaceCompactionTask.transitSourceFilesToMerging();
+                    } catch (FileCannotTransitToCompactingException e) {
+                      Assert.fail("inner space compaction task should be 
valid.");
                     }
                     for (int i = 0; i < task.size(); i++) {
                       TsFileResource resource = task.get(i);
@@ -579,9 +597,20 @@ public class InnerSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
                     cd1.countDown();
                     cd2.await();
 
-                    if (innerSpaceCompactionTask.checkValidAndSetMerging()) {
-                      throw new RuntimeException("cross space compaction task 
should be invalid.");
+                    try {
+                      innerSpaceCompactionTask.transitSourceFilesToMerging();
+                      Assert.fail("inner space compaction task should be 
invalid.");
+                    } catch (FileCannotTransitToCompactingException e) {
+
                     }
+                    FixedPriorityBlockingQueue<AbstractCompactionTask> 
mockQueue =
+                        Mockito.mock(FixedPriorityBlockingQueue.class);
+                    Mockito.when(mockQueue.take())
+                        .thenReturn(innerSpaceCompactionTask)
+                        .thenThrow(new InterruptedException());
+                    CompactionWorker worker = new CompactionWorker(0, 
mockQueue);
+                    worker.run();
+
                     for (int i = 0; i < task.size(); i++) {
                       TsFileResource resource = task.get(i);
                       if (i == 1) {
@@ -593,8 +622,10 @@ public class InnerSpaceCompactionSelectorTest extends 
AbstractCompactionTest {
                       }
                     }
                   } else {
-                    if (!innerSpaceCompactionTask.checkValidAndSetMerging()) {
-                      throw new RuntimeException("cross space compaction task 
should be valid.");
+                    try {
+                      innerSpaceCompactionTask.transitSourceFilesToMerging();
+                    } catch (FileCannotTransitToCompactingException e) {
+                      Assert.fail("inner space compaction task should be 
valid.");
                     }
                     for (int i = 0; i < task.size(); i++) {
                       TsFileResource resource = task.get(i);


Reply via email to