This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 575616ae5cc [to dev/1.3] Separate fully dirty files by partially dirty
files task (#15342)
575616ae5cc is described below
commit 575616ae5ccb418d6e408b0b7632a12b543ee818
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 16 12:33:30 2025 +0800
[to dev/1.3] Separate fully dirty files by partially dirty files task
(#15342)
* separate fully dirty files by partially dirty files task
* fix concurrency bug
---
.../compaction/schedule/CompactionScheduler.java | 16 ++--
.../selector/impl/SettleSelectorImpl.java | 103 ++++++++++-----------
.../settle/SettleCompactionSelectorTest.java | 20 ++--
3 files changed, 67 insertions(+), 72 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index 44678a6356a..f2c8e789ab1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -160,9 +160,7 @@ public class CompactionScheduler {
long startTime = System.currentTimeMillis();
List<InnerSpaceCompactionTask> innerSpaceTaskList =
innerSpaceCompactionSelector.selectInnerSpaceTask(
- sequence
- ?
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
- :
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+ tsFileManager.getTsFileListSnapshot(timePartition, sequence));
CompactionMetrics.getInstance()
.updateCompactionTaskSelectionTimeCost(
sequence ? CompactionTaskType.INNER_SEQ :
CompactionTaskType.INNER_UNSEQ,
@@ -245,8 +243,8 @@ public class CompactionScheduler {
List<CrossCompactionTaskResource> selectedTasks =
selector.selectInsertionCrossSpaceTask(
-
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition),
-
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+ tsFileManager.getTsFileListSnapshot(timePartition, true),
+ tsFileManager.getTsFileListSnapshot(timePartition, false));
if (selectedTasks.isEmpty()) {
return 0;
}
@@ -287,8 +285,8 @@ public class CompactionScheduler {
List<CrossCompactionTaskResource> taskList =
crossSpaceCompactionSelector.selectCrossSpaceTask(
-
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition),
-
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+ tsFileManager.getTsFileListSnapshot(timePartition, true),
+ tsFileManager.getTsFileListSnapshot(timePartition, false));
List<Long> memoryCost =
taskList.stream()
.map(CrossCompactionTaskResource::getTotalMemoryCost)
@@ -335,12 +333,12 @@ public class CompactionScheduler {
if (config.isEnableSeqSpaceCompaction()) {
taskList.addAll(
settleSelector.selectSettleTask(
-
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)));
+ tsFileManager.getTsFileListSnapshot(timePartition, true)));
}
if (config.isEnableUnseqSpaceCompaction()) {
taskList.addAll(
settleSelector.selectSettleTask(
-
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition)));
+ tsFileManager.getTsFileListSnapshot(timePartition, false)));
}
CompactionMetrics.getInstance()
.updateCompactionTaskSelectionTimeCost(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
index d6703ceeb74..9a5dff0afea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
@@ -98,24 +98,38 @@ public class SettleSelectorImpl implements ISettleSelector {
}
}
- static class PartiallyDirtyResource {
- List<TsFileResource> resources = new ArrayList<>();
- long totalFileSize = 0;
-
- public boolean add(TsFileResource resource, long dirtyDataSize) {
- resources.add(resource);
- totalFileSize += resource.getTsFileSize();
- totalFileSize -= dirtyDataSize;
+ static class SettleTaskResource {
+
+ List<TsFileResource> fullyDirtyResources = new ArrayList<>();
+ List<TsFileResource> partiallyDirtyResources = new ArrayList<>();
+ long totalPartiallyDirtyFileSize = 0;
+
+ public void addFullyDirtyResource(TsFileResource resource) {
+ fullyDirtyResources.add(resource);
+ }
+
+ public boolean addPartiallyDirtyResource(TsFileResource resource, long
dirtyDataSize) {
+ partiallyDirtyResources.add(resource);
+ totalPartiallyDirtyFileSize += resource.getTsFileSize();
+ totalPartiallyDirtyFileSize -= dirtyDataSize;
return checkHasReachedThreshold();
}
- public List<TsFileResource> getResources() {
- return resources;
+ public List<TsFileResource> getFullyDirtyResources() {
+ return fullyDirtyResources;
+ }
+
+ public List<TsFileResource> getPartiallyDirtyResources() {
+ return partiallyDirtyResources;
}
public boolean checkHasReachedThreshold() {
- return resources.size() >= config.getInnerCompactionCandidateFileNum()
- || totalFileSize >= config.getTargetCompactionFileSize();
+ return partiallyDirtyResources.size() >=
config.getInnerCompactionCandidateFileNum()
+ || totalPartiallyDirtyFileSize >=
config.getTargetCompactionFileSize();
+ }
+
+ public boolean isEmpty() {
+ return fullyDirtyResources.isEmpty() &&
partiallyDirtyResources.isEmpty();
}
}
@@ -129,9 +143,8 @@ public class SettleSelectorImpl implements ISettleSelector {
}
private List<SettleCompactionTask> selectTasks(List<TsFileResource>
resources) {
- List<TsFileResource> fullyDirtyResource = new ArrayList<>();
- List<PartiallyDirtyResource> partiallyDirtyResourceList = new
ArrayList<>();
- PartiallyDirtyResource partiallyDirtyResource = new
PartiallyDirtyResource();
+ List<SettleTaskResource> partiallyDirtyResourceList = new ArrayList<>();
+ SettleTaskResource settleTaskResource = new SettleTaskResource();
try {
for (TsFileResource resource : resources) {
boolean shouldStop = false;
@@ -148,21 +161,22 @@ public class SettleSelectorImpl implements
ISettleSelector {
switch (fileDirtyInfo.status) {
case FULLY_DIRTY:
- fullyDirtyResource.add(resource);
+ settleTaskResource.addFullyDirtyResource(resource);
break;
case PARTIALLY_DIRTY:
- shouldStop = partiallyDirtyResource.add(resource,
fileDirtyInfo.dirtyDataSize);
+ shouldStop =
+ settleTaskResource.addPartiallyDirtyResource(resource,
fileDirtyInfo.dirtyDataSize);
break;
case NOT_SATISFIED:
- shouldStop = !partiallyDirtyResource.getResources().isEmpty();
+ shouldStop =
!settleTaskResource.getPartiallyDirtyResources().isEmpty();
break;
default:
// do nothing
}
if (shouldStop) {
- partiallyDirtyResourceList.add(partiallyDirtyResource);
- partiallyDirtyResource = new PartiallyDirtyResource();
+ partiallyDirtyResourceList.add(settleTaskResource);
+ settleTaskResource = new SettleTaskResource();
if (!heavySelect) {
// Non-heavy selection is triggered more frequently. In order to
avoid selecting too
// many files containing mods for compaction when the disk is
insufficient, the number
@@ -171,8 +185,8 @@ public class SettleSelectorImpl implements ISettleSelector {
}
}
}
- partiallyDirtyResourceList.add(partiallyDirtyResource);
- return createTask(fullyDirtyResource, partiallyDirtyResourceList);
+ partiallyDirtyResourceList.add(settleTaskResource);
+ return createTask(partiallyDirtyResourceList);
} catch (Exception e) {
LOGGER.error(
"{}-{} cannot select file for settle compaction", storageGroupName,
dataRegionId, e);
@@ -278,39 +292,22 @@ public class SettleSelectorImpl implements
ISettleSelector {
return false;
}
- private List<SettleCompactionTask> createTask(
- List<TsFileResource> fullyDirtyResources,
- List<PartiallyDirtyResource> partiallyDirtyResourceList) {
+ private List<SettleCompactionTask> createTask(List<SettleTaskResource>
settleTaskResourceList) {
List<SettleCompactionTask> tasks = new ArrayList<>();
- for (int i = 0; i < partiallyDirtyResourceList.size(); i++) {
- if (i == 0) {
- if (fullyDirtyResources.isEmpty()
- && partiallyDirtyResourceList.get(i).getResources().isEmpty()) {
- continue;
- }
- tasks.add(
- new SettleCompactionTask(
- timePartition,
- tsFileManager,
- fullyDirtyResources,
- partiallyDirtyResourceList.get(i).getResources(),
- isSeq,
- createCompactionPerformer(),
- tsFileManager.getNextCompactionTaskId()));
- } else {
- if (partiallyDirtyResourceList.get(i).getResources().isEmpty()) {
- continue;
- }
- tasks.add(
- new SettleCompactionTask(
- timePartition,
- tsFileManager,
- Collections.emptyList(),
- partiallyDirtyResourceList.get(i).getResources(),
- isSeq,
- createCompactionPerformer(),
- tsFileManager.getNextCompactionTaskId()));
+ for (SettleTaskResource settleTaskResource : settleTaskResourceList) {
+ if (settleTaskResource.isEmpty()) {
+ continue;
}
+ SettleCompactionTask task =
+ new SettleCompactionTask(
+ timePartition,
+ tsFileManager,
+ settleTaskResource.getFullyDirtyResources(),
+ settleTaskResource.getPartiallyDirtyResources(),
+ isSeq,
+ createCompactionPerformer(),
+ tsFileManager.getNextCompactionTaskId());
+ tasks.add(task);
}
return tasks;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
index 514ee54aa3f..4f0edeb6de3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
@@ -444,14 +444,14 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
}
seqTasks = settleSelector.selectSettleTask(seqResources);
Assert.assertEquals(2, seqTasks.size());
- Assert.assertEquals(5, seqTasks.get(0).getFullyDirtyFiles().size());
+ Assert.assertEquals(3, seqTasks.get(0).getFullyDirtyFiles().size());
Assert.assertEquals(3, seqTasks.get(0).getPartiallyDirtyFiles().size());
Assert.assertEquals(seqResources.get(1),
seqTasks.get(0).getPartiallyDirtyFiles().get(0));
Assert.assertEquals(seqResources.get(3),
seqTasks.get(0).getPartiallyDirtyFiles().get(1));
Assert.assertEquals(seqResources.get(5),
seqTasks.get(0).getPartiallyDirtyFiles().get(2));
Assert.assertEquals(seqResources.get(7),
seqTasks.get(1).getPartiallyDirtyFiles().get(0));
Assert.assertEquals(seqResources.get(9),
seqTasks.get(1).getPartiallyDirtyFiles().get(1));
- Assert.assertEquals(0, seqTasks.get(1).getFullyDirtyFiles().size());
+ Assert.assertEquals(2, seqTasks.get(1).getFullyDirtyFiles().size());
Assert.assertEquals(2, seqTasks.get(1).getPartiallyDirtyFiles().size());
Assert.assertTrue(seqTasks.get(0).start());
Assert.assertTrue(seqTasks.get(1).start());
@@ -575,15 +575,15 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
new SettleSelectorImpl(true, COMPACTION_TEST_SG, "0", 0,
tsFileManager);
List<SettleCompactionTask> seqTasks =
settleSelector.selectSettleTask(seqResources);
Assert.assertEquals(3, seqTasks.size());
- Assert.assertEquals(3, seqTasks.get(0).getFullyDirtyFiles().size());
+ Assert.assertEquals(0, seqTasks.get(0).getFullyDirtyFiles().size());
Assert.assertEquals(1, seqTasks.get(0).getPartiallyDirtyFiles().size());
Assert.assertEquals(seqResources.get(0),
seqTasks.get(0).getPartiallyDirtyFiles().get(0));
- Assert.assertEquals(0, seqTasks.get(1).getFullyDirtyFiles().size());
+ Assert.assertEquals(2, seqTasks.get(1).getFullyDirtyFiles().size());
Assert.assertEquals(1, seqTasks.get(1).getPartiallyDirtyFiles().size());
Assert.assertEquals(seqResources.get(4),
seqTasks.get(1).getPartiallyDirtyFiles().get(0));
- Assert.assertEquals(0, seqTasks.get(2).getFullyDirtyFiles().size());
+ Assert.assertEquals(1, seqTasks.get(2).getFullyDirtyFiles().size());
Assert.assertEquals(2, seqTasks.get(2).getPartiallyDirtyFiles().size());
Assert.assertEquals(seqResources.get(6),
seqTasks.get(2).getPartiallyDirtyFiles().get(0));
Assert.assertEquals(seqResources.get(8),
seqTasks.get(2).getPartiallyDirtyFiles().get(1));
@@ -981,14 +981,14 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
}
seqTasks = settleSelector.selectSettleTask(seqResources);
Assert.assertEquals(2, seqTasks.size());
- Assert.assertEquals(5, seqTasks.get(0).getFullyDirtyFiles().size());
+ Assert.assertEquals(3, seqTasks.get(0).getFullyDirtyFiles().size());
Assert.assertEquals(3, seqTasks.get(0).getPartiallyDirtyFiles().size());
Assert.assertEquals(seqResources.get(1),
seqTasks.get(0).getPartiallyDirtyFiles().get(0));
Assert.assertEquals(seqResources.get(3),
seqTasks.get(0).getPartiallyDirtyFiles().get(1));
Assert.assertEquals(seqResources.get(5),
seqTasks.get(0).getPartiallyDirtyFiles().get(2));
Assert.assertEquals(seqResources.get(7),
seqTasks.get(1).getPartiallyDirtyFiles().get(0));
Assert.assertEquals(seqResources.get(9),
seqTasks.get(1).getPartiallyDirtyFiles().get(1));
- Assert.assertEquals(0, seqTasks.get(1).getFullyDirtyFiles().size());
+ Assert.assertEquals(2, seqTasks.get(1).getFullyDirtyFiles().size());
Assert.assertEquals(2, seqTasks.get(1).getPartiallyDirtyFiles().size());
Assert.assertTrue(seqTasks.get(0).start());
Assert.assertTrue(seqTasks.get(1).start());
@@ -1114,15 +1114,15 @@ public class SettleCompactionSelectorTest extends
AbstractCompactionTest {
new SettleSelectorImpl(true, COMPACTION_TEST_SG, "0", 0,
tsFileManager);
List<SettleCompactionTask> seqTasks =
settleSelector.selectSettleTask(seqResources);
Assert.assertEquals(3, seqTasks.size());
- Assert.assertEquals(3, seqTasks.get(0).getFullyDirtyFiles().size());
+ Assert.assertEquals(0, seqTasks.get(0).getFullyDirtyFiles().size());
Assert.assertEquals(1, seqTasks.get(0).getPartiallyDirtyFiles().size());
Assert.assertEquals(seqResources.get(0),
seqTasks.get(0).getPartiallyDirtyFiles().get(0));
- Assert.assertEquals(0, seqTasks.get(1).getFullyDirtyFiles().size());
+ Assert.assertEquals(2, seqTasks.get(1).getFullyDirtyFiles().size());
Assert.assertEquals(1, seqTasks.get(1).getPartiallyDirtyFiles().size());
Assert.assertEquals(seqResources.get(4),
seqTasks.get(1).getPartiallyDirtyFiles().get(0));
- Assert.assertEquals(0, seqTasks.get(2).getFullyDirtyFiles().size());
+ Assert.assertEquals(1, seqTasks.get(2).getFullyDirtyFiles().size());
Assert.assertEquals(2, seqTasks.get(2).getPartiallyDirtyFiles().size());
Assert.assertEquals(seqResources.get(6),
seqTasks.get(2).getPartiallyDirtyFiles().get(0));
Assert.assertEquals(seqResources.get(8),
seqTasks.get(2).getPartiallyDirtyFiles().get(1));