This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch testcontainer in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f76e2e06491c54e1908e3cc895177a38b0a86708 Author: zhanglingzhe0820 <[email protected]> AuthorDate: Fri Apr 16 16:49:51 2021 +0800 [IOTDB-1294] Compaction mods for new mods structure (#3013) * compaction mods for new mods structure * fix ci Co-authored-by: zhanglingzhe <[email protected]> --- .../db/engine/compaction/TsFileManagement.java | 3 + .../level/LevelCompactionTsFileManagement.java | 3 + .../engine/compaction/utils/CompactionUtils.java | 25 ++------ .../db/engine/compaction/CompactionChunkTest.java | 8 +-- .../compaction/LevelCompactionMergeTest.java | 71 ++++++++++++++++++++++ .../engine/compaction/LevelCompactionModsTest.java | 45 +++++++++++++- 6 files changed, 127 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java index bc734b2..a483f73 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java @@ -346,6 +346,9 @@ public abstract class TsFileManagement { seqFile.removeModFile(); if (mergingModification != null) { for (Modification modification : mergingModification.getModifications()) { + // we have to set modification offset to MAX_VALUE, as the offset of source chunk may + // change after compaction + modification.setFileOffset(Long.MAX_VALUE); seqFile.getModFile().write(modification); } try { diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java index 86f8148..5bd7cbc 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java @@ -115,6 +115,9 @@ public class LevelCompactionTsFileManagement extends TsFileManagement { try (ModificationFile modificationFile = new ModificationFile(targetTsFile.getTsFilePath() + ModificationFile.FILE_SUFFIX)) { for (Modification modification : modifications) { + // we have to set modification offset to MAX_VALUE, as the offset of source chunk may + // change after compaction + modification.setFileOffset(Long.MAX_VALUE); modificationFile.write(modification); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java index b48d5a8..9bc3ce2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java @@ -75,19 +75,13 @@ public class CompactionUtils { } private static Pair<ChunkMetadata, Chunk> readByAppendMerge( - Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap, - Map<String, List<Modification>> modificationCache, - PartialPath seriesPath, - List<Modification> modifications) - throws IOException { + Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) throws IOException { ChunkMetadata newChunkMetadata = null; Chunk newChunk = null; for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry : readerChunkMetadataMap.entrySet()) { TsFileSequenceReader reader = entry.getKey(); List<ChunkMetadata> chunkMetadataList = entry.getValue(); - modifyChunkMetaDataWithCache( - reader, chunkMetadataList, modificationCache, seriesPath, modifications); for (ChunkMetadata chunkMetadata : chunkMetadataList) { Chunk chunk = reader.readMemChunk(chunkMetadata); if (newChunkMetadata == null) { @@ -133,16 +127,9 @@ public class CompactionUtils { RateLimiter compactionWriteRateLimiter, Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry, TsFileResource targetResource, - RestorableTsFileIOWriter writer, - Map<String, List<Modification>> modificationCache, - List<Modification> modifications) - throws IOException, IllegalPathException { - Pair<ChunkMetadata, Chunk> chunkPair = - readByAppendMerge( - entry.getValue(), - modificationCache, - new PartialPath(device, entry.getKey()), - modifications); + RestorableTsFileIOWriter writer) + throws IOException { + Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(entry.getValue()); ChunkMetadata newChunkMetadata = chunkPair.left; Chunk newChunk = chunkPair.right; if (newChunkMetadata != null && newChunk != null) { @@ -356,9 +343,7 @@ public class CompactionUtils { compactionWriteRateLimiter, sensorReaderChunkMetadataListEntry, targetResource, - writer, - modificationCache, - modifications); + writer); } else { logger.debug("{} [Compaction] page too small, use deserialize merge", storageGroup); // we have to deserialize chunks to merge pages diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java index 74f2aa5..8a61859 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java @@ -121,13 +121,7 @@ public class CompactionChunkTest extends LevelCompactionTest { for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry : measurementChunkMetadataMap.entrySet()) { CompactionUtils.writeByAppendMerge( - device, - compactionWriteRateLimiter, - entry, - targetTsfileResource, - writer, - new HashMap<>(), - new ArrayList<>()); + device, compactionWriteRateLimiter, entry, targetTsfileResource, writer); } reader.close(); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java index a91e8fc..4e2d65c 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java @@ -23,6 +23,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask; import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement; +import org.apache.iotdb.db.engine.modification.Deletion; +import org.apache.iotdb.db.engine.modification.Modification; +import org.apache.iotdb.db.engine.modification.ModificationFile; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; @@ -42,6 +46,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import static org.junit.Assert.assertEquals; @@ -154,6 +159,72 @@ public class LevelCompactionMergeTest extends LevelCompactionTest { IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum); } + /** + * As we change the structure of mods file in 0.12, we have to check whether a modification record + * is valid by its offset in tsfile + */ + @Test + public void testCompactionModsByOffsetAfterMerge() throws IllegalPathException, IOException { + int prevPageLimit = + IoTDBDescriptor.getInstance().getConfig().getMergePagePointNumberThreshold(); + IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(1); + + LevelCompactionTsFileManagement levelCompactionTsFileManagement = + new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath()); + TsFileResource forthSeqTsFileResource = seqResources.get(3); + PartialPath path = + new PartialPath( + deviceIds[0] + + TsFileConstant.PATH_SEPARATOR + + measurementSchemas[0].getMeasurementId()); + try (ModificationFile sourceModificationFile = + new ModificationFile( + forthSeqTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) { + Modification modification = + new Deletion(path, forthSeqTsFileResource.getTsFileSize() / 10, 300, 310); + sourceModificationFile.write(modification); + } + levelCompactionTsFileManagement.addAll(seqResources, true); + levelCompactionTsFileManagement.addAll(unseqResources, false); + levelCompactionTsFileManagement.forkCurrentFileList(0); + CompactionMergeTask compactionMergeTask = + levelCompactionTsFileManagement + .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0); + compactionMergeWorking = true; + compactionMergeTask.run(); + while (compactionMergeWorking) { + // wait + } + QueryContext context = new QueryContext(); + IBatchReader tsFilesReader = + new SeriesRawDataBatchReader( + path, + measurementSchemas[0].getType(), + context, + levelCompactionTsFileManagement.getTsFileList(true), + new ArrayList<>(), + null, + null, + true); + + long count = 0L; + while (tsFilesReader.hasNextBatch()) { + BatchData batchData = tsFilesReader.nextBatch(); + for (int i = 0; i < batchData.length(); i++) { + System.out.println(batchData.getTimeByIndex(i)); + } + count += batchData.length(); + } + assertEquals(489, count); + + List<TsFileResource> tsFileResourceList = levelCompactionTsFileManagement.getTsFileList(true); + for (TsFileResource tsFileResource : tsFileResourceList) { + tsFileResource.getModFile().remove(); + tsFileResource.remove(); + } + IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(prevPageLimit); + } + /** close compaction merge callback, to release some locks */ private void closeCompactionMergeCallBack() { this.compactionMergeWorking = false; diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java index 987b7da..e3dd45f 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java @@ -73,7 +73,7 @@ public class LevelCompactionModsTest extends LevelCompactionTest { try (ModificationFile sourceModificationFile = new ModificationFile(sourceTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) { modification1 = new Deletion(new PartialPath(deviceIds[0], "sensor0"), 0, 0); - modification2 = new Deletion(new PartialPath(deviceIds[0], "sensor1"), 0, 0); + modification2 = new Deletion(new PartialPath(deviceIds[0], "sensor1"), Long.MAX_VALUE, 0); sourceModificationFile.write(modification1); sourceModificationFile.write(modification2); filterModifications.add(modification1); @@ -89,4 +89,47 @@ public class LevelCompactionModsTest extends LevelCompactionTest { assertEquals(modification2, modifications.stream().findFirst().get()); } } + + /** + * As we change the structure of mods file in 0.12, we have to check whether a modification record + * is valid by its offset in tsfile + */ + @Test + public void testCompactionModsByOffset() throws IllegalPathException, IOException { + LevelCompactionTsFileManagement levelCompactionTsFileManagement = + new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, tempSGDir.getPath()); + TsFileResource sourceTsFileResource = seqResources.get(0); + TsFileResource targetTsFileResource = seqResources.get(1); + List<Modification> filterModifications = new ArrayList<>(); + Modification modification1; + Modification modification2; + try (ModificationFile sourceModificationFile = + new ModificationFile(sourceTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) { + modification1 = + new Deletion( + new PartialPath(deviceIds[0], "sensor0"), + sourceTsFileResource.getTsFileSize() / 2, + 0, + 100); + modification2 = + new Deletion( + new PartialPath(deviceIds[0], "sensor1"), + sourceTsFileResource.getTsFileSize() / 2, + 0, + 100); + sourceModificationFile.write(modification1); + sourceModificationFile.write(modification2); + filterModifications.add(modification1); + } + List<TsFileResource> sourceTsFileResources = new ArrayList<>(); + sourceTsFileResources.add(sourceTsFileResource); + levelCompactionTsFileManagement.renameLevelFilesMods( + filterModifications, sourceTsFileResources, targetTsFileResource); + try (ModificationFile targetModificationFile = + new ModificationFile(targetTsFileResource.getTsFilePath() + ModificationFile.FILE_SUFFIX)) { + Collection<Modification> modifications = targetModificationFile.getModifications(); + assertEquals(1, modifications.size()); + assertEquals(Long.MAX_VALUE, modifications.stream().findFirst().get().getFileOffset()); + } + } }
