This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 575616ae5cc [to dev/1.3] Separate fully dirty files by partially dirty 
files task (#15342)
575616ae5cc is described below

commit 575616ae5ccb418d6e408b0b7632a12b543ee818
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 16 12:33:30 2025 +0800

    [to dev/1.3] Separate fully dirty files by partially dirty files task 
(#15342)
    
    * separate fully dirty files by partially dirty files task
    
    * fix concurrency bug
---
 .../compaction/schedule/CompactionScheduler.java   |  16 ++--
 .../selector/impl/SettleSelectorImpl.java          | 103 ++++++++++-----------
 .../settle/SettleCompactionSelectorTest.java       |  20 ++--
 3 files changed, 67 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
index 44678a6356a..f2c8e789ab1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java
@@ -160,9 +160,7 @@ public class CompactionScheduler {
     long startTime = System.currentTimeMillis();
     List<InnerSpaceCompactionTask> innerSpaceTaskList =
         innerSpaceCompactionSelector.selectInnerSpaceTask(
-            sequence
-                ? 
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)
-                : 
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+            tsFileManager.getTsFileListSnapshot(timePartition, sequence));
     CompactionMetrics.getInstance()
         .updateCompactionTaskSelectionTimeCost(
             sequence ? CompactionTaskType.INNER_SEQ : 
CompactionTaskType.INNER_UNSEQ,
@@ -245,8 +243,8 @@ public class CompactionScheduler {
 
     List<CrossCompactionTaskResource> selectedTasks =
         selector.selectInsertionCrossSpaceTask(
-            
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition),
-            
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+            tsFileManager.getTsFileListSnapshot(timePartition, true),
+            tsFileManager.getTsFileListSnapshot(timePartition, false));
     if (selectedTasks.isEmpty()) {
       return 0;
     }
@@ -287,8 +285,8 @@ public class CompactionScheduler {
 
     List<CrossCompactionTaskResource> taskList =
         crossSpaceCompactionSelector.selectCrossSpaceTask(
-            
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition),
-            
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition));
+            tsFileManager.getTsFileListSnapshot(timePartition, true),
+            tsFileManager.getTsFileListSnapshot(timePartition, false));
     List<Long> memoryCost =
         taskList.stream()
             .map(CrossCompactionTaskResource::getTotalMemoryCost)
@@ -335,12 +333,12 @@ public class CompactionScheduler {
     if (config.isEnableSeqSpaceCompaction()) {
       taskList.addAll(
           settleSelector.selectSettleTask(
-              
tsFileManager.getOrCreateSequenceListByTimePartition(timePartition)));
+              tsFileManager.getTsFileListSnapshot(timePartition, true)));
     }
     if (config.isEnableUnseqSpaceCompaction()) {
       taskList.addAll(
           settleSelector.selectSettleTask(
-              
tsFileManager.getOrCreateUnsequenceListByTimePartition(timePartition)));
+              tsFileManager.getTsFileListSnapshot(timePartition, false)));
     }
     CompactionMetrics.getInstance()
         .updateCompactionTaskSelectionTimeCost(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
index d6703ceeb74..9a5dff0afea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java
@@ -98,24 +98,38 @@ public class SettleSelectorImpl implements ISettleSelector {
     }
   }
 
