This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/refine_cross_selection
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/xingtanzjr/refine_cross_selection by this push:
new dbf2cedd80 spotless
dbf2cedd80 is described below
commit dbf2cedd807ddb6751f9fc812c2b9adba336826b
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed Dec 21 23:45:40 2022 +0800
spotless
---
.../cross/rewrite/CrossCompactionTaskResource.java | 6 ++-
.../rewrite/CrossSpaceCompactionCandidate.java | 30 ++++++++++----
.../RewriteCrossSpaceCompactionSelector.java | 48 ++++++++++++++--------
.../cross/RewriteCompactionFileSelectorTest.java | 3 +-
4 files changed, 58 insertions(+), 29 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossCompactionTaskResource.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossCompactionTaskResource.java
index 3664ca6492..8be9220209 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossCompactionTaskResource.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossCompactionTaskResource.java
@@ -44,7 +44,8 @@ public class CrossCompactionTaskResource {
return seqFiles;
}
- public void putResources(TsFileResource unseqFile, List<TsFileResource>
seqFiles, long memoryCost) {
+ public void putResources(
+ TsFileResource unseqFile, List<TsFileResource> seqFiles, long
memoryCost) {
addUnseqFile(unseqFile);
addTargetSeqFiles(seqFiles);
increaseMemoryCost(memoryCost);
@@ -90,7 +91,8 @@ public class CrossCompactionTaskResource {
}
public boolean isValid() {
- // Regarding current implementation of cross compaction task, the
unseqFiles and seqFiles should not be empty.
+ // Regarding current implementation of cross compaction task, the
unseqFiles and seqFiles should
+ // not be empty.
// It should be changed once the task execution is optimized.
// See https://issues.apache.org/jira/browse/IOTDB-5263
return !unseqFiles.isEmpty() && !seqFiles.isEmpty();
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossSpaceCompactionCandidate.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossSpaceCompactionCandidate.java
index 285de9430a..4ff2759af4 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossSpaceCompactionCandidate.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/CrossSpaceCompactionCandidate.java
@@ -87,7 +87,9 @@ public class CrossSpaceCompactionCandidate {
}
}
if (unseqDeviceInfo.endTime <= seqDeviceInfo.endTime) {
- // When scanning the target seqFiles for unseqFile, we traverse them
one by one no matter whether it is selected or not. But we only add the
unselected seqFiles to next split to avoid duplication selection
+ // When scanning the target seqFiles for unseqFile, we traverse them
one by one no matter
+ // whether it is selected or not. But we only add the unselected
seqFiles to next split to
+ // avoid duplication selection
if (!seqFile.selected) {
ret.add(seqFile);
}
@@ -102,7 +104,8 @@ public class CrossSpaceCompactionCandidate {
}
}
}
- // mark candidates in next split as selected even though it may not be
added to the final TaskResource
+ // mark candidates in next split as selected even though it may not be
added to the final
+ // TaskResource
unseqFile.markAsSelected();
for (TsFileResourceCandidate fileCandidate : ret) {
fileCandidate.markAsSelected();
@@ -113,7 +116,7 @@ public class CrossSpaceCompactionCandidate {
private List<TsFileResourceCandidate> copySeqResource(List<TsFileResource>
seqFiles) {
List<TsFileResourceCandidate> ret = new ArrayList<>();
- for(TsFileResource resource : seqFiles) {
+ for (TsFileResource resource : seqFiles) {
ret.add(new TsFileResourceCandidate(resource));
}
return ret;
@@ -128,7 +131,9 @@ public class CrossSpaceCompactionCandidate {
private List<TsFileResourceCandidate>
filterUnseqResource(List<TsFileResource> unseqResources) {
List<TsFileResourceCandidate> ret = new ArrayList<>();
for (TsFileResource resource : unseqResources) {
- if (resource.getStatus() != TsFileResourceStatus.CLOSED ||
!resource.getTsFile().exists() || resource.isDeleted()) {
+ if (resource.getStatus() != TsFileResourceStatus.CLOSED
+ || !resource.getTsFile().exists()
+ || resource.isDeleted()) {
break;
} else if (!resource.isDeleted() && resource.stillLives(ttlLowerBound)) {
ret.add(new TsFileResourceCandidate(resource));
@@ -138,7 +143,9 @@ public class CrossSpaceCompactionCandidate {
}
public List<TsFileResource> getSeqFiles() {
- return seqFiles.stream().map(tsFileResourceCandidate ->
tsFileResourceCandidate.resource).collect(Collectors.toList());
+ return seqFiles.stream()
+ .map(tsFileResourceCandidate -> tsFileResourceCandidate.resource)
+ .collect(Collectors.toList());
}
public List<TsFileResourceCandidate> getUnseqFileCandidates() {
@@ -146,14 +153,17 @@ public class CrossSpaceCompactionCandidate {
}
public List<TsFileResource> getUnseqFiles() {
- return unseqFiles.stream().map(tsFileResourceCandidate ->
tsFileResourceCandidate.resource).collect(Collectors.toList());
+ return unseqFiles.stream()
+ .map(tsFileResourceCandidate -> tsFileResourceCandidate.resource)
+ .collect(Collectors.toList());
}
protected static class CrossCompactionTaskResourceSplit {
protected TsFileResourceCandidate unseqFile;
protected List<TsFileResourceCandidate> seqFiles;
- public CrossCompactionTaskResourceSplit(TsFileResourceCandidate unseqFile,
List<TsFileResourceCandidate> seqFiles) {
+ public CrossCompactionTaskResourceSplit(
+ TsFileResourceCandidate unseqFile, List<TsFileResourceCandidate>
seqFiles) {
this.unseqFile = unseqFile;
this.seqFiles = seqFiles;
}
@@ -163,10 +173,12 @@ public class CrossSpaceCompactionCandidate {
protected TsFileResource resource;
protected boolean selected;
protected boolean isValidCandidate;
+
protected TsFileResourceCandidate(TsFileResource tsFileResource) {
this.resource = tsFileResource;
this.selected = false;
- // although we do the judgement here, the task should be validated
before executing because the status of file may be changed after the task is
submitted to queue
+ // although we do the judgement here, the task should be validated
before executing because
+ // the status of file may be changed after the task is submitted to queue
this.isValidCandidate = tsFileResource.isClosed() &&
tsFileResource.getTsFile().exists();
}
@@ -192,4 +204,4 @@ public class CrossSpaceCompactionCandidate {
protected long startTime;
protected long endTime;
}
-}
\ No newline at end of file
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
index bf55a199dc..ca21afe699 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/RewriteCrossSpaceCompactionSelector.java
@@ -23,21 +23,19 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.cross.ICrossSpaceSelector;
+import
org.apache.iotdb.db.engine.compaction.cross.rewrite.CrossSpaceCompactionCandidate.CrossCompactionTaskResourceSplit;
+import
org.apache.iotdb.db.engine.compaction.cross.rewrite.CrossSpaceCompactionCandidate.TsFileResourceCandidate;
import
org.apache.iotdb.db.engine.compaction.cross.utils.AbstractCompactionEstimator;
import org.apache.iotdb.db.engine.compaction.task.ICompactionSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
-import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.rescon.SystemInfo;
-import
org.apache.iotdb.db.engine.compaction.cross.rewrite.CrossSpaceCompactionCandidate.TsFileResourceCandidate;
-import
org.apache.iotdb.db.engine.compaction.cross.rewrite.CrossSpaceCompactionCandidate.CrossCompactionTaskResourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -108,7 +106,8 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
* @return two lists of TsFileResource, the former is selected seqFiles and
the latter is selected
* unseqFiles or an empty array if there are no proper candidates by the
budget.
*/
- private CrossCompactionTaskResource
selectOneTaskResources(CrossSpaceCompactionCandidate candidate) throws
MergeException {
+ private CrossCompactionTaskResource selectOneTaskResources(
+ CrossSpaceCompactionCandidate candidate) throws MergeException {
long startTime = System.currentTimeMillis();
try {
LOGGER.debug(
@@ -153,26 +152,39 @@ public class RewriteCrossSpaceCompactionSelector
implements ICrossSpaceSelector
* exceed the memory overhead preset by the system for the compaction
thread, put them into the
* selectedSeqFiles and selectedUnseqFiles.
*/
- private CrossCompactionTaskResource
executeTaskResourceSelection(CrossSpaceCompactionCandidate candidate) throws
IOException {
+ private CrossCompactionTaskResource executeTaskResourceSelection(
+ CrossSpaceCompactionCandidate candidate) throws IOException {
CrossCompactionTaskResource taskResource = new
CrossCompactionTaskResource();
while (candidate.hasNextSplit()) {
CrossCompactionTaskResourceSplit split = candidate.nextSplit();
TsFileResource unseqFile = split.unseqFile.resource;
- List<TsFileResource> targetSeqFiles = split.seqFiles.stream().map(c ->
c.resource).collect(Collectors.toList());
- long memoryCost =
compactionEstimator.estimateCrossCompactionMemory(targetSeqFiles, unseqFile);
+ List<TsFileResource> targetSeqFiles =
+ split.seqFiles.stream().map(c ->
c.resource).collect(Collectors.toList());
+ long memoryCost =
+ compactionEstimator.estimateCrossCompactionMemory(targetSeqFiles,
unseqFile);
if (!canAddToTaskResource(taskResource, unseqFile, targetSeqFiles,
memoryCost)) {
break;
}
taskResource.putResources(unseqFile, targetSeqFiles, memoryCost);
- LOGGER.debug("Adding a new unseqFile {} and seqFiles {} as candidates,
new cost {}, total cost {}", unseqFile, targetSeqFiles, memoryCost, totalCost);
+ LOGGER.debug(
+ "Adding a new unseqFile {} and seqFiles {} as candidates, new cost
{}, total cost {}",
+ unseqFile,
+ targetSeqFiles,
+ memoryCost,
+ totalCost);
}
return taskResource;
}
// TODO: (xingtanzjr) need to confirm whether we should strictly guarantee
the conditions
- // If we guarantee the condition strictly, the smallest collection of cross
task resource may not satisfied
- private boolean canAddToTaskResource(CrossCompactionTaskResource
taskResource, TsFileResource unseqFile, List<TsFileResource> seqFiles, long
memoryCost) {
+ // If we guarantee the condition strictly, the smallest collection of cross
task resource may not
+ // satisfied
+ private boolean canAddToTaskResource(
+ CrossCompactionTaskResource taskResource,
+ TsFileResource unseqFile,
+ List<TsFileResource> seqFiles,
+ long memoryCost) {
long totalFileSize = unseqFile.getTsFileSize();
for (TsFileResource f : seqFiles) {
totalFileSize += f.getTsFileSize();
@@ -186,7 +198,8 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
}
private boolean canSubmitCrossTask() {
- return config.isEnableCrossSpaceCompaction() &&
(CompactionTaskManager.currentTaskNum.get() <
config.getCompactionThreadCount());
+ return config.isEnableCrossSpaceCompaction()
+ && (CompactionTaskManager.currentTaskNum.get() <
config.getCompactionThreadCount());
}
/**
@@ -208,14 +221,16 @@ public class RewriteCrossSpaceCompactionSelector
implements ICrossSpaceSelector
}
// TODO: (xingtanzjr) need to confirm what this ttl is used for
long ttlLowerBound = System.currentTimeMillis() - Long.MAX_VALUE;
- // we record the variable `candidate` here is used for selecting more than
one CrossCompactionTaskResources in this method
- CrossSpaceCompactionCandidate candidate = new
CrossSpaceCompactionCandidate(sequenceFileList, unsequenceFileList,
ttlLowerBound);
+ // we record the variable `candidate` here is used for selecting more than
one
+ // CrossCompactionTaskResources in this method
+ CrossSpaceCompactionCandidate candidate =
+ new CrossSpaceCompactionCandidate(sequenceFileList,
unsequenceFileList, ttlLowerBound);
try {
CrossCompactionTaskResource taskResources =
selectOneTaskResources(candidate);
if (!taskResources.isValid()) {
LOGGER.info(
- "{} [Compaction] Cannot select any files, because source files
may be occupied by other compaction threads.",
- logicalStorageGroupName + "-" + dataRegionId);
+ "{} [Compaction] Cannot select any files, because source files may
be occupied by other compaction threads.",
+ logicalStorageGroupName + "-" + dataRegionId);
return Collections.emptyList();
}
@@ -226,7 +241,6 @@ public class RewriteCrossSpaceCompactionSelector implements
ICrossSpaceSelector
taskResources.getUnseqFiles().size());
return Collections.singletonList(taskResources);
-
} catch (MergeException e) {
LOGGER.error("{} cannot select file for cross space compaction",
logicalStorageGroupName, e);
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
index c8683bfa53..3bed15c076 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCompactionFileSelectorTest.java
@@ -299,7 +299,8 @@ public class RewriteCompactionFileSelectorTest extends
MergeTest {
prepareFile(unseqList.get(1), 0, 100, 20);
prepareFile(unseqList.get(2), 99, 1, 30);
- CrossSpaceCompactionCandidate resource = new
CrossSpaceCompactionCandidate(seqList, unseqList);
+ CrossSpaceCompactionCandidate resource =
+ new CrossSpaceCompactionCandidate(seqList, unseqList);
// the budget is enough to select unseq0 and unseq2, but not unseq1
// the first selection should only contain seq0 and unseq0
long originMemoryBudget =
SystemInfo.getInstance().getMemorySizeForCompaction();