This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch compaction_worker_refactor_0928 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ffa747bf41a42898cf61ce3202396a52bfae21a9 Author: Jinrui.Zhang <[email protected]> AuthorDate: Sat Oct 7 12:34:24 2023 +0800 add compactionWorkerTest --- .../compaction/schedule/CompactionWorker.java | 77 ++++++++++++---------- ...yControlTest.java => CompactionWorkerTest.java} | 51 +++++++++----- 2 files changed, 75 insertions(+), 53 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java index 64be3cdd09f..e722bb1bdaf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java @@ -53,46 +53,53 @@ public class CompactionWorker implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { - AbstractCompactionTask task; - try { - task = compactionTaskQueue.take(); - } catch (InterruptedException e) { - log.warn("CompactionThread-{} terminates because interruption", threadId); - return; - } + processOneCompactionTask(); + } + } + + private void processOneCompactionTask() { + AbstractCompactionTask task; + try { + task = compactionTaskQueue.take(); + } catch (InterruptedException e) { + log.warn("CompactionThread-{} terminates because interruption", threadId); + Thread.currentThread().interrupt(); + return; + } + long estimatedMemoryCost = 0L; + boolean memoryAcquired = false; + boolean fileHandleAcquired = false; + try { if (task == null || !task.isCompactionAllowed()) { log.info("Compaction task is not allowed to be executed by TsFileManager. Task {}", task); return; } - long estimatedMemoryCost = 0L; - boolean memoryAcquired = false; - boolean fileHandleAcquired = false; - try { - task.transitSourceFilesToMerging(); - estimatedMemoryCost = task.getEstimatedMemoryCost(); - memoryAcquired = SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60); - fileHandleAcquired = - SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60); - CompactionTaskSummary summary = task.getSummary(); - CompactionTaskFuture future = new CompactionTaskFuture(summary); - CompactionTaskManager.getInstance().recordTask(task, future); - task.start(); - } catch (FileCannotTransitToCompactingException - | IOException - | CompactionMemoryNotEnoughException - | CompactionFileCountExceededException e) { - log.info("CompactionTask {} cannot be executed. Reason: {}", task, e); - } catch (InterruptedException e) { - log.warn("InterruptedException occurred when preparing compaction task. {}", task, e); - Thread.currentThread().interrupt(); - } finally { + task.transitSourceFilesToMerging(); + estimatedMemoryCost = task.getEstimatedMemoryCost(); + memoryAcquired = SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60); + fileHandleAcquired = + SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60); + CompactionTaskSummary summary = task.getSummary(); + CompactionTaskFuture future = new CompactionTaskFuture(summary); + CompactionTaskManager.getInstance().recordTask(task, future); + task.start(); + } catch (FileCannotTransitToCompactingException + | IOException + | CompactionMemoryNotEnoughException + | CompactionFileCountExceededException e) { + log.info("CompactionTask {} cannot be executed. Reason: {}", task, e); + } catch (InterruptedException e) { + log.warn("InterruptedException occurred when preparing compaction task. {}", task, e); + Thread.currentThread().interrupt(); + } finally { + if (task != null) { task.resetCompactionCandidateStatusForAllSourceFiles(); - if (memoryAcquired) { - SystemInfo.getInstance().resetCompactionMemoryCost(estimatedMemoryCost); - } - if (fileHandleAcquired) { - SystemInfo.getInstance().decreaseCompactionFileNumCost(task.getProcessedFileNum()); - } + } + if (memoryAcquired) { + SystemInfo.getInstance().resetCompactionMemoryCost(estimatedMemoryCost); + } + if (fileHandleAcquired) { + SystemInfo.getInstance().decreaseCompactionFileNumCost(task.getProcessedFileNum()); } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/MemoryControlTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java similarity index 79% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/MemoryControlTest.java rename to iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java index 2bcde7dae9c..a6b11b38070 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/MemoryControlTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWorkerTest.java @@ -20,12 +20,15 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; +import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.junit.Assert; @@ -38,7 +41,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -public class MemoryControlTest { +public class CompactionWorkerTest { @Before public void setUp() throws IOException, WriteProcessException, MetadataException, InterruptedException { @@ -75,8 +78,13 @@ public class MemoryControlTest { null, 1024L * 1024L * 1024L * 50L, 0); - boolean success = task.checkValidAndSetMerging(); - Assert.assertFalse(success); + CrossSpaceCompactionTask taskMock = Mockito.spy(task); + Mockito.doReturn(true).when(taskMock).start(); + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()).thenReturn(taskMock).thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionMemoryCost().get()); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionFileNumCost().get()); for (TsFileResource tsFileResource : sequenceFiles) { @@ -90,7 +98,7 @@ public class MemoryControlTest { } @Test - public void testFailedToAllocateFileNumInCrossTask() { + public void testFailedToAllocateFileNumInCrossTask() throws InterruptedException { int oldMaxCrossCompactionCandidateFileNum = SystemInfo.getInstance().getTotalFileLimitForCrossTask(); SystemInfo.getInstance().setTotalFileLimitForCrossTask(2); @@ -116,9 +124,13 @@ public class MemoryControlTest { CrossSpaceCompactionTask task = new CrossSpaceCompactionTask( 0L, tsFileManager, sequenceFiles, unsequenceFiles, null, 1000, 0); - - boolean success = task.checkValidAndSetMerging(); - Assert.assertFalse(success); + CrossSpaceCompactionTask taskMock = Mockito.spy(task); + Mockito.doReturn(true).when(taskMock).start(); + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()).thenReturn(taskMock).thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionMemoryCost().get()); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionFileNumCost().get()); for (TsFileResource tsFileResource : sequenceFiles) { @@ -140,7 +152,7 @@ public class MemoryControlTest { * @throws Exception */ @Test - public void testFailedToCheckValidInCrossTask() { + public void testFailedToCheckValidInCrossTask() throws InterruptedException { List<TsFileResource> sequenceFiles = new ArrayList<>(); for (int i = 1; i <= 10; i++) { sequenceFiles.add( @@ -156,15 +168,16 @@ public class MemoryControlTest { TsFileResourceStatus.COMPACTION_CANDIDATE)); } TsFileManager tsFileManager = Mockito.mock(TsFileManager.class); - Mockito.when(tsFileManager.getStorageGroupName()).thenReturn("root.sg"); - Mockito.when(tsFileManager.getDataRegionId()).thenReturn("1"); - + Mockito.when(tsFileManager.isAllowCompaction()).thenReturn(false); // fail to check valid when tsfile manager is not allowed to compaction in cross task CrossSpaceCompactionTask task = new CrossSpaceCompactionTask( 0L, tsFileManager, sequenceFiles, unsequenceFiles, null, 1000, 0); - boolean success = task.checkValidAndSetMerging(); - Assert.assertFalse(success); + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()).thenReturn(task).thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionMemoryCost().get()); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionFileNumCost().get()); for (TsFileResource tsFileResource : sequenceFiles) { @@ -183,7 +196,7 @@ public class MemoryControlTest { * @throws Exception */ @Test - public void testFailedToCheckValidInInnerTask() { + public void testFailedToCheckValidInInnerTask() throws InterruptedException { List<TsFileResource> sequenceFiles = new ArrayList<>(); for (int i = 1; i <= 10; i++) { sequenceFiles.add( @@ -192,14 +205,16 @@ public class MemoryControlTest { TsFileResourceStatus.COMPACTION_CANDIDATE)); } TsFileManager tsFileManager = Mockito.mock(TsFileManager.class); - Mockito.when(tsFileManager.getStorageGroupName()).thenReturn("root.sg"); - Mockito.when(tsFileManager.getDataRegionId()).thenReturn("1"); + Mockito.when(tsFileManager.isAllowCompaction()).thenReturn(false); // fail to check valid when tsfile manager is not allowed to compaction in inner task InnerSpaceCompactionTask innerTask = new InnerSpaceCompactionTask(0L, tsFileManager, sequenceFiles, true, null, 0L); - boolean success = innerTask.checkValidAndSetMerging(); - Assert.assertFalse(success); + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()).thenReturn(innerTask).thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionMemoryCost().get()); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionFileNumCost().get()); for (TsFileResource tsFileResource : sequenceFiles) {