-  static class PartiallyDirtyResource {
-    List<TsFileResource> resources = new ArrayList<>();
-    long totalFileSize = 0;
-
-    public boolean add(TsFileResource resource, long dirtyDataSize) {
-      resources.add(resource);
-      totalFileSize += resource.getTsFileSize();
-      totalFileSize -= dirtyDataSize;
+  static class SettleTaskResource {
+
+    List<TsFileResource> fullyDirtyResources = new ArrayList<>();
+    List<TsFileResource> partiallyDirtyResources = new ArrayList<>();
+    long totalPartiallyDirtyFileSize = 0;
+
+    public void addFullyDirtyResource(TsFileResource resource) {
+      fullyDirtyResources.add(resource);
+    }
+
+    public boolean addPartiallyDirtyResource(TsFileResource resource, long 
dirtyDataSize) {
+      partiallyDirtyResources.add(resource);
+      totalPartiallyDirtyFileSize += resource.getTsFileSize();
+      totalPartiallyDirtyFileSize -= dirtyDataSize;
       return checkHasReachedThreshold();
     }
 
-    public List<TsFileResource> getResources() {
-      return resources;
+    public List<TsFileResource> getFullyDirtyResources() {
+      return fullyDirtyResources;
+    }
+
+    public List<TsFileResource> getPartiallyDirtyResources() {
+      return partiallyDirtyResources;
     }
 
     public boolean checkHasReachedThreshold() {
-      return resources.size() >= config.getInnerCompactionCandidateFileNum()
-          || totalFileSize >= config.getTargetCompactionFileSize();
+      return partiallyDirtyResources.size() >= 
config.getInnerCompactionCandidateFileNum()
+          || totalPartiallyDirtyFileSize >= 
config.getTargetCompactionFileSize();
+    }
+
+    public boolean isEmpty() {
+      return fullyDirtyResources.isEmpty() && 
partiallyDirtyResources.isEmpty();
     }
   }
 
@@ -129,9 +143,8 @@ public class SettleSelectorImpl implements ISettleSelector {
   }
 
   private List<SettleCompactionTask> selectTasks(List<TsFileResource> 
resources) {
-    List<TsFileResource> fullyDirtyResource = new ArrayList<>();
-    List<PartiallyDirtyResource> partiallyDirtyResourceList = new 
ArrayList<>();
-    PartiallyDirtyResource partiallyDirtyResource = new 
PartiallyDirtyResource();
+    List<SettleTaskResource> partiallyDirtyResourceList = new ArrayList<>();
+    SettleTaskResource settleTaskResource = new SettleTaskResource();
     try {
       for (TsFileResource resource : resources) {
         boolean shouldStop = false;
@@ -148,21 +161,22 @@ public class SettleSelectorImpl implements 
ISettleSelector {
 
         switch (fileDirtyInfo.status) {
           case FULLY_DIRTY:
-            fullyDirtyResource.add(resource);
+            settleTaskResource.addFullyDirtyResource(resource);
             break;
           case PARTIALLY_DIRTY:
-            shouldStop = partiallyDirtyResource.add(resource, 
fileDirtyInfo.dirtyDataSize);
+            shouldStop =
+                settleTaskResource.addPartiallyDirtyResource(resource, 
fileDirtyInfo.dirtyDataSize);
             break;
           case NOT_SATISFIED:
-            shouldStop = !partiallyDirtyResource.getResources().isEmpty();
+            shouldStop = 
!settleTaskResource.getPartiallyDirtyResources().isEmpty();
             break;
           default:
             // do nothing
         }
 
         if (shouldStop) {
-          partiallyDirtyResourceList.add(partiallyDirtyResource);
-          partiallyDirtyResource = new PartiallyDirtyResource();
+          partiallyDirtyResourceList.add(settleTaskResource);
+          settleTaskResource = new SettleTaskResource();
           if (!heavySelect) {
             // Non-heavy selection is triggered more frequently. In order to 
avoid selecting too
             // many files containing mods for compaction when the disk is 
insufficient, the number
@@ -171,8 +185,8 @@ public class SettleSelectorImpl implements ISettleSelector {
           }
         }
       }
-      partiallyDirtyResourceList.add(partiallyDirtyResource);
-      return createTask(fullyDirtyResource, partiallyDirtyResourceList);
+      partiallyDirtyResourceList.add(settleTaskResource);
+      return createTask(partiallyDirtyResourceList);
     } catch (Exception e) {
       LOGGER.error(
           "{}-{} cannot select file for settle compaction", storageGroupName, 
dataRegionId, e);
@@ -278,39 +292,22 @@ public class SettleSelectorImpl implements 
ISettleSelector {
     return false;
   }
 
-  private List<SettleCompactionTask> createTask(
-      List<TsFileResource> fullyDirtyResources,
-      List<PartiallyDirtyResource> partiallyDirtyResourceList) {
+  private List<SettleCompactionTask> createTask(List<SettleTaskResource> 
settleTaskResourceList) {
     List<SettleCompactionTask> tasks = new ArrayList<>();
-    for (int i = 0; i < partiallyDirtyResourceList.size(); i++) {
-      if (i == 0) {
-        if (fullyDirtyResources.isEmpty()
-            && partiallyDirtyResourceList.get(i).getResources().isEmpty()) {
-          continue;
-        }
-        tasks.add(
-            new SettleCompactionTask(
-                timePartition,
-                tsFileManager,
-                fullyDirtyResources,
-                partiallyDirtyResourceList.get(i).getResources(),
-                isSeq,
-                createCompactionPerformer(),
-                tsFileManager.getNextCompactionTaskId()));
-      } else {
-        if (partiallyDirtyResourceList.get(i).getResources().isEmpty()) {
-          continue;
-        }
-        tasks.add(
-            new SettleCompactionTask(
-                timePartition,
-                tsFileManager,
-                Collections.emptyList(),
-                partiallyDirtyResourceList.get(i).getResources(),
-                isSeq,
-                createCompactionPerformer(),
-                tsFileManager.getNextCompactionTaskId()));
+    for (SettleTaskResource settleTaskResource : settleTaskResourceList) {
+      if (settleTaskResource.isEmpty()) {
+        continue;
       }
+      SettleCompactionTask task =
+          new SettleCompactionTask(
+              timePartition,
+              tsFileManager,
+              settleTaskResource.getFullyDirtyResources(),
+              settleTaskResource.getPartiallyDirtyResources(),
+              isSeq,
+              createCompactionPerformer(),
+              tsFileManager.getNextCompactionTaskId());
+      tasks.add(task);
     }
     return tasks;
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
index 514ee54aa3f..4f0edeb6de3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/settle/SettleCompactionSelectorTest.java
@@ -444,14 +444,14 @@ public class SettleCompactionSelectorTest extends 
AbstractCompactionTest {
     }
     seqTasks = settleSelector.selectSettleTask(seqResources);
     Assert.assertEquals(2, seqTasks.size());
-    Assert.assertEquals(5, seqTasks.get(0).getFullyDirtyFiles().size());
+    Assert.assertEquals(3, seqTasks.get(0).getFullyDirtyFiles().size());
     Assert.assertEquals(3, seqTasks.get(0).getPartiallyDirtyFiles().size());
     Assert.assertEquals(seqResources.get(1), 
seqTasks.get(0).getPartiallyDirtyFiles().get(0));
     Assert.assertEquals(seqResources.get(3), 
seqTasks.get(0).getPartiallyDirtyFiles().get(1));
     Assert.assertEquals(seqResources.get(5), 
seqTasks.get(0).getPartiallyDirtyFiles().get(2));
     Assert.assertEquals(seqResources.get(7), 
seqTasks.get(1).getPartiallyDirtyFiles().get(0));
     Assert.assertEquals(seqResources.get(9), 
seqTasks.get(1).getPartiallyDirtyFiles().get(1));
-    Assert.assertEquals(0, seqTasks.get(1).getFullyDirtyFiles().size());
+    Assert.assertEquals(2, seqTasks.get(1).getFullyDirtyFiles().size());
     Assert.assertEquals(2, seqTasks.get(1).getPartiallyDirtyFiles().size());
     Assert.assertTrue(seqTasks.get(0).start());
     Assert.assertTrue(seqTasks.get(1).start());
@@ -575,15 +575,15 @@ public class SettleCompactionSelectorTest extends 
AbstractCompactionTest {
         new SettleSelectorImpl(true, COMPACTION_TEST_SG, "0", 0, 
tsFileManager);
     List<SettleCompactionTask> seqTasks = 
settleSelector.selectSettleTask(seqResources);
     Assert.assertEquals(3, seqTasks.size());
-    Assert.assertEquals(3, seqTasks.get(0).getFullyDirtyFiles().size());
+    Assert.assertEquals(0, seqTasks.get(0).getFullyDirtyFiles().size());
     Assert.assertEquals(1, seqTasks.get(0).getPartiallyDirtyFiles().size());
     Assert.assertEquals(seqResources.get(0), 
seqTasks.get(0).getPartiallyDirtyFiles().get(0));
 
-    Assert.assertEquals(0, seqTasks.get(1).getFullyDirtyFiles().size());
+    Assert.assertEquals(2, seqTasks.get(1).getFullyDirtyFiles().size());
     Assert.assertEquals(1, seqTasks.get(1).getPartiallyDirtyFiles().size());
     Assert.assertEquals(seqResources.get(4), 
seqTasks.get(1).getPartiallyDirtyFiles().get(0));
 
-    Assert.assertEquals(0, seqTasks.get(2).getFullyDirtyFiles().size());
+    Assert.assertEquals(1, seqTasks.get(2).getFullyDirtyFiles().size());
     Assert.assertEquals(2, seqTasks.get(2).getPartiallyDirtyFiles().size());
     Assert.assertEquals(seqResources.get(6), 
seqTasks.get(2).getPartiallyDirtyFiles().get(0));
     Assert.assertEquals(seqResources.get(8), 
seqTasks.get(2).getPartiallyDirtyFiles().get(1));
@@ -981,14 +981,14 @@ public class SettleCompactionSelectorTest extends 
AbstractCompactionTest {
     }
     seqTasks = settleSelector.selectSettleTask(seqResources);
     Assert.assertEquals(2, seqTasks.size());
-    Assert.assertEquals(5, seqTasks.get(0).getFullyDirtyFiles().size());
+    Assert.assertEquals(3, seqTasks.get(0).getFullyDirtyFiles().size());
     Assert.assertEquals(3, seqTasks.get(0).getPartiallyDirtyFiles().size());
     Assert.assertEquals(seqResources.get(1), 
seqTasks.get(0).getPartiallyDirtyFiles().get(0));
     Assert.assertEquals(seqResources.get(3), 
seqTasks.get(0).getPartiallyDirtyFiles().get(1));
     Assert.assertEquals(seqResources.get(5), 
seqTasks.get(0).getPartiallyDirtyFiles().get(2));
     Assert.assertEquals(seqResources.get(7), 
seqTasks.get(1).getPartiallyDirtyFiles().get(0));
     Assert.assertEquals(seqResources.get(9), 
seqTasks.get(1).getPartiallyDirtyFiles().get(1));
-    Assert.assertEquals(0, seqTasks.get(1).getFullyDirtyFiles().size());
+    Assert.assertEquals(2, seqTasks.get(1).getFullyDirtyFiles().size());
     Assert.assertEquals(2, seqTasks.get(1).getPartiallyDirtyFiles().size());
     Assert.assertTrue(seqTasks.get(0).start());
     Assert.assertTrue(seqTasks.get(1).start());
@@ -1114,15 +1114,15 @@ public class SettleCompactionSelectorTest extends 
AbstractCompactionTest {
         new SettleSelectorImpl(true, COMPACTION_TEST_SG, "0", 0, 
tsFileManager);
     List<SettleCompactionTask> seqTasks = 
settleSelector.selectSettleTask(seqResources);
     Assert.assertEquals(3, seqTasks.size());
-    Assert.assertEquals(3, seqTasks.get(0).getFullyDirtyFiles().size());
+    Assert.assertEquals(0, seqTasks.get(0).getFullyDirtyFiles().size());
     Assert.assertEquals(1, seqTasks.get(0).getPartiallyDirtyFiles().size());
     Assert.assertEquals(seqResources.get(0), 
seqTasks.get(0).getPartiallyDirtyFiles().get(0));
 
-    Assert.assertEquals(0, seqTasks.get(1).getFullyDirtyFiles().size());
+    Assert.assertEquals(2, seqTasks.get(1).getFullyDirtyFiles().size());
     Assert.assertEquals(1, seqTasks.get(1).getPartiallyDirtyFiles().size());
     Assert.assertEquals(seqResources.get(4), 
seqTasks.get(1).getPartiallyDirtyFiles().get(0));
 
-    Assert.assertEquals(0, seqTasks.get(2).getFullyDirtyFiles().size());
+    Assert.assertEquals(1, seqTasks.get(2).getFullyDirtyFiles().size());
     Assert.assertEquals(2, seqTasks.get(2).getPartiallyDirtyFiles().size());
     Assert.assertEquals(seqResources.get(6), 
seqTasks.get(2).getPartiallyDirtyFiles().get(0));
     Assert.assertEquals(seqResources.get(8), 
seqTasks.get(2).getPartiallyDirtyFiles().get(1));

Reply via email to