This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch compaction_worker_refactor_0928 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 67e1db3bdd0872fb0ebe2fbaf69ccabe9cb1e1e6 Author: Jinrui.Zhang <[email protected]> AuthorDate: Sat Oct 7 17:44:11 2023 +0800 fix UT --- .../execute/task/AbstractCompactionTask.java | 8 --- .../execute/task/CrossSpaceCompactionTask.java | 42 +------------ .../execute/task/InnerSpaceCompactionTask.java | 55 +---------------- .../compaction/schedule/CompactionWorker.java | 7 ++- .../compaction/CompactionTaskComparatorTest.java | 10 --- .../FastCrossCompactionPerformerTest.java | 15 +++-- .../cross/CrossSpaceCompactionSelectorTest.java | 72 +++++++++++++++++++--- ...eCrossSpaceCompactionWithFastPerformerTest.java | 7 ++- ...sSpaceCompactionWithReadPointPerformerTest.java | 7 ++- .../InnerSeqCompactionWithFastPerformerTest.java | 4 +- ...nerSeqCompactionWithReadChunkPerformerTest.java | 5 +- .../inner/InnerSpaceCompactionSelectorTest.java | 47 +++++++++++--- 12 files changed, 137 insertions(+), 142 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index 1db4553c92e..02412723c48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -136,14 +136,6 @@ public abstract class AbstractCompactionTask { public abstract boolean equalsOtherTask(AbstractCompactionTask otherTask); - /** - * Check if the compaction task is valid (selected files are not merging, closed and exist). If - * the task is valid, then set the merging status of selected files to true. - * - * @return true if the task is valid else false - */ - public abstract boolean checkValidAndSetMerging(); - public void transitSourceFilesToMerging() throws FileCannotTransitToCompactingException { for (TsFileResource f : getAllSourceTsFiles()) { if (!f.setStatus(TsFileResourceStatus.COMPACTING)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java index 4d67f8f7022..08e36eb244c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java @@ -23,8 +23,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; @@ -37,7 +35,6 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; -import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -272,11 +269,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { false, true); } finally { - SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost); - SystemInfo.getInstance() - .decreaseCompactionFileNumCost( - selectedSequenceFiles.size() + selectedUnsequenceFiles.size()); - releaseAllLocksAndResetStatus(); + releaseAllLocks(); } return isSuccess; } @@ -292,7 +285,7 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { && this.performer.getClass().isInstance(otherCrossCompactionTask.performer); } - private void releaseAllLocksAndResetStatus() { + private void releaseAllLocks() { for (TsFileResource tsFileResource : holdWriteLockList) { tsFileResource.writeUnlock(); } @@ -349,37 +342,6 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { } } - @Override - public boolean checkValidAndSetMerging() { - if (!tsFileManager.isAllowCompaction()) { - resetCompactionCandidateStatusForAllSourceFiles(); - return false; - } - if (!isDiskSpaceCheckPassed()) { - LOGGER.debug( - "cross compaction task start check failed because disk free ratio is less than disk_space_warning_threshold"); - return false; - } - try { - SystemInfo.getInstance().addCompactionMemoryCost(memoryCost, 60); - SystemInfo.getInstance() - .addCompactionFileNum(selectedSequenceFiles.size() + selectedUnsequenceFiles.size(), 60); - } catch (Exception e) { - if (e instanceof InterruptedException) { - LOGGER.warn("Interrupted when allocating memory for compaction", e); - Thread.currentThread().interrupt(); - } else if (e instanceof CompactionMemoryNotEnoughException) { - LOGGER.info("No enough memory for current compaction task {}", this, e); - } else if (e instanceof CompactionFileCountExceededException) { - LOGGER.info("No enough file num for current compaction task {}", this, e); - SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost); - } - resetCompactionCandidateStatusForAllSourceFiles(); - return false; - } - return true; - } - @Override public long getEstimatedMemoryCost() { return memoryCost; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index f52048c24ff..e9eb19e58f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -24,8 +24,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.service.metrics.FileMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionExceptionHandler; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; @@ -42,7 +40,6 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; -import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException; @@ -116,7 +113,7 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) { if (this.performer instanceof ReadChunkCompactionPerformer) { innerSpaceEstimator = new ReadChunkInnerCompactionEstimator(); - } else if (!sequence && this.performer instanceof FastCompactionInnerCompactionEstimator) { + } else if (!sequence && this.performer instanceof FastCompactionPerformer) { innerSpaceEstimator = new FastCompactionInnerCompactionEstimator(); } } @@ -450,57 +447,9 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { } } - @Override - public boolean checkValidAndSetMerging() { - if (!tsFileManager.isAllowCompaction()) { - resetCompactionCandidateStatusForAllSourceFiles(); - return false; - } - if (!isDiskSpaceCheckPassed()) { - LOGGER.debug( - "inner compaction task start check failed because disk free ratio is less than disk_space_warning_threshold"); - return false; - } - try { - for (int i = 0; i < selectedTsFileResourceList.size(); ++i) { - TsFileResource resource = selectedTsFileResourceList.get(i); - if (!resource.setStatus(TsFileResourceStatus.COMPACTING)) { - releaseAllLocks(); - return false; - } - } - if (innerSpaceEstimator != null) { - memoryCost = innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList); - } - SystemInfo.getInstance().addCompactionMemoryCost(memoryCost, 60); - SystemInfo.getInstance().addCompactionFileNum(selectedTsFileResourceList.size(), 60); - } catch (Exception e) { - if (e instanceof InterruptedException) { - LOGGER.warn("Interrupted when allocating memory for compaction", e); - Thread.currentThread().interrupt(); - } else if (e instanceof CompactionMemoryNotEnoughException) { - LOGGER.warn("No enough memory for current compaction task {}", this, e); - } else if (e instanceof CompactionFileCountExceededException) { - LOGGER.warn("No enough file num for current compaction task {}", this, e); - SystemInfo.getInstance().resetCompactionMemoryCost(memoryCost); - } - releaseAllLocks(); - return false; - } finally { - try { - if (innerSpaceEstimator != null) { - innerSpaceEstimator.close(); - } - } catch (IOException e) { - LOGGER.warn("Failed to close InnerSpaceCompactionMemoryEstimator"); - } - } - return true; - } - @Override public long getEstimatedMemoryCost() throws IOException { - if (memoryCost == 0L) { + if (innerSpaceEstimator != null && memoryCost == 0L) { memoryCost = innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList); } return memoryCost; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java index e722bb1bdaf..36421205502 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException; @@ -75,8 +76,10 @@ public class CompactionWorker implements Runnable { return; } task.transitSourceFilesToMerging(); - estimatedMemoryCost = task.getEstimatedMemoryCost(); - memoryAcquired = SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60); + if (IoTDBDescriptor.getInstance().getConfig().isEnableCompactionMemControl()) { + estimatedMemoryCost = task.getEstimatedMemoryCost(); + memoryAcquired = SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60); + } fileHandleAcquired = SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60); CompactionTaskSummary summary = task.getSummary(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java index 8ac9f67ad83..a1efb673e21 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionTaskComparatorTest.java @@ -494,11 +494,6 @@ public class CompactionTaskComparatorTest { public boolean equalsOtherTask(AbstractCompactionTask other) { return false; } - - @Override - public boolean checkValidAndSetMerging() { - return true; - } } private static class FakeCrossSpaceCompactionTask extends CrossSpaceCompactionTask { @@ -529,11 +524,6 @@ public class CompactionTaskComparatorTest { public boolean equalsOtherTask(AbstractCompactionTask other) { return false; } - - @Override - public boolean checkValidAndSetMerging() { - return true; - } } private static class FakedTsFileResource extends TsFileResource { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java index 6c5c336e6d9..d67bf6be322 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java @@ -27,17 +27,20 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.IDataBlockReader; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.SeriesDataBlockReader; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.tools.validate.TsFileValidationTool; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -52,6 +55,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -4029,7 +4033,7 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest { @Test public void testReleaseFileNumAndMemoryAfterCrossTask() - throws IOException, MetadataException, WriteProcessException { + throws IOException, MetadataException, WriteProcessException, InterruptedException { int oldMaxCrossCompactionCandidateFileNum = SystemInfo.getInstance().getTotalFileLimitForCrossTask(); SystemInfo.getInstance().setTotalFileLimitForCrossTask(15); @@ -4050,9 +4054,12 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest { 1000, 0); Assert.assertTrue(task.setSourceFilesToCompactionCandidate()); - boolean success = task.checkValidAndSetMerging(); - Assert.assertTrue(success); - Assert.assertTrue(task.start()); + + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()).thenReturn(task).thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionFileNumCost().get()); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionMemoryCost().get()); } finally { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java index ac4f8d4a939..8b9358ac969 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionSelectorTest.java @@ -22,7 +22,10 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CrossSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.RewriteCrossSpaceCompactionSelector; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossCompactionTaskResource; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.CrossSpaceCompactionCandidate; @@ -30,12 +33,14 @@ import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManag import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; +import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; import java.util.List; @@ -176,7 +181,7 @@ public class CrossSpaceCompactionSelectorTest extends AbstractCompactionTest { @Test public void testSelectWithTooManySourceFiles() - throws IOException, MetadataException, WriteProcessException { + throws IOException, MetadataException, WriteProcessException, InterruptedException { int oldMaxFileNumForCompaction = SystemInfo.getInstance().getTotalFileLimitForCrossTask(); SystemInfo.getInstance().setTotalFileLimitForCrossTask(1); SystemInfo.getInstance().getCompactionFileNumCost().set(0); @@ -207,7 +212,14 @@ public class CrossSpaceCompactionSelectorTest extends AbstractCompactionTest { // set file status to COMPACTION_CANDIDATE Assert.assertTrue(crossSpaceCompactionTask.setSourceFilesToCompactionCandidate()); - Assert.assertFalse(crossSpaceCompactionTask.checkValidAndSetMerging()); + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()) + .thenReturn(crossSpaceCompactionTask) + .thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); + Assert.assertEquals(0, SystemInfo.getInstance().getCompactionMemoryCost().get()); Assert.assertEquals(0, SystemInfo.getInstance().getCompactionFileNumCost().get()); for (TsFileResource resource : seqResources) { @@ -564,9 +576,19 @@ public class CrossSpaceCompactionSelectorTest extends AbstractCompactionTest { cd1.countDown(); cd2.await(); - if (crossSpaceCompactionTask.checkValidAndSetMerging()) { - throw new RuntimeException("cross space compaction task should be invalid."); + try { + crossSpaceCompactionTask.transitSourceFilesToMerging(); + Assert.fail("cross space compaction task should be invalid."); + } catch (FileCannotTransitToCompactingException e) { + } + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()) + .thenReturn(crossSpaceCompactionTask) + .thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); for (int i = 0; i < seqResources.size(); i++) { TsFileResource resource = seqResources.get(i); @@ -906,9 +928,19 @@ public class CrossSpaceCompactionSelectorTest extends AbstractCompactionTest { cd1.countDown(); cd2.await(); - if (crossSpaceCompactionTask.checkValidAndSetMerging()) { - throw new RuntimeException("cross space compaction task should be invalid."); + try { + crossSpaceCompactionTask.transitSourceFilesToMerging(); + Assert.fail("cross space compaction task should be invalid."); + } catch (FileCannotTransitToCompactingException e) { + } + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()) + .thenReturn(crossSpaceCompactionTask) + .thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); for (int i = 0; i < seqResources.size(); i++) { TsFileResource resource = seqResources.get(i); @@ -1382,9 +1414,19 @@ public class CrossSpaceCompactionSelectorTest extends AbstractCompactionTest { cd1.countDown(); cd2.await(); - if (crossSpaceCompactionTask.checkValidAndSetMerging()) { - throw new RuntimeException("cross space compaction task should be invalid."); + try { + crossSpaceCompactionTask.transitSourceFilesToMerging(); + Assert.fail("cross space compaction task should be invalid."); + } catch (FileCannotTransitToCompactingException e) { + } + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()) + .thenReturn(crossSpaceCompactionTask) + .thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); for (int i = 0; i < unseqResources.size(); i++) { TsFileResource resource = unseqResources.get(i); @@ -1723,9 +1765,19 @@ public class CrossSpaceCompactionSelectorTest extends AbstractCompactionTest { cd1.countDown(); cd2.await(); - if (crossSpaceCompactionTask.checkValidAndSetMerging()) { - throw new RuntimeException("cross space compaction task should be invalid."); + try { + crossSpaceCompactionTask.transitSourceFilesToMerging(); + Assert.fail("cross space compaction task should be invalid."); + } catch (FileCannotTransitToCompactingException e) { + } + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()) + .thenReturn(crossSpaceCompactionTask) + .thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); for (int i = 0; i < unseqResources.size(); i++) { TsFileResource resource = unseqResources.get(i); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java index a0738f8f36b..5d9f43868ce 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithFastPerformerTest.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -596,7 +597,8 @@ public class RewriteCrossSpaceCompactionWithFastPerformerTest extends AbstractCo 0, 0); task.setSourceFilesToCompactionCandidate(); - task.checkValidAndSetMerging(); + seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING)); + unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING)); // delete data in source file during compaction vsgp.deleteByDevice( new PartialPath( @@ -714,7 +716,8 @@ public class RewriteCrossSpaceCompactionWithFastPerformerTest extends AbstractCo 0, 0); task.setSourceFilesToCompactionCandidate(); - task.checkValidAndSetMerging(); + seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING)); + unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING)); // delete data in source file during compaction vsgp.deleteByDevice( new PartialPath( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java index 89ac14094da..24bf2305eff 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/RewriteCrossSpaceCompactionWithReadPointPerformerTest.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -596,7 +597,8 @@ public class RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr 0, 0); task.setSourceFilesToCompactionCandidate(); - task.checkValidAndSetMerging(); + seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING)); + unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING)); // delete data in source file during compaction vsgp.deleteByDevice( new PartialPath( @@ -714,7 +716,8 @@ public class RewriteCrossSpaceCompactionWithReadPointPerformerTest extends Abstr 0, 0); task.setSourceFilesToCompactionCandidate(); - task.checkValidAndSetMerging(); + seqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING)); + unseqResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING)); // delete data in source file during compaction vsgp.deleteByDevice( new PartialPath( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java index b0ac91e2fed..e8c01e2b0e1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithFastPerformerTest.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionF import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTimeseriesType; import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.constant.TestConstant; @@ -1136,7 +1137,8 @@ public class InnerSeqCompactionWithFastPerformerTest { 0, vsgp.getTsFileResourceManager(), sourceResources, true, performer, 0); task.setSourceFilesToCompactionCandidate(); - task.checkValidAndSetMerging(); + // set the source files to COMPACTING manually to simulate the concurrent scenario + sourceResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING)); // delete data during compaction vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1200, 0); vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1800, 0); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java index 59537af962f..83b5933c60e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSeqCompactionWithReadChunkPerformerTest.java @@ -39,6 +39,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionF import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTimeseriesType; import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.constant.TestConstant; @@ -1059,7 +1060,7 @@ public class InnerSeqCompactionWithReadChunkPerformerTest { @Test public void testCompactionWithDeletionsDuringCompactions() - throws MetadataException, IOException, DataRegionException { + throws MetadataException, IOException, DataRegionException, InterruptedException { // create source seq files List<TsFileResource> sourceResources = new ArrayList<>(); List<List<Long>> chunkPagePointsNum = new ArrayList<>(); @@ -1102,7 +1103,7 @@ public class InnerSeqCompactionWithReadChunkPerformerTest { new ReadChunkCompactionPerformer(), 0); task.setSourceFilesToCompactionCandidate(); - task.checkValidAndSetMerging(); + sourceResources.forEach(f -> f.setStatus(TsFileResourceStatus.COMPACTING)); // delete data during compaction vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1200, 0); vsgp.deleteByDevice(new PartialPath(fullPaths[0]), 0, 1800, 0); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java index 1c347c7f67e..5307364747d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/InnerSpaceCompactionSelectorTest.java @@ -23,20 +23,25 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionWorker; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SizeTieredCompactionSelector; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.io.IOException; import java.util.List; @@ -286,9 +291,20 @@ public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest { cd1.countDown(); cd2.await(); - if (innerSpaceCompactionTask.checkValidAndSetMerging()) { - throw new RuntimeException("cross space compaction task should be invalid."); + try { + innerSpaceCompactionTask.transitSourceFilesToMerging(); + Assert.fail("inner space compaction task should be invalid."); + } catch (FileCannotTransitToCompactingException e) { + } + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()) + .thenReturn(innerSpaceCompactionTask) + .thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); + for (int i = 0; i < task.size(); i++) { TsFileResource resource = task.get(i); if (i == 1) { @@ -300,8 +316,10 @@ public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest { } } } else { - if (!innerSpaceCompactionTask.checkValidAndSetMerging()) { - throw new RuntimeException("cross space compaction task should be valid."); + try { + innerSpaceCompactionTask.transitSourceFilesToMerging(); + } catch (FileCannotTransitToCompactingException e) { + Assert.fail("inner space compaction task should be valid."); } for (int i = 0; i < task.size(); i++) { TsFileResource resource = task.get(i); @@ -579,9 +597,20 @@ public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest { cd1.countDown(); cd2.await(); - if (innerSpaceCompactionTask.checkValidAndSetMerging()) { - throw new RuntimeException("cross space compaction task should be invalid."); + try { + innerSpaceCompactionTask.transitSourceFilesToMerging(); + Assert.fail("inner space compaction task should be invalid."); + } catch (FileCannotTransitToCompactingException e) { + } + FixedPriorityBlockingQueue<AbstractCompactionTask> mockQueue = + Mockito.mock(FixedPriorityBlockingQueue.class); + Mockito.when(mockQueue.take()) + .thenReturn(innerSpaceCompactionTask) + .thenThrow(new InterruptedException()); + CompactionWorker worker = new CompactionWorker(0, mockQueue); + worker.run(); + for (int i = 0; i < task.size(); i++) { TsFileResource resource = task.get(i); if (i == 1) { @@ -593,8 +622,10 @@ public class InnerSpaceCompactionSelectorTest extends AbstractCompactionTest { } } } else { - if (!innerSpaceCompactionTask.checkValidAndSetMerging()) { - throw new RuntimeException("cross space compaction task should be valid."); + try { + innerSpaceCompactionTask.transitSourceFilesToMerging(); + } catch (FileCannotTransitToCompactingException e) { + Assert.fail("inner space compaction task should be valid."); } for (int i = 0; i < task.size(); i++) { TsFileResource resource = task.get(i);
