This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 87a416e6d2 [IOTDB-4666]Get the devices on one leaf node at a time
while compacting (#7646)
87a416e6d2 is described below
commit 87a416e6d2b10b7d269f0985ef51b90dd398280e
Author: 周沛辰 <[email protected]>
AuthorDate: Wed Nov 9 11:55:30 2022 +0800
[IOTDB-4666]Get the devices on one leaf node at a time while compacting
(#7646)
---
.../inner/utils/MultiTsFileDeviceIterator.java | 4 +-
.../engine/compaction/AbstractCompactionTest.java | 21 +++
.../ReadPointCompactionPerformerTest.java | 153 ++++++++++-----------
.../utils/MultiTsFileDeviceIteratorTest.java | 120 ++++++++++++++++
.../iotdb/tsfile/read/TsFileDeviceIterator.java | 34 ++++-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 96 +++++++++++--
.../iotdb/tsfile/utils/TsFileGeneratorUtils.java | 2 +-
7 files changed, 327 insertions(+), 103 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
index 365fe4e916..a70f844511 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
@@ -49,7 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class MultiTsFileDeviceIterator implements AutoCloseable {
- // sorted from the newest to the oldest
+
private final List<TsFileResource> tsFileResources;
private final Map<TsFileResource, TsFileSequenceReader> readerMap = new
HashMap<>();
private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap =
new HashMap<>();
@@ -59,6 +59,7 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
/** Used for inner space compaction. */
public MultiTsFileDeviceIterator(List<TsFileResource> tsFileResources)
throws IOException {
this.tsFileResources = new ArrayList<>(tsFileResources);
+ // sort the files from the oldest to the newest
Collections.sort(this.tsFileResources, TsFileResource::compareFileName);
try {
for (TsFileResource tsFileResource : this.tsFileResources) {
@@ -81,6 +82,7 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
List<TsFileResource> seqResources, List<TsFileResource> unseqResources)
throws IOException {
this.tsFileResources = new ArrayList<>(seqResources);
tsFileResources.addAll(unseqResources);
+ // sort the files from the newest to the oldest
Collections.sort(this.tsFileResources,
TsFileResource::compareFileNameByDesc);
for (TsFileResource tsFileResource : tsFileResources) {
TsFileSequenceReader reader =
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
index 9b69007ea5..cdd863ff22 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/AbstractCompactionTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.engine.compaction;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
@@ -25,6 +26,7 @@ import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
+import
org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -37,6 +39,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
import org.apache.commons.io.FileUtils;
@@ -45,7 +48,9 @@ import org.junit.Assert;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
@@ -69,6 +74,9 @@ public class AbstractCompactionTest {
private static final int oldMaxCrossCompactionFileNum =
IoTDBDescriptor.getInstance().getConfig().getMaxCrossCompactionCandidateFileNum();
+ private final int oldMaxDegreeOfIndexNode =
+ TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode();
+
protected static File STORAGE_GROUP_DIR =
new File(
TestConstant.BASE_OUTPUT_PATH
@@ -339,6 +347,7 @@ public class AbstractCompactionTest {
.setMaxCrossCompactionCandidateFileNum(oldMaxCrossCompactionFileNum);
TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(oldChunkGroupSize);
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(oldPagePointSize);
+
TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(oldMaxDegreeOfIndexNode);
EnvironmentUtils.cleanAllDir();
if (SEQ_DIRS.exists()) {
FileUtils.deleteDirectory(SEQ_DIRS);
@@ -371,6 +380,18 @@ public class AbstractCompactionTest {
}
}
+ protected void generateModsFile(
+ List<String> seriesPaths, List<TsFileResource> resources, long
startValue, long endValue)
+ throws IllegalPathException, IOException {
+ for (TsFileResource resource : resources) {
+ Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
+ for (String path : seriesPaths) {
+ deleteMap.put(path, new Pair<>(startValue, endValue));
+ }
+ CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
+ }
+ }
+
protected void setDataType(TSDataType dataType) {
this.dataType = dataType;
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
index 1771931f65..abfbf6a20e 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/ReadPointCompactionPerformerTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.engine.compaction;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
@@ -73,12 +72,15 @@ import static org.junit.Assert.assertEquals;
public class ReadPointCompactionPerformerTest extends AbstractCompactionTest {
private final String oldThreadName = Thread.currentThread().getName();
+ private final ICompactionPerformer performer = new
ReadPointCompactionPerformer();
+
@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException,
InterruptedException {
super.setUp();
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1024);
Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
+ TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(2);
}
@After
@@ -131,8 +133,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources,
true);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -222,8 +224,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources,
true);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -328,8 +330,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources,
false);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -436,8 +438,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources,
false);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -575,8 +577,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources,
false);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -703,8 +705,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources,
false);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -817,8 +819,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources,
false);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -901,8 +903,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources,
true);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -1012,8 +1014,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources,
true);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -1133,8 +1135,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources,
true);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -1260,8 +1262,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources,
false);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -1436,8 +1438,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources,
false);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -1593,8 +1595,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources,
false);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -1702,8 +1704,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(unseqResources,
false);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, true, COMPACTION_TEST_SG);
@@ -1798,8 +1800,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -1914,8 +1916,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -2112,8 +2114,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -2307,8 +2309,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -2492,8 +2494,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -2667,8 +2669,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -2841,8 +2843,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -2971,8 +2973,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -3175,8 +3177,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -3414,8 +3416,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -3530,8 +3532,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -3656,8 +3658,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -3796,8 +3798,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -3915,8 +3917,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -4068,8 +4070,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -4279,8 +4281,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -4548,8 +4550,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -4832,8 +4834,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -4943,8 +4945,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -5072,8 +5074,8 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
List<TsFileResource> targetResources =
CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources);
- ICompactionPerformer performer =
- new ReadPointCompactionPerformer(seqResources, unseqResources,
targetResources);
+ performer.setTargetFiles(targetResources);
+ performer.setSourceFiles(seqResources, unseqResources);
performer.setSummary(new CompactionTaskSummary());
performer.perform();
CompactionUtils.moveTargetFile(targetResources, false, COMPACTION_TEST_SG);
@@ -5150,7 +5152,6 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
COMPACTION_TEST_SG + PATH_SEPARATOR + "d1000" + i +
PATH_SEPARATOR + "s" + j,
iterator.currentTime());
count++;
- System.out.println(iterator.currentTime());
iterator.next();
}
}
@@ -5341,18 +5342,6 @@ public class ReadPointCompactionPerformerTest extends
AbstractCompactionTest {
}
}
- private void generateModsFile(
- List<String> seriesPaths, List<TsFileResource> resources, long
startValue, long endValue)
- throws IllegalPathException, IOException {
- for (TsFileResource resource : resources) {
- Map<String, Pair<Long, Long>> deleteMap = new HashMap<>();
- for (String path : seriesPaths) {
- deleteMap.put(path, new Pair<>(startValue, endValue));
- }
- CompactionFileGeneratorUtils.generateMods(deleteMap, resource, false);
- }
- }
-
/**
* Check whether target file contain empty chunk group or not. Assert fail
if it contains empty
* chunk group whose deviceID is not in the deviceIdList.
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/MultiTsFileDeviceIteratorTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/MultiTsFileDeviceIteratorTest.java
new file mode 100644
index 0000000000..0e699da1e2
--- /dev/null
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/MultiTsFileDeviceIteratorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.compaction.utils;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.AbstractCompactionTest;
+import
org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MultiTsFileDeviceIteratorTest extends AbstractCompactionTest {
+
+ @Before
+ public void setUp()
+ throws IOException, WriteProcessException, MetadataException,
InterruptedException {
+ super.setUp();
+ IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(1024);
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ for (TsFileResource tsFileResource : seqResources) {
+
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
+ }
+ for (TsFileResource tsFileResource : unseqResources) {
+
FileReaderManager.getInstance().closeFileAndRemoveReader(tsFileResource.getTsFilePath());
+ }
+ }
+
+ @Test
+ public void
getNonAlignedDevicesFromDifferentFilesWithFourLayersInNodeTreeTest()
+ throws MetadataException, IOException, WriteProcessException {
+ TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(3);
+ registerTimeseriesInMManger(30, 3, false);
+ createFiles(3, 10, 3, 100, 0, 0, 50, 50, false, true);
+ createFiles(4, 5, 3, 100, 1000, 0, 50, 50, false, true);
+ createFiles(2, 15, 3, 100, 1000, 0, 50, 50, false, false);
+ createFiles(3, 30, 3, 100, 1000, 0, 50, 50, false, false);
+
+ // sort the deviceId in lexicographical order from small to large
+ List<String> deviceIds = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ deviceIds.add("root.testsg.d" + i);
+ }
+ deviceIds.sort(String::compareTo);
+
+ int deviceNum = 0;
+ try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
+ new MultiTsFileDeviceIterator(seqResources, unseqResources)) {
+ while (multiTsFileDeviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
+ Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
+ Assert.assertFalse(deviceInfo.right);
+ deviceNum++;
+ }
+ }
+ Assert.assertEquals(30, deviceNum);
+ }
+
+ @Test
+ public void getAlignedDevicesFromDifferentFilesWithOneLayerInNodeTreeTest()
+ throws MetadataException, IOException, WriteProcessException {
+ registerTimeseriesInMManger(30, 3, false);
+ createFiles(3, 10, 3, 100, 0, 0, 50, 50, true, true);
+ createFiles(4, 5, 3, 100, 1000, 0, 50, 50, true, true);
+ createFiles(2, 15, 3, 100, 1000, 0, 50, 50, true, false);
+ createFiles(3, 30, 3, 100, 1000, 0, 50, 50, true, false);
+
+ // sort the deviceId in lexicographical order from small to large
+ List<String> deviceIds = new ArrayList<>();
+ for (int i = 0; i < 30; i++) {
+ deviceIds.add("root.testsg.d" + (i +
TsFileGeneratorUtils.getAlignDeviceOffset()));
+ }
+ deviceIds.sort(String::compareTo);
+
+ int deviceNum = 0;
+ try (MultiTsFileDeviceIterator multiTsFileDeviceIterator =
+ new MultiTsFileDeviceIterator(seqResources, unseqResources)) {
+ while (multiTsFileDeviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> deviceInfo =
multiTsFileDeviceIterator.nextDevice();
+ Assert.assertEquals(deviceIds.get(deviceNum), deviceInfo.left);
+ Assert.assertTrue(deviceInfo.right);
+ deviceNum++;
+ }
+ }
+ Assert.assertEquals(30, deviceNum);
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
index 562a3bb493..a487d55b2a 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
@@ -25,19 +25,28 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
public class TsFileDeviceIterator implements Iterator<Pair<String, Boolean>> {
private final TsFileSequenceReader reader;
- private final Queue<Pair<String, Pair<Long, Long>>> queue;
+
+ // device -> firstMeasurmentNode offset
+ private final Queue<Pair<String, long[]>> queue;
private Pair<String, Boolean> currentDevice = null;
private MetadataIndexNode measurementNode;
+ // <startOffset, endOffset>, device leaf node offset in this file
+ private final List<long[]> leafDeviceNodeOffsetList;
+
public TsFileDeviceIterator(
- TsFileSequenceReader reader, Queue<Pair<String, Pair<Long, Long>>>
queue) {
+ TsFileSequenceReader reader,
+ List<long[]> leafDeviceNodeOffsetList,
+ Queue<Pair<String, long[]>> queue) {
this.reader = reader;
this.queue = queue;
+ this.leafDeviceNodeOffsetList = leafDeviceNodeOffsetList;
}
public Pair<String, Boolean> current() {
@@ -46,7 +55,22 @@ public class TsFileDeviceIterator implements
Iterator<Pair<String, Boolean>> {
@Override
public boolean hasNext() {
- return !queue.isEmpty();
+ if (!queue.isEmpty()) {
+ return true;
+ } else if (leafDeviceNodeOffsetList.size() == 0) {
+ // device queue is empty and all device leaf node has been read
+ return false;
+ } else {
+ // queue is empty but there are still some devices on leaf node not
being read yet
+ long[] nextDeviceLeafNodeOffset = leafDeviceNodeOffsetList.remove(0);
+ try {
+ reader.getDevicesAndEntriesOfOneLeafNode(
+ nextDeviceLeafNodeOffset[0], nextDeviceLeafNodeOffset[1], queue);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return true;
+ }
}
@Override
@@ -54,12 +78,12 @@ public class TsFileDeviceIterator implements
Iterator<Pair<String, Boolean>> {
if (!hasNext()) {
throw new NoSuchElementException();
}
- Pair<String, Pair<Long, Long>> startEndPair = queue.remove();
+ Pair<String, long[]> startEndPair = queue.remove();
try {
// get the first measurment node of this device, to know if the device
is alignd
this.measurementNode =
MetadataIndexNode.deserializeFrom(
- reader.readData(startEndPair.right.left,
startEndPair.right.right));
+ reader.readData(startEndPair.right[0], startEndPair.right[1]));
boolean isAligned = reader.isAlignedDevice(measurementNode);
currentDevice = new Pair<>(startEndPair.left, isAligned);
return currentDevice;
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index e713f5c9cb..8927c43bce 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -634,37 +634,105 @@ public class TsFileSequenceReader implements
AutoCloseable {
/**
* @return an iterator of "device, isAligned" list, in which names of
devices are ordered in
- * dictionary order, and isAligned represents whether the device is
aligned
+ * dictionary order, and isAligned represents whether the device is
aligned. Only read devices
+ * on one device leaf node each time to save memory.
*/
public TsFileDeviceIterator getAllDevicesIteratorWithIsAligned() throws
IOException {
readFileMetadata();
-
+ Queue<Pair<String, long[]>> queue = new LinkedList<>();
+ List<long[]> leafDeviceNodeOffsets = new ArrayList<>();
MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
- Queue<Pair<String, Pair<Long, Long>>> queue = new LinkedList<>();
- getAllDevicesWithIsAligned(metadataIndexNode, queue);
+ if
(metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
+ // the first node of index tree is device leaf node, then get the
devices directly
+ getDevicesOfLeafNode(metadataIndexNode, queue);
+ } else {
+ // get all device leaf node offset
+ getAllDeviceLeafNodeOffset(metadataIndexNode, leafDeviceNodeOffsets);
+ }
- return new TsFileDeviceIterator(this, queue);
+ return new TsFileDeviceIterator(this, leafDeviceNodeOffsets, queue);
}
- private void getAllDevicesWithIsAligned(
- MetadataIndexNode metadataIndexNode, Queue<Pair<String, Pair<Long,
Long>>> queue)
+ /**
+ * Get devices and first measurement node offset.
+ *
+ * @param startOffset start offset of device leaf node
+ * @param endOffset end offset of device leaf node
+ * @param measurementNodeOffsetQueue device -> first measurement node offset
+ */
+ public void getDevicesAndEntriesOfOneLeafNode(
+ Long startOffset, Long endOffset, Queue<Pair<String, long[]>>
measurementNodeOffsetQueue)
throws IOException {
try {
- int metadataIndexListSize = metadataIndexNode.getChildren().size();
+ ByteBuffer nextBuffer = readData(startOffset, endOffset);
+ MetadataIndexNode deviceLeafNode =
MetadataIndexNode.deserializeFrom(nextBuffer);
+ getDevicesOfLeafNode(deviceLeafNode, measurementNodeOffsetQueue);
+ } catch (Exception e) {
+ logger.error("Something error happened while getting all devices of file
{}", file);
+ throw e;
+ }
+ }
+ /**
+ * Get all devices and its corresponding entries on the specific device leaf
node.
+ *
+ * @param deviceLeafNode this node must be device leaf node
+ */
+ private void getDevicesOfLeafNode(
+ MetadataIndexNode deviceLeafNode, Queue<Pair<String, long[]>>
measurementNodeOffsetQueue) {
+ if
(!deviceLeafNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
+ throw new RuntimeException("the first param should be device leaf
node.");
+ }
+ List<MetadataIndexEntry> childrenEntries = deviceLeafNode.getChildren();
+ for (int i = 0; i < childrenEntries.size(); i++) {
+ MetadataIndexEntry deviceEntry = childrenEntries.get(i);
+ long childStartOffset = deviceEntry.getOffset();
+ long childEndOffset =
+ i == childrenEntries.size() - 1
+ ? deviceLeafNode.getEndOffset()
+ : childrenEntries.get(i + 1).getOffset();
+ long[] offset = {childStartOffset, childEndOffset};
+ measurementNodeOffsetQueue.add(new Pair<>(deviceEntry.getName(),
offset));
+ }
+ }
+
+ /**
+ * Get the device leaf node offset under the specific device internal node.
+ *
+ * @param deviceInternalNode this node must be device internal node
+ */
+ private void getAllDeviceLeafNodeOffset(
+ MetadataIndexNode deviceInternalNode, List<long[]>
leafDeviceNodeOffsets) throws IOException {
+ if
(!deviceInternalNode.getNodeType().equals(MetadataIndexNodeType.INTERNAL_DEVICE))
{
+ throw new RuntimeException("the first param should be device internal
node.");
+ }
+ try {
+ int metadataIndexListSize = deviceInternalNode.getChildren().size();
+ boolean isCurrentLayerLeafNode = false;
for (int i = 0; i < metadataIndexListSize; i++) {
- MetadataIndexEntry entry = metadataIndexNode.getChildren().get(i);
+ MetadataIndexEntry entry = deviceInternalNode.getChildren().get(i);
long startOffset = entry.getOffset();
- long endOffset = metadataIndexNode.getEndOffset();
+ long endOffset = deviceInternalNode.getEndOffset();
if (i != metadataIndexListSize - 1) {
- endOffset = metadataIndexNode.getChildren().get(i + 1).getOffset();
+ endOffset = deviceInternalNode.getChildren().get(i + 1).getOffset();
+ }
+ if (i == 0) {
+ // check is current layer device leaf node or device internal node.
Just need to check the
+ // first entry, because the rest are the same
+ MetadataIndexNodeType nodeType =
+ MetadataIndexNodeType.deserialize(
+ ReadWriteIOUtils.readByte(readData(endOffset - 1,
endOffset)));
+ isCurrentLayerLeafNode =
nodeType.equals(MetadataIndexNodeType.LEAF_DEVICE);
}
- if
(metadataIndexNode.getNodeType().equals(MetadataIndexNodeType.LEAF_DEVICE)) {
- queue.add(new Pair<>(entry.getName(), new Pair<>(startOffset,
endOffset)));
+ if (isCurrentLayerLeafNode) {
+ // is device leaf node
+ long[] offset = {startOffset, endOffset};
+ leafDeviceNodeOffsets.add(offset);
continue;
}
ByteBuffer nextBuffer = readData(startOffset, endOffset);
-
getAllDevicesWithIsAligned(MetadataIndexNode.deserializeFrom(nextBuffer),
queue);
+ getAllDeviceLeafNodeOffset(
+ MetadataIndexNode.deserializeFrom(nextBuffer),
leafDeviceNodeOffsets);
}
} catch (Exception e) {
logger.error("Something error happened while getting all devices of file
{}", file);
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
index 85bf39a6e7..42e8714d45 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileGeneratorUtils.java
@@ -45,7 +45,7 @@ import static
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARA
public class TsFileGeneratorUtils {
private static final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
public static final String testStorageGroup = "root.testsg";
- private static int alignDeviceOffset = 10000;
+ public static int alignDeviceOffset = 10000;
public static void writeWithTsRecord(
TsFileWriter tsFileWriter,