This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch compaction_worker_refactor_0928
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ffa747bf41a42898cf61ce3202396a52bfae21a9
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Sat Oct 7 12:34:24 2023 +0800

    add compactionWorkerTest
---
 .../compaction/schedule/CompactionWorker.java      | 77 ++++++++++++----------
 ...yControlTest.java => CompactionWorkerTest.java} | 51 +++++++++-----
 2 files changed, 75 insertions(+), 53 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
index 64be3cdd09f..e722bb1bdaf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java
@@ -53,46 +53,53 @@ public class CompactionWorker implements Runnable {
   @Override
   public void run() {
     while (!Thread.currentThread().isInterrupted()) {
-      AbstractCompactionTask task;
-      try {
-        task = compactionTaskQueue.take();
-      } catch (InterruptedException e) {
-        log.warn("CompactionThread-{} terminates because interruption", 
threadId);
-        return;
-      }
+      processOneCompactionTask();
+    }
+  }
+
+  private void processOneCompactionTask() {
+    AbstractCompactionTask task;
+    try {
+      task = compactionTaskQueue.take();
+    } catch (InterruptedException e) {
+      log.warn("CompactionThread-{} terminates because interruption", 
threadId);
+      Thread.currentThread().interrupt();
+      return;
+    }
+    long estimatedMemoryCost = 0L;
+    boolean memoryAcquired = false;
+    boolean fileHandleAcquired = false;
+    try {
       if (task == null || !task.isCompactionAllowed()) {
         log.info("Compaction task is not allowed to be executed by 
TsFileManager. Task {}", task);
         return;
       }
-      long estimatedMemoryCost = 0L;
-      boolean memoryAcquired = false;
-      boolean fileHandleAcquired = false;
-      try {
-        task.transitSourceFilesToMerging();
-        estimatedMemoryCost = task.getEstimatedMemoryCost();
-        memoryAcquired = 
SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60);
-        fileHandleAcquired =
-            
SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60);
-        CompactionTaskSummary summary = task.getSummary();
-        CompactionTaskFuture future = new CompactionTaskFuture(summary);
-        CompactionTaskManager.getInstance().recordTask(task, future);
-        task.start();
-      } catch (FileCannotTransitToCompactingException
-          | IOException
-          | CompactionMemoryNotEnoughException
-          | CompactionFileCountExceededException e) {
-        log.info("CompactionTask {} cannot be executed. Reason: {}", task, e);
-      } catch (InterruptedException e) {
-        log.warn("InterruptedException occurred when preparing compaction 
task. {}", task, e);
-        Thread.currentThread().interrupt();
-      } finally {
+      task.transitSourceFilesToMerging();
+      estimatedMemoryCost = task.getEstimatedMemoryCost();
+      memoryAcquired = 
SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60);
+      fileHandleAcquired =
+          
SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60);
+      CompactionTaskSummary summary = task.getSummary();
+      CompactionTaskFuture future = new CompactionTaskFuture(summary);
+      CompactionTaskManager.getInstance().recordTask(task, future);
+      task.start();
+    } catch (FileCannotTransitToCompactingException
+        | IOException
+        | CompactionMemoryNotEnoughException
+        | CompactionFileCountExceededException e) {
+      log.info("CompactionTask {} cannot be executed. Reason: {}", task, e);
+    } catch (InterruptedException e) {
+      log.warn("InterruptedException occurred when preparing compaction task. 
{}", task, e);
+      Thread.currentThread().interrupt();
+    } finally {
+      if (task != null) {
         task.resetCompactionCandidateStatusForAllSourceFiles();
-        if (memoryAcquired) {
-          
SystemInfo.getInstance().resetCompactionMemoryCost(estimatedMemoryCost);
-        }
-        if (fileHandleAcquired) {
-          
SystemInfo.getInstance().decreaseCompactionFileNumCost(task.getProcessedFileNum());
-        }
+      }
+      if (memoryAcquired) {
+        
SystemInfo.getInstance().resetCompactionMemoryCost(estimatedMemoryCost);
+      }
+      if (fileHandleAcquired) {
+        
SystemInfo.getInstance().decreaseCompactionFileNumCost(task.getProcessedFileNum());
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/MemoryControlTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java
similarity index 79%
rename from 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/MemoryControlTest.java
rename to 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java
index 2bcde7dae9c..a6b11b38070 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/MemoryControlTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java
@@ -20,12 +20,15 @@
 package org.apache.iotdb.db.storageengine.dataregion.compaction;
 
 import org.apache.iotdb.commons.exception.MetadataException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
+import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
 
 import org.junit.Assert;
@@ -38,7 +41,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-public class MemoryControlTest {
+public class CompactionWorkerTest {
   @Before
   public void setUp()
       throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
@@ -75,8 +78,13 @@ public class MemoryControlTest {
             null,
             1024L * 1024L * 1024L * 50L,
             0);
-    boolean success = task.checkValidAndSetMerging();
-    Assert.assertFalse(success);
+    CrossSpaceCompactionTask taskMock = Mockito.spy(task);
+    Mockito.doReturn(true).when(taskMock).start();
+    FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+        Mockito.mock(FixedPriorityBlockingQueue.class);
+    Mockito.when(mockQueue.take()).thenReturn(taskMock).thenThrow(new 
InterruptedException());
+    CompactionWorker worker = new CompactionWorker(0, mockQueue);
+    worker.run();
     Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionMemoryCost().get());
     Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionFileNumCost().get());
     for (TsFileResource tsFileResource : sequenceFiles) {
@@ -90,7 +98,7 @@ public class MemoryControlTest {
   }
 
   @Test
-  public void testFailedToAllocateFileNumInCrossTask() {
+  public void testFailedToAllocateFileNumInCrossTask() throws 
InterruptedException {
     int oldMaxCrossCompactionCandidateFileNum =
         SystemInfo.getInstance().getTotalFileLimitForCrossTask();
     SystemInfo.getInstance().setTotalFileLimitForCrossTask(2);
@@ -116,9 +124,13 @@ public class MemoryControlTest {
       CrossSpaceCompactionTask task =
           new CrossSpaceCompactionTask(
               0L, tsFileManager, sequenceFiles, unsequenceFiles, null, 1000, 
0);
-
-      boolean success = task.checkValidAndSetMerging();
-      Assert.assertFalse(success);
+      CrossSpaceCompactionTask taskMock = Mockito.spy(task);
+      Mockito.doReturn(true).when(taskMock).start();
+      FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+          Mockito.mock(FixedPriorityBlockingQueue.class);
+      Mockito.when(mockQueue.take()).thenReturn(taskMock).thenThrow(new 
InterruptedException());
+      CompactionWorker worker = new CompactionWorker(0, mockQueue);
+      worker.run();
       Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionMemoryCost().get());
       Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionFileNumCost().get());
       for (TsFileResource tsFileResource : sequenceFiles) {
@@ -140,7 +152,7 @@ public class MemoryControlTest {
    * @throws Exception
    */
   @Test
-  public void testFailedToCheckValidInCrossTask() {
+  public void testFailedToCheckValidInCrossTask() throws InterruptedException {
     List<TsFileResource> sequenceFiles = new ArrayList<>();
     for (int i = 1; i <= 10; i++) {
       sequenceFiles.add(
@@ -156,15 +168,16 @@ public class MemoryControlTest {
               TsFileResourceStatus.COMPACTION_CANDIDATE));
     }
     TsFileManager tsFileManager = Mockito.mock(TsFileManager.class);
-    Mockito.when(tsFileManager.getStorageGroupName()).thenReturn("root.sg");
-    Mockito.when(tsFileManager.getDataRegionId()).thenReturn("1");
-
+    Mockito.when(tsFileManager.isAllowCompaction()).thenReturn(false);
     // fail to check valid when tsfile manager is not allowed to compaction in 
cross task
     CrossSpaceCompactionTask task =
         new CrossSpaceCompactionTask(
             0L, tsFileManager, sequenceFiles, unsequenceFiles, null, 1000, 0);
-    boolean success = task.checkValidAndSetMerging();
-    Assert.assertFalse(success);
+    FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+        Mockito.mock(FixedPriorityBlockingQueue.class);
+    Mockito.when(mockQueue.take()).thenReturn(task).thenThrow(new 
InterruptedException());
+    CompactionWorker worker = new CompactionWorker(0, mockQueue);
+    worker.run();
     Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionMemoryCost().get());
     Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionFileNumCost().get());
     for (TsFileResource tsFileResource : sequenceFiles) {
@@ -183,7 +196,7 @@ public class MemoryControlTest {
    * @throws Exception
    */
   @Test
-  public void testFailedToCheckValidInInnerTask() {
+  public void testFailedToCheckValidInInnerTask() throws InterruptedException {
     List<TsFileResource> sequenceFiles = new ArrayList<>();
     for (int i = 1; i <= 10; i++) {
       sequenceFiles.add(
@@ -192,14 +205,16 @@ public class MemoryControlTest {
               TsFileResourceStatus.COMPACTION_CANDIDATE));
     }
     TsFileManager tsFileManager = Mockito.mock(TsFileManager.class);
-    Mockito.when(tsFileManager.getStorageGroupName()).thenReturn("root.sg");
-    Mockito.when(tsFileManager.getDataRegionId()).thenReturn("1");
+    Mockito.when(tsFileManager.isAllowCompaction()).thenReturn(false);
 
     // fail to check valid when tsfile manager is not allowed to compaction in 
inner task
     InnerSpaceCompactionTask innerTask =
         new InnerSpaceCompactionTask(0L, tsFileManager, sequenceFiles, true, 
null, 0L);
-    boolean success = innerTask.checkValidAndSetMerging();
-    Assert.assertFalse(success);
+    FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue =
+        Mockito.mock(FixedPriorityBlockingQueue.class);
+    Mockito.when(mockQueue.take()).thenReturn(innerTask).thenThrow(new 
InterruptedException());
+    CompactionWorker worker = new CompactionWorker(0, mockQueue);
+    worker.run();
     Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionMemoryCost().get());
     Assert.assertEquals(0, 
SystemInfo.getInstance().getCompactionFileNumCost().get());
     for (TsFileResource tsFileResource : sequenceFiles) {

Reply via email to