This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eca660  [IOTDB-2024] Add annotation to compaction (#4523)
7eca660 is described below

commit 7eca6605bdf59fe9798d0da8d0ae67b903088b9c
Author: 周沛辰 <[email protected]>
AuthorDate: Fri Dec 3 15:41:07 2021 +0800

    [IOTDB-2024] Add annotation to compaction (#4523)
---
 .../engine/compaction/CompactionTaskManager.java   | 16 ++++++++++++++-
 .../cross/inplace/InplaceCompactionSelector.java   |  8 ++++++++
 .../inplace/selector/MaxFileMergeFileSelector.java | 20 +++++++++++++++++++
 .../sizetiered/SizeTieredCompactionSelector.java   | 23 +++++++++++++++++++++-
 4 files changed, 65 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 959b9fc..8eac781 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -52,16 +52,25 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class CompactionTaskManager implements IService {
   private static final Logger logger = LoggerFactory.getLogger("COMPACTION");
   private static final CompactionTaskManager INSTANCE = new 
CompactionTaskManager();
+
+  // The thread pool that executes the compaction task. The default number of 
threads for this pool
+  // is 10.
   private WrappedScheduledExecutorService taskExecutionPool;
   public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
-  // TODO: record the task in time partition
   private MinMaxPriorityQueue<AbstractCompactionTask> 
candidateCompactionTaskQueue =
       MinMaxPriorityQueue.orderedBy(new 
CompactionTaskComparator()).maximumSize(1000).create();
+  // <logicalStorageGroupName,futureSet>, it is used to terminate all 
compaction tasks under the
+  // logicalStorageGroup
   private Map<String, Set<Future<Void>>> storageGroupTasks = new 
ConcurrentHashMap<>();
   private Map<String, Map<Long, Set<Future<Void>>>> compactionTaskFutures =
       new ConcurrentHashMap<>();
   private List<AbstractCompactionTask> runningCompactionTaskList = new 
ArrayList<>();
+
+  // The thread pool that periodically fetches and executes the compaction 
task from
+  // candidateCompactionTaskQueue to taskExecutionPool. The default number of 
threads for this pool
+  // is 1.
   private ScheduledExecutorService compactionTaskSubmissionThreadPool;
+
   private final long TASK_SUBMIT_INTERVAL =
       
IoTDBDescriptor.getInstance().getConfig().getCompactionSubmissionInterval();
 
@@ -81,6 +90,11 @@ public class CompactionTaskManager implements IService {
       currentTaskNum = new AtomicInteger(0);
       compactionTaskSubmissionThreadPool =
           IoTDBThreadPoolFactory.newScheduledThreadPool(1, 
ThreadName.COMPACTION_SERVICE.getName());
+
+      // Periodically do the following: fetch the highest priority thread from 
the
+      // candidateCompactionTaskQueue, check that all tsfiles in the 
compaction task are valid, and
+      // if there is thread space available in the taskExecutionPool, put the 
compaction task thread
+      // into the taskExecutionPool and perform the compaction.
       compactionTaskSubmissionThreadPool.scheduleWithFixedDelay(
           this::submitTaskFromTaskQueue,
           TASK_SUBMIT_INTERVAL,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
index dcc494c..6190fb4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
@@ -62,6 +62,14 @@ public class InplaceCompactionSelector extends 
AbstractCrossSpaceCompactionSelec
         taskFactory);
   }
 
+  /**
+   * This method creates a specific file selector according to the file 
selection strategy of
+   * crossSpace compaction, uses the file selector to select all unseqFiles 
and seqFiles to be
+   * compacted under the time partition of the virtual storage group, and 
creates a compaction task
+   * for them. The task is put into the compactionTaskQueue of the {@link 
CompactionTaskManager}.
+   *
+   * @return Returns whether the file was found and submits the merge task
+   */
   @Override
   public boolean selectAndSubmit() {
     boolean taskSubmitted = false;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
index bda4cc4..0d4df93 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
@@ -130,6 +130,17 @@ public class MaxFileMergeFileSelector implements 
ICrossSpaceMergeFileSelector {
     return new List[] {selectedSeqFiles, selectedUnseqFiles};
   }
 
+  /**
+   * In a preset time (30 seconds), for each unseqFile, find the list of 
seqFiles that overlap with
+   * it and have not been selected by the file selector of this compaction 
task. After finding each
+   * unseqFile and its corresponding overlap seqFile list, estimate the 
additional memory overhead
+   * that may be added by compacting them (preferably using the loop 
estimate), and if it does not
+   * exceed the memory overhead preset by the system for the compaction 
thread, put them into the
+   * selectedSeqFiles and selectedUnseqFiles.
+   *
+   * @param useTightBound whether is tight estimate or loop estimate
+   * @throws IOException
+   */
   void select(boolean useTightBound) throws IOException {
     tmpSelectedSeqFiles = new HashSet<>();
     seqSelected = new boolean[resource.getSeqFiles().size()];
@@ -221,6 +232,15 @@ public class MaxFileMergeFileSelector implements 
ICrossSpaceMergeFileSelector {
     return isClosedAndNotMerging;
   }
 
+  /**
+   * Put the index of the seqFile that has an overlap with the specific 
unseqFile and has not been
+   * selected by the file selector of the compaction task into the 
tmpSelectedSeqFiles list. To
+   * determine whether overlap exists is to traverse each device ChunkGroup in 
unseqFiles, and
+   * determine whether it overlaps with the same device ChunkGroup of each 
seqFile that are not
+   * selected by the compaction task, if so, select this seqFile.
+   *
+   * @param unseqFile the tsFileResource of unseqFile to be compacted
+   */
   private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
     int tmpSelectedNum = 0;
     for (String deviceId : unseqFile.getDevices()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index db665bd..0786503 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -72,6 +72,14 @@ public class SizeTieredCompactionSelector extends 
AbstractInnerSpaceCompactionSe
         taskFactory);
   }
 
+  /**
+   * This method searches for a batch of files to be compacted from layer 0 to 
the highest layer. If
+   * there are more than a batch of files to be merged on a certain layer, it 
does not search to
+   * higher layers. It creates a compaction thread for each batch of files and 
put it into the
+   * candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
+   *
+   * @return Returns whether the file was found and submits the merge task
+   */
   @Override
   public boolean selectAndSubmit() {
     LOGGER.debug(
@@ -105,6 +113,20 @@ public class SizeTieredCompactionSelector extends 
AbstractInnerSpaceCompactionSe
     return true;
   }
 
+  /**
+   * This method searches for all files on the given level. If there are 
consecutive files on the
+   * level that meet the system preset conditions (the number exceeds 10 or 
the total file size
+   * exceeds 2G), a compaction task is created for the batch of files and 
placed in the
+   * taskPriorityQueue queue , and continue to search for the next batch. If 
at least one batch of
+   * files to be compacted is found on this layer, it will return false 
(indicating that it will no
+   * longer search for higher layers), otherwise it will return true.
+   *
+   * @param level the level to be searched
+   * @param taskPriorityQueue it stores the batches of files to be compacted 
and the total size of
+   *     each batch
+   * @return return whether to continue the search to higher levels
+   * @throws IOException
+   */
   private boolean selectLevelTask(
       int level, PriorityQueue<Pair<List<TsFileResource>, Long>> 
taskPriorityQueue)
       throws IOException {
@@ -113,7 +135,6 @@ public class SizeTieredCompactionSelector extends 
AbstractInnerSpaceCompactionSe
     long selectedFileSize = 0L;
     long targetCompactionFileSize = config.getTargetCompactionFileSize();
 
-    // this iterator traverses the list in reverse order
     for (TsFileResource currentFile : tsFileResources) {
       TsFileNameGenerator.TsFileName currentName =
           TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());

Reply via email to