This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a0e30f5157 [IOTDB-3745]Deduplicate mods of target files in compaction
(#6591)
a0e30f5157 is described below
commit a0e30f515777bf76b37b0b86e6f24910bb3f35c1
Author: 周沛辰 <[email protected]>
AuthorDate: Thu Jul 7 08:48:42 2022 +0800
[IOTDB-3745]Deduplicate mods of target files in compaction (#6591)
---
.../db/engine/compaction/CompactionUtils.java | 49 ++++++------
.../engine/storagegroup/TsFileNameGenerator.java | 23 ++++++
.../cross/RewriteCrossSpaceCompactionTest.java | 8 +-
.../compaction/inner/InnerSeqCompactionTest.java | 90 ++++++++++++++++++++++
4 files changed, 139 insertions(+), 31 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 096f03cbf4..fb143e1e6f 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -39,11 +39,12 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* This tool can be used to perform inner space or cross space compaction of
aligned and non aligned
@@ -115,9 +116,19 @@ public class CompactionUtils {
tsFileResource);
}
// update each target mods file.
- for (TsFileResource tsFileResource : targetResources) {
- updateOneTargetMods(
- tsFileResource,
seqFileInfoMap.get(tsFileResource.getTsFile().getName()), unseqResources);
+ for (TsFileResource targetResource : targetResources) {
+ TsFileResource seqFile =
seqFileInfoMap.get(targetResource.getTsFile().getName());
+ Set<Modification> modifications = new HashSet<>();
+ if (seqFile != null) {
+ // get compaction mods from its corresponding source seq file
+
modifications.addAll(ModificationFile.getCompactionMods(seqFile).getModifications());
+ }
+ // get compaction mods from all source unseq files
+ for (TsFileResource unseqFile : unseqResources) {
+
modifications.addAll(ModificationFile.getCompactionMods(unseqFile).getModifications());
+ }
+
+ updateOneTargetMods(targetResource, modifications);
}
}
@@ -127,15 +138,20 @@ public class CompactionUtils {
*/
public static void combineModsInInnerCompaction(
Collection<TsFileResource> sourceFiles, TsFileResource targetTsFile)
throws IOException {
- List<Modification> modifications = new ArrayList<>();
+ Set<Modification> modifications = new HashSet<>();
for (TsFileResource mergeTsFile : sourceFiles) {
try (ModificationFile sourceCompactionModificationFile =
ModificationFile.getCompactionMods(mergeTsFile)) {
modifications.addAll(sourceCompactionModificationFile.getModifications());
}
}
+ updateOneTargetMods(targetTsFile, modifications);
+ }
+
+ private static void updateOneTargetMods(
+ TsFileResource targetFile, Set<Modification> modifications) throws
IOException {
if (!modifications.isEmpty()) {
- try (ModificationFile modificationFile =
ModificationFile.getNormalMods(targetTsFile)) {
+ try (ModificationFile modificationFile =
ModificationFile.getNormalMods(targetFile)) {
for (Modification modification : modifications) {
// we have to set modification offset to MAX_VALUE, as the offset of
source chunk may
// change after compaction
@@ -146,27 +162,6 @@ public class CompactionUtils {
}
}
- private static void updateOneTargetMods(
- TsFileResource targetFile, TsFileResource seqFile, List<TsFileResource>
unseqFiles)
- throws IOException {
- // write mods in the seq file
- if (seqFile != null) {
- ModificationFile seqCompactionModificationFile =
ModificationFile.getCompactionMods(seqFile);
- for (Modification modification :
seqCompactionModificationFile.getModifications()) {
- targetFile.getModFile().write(modification);
- }
- }
- // write mods in all unseq files
- for (TsFileResource unseqFile : unseqFiles) {
- ModificationFile compactionUnseqModificationFile =
- ModificationFile.getCompactionMods(unseqFile);
- for (Modification modification :
compactionUnseqModificationFile.getModifications()) {
- targetFile.getModFile().write(modification);
- }
- }
- targetFile.getModFile().close();
- }
-
public static void deleteCompactionModsFile(
List<TsFileResource> selectedSeqTsFileResourceList,
List<TsFileResource> selectedUnSeqTsFileResourceList)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
index 946d827461..8c15c99ed6 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -122,6 +123,7 @@ public class TsFileNameGenerator {
}
}
+ @TestOnly
public static TsFileResource increaseCrossCompactionCnt(TsFileResource
tsFileResource)
throws IOException {
File tsFile = tsFileResource.getTsFile();
@@ -142,6 +144,27 @@ public class TsFileNameGenerator {
return tsFileResource;
}
+ @TestOnly
+ public static TsFileResource increaseInnerCompactionCnt(TsFileResource
tsFileResource)
+ throws IOException {
+ File tsFile = tsFileResource.getTsFile();
+ String path = tsFile.getParent();
+ TsFileName tsFileName =
getTsFileName(tsFileResource.getTsFile().getName());
+ tsFileName.setInnerCompactionCnt(tsFileName.getInnerCompactionCnt() + 1);
+ tsFileResource.setFile(
+ new File(
+ path,
+ tsFileName.time
+ + FILE_NAME_SEPARATOR
+ + tsFileName.version
+ + FILE_NAME_SEPARATOR
+ + tsFileName.innerCompactionCnt
+ + FILE_NAME_SEPARATOR
+ + tsFileName.crossCompactionCnt
+ + TSFILE_SUFFIX));
+ return tsFileResource;
+ }
+
public static File increaseCrossCompactionCnt(File tsFile) throws
IOException {
String path = tsFile.getParent();
TsFileName tsFileName = getTsFileName(tsFile.getName());
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
index fa3181d854..b7d5f709d4 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/cross/RewriteCrossSpaceCompactionTest.java
@@ -246,7 +246,7 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
.replace(CROSS_COMPACTION_TMP_FILE_SUFFIX,
TsFileConstant.TSFILE_SUFFIX)));
resource.resetModFile();
Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(24, resource.getModFile().getModifications().size());
+ Assert.assertEquals(4, resource.getModFile().getModifications().size());
}
FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
for (int i = TsFileGeneratorUtils.getAlignDeviceOffset();
@@ -483,7 +483,7 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
continue;
}
Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(180,
resource.getModFile().getModifications().size());
+ Assert.assertEquals(30, resource.getModFile().getModifications().size());
}
FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
@@ -662,7 +662,7 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
new TsFileResource(
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource.getTsFile()));
Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(6, resource.getModFile().getModifications().size());
+ Assert.assertEquals(1, resource.getModFile().getModifications().size());
Assert.assertFalse(resource.getCompactionModFile().exists());
}
}
@@ -784,7 +784,7 @@ public class RewriteCrossSpaceCompactionTest extends
AbstractCompactionTest {
new TsFileResource(
TsFileNameGenerator.increaseCrossCompactionCnt(seqResource.getTsFile()));
Assert.assertTrue(resource.getModFile().exists());
- Assert.assertEquals(12, resource.getModFile().getModifications().size());
+ Assert.assertEquals(2, resource.getModFile().getModifications().size());
Assert.assertFalse(resource.getCompactionModFile().exists());
}
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
index 8c52951785..09e17a763f 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerSeqCompactionTest.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.CompactionUtils;
@@ -34,7 +35,11 @@ import
org.apache.iotdb.db.engine.compaction.utils.CompactionClearUtils;
import org.apache.iotdb.db.engine.compaction.utils.CompactionConfigRestorer;
import
org.apache.iotdb.db.engine.compaction.utils.CompactionFileGeneratorUtils;
import org.apache.iotdb.db.engine.compaction.utils.CompactionTimeseriesType;
+import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -46,6 +51,7 @@ import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -57,6 +63,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils.putChunk;
import static
org.apache.iotdb.db.engine.compaction.utils.CompactionCheckerUtils.putOnePageChunk;
@@ -91,6 +98,7 @@ public class InnerSeqCompactionTest {
public void setUp() throws MetadataException {
prevMaxDegreeOfIndexNode =
TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode();
TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(2);
+ EnvironmentUtils.envSetUp();
IoTDB.configManager.init();
IoTDB.schemaProcessor.setStorageGroup(new PartialPath(COMPACTION_TEST_SG));
for (String fullPath : fullPaths) {
@@ -111,6 +119,7 @@ public class InnerSeqCompactionTest {
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
IoTDB.configManager.clear();
+ EnvironmentUtils.cleanEnv();
EnvironmentUtils.cleanAllDir();
TSFileDescriptor.getInstance().getConfig().setMaxDegreeOfIndexNode(prevMaxDegreeOfIndexNode);
}
@@ -967,4 +976,85 @@ public class InnerSeqCompactionTest {
IoTDBDescriptor.getInstance().getConfig().setTargetChunkSize(prevTargetChunkSize);
}
}
+
+ @Test
+ public void testCompactionWithDeletionsDuringCompactions()
+ throws MetadataException, IOException, DataRegionException {
+ // create source seq files
+ List<TsFileResource> sourceResources = new ArrayList<>();
+ List<List<Long>> chunkPagePointsNum = new ArrayList<>();
+ List<Long> pagePointsNum = new ArrayList<>();
+ pagePointsNum.add(100L);
+ chunkPagePointsNum.add(pagePointsNum);
+ pagePointsNum = new ArrayList<>();
+ pagePointsNum.add(200L);
+ chunkPagePointsNum.add(pagePointsNum);
+ pagePointsNum = new ArrayList<>();
+ pagePointsNum.add(300L);
+ chunkPagePointsNum.add(pagePointsNum);
+ Set<String> paths = new HashSet<>();
+ for (int i = 0; i < fullPaths.length; i++) {
+ paths.add(fullPaths[i]);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ TsFileResource tsFileResource =
+ CompactionFileGeneratorUtils.generateTsFileResource(true, i + 1);
+ CompactionFileGeneratorUtils.writeTsFile(paths, chunkPagePointsNum, i *
600L, tsFileResource);
+ sourceResources.add(tsFileResource);
+ }
+ DataRegion vsgp =
+ new DataRegion(
+ TestConstant.BASE_OUTPUT_PATH,
+ "0",
+ new TsFileFlushPolicy.DirectFlushPolicy(),
+ COMPACTION_TEST_SG);
+ vsgp.getTsFileResourceManager().addAll(sourceResources, true);
+ // delete data before compaction
+ vsgp.delete(new PartialPath(fullPaths[0]), 0, 1000, 0, null);
+
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0,
+ vsgp.getTsFileResourceManager(),
+ sourceResources,
+ true,
+ new ReadChunkCompactionPerformer(),
+ new AtomicInteger(0),
+ 0);
+ task.setSourceFilesToCompactionCandidate();
+ task.checkValidAndSetMerging();
+ // delete data during compaction
+ vsgp.delete(new PartialPath(fullPaths[0]), 0, 1200, 0, null);
+ vsgp.delete(new PartialPath(fullPaths[0]), 0, 1800, 0, null);
+ for (int i = 0; i < sourceResources.size() - 1; i++) {
+ TsFileResource resource = sourceResources.get(i);
+ resource.resetModFile();
+ Assert.assertTrue(resource.getCompactionModFile().exists());
+ Assert.assertTrue(resource.getModFile().exists());
+ if (i < 2) {
+ Assert.assertEquals(3,
resource.getModFile().getModifications().size());
+ Assert.assertEquals(2,
resource.getCompactionModFile().getModifications().size());
+ } else if (i < 3) {
+ Assert.assertEquals(2,
resource.getModFile().getModifications().size());
+ Assert.assertEquals(2,
resource.getCompactionModFile().getModifications().size());
+ } else {
+ Assert.assertEquals(1,
resource.getModFile().getModifications().size());
+ Assert.assertEquals(1,
resource.getCompactionModFile().getModifications().size());
+ }
+ }
+ task.start();
+ for (TsFileResource resource : sourceResources) {
+ Assert.assertFalse(resource.getTsFile().exists());
+ Assert.assertFalse(resource.getModFile().exists());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
+
+ TsFileResource resource =
+ TsFileNameGenerator.increaseInnerCompactionCnt(sourceResources.get(0));
+ resource.resetModFile();
+ Assert.assertTrue(resource.getModFile().exists());
+ Assert.assertEquals(2, resource.getModFile().getModifications().size());
+ Assert.assertFalse(resource.getCompactionModFile().exists());
+ }
}