This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7eca660 [IOTDB-2024] Add annotation to compaction (#4523)
7eca660 is described below
commit 7eca6605bdf59fe9798d0da8d0ae67b903088b9c
Author: 周沛辰 <[email protected]>
AuthorDate: Fri Dec 3 15:41:07 2021 +0800
[IOTDB-2024] Add annotation to compaction (#4523)
---
.../engine/compaction/CompactionTaskManager.java | 16 ++++++++++++++-
.../cross/inplace/InplaceCompactionSelector.java | 8 ++++++++
.../inplace/selector/MaxFileMergeFileSelector.java | 20 +++++++++++++++++++
.../sizetiered/SizeTieredCompactionSelector.java | 23 +++++++++++++++++++++-
4 files changed, 65 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 959b9fc..8eac781 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -52,16 +52,25 @@ import java.util.concurrent.atomic.AtomicInteger;
public class CompactionTaskManager implements IService {
private static final Logger logger = LoggerFactory.getLogger("COMPACTION");
private static final CompactionTaskManager INSTANCE = new
CompactionTaskManager();
+
+ // The thread pool that executes the compaction task. The default number of
threads for this pool
+ // is 10.
private WrappedScheduledExecutorService taskExecutionPool;
public static volatile AtomicInteger currentTaskNum = new AtomicInteger(0);
- // TODO: record the task in time partition
private MinMaxPriorityQueue<AbstractCompactionTask>
candidateCompactionTaskQueue =
MinMaxPriorityQueue.orderedBy(new
CompactionTaskComparator()).maximumSize(1000).create();
+ // <logicalStorageGroupName,futureSet>, it is used to terminate all
compaction tasks under the
+ // logicalStorageGroup
private Map<String, Set<Future<Void>>> storageGroupTasks = new
ConcurrentHashMap<>();
private Map<String, Map<Long, Set<Future<Void>>>> compactionTaskFutures =
new ConcurrentHashMap<>();
private List<AbstractCompactionTask> runningCompactionTaskList = new
ArrayList<>();
+
+ // The thread pool that periodically fetches and executes the compaction
task from
+ // candidateCompactionTaskQueue to taskExecutionPool. The default number of
threads for this pool
+ // is 1.
private ScheduledExecutorService compactionTaskSubmissionThreadPool;
+
private final long TASK_SUBMIT_INTERVAL =
IoTDBDescriptor.getInstance().getConfig().getCompactionSubmissionInterval();
@@ -81,6 +90,11 @@ public class CompactionTaskManager implements IService {
currentTaskNum = new AtomicInteger(0);
compactionTaskSubmissionThreadPool =
IoTDBThreadPoolFactory.newScheduledThreadPool(1,
ThreadName.COMPACTION_SERVICE.getName());
+
+ // Periodically do the following: fetch the highest priority thread from
the
+ // candidateCompactionTaskQueue, check that all tsfiles in the
compaction task are valid, and
+ // if there is thread space available in the taskExecutionPool, put the
compaction task thread
+ // into the taskExecutionPool and perform the compaction.
compactionTaskSubmissionThreadPool.scheduleWithFixedDelay(
this::submitTaskFromTaskQueue,
TASK_SUBMIT_INTERVAL,
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
index dcc494c..6190fb4 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/InplaceCompactionSelector.java
@@ -62,6 +62,14 @@ public class InplaceCompactionSelector extends
AbstractCrossSpaceCompactionSelec
taskFactory);
}
+ /**
+ * This method creates a specific file selector according to the file
selection strategy of
+ * crossSpace compaction, uses the file selector to select all unseqFiles
and seqFiles to be
+ * compacted under the time partition of the virtual storage group, and
creates a compaction task
+ * for them. The task is put into the compactionTaskQueue of the {@link
CompactionTaskManager}.
+ *
+ * @return Returns whether the file was found and submits the merge task
+ */
@Override
public boolean selectAndSubmit() {
boolean taskSubmitted = false;
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
index bda4cc4..0d4df93 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/inplace/selector/MaxFileMergeFileSelector.java
@@ -130,6 +130,17 @@ public class MaxFileMergeFileSelector implements
ICrossSpaceMergeFileSelector {
return new List[] {selectedSeqFiles, selectedUnseqFiles};
}
+ /**
+ * In a preset time (30 seconds), for each unseqFile, find the list of
seqFiles that overlap with
+ * it and have not been selected by the file selector of this compaction
task. After finding each
+ * unseqFile and its corresponding overlap seqFile list, estimate the
additional memory overhead
+ * that may be added by compacting them (preferably using the loop
estimate), and if it does not
+ * exceed the memory overhead preset by the system for the compaction
thread, put them into the
+ * selectedSeqFiles and selectedUnseqFiles.
+ *
+ * @param useTightBound whether is tight estimate or loop estimate
+ * @throws IOException
+ */
void select(boolean useTightBound) throws IOException {
tmpSelectedSeqFiles = new HashSet<>();
seqSelected = new boolean[resource.getSeqFiles().size()];
@@ -221,6 +232,15 @@ public class MaxFileMergeFileSelector implements
ICrossSpaceMergeFileSelector {
return isClosedAndNotMerging;
}
+ /**
+ * Put the index of the seqFile that has an overlap with the specific
unseqFile and has not been
+ * selected by the file selector of the compaction task into the
tmpSelectedSeqFiles list. To
+ * determine whether overlap exists is to traverse each device ChunkGroup in
unseqFiles, and
+ * determine whether it overlaps with the same device ChunkGroup of each
seqFile that are not
+ * selected by the compaction task, if so, select this seqFile.
+ *
+ * @param unseqFile the tsFileResource of unseqFile to be compacted
+ */
private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
int tmpSelectedNum = 0;
for (String deviceId : unseqFile.getDevices()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
index db665bd..0786503 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/sizetiered/SizeTieredCompactionSelector.java
@@ -72,6 +72,14 @@ public class SizeTieredCompactionSelector extends
AbstractInnerSpaceCompactionSe
taskFactory);
}
+ /**
+ * This method searches for a batch of files to be compacted from layer 0 to
the highest layer. If
+ * there are more than a batch of files to be merged on a certain layer, it
does not search to
+ * higher layers. It creates a compaction thread for each batch of files and
put it into the
+ * candidateCompactionTaskQueue of the {@link CompactionTaskManager}.
+ *
+ * @return Returns whether the file was found and submits the merge task
+ */
@Override
public boolean selectAndSubmit() {
LOGGER.debug(
@@ -105,6 +113,20 @@ public class SizeTieredCompactionSelector extends
AbstractInnerSpaceCompactionSe
return true;
}
+ /**
+ * This method searches for all files on the given level. If there are
consecutive files on the
+ * level that meet the system preset conditions (the number exceeds 10 or
the total file size
+ * exceeds 2G), a compaction task is created for the batch of files and
placed in the
+ * taskPriorityQueue queue , and continue to search for the next batch. If
at least one batch of
+ * files to be compacted is found on this layer, it will return false
(indicating that it will no
+ * longer search for higher layers), otherwise it will return true.
+ *
+ * @param level the level to be searched
+ * @param taskPriorityQueue it stores the batches of files to be compacted
and the total size of
+ * each batch
+ * @return return whether to continue the search to higher levels
+ * @throws IOException
+ */
private boolean selectLevelTask(
int level, PriorityQueue<Pair<List<TsFileResource>, Long>>
taskPriorityQueue)
throws IOException {
@@ -113,7 +135,6 @@ public class SizeTieredCompactionSelector extends
AbstractInnerSpaceCompactionSe
long selectedFileSize = 0L;
long targetCompactionFileSize = config.getTargetCompactionFileSize();
- // this iterator traverses the list in reverse order
for (TsFileResource currentFile : tsFileResources) {
TsFileNameGenerator.TsFileName currentName =
TsFileNameGenerator.getTsFileName(currentFile.getTsFile().getName());