This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch testcontainer
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f76e2e06491c54e1908e3cc895177a38b0a86708
Author: zhanglingzhe0820 <[email protected]>
AuthorDate: Fri Apr 16 16:49:51 2021 +0800

    [IOTDB-1294] Compaction mods for new mods structure (#3013)
    
    * compaction mods for new mods structure
    
    * fix ci
    
    Co-authored-by: zhanglingzhe <[email protected]>
---
 .../db/engine/compaction/TsFileManagement.java     |  3 +
 .../level/LevelCompactionTsFileManagement.java     |  3 +
 .../engine/compaction/utils/CompactionUtils.java   | 25 ++------
 .../db/engine/compaction/CompactionChunkTest.java  |  8 +--
 .../compaction/LevelCompactionMergeTest.java       | 71 ++++++++++++++++++++++
 .../engine/compaction/LevelCompactionModsTest.java | 45 +++++++++++++-
 6 files changed, 127 insertions(+), 28 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index bc734b2..a483f73 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -346,6 +346,9 @@ public abstract class TsFileManagement {
       seqFile.removeModFile();
       if (mergingModification != null) {
         for (Modification modification : 
mergingModification.getModifications()) {
+          // we have to set modification offset to MAX_VALUE, as the offset of 
source chunk may
+          // change after compaction
+          modification.setFileOffset(Long.MAX_VALUE);
           seqFile.getModFile().write(modification);
         }
         try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 86f8148..5bd7cbc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -115,6 +115,9 @@ public class LevelCompactionTsFileManagement extends 
TsFileManagement {
       try (ModificationFile modificationFile =
           new ModificationFile(targetTsFile.getTsFilePath() + 
ModificationFile.FILE_SUFFIX)) {
         for (Modification modification : modifications) {
+          // we have to set modification offset to MAX_VALUE, as the offset of 
source chunk may
+          // change after compaction
+          modification.setFileOffset(Long.MAX_VALUE);
           modificationFile.write(modification);
         }
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index b48d5a8..9bc3ce2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -75,19 +75,13 @@ public class CompactionUtils {
   }
 
   private static Pair<ChunkMetadata, Chunk> readByAppendMerge(
-      Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap,
-      Map<String, List<Modification>> modificationCache,
-      PartialPath seriesPath,
-      List<Modification> modifications)
-      throws IOException {
+      Map<TsFileSequenceReader, List<ChunkMetadata>> readerChunkMetadataMap) 
throws IOException {
     ChunkMetadata newChunkMetadata = null;
     Chunk newChunk = null;
     for (Entry<TsFileSequenceReader, List<ChunkMetadata>> entry :
         readerChunkMetadataMap.entrySet()) {
       TsFileSequenceReader reader = entry.getKey();
       List<ChunkMetadata> chunkMetadataList = entry.getValue();
-      modifyChunkMetaDataWithCache(
-          reader, chunkMetadataList, modificationCache, seriesPath, 
modifications);
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
         Chunk chunk = reader.readMemChunk(chunkMetadata);
         if (newChunkMetadata == null) {
@@ -133,16 +127,9 @@ public class CompactionUtils {
       RateLimiter compactionWriteRateLimiter,
       Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry,
       TsFileResource targetResource,
-      RestorableTsFileIOWriter writer,
-      Map<String, List<Modification>> modificationCache,
-      List<Modification> modifications)
-      throws IOException, IllegalPathException {
-    Pair<ChunkMetadata, Chunk> chunkPair =
-        readByAppendMerge(
-            entry.getValue(),
-            modificationCache,
-            new PartialPath(device, entry.getKey()),
-            modifications);
+      RestorableTsFileIOWriter writer)
+      throws IOException {
+    Pair<ChunkMetadata, Chunk> chunkPair = readByAppendMerge(entry.getValue());
     ChunkMetadata newChunkMetadata = chunkPair.left;
     Chunk newChunk = chunkPair.right;
     if (newChunkMetadata != null && newChunk != null) {
@@ -356,9 +343,7 @@ public class CompactionUtils {
                     compactionWriteRateLimiter,
                     sensorReaderChunkMetadataListEntry,
                     targetResource,
-                    writer,
-                    modificationCache,
-                    modifications);
+                    writer);
               } else {
                 logger.debug("{} [Compaction] page too small, use deserialize 
merge", storageGroup);
                 // we have to deserialize chunks to merge pages
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
index 74f2aa5..8a61859 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionChunkTest.java
@@ -121,13 +121,7 @@ public class CompactionChunkTest extends 
LevelCompactionTest {
       for (Entry<String, Map<TsFileSequenceReader, List<ChunkMetadata>>> entry 
:
           measurementChunkMetadataMap.entrySet()) {
         CompactionUtils.writeByAppendMerge(
-            device,
-            compactionWriteRateLimiter,
-            entry,
-            targetTsfileResource,
-            writer,
-            new HashMap<>(),
-            new ArrayList<>());
+            device, compactionWriteRateLimiter, entry, targetTsfileResource, 
writer);
       }
       reader.close();
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
index a91e8fc..4e2d65c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionMergeTest.java
@@ -23,6 +23,10 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.constant.TestConstant;
 import 
org.apache.iotdb.db.engine.compaction.TsFileManagement.CompactionMergeTask;
 import 
org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -42,6 +46,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 
@@ -154,6 +159,72 @@ public class LevelCompactionMergeTest extends 
LevelCompactionTest {
     IoTDBDescriptor.getInstance().getConfig().setSeqLevelNum(prevSeqLevelNum);
   }
 
+  /**
+   * As we change the structure of mods file in 0.12, we have to check whether 
a modification record
+   * is valid by its offset in tsfile
+   */
+  @Test
+  public void testCompactionModsByOffsetAfterMerge() throws 
IllegalPathException, IOException {
+    int prevPageLimit =
+        
IoTDBDescriptor.getInstance().getConfig().getMergePagePointNumberThreshold();
+    
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(1);
+
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, 
tempSGDir.getPath());
+    TsFileResource forthSeqTsFileResource = seqResources.get(3);
+    PartialPath path =
+        new PartialPath(
+            deviceIds[0]
+                + TsFileConstant.PATH_SEPARATOR
+                + measurementSchemas[0].getMeasurementId());
+    try (ModificationFile sourceModificationFile =
+        new ModificationFile(
+            forthSeqTsFileResource.getTsFilePath() + 
ModificationFile.FILE_SUFFIX)) {
+      Modification modification =
+          new Deletion(path, forthSeqTsFileResource.getTsFileSize() / 10, 300, 
310);
+      sourceModificationFile.write(modification);
+    }
+    levelCompactionTsFileManagement.addAll(seqResources, true);
+    levelCompactionTsFileManagement.addAll(unseqResources, false);
+    levelCompactionTsFileManagement.forkCurrentFileList(0);
+    CompactionMergeTask compactionMergeTask =
+        levelCompactionTsFileManagement
+        .new CompactionMergeTask(this::closeCompactionMergeCallBack, 0);
+    compactionMergeWorking = true;
+    compactionMergeTask.run();
+    while (compactionMergeWorking) {
+      // wait
+    }
+    QueryContext context = new QueryContext();
+    IBatchReader tsFilesReader =
+        new SeriesRawDataBatchReader(
+            path,
+            measurementSchemas[0].getType(),
+            context,
+            levelCompactionTsFileManagement.getTsFileList(true),
+            new ArrayList<>(),
+            null,
+            null,
+            true);
+
+    long count = 0L;
+    while (tsFilesReader.hasNextBatch()) {
+      BatchData batchData = tsFilesReader.nextBatch();
+      for (int i = 0; i < batchData.length(); i++) {
+        System.out.println(batchData.getTimeByIndex(i));
+      }
+      count += batchData.length();
+    }
+    assertEquals(489, count);
+
+    List<TsFileResource> tsFileResourceList = 
levelCompactionTsFileManagement.getTsFileList(true);
+    for (TsFileResource tsFileResource : tsFileResourceList) {
+      tsFileResource.getModFile().remove();
+      tsFileResource.remove();
+    }
+    
IoTDBDescriptor.getInstance().getConfig().setMergePagePointNumberThreshold(prevPageLimit);
+  }
+
   /** close compaction merge callback, to release some locks */
   private void closeCompactionMergeCallBack() {
     this.compactionMergeWorking = false;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
index 987b7da..e3dd45f 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/LevelCompactionModsTest.java
@@ -73,7 +73,7 @@ public class LevelCompactionModsTest extends 
LevelCompactionTest {
     try (ModificationFile sourceModificationFile =
         new ModificationFile(sourceTsFileResource.getTsFilePath() + 
ModificationFile.FILE_SUFFIX)) {
       modification1 = new Deletion(new PartialPath(deviceIds[0], "sensor0"), 
0, 0);
-      modification2 = new Deletion(new PartialPath(deviceIds[0], "sensor1"), 
0, 0);
+      modification2 = new Deletion(new PartialPath(deviceIds[0], "sensor1"), 
Long.MAX_VALUE, 0);
       sourceModificationFile.write(modification1);
       sourceModificationFile.write(modification2);
       filterModifications.add(modification1);
@@ -89,4 +89,47 @@ public class LevelCompactionModsTest extends 
LevelCompactionTest {
       assertEquals(modification2, modifications.stream().findFirst().get());
     }
   }
+
+  /**
+   * As we change the structure of mods file in 0.12, we have to check whether 
a modification record
+   * is valid by its offset in tsfile
+   */
+  @Test
+  public void testCompactionModsByOffset() throws IllegalPathException, 
IOException {
+    LevelCompactionTsFileManagement levelCompactionTsFileManagement =
+        new LevelCompactionTsFileManagement(COMPACTION_TEST_SG, 
tempSGDir.getPath());
+    TsFileResource sourceTsFileResource = seqResources.get(0);
+    TsFileResource targetTsFileResource = seqResources.get(1);
+    List<Modification> filterModifications = new ArrayList<>();
+    Modification modification1;
+    Modification modification2;
+    try (ModificationFile sourceModificationFile =
+        new ModificationFile(sourceTsFileResource.getTsFilePath() + 
ModificationFile.FILE_SUFFIX)) {
+      modification1 =
+          new Deletion(
+              new PartialPath(deviceIds[0], "sensor0"),
+              sourceTsFileResource.getTsFileSize() / 2,
+              0,
+              100);
+      modification2 =
+          new Deletion(
+              new PartialPath(deviceIds[0], "sensor1"),
+              sourceTsFileResource.getTsFileSize() / 2,
+              0,
+              100);
+      sourceModificationFile.write(modification1);
+      sourceModificationFile.write(modification2);
+      filterModifications.add(modification1);
+    }
+    List<TsFileResource> sourceTsFileResources = new ArrayList<>();
+    sourceTsFileResources.add(sourceTsFileResource);
+    levelCompactionTsFileManagement.renameLevelFilesMods(
+        filterModifications, sourceTsFileResources, targetTsFileResource);
+    try (ModificationFile targetModificationFile =
+        new ModificationFile(targetTsFileResource.getTsFilePath() + 
ModificationFile.FILE_SUFFIX)) {
+      Collection<Modification> modifications = 
targetModificationFile.getModifications();
+      assertEquals(1, modifications.size());
+      assertEquals(Long.MAX_VALUE, 
modifications.stream().findFirst().get().getFileOffset());
+    }
+  }
 }

Reply via email to