This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/object_type by this push:
     new 848f4ddc987 modify compaction
848f4ddc987 is described below

commit 848f4ddc987b1513903adbddfbd6920f4772105c
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jul 8 17:41:51 2025 +0800

    modify compaction
---
 .../main/java/org/apache/iotdb/ObjectExample.java  |   6 +-
 .../relational/function/scalar/GeoPenetrate.java   |   2 +-
 .../unary/scalar/ReadObjectColumnTransformer.java  |  19 +--
 .../execute/task/SettleCompactionTask.java         |   8 +-
 .../compaction/execute/utils/CompactionUtils.java  | 171 +++++++++++++++++++++
 .../execute/utils/MultiTsFileDeviceIterator.java   |   8 +-
 .../fast/FastAlignedSeriesCompactionExecutor.java  |  10 +-
 .../FastNonAlignedSeriesCompactionExecutor.java    |   6 +-
 .../fast/element/ChunkMetadataElement.java         |  10 +-
 .../org/apache/iotdb/db/utils/ObjectTypeUtils.java |  72 +++++++++
 .../compaction/CompactionDeleteObjectFileTest.java |  85 ++++++++++
 11 files changed, 371 insertions(+), 26 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java 
b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
index 36cab4c8d0d..635141edd10 100644
--- a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
@@ -82,7 +82,8 @@ public class ObjectExample {
           true,
           0,
           Files.readAllBytes(
-              
Paths.get("/Users/ht/Downloads/2_1746622362350_fa24aa15233f4e76bcda789a5771f43f")));
+              Paths.get(
+                  
"/Users/shuww/Downloads/2_1746622362350_fa24aa15233f4e76bcda789a5771f43f")));
       session.insert(tablet);
       tablet.reset();
 
@@ -99,7 +100,8 @@ public class ObjectExample {
           true,
           0,
           Files.readAllBytes(
-              
Paths.get("/Users/ht/Downloads/2_1746622367063_8fb5ac8e21724140874195b60b878664")));
+              Paths.get(
+                  
"/Users/shuww/Downloads/2_1746622367063_8fb5ac8e21724140874195b60b878664")));
       session.insert(tablet);
       tablet.reset();
       //
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/scalar/GeoPenetrate.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/scalar/GeoPenetrate.java
index 440b3728bc4..45e512ebd27 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/scalar/GeoPenetrate.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/scalar/GeoPenetrate.java
@@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import static 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.ReadObjectColumnTransformer.getObjectPathFromBinary;
+import static 
org.apache.iotdb.db.utils.ObjectTypeUtils.getObjectPathFromBinary;
 
 public class GeoPenetrate implements ScalarFunction {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
index 496e0dd0dbb..84a094e39fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
@@ -20,16 +20,14 @@
 package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
 
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
-import org.apache.iotdb.commons.exception.ObjectFileNotExist;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
-import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.type.Type;
 import org.apache.tsfile.utils.Binary;
@@ -43,8 +41,6 @@ import java.util.Optional;
 
 public class ReadObjectColumnTransformer extends UnaryColumnTransformer {
 
-  private static final TierManager TIER_MANAGER = TierManager.getInstance();
-
   private final Optional<FragmentInstanceContext> fragmentInstanceContext;
   private long offset = 0;
   private long length = -1;
@@ -110,7 +106,7 @@ public class ReadObjectColumnTransformer extends 
UnaryColumnTransformer {
   }
 
   private Binary readObject(Binary binary) {
-    File file = getObjectPathFromBinary(binary);
+    File file = ObjectTypeUtils.getObjectPathFromBinary(binary);
     long fileSize = file.length();
     if (offset >= fileSize) {
       throw new UnsupportedOperationException("offset is greater than object 
size");
@@ -130,15 +126,4 @@ public class ReadObjectColumnTransformer extends 
UnaryColumnTransformer {
     }
     return new Binary(bytes);
   }
-
-  public static File getObjectPathFromBinary(Binary binary) {
-    byte[] bytes = binary.getValues();
-    String relativeObjectFilePath =
-        new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
-    Optional<File> file = 
TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath);
-    if (!file.isPresent()) {
-      throw new ObjectFileNotExist(relativeObjectFilePath);
-    }
-    return file.get();
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
index e00703e5e75..79d6a41025b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
@@ -18,10 +18,12 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
@@ -204,7 +206,7 @@ public class SettleCompactionTask extends 
InnerSpaceCompactionTask {
     return isSuccess;
   }
 
-  public boolean settleWithFullyDirtyFiles() {
+  public boolean settleWithFullyDirtyFiles() throws IllegalPathException, 
IOException {
     if (fullyDirtyFiles.isEmpty()) {
       return true;
     }
@@ -213,6 +215,8 @@ public class SettleCompactionTask extends 
InnerSpaceCompactionTask {
       if (recoverMemoryStatus) {
         tsFileManager.remove(resource, resource.isSeq());
       }
+      CompactionUtils.removeDeletedObjectFiles(resource);
+
       boolean res = deleteTsFileOnDisk(resource);
       if (res) {
         fullyDeletedSuccessNum++;
@@ -288,7 +292,7 @@ public class SettleCompactionTask extends 
InnerSpaceCompactionTask {
     }
   }
 
-  public void recoverFullyDirtyFiles() {
+  public void recoverFullyDirtyFiles() throws IllegalPathException, 
IOException {
     if (!settleWithFullyDirtyFiles()) {
       throw new CompactionRecoverException("Failed to delete fully_dirty 
source file.");
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index 44905afc461..d744b14e087 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.db.service.metrics.FileMetrics;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionAlignedChunkReader;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch;
@@ -39,21 +41,36 @@ import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEn
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
+import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.SystemMetric;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
@@ -448,4 +465,158 @@ public class CompactionUtils {
           new TimeRange(Long.MIN_VALUE, timeLowerBound));
     }
   }
+
+  public static void removeDeletedObjectFiles(TsFileResource resource)
+      throws IOException, IllegalPathException {
+    try (MultiTsFileDeviceIterator deviceIterator =
+        new MultiTsFileDeviceIterator(Collections.singletonList(resource))) {
+      while (deviceIterator.hasNextDevice()) {
+        deviceIterator.nextDevice();
+        deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
+      }
+    }
+  }
+
+  public static void removeDeletedObjectFiles(
+      TsFileSequenceReader reader,
+      List<AbstractAlignedChunkMetadata> alignedChunkMetadataList,
+      List<ModEntry> timeMods,
+      List<List<ModEntry>> valueMods)
+      throws IOException {
+    if (alignedChunkMetadataList.isEmpty()) {
+      return;
+    }
+    List<Integer> objectColumnIndexList = new ArrayList<>();
+    List<List<ModEntry>> objectDeletionIntervalList = new ArrayList<>();
+    boolean objectColumnHasDeletion = false;
+
+    TSDataType[] dataTypes = new TSDataType[valueMods.size()];
+    for (AbstractAlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
+      boolean hasNull = false;
+      for (int i = 0; i < 
alignedChunkMetadata.getValueChunkMetadataList().size(); i++) {
+        if (dataTypes[i] != null) {
+          continue;
+        }
+        IChunkMetadata chunkMetadata = 
alignedChunkMetadata.getValueChunkMetadataList().get(i);
+        if (chunkMetadata == null) {
+          hasNull = true;
+          continue;
+        }
+        dataTypes[i] = chunkMetadata.getDataType();
+        if (dataTypes[i] == TSDataType.OBJECT) {
+          objectColumnIndexList.add(i);
+          List<ModEntry> deletionInterval = 
ModificationUtils.sortAndMerge(valueMods.get(i));
+          objectColumnHasDeletion |= !deletionInterval.isEmpty();
+          objectDeletionIntervalList.add(deletionInterval);
+        }
+      }
+      if (!hasNull) {
+        break;
+      }
+    }
+    if (!objectColumnHasDeletion) {
+      return;
+    }
+    int[] deletionCursors = new int[objectColumnIndexList.size() + 1];
+    List<ModEntry> timeDeletionIntervalList = 
ModificationUtils.sortAndMerge(timeMods);
+    for (AbstractAlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
+      CompactionUtils.removeDeletedObjectFiles(
+          reader,
+          alignedChunkMetadata,
+          objectColumnIndexList,
+          timeDeletionIntervalList,
+          objectDeletionIntervalList,
+          deletionCursors);
+    }
+  }
+
+  private static void removeDeletedObjectFiles(
+      TsFileSequenceReader reader,
+      AbstractAlignedChunkMetadata alignedChunkMetadata,
+      List<Integer> objectColumnIndexList,
+      List<ModEntry> timeDeletions,
+      List<List<ModEntry>> objectDeletions,
+      int[] deletionCursors)
+      throws IOException {
+    Chunk timeChunk =
+        reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
+    CompactionChunkReader compactionChunkReader = new 
CompactionChunkReader(timeChunk);
+    List<Pair<PageHeader, ByteBuffer>> timePages =
+        compactionChunkReader.readPageDataWithoutUncompressing();
+
+    List<Chunk> valueChunks = new ArrayList<>();
+    List<List<Pair<PageHeader, ByteBuffer>>> valuePages = new ArrayList<>();
+
+    for (int i = 0; i < objectColumnIndexList.size(); i++) {
+      int idxInAlignedChunkMetadata = objectColumnIndexList.get(i);
+      if (timeDeletions.isEmpty() && objectDeletions.get(i).isEmpty()) {
+        continue;
+      }
+      ChunkMetadata valueChunkMetadata =
+          (ChunkMetadata)
+              
alignedChunkMetadata.getValueChunkMetadataList().get(idxInAlignedChunkMetadata);
+      if (valueChunkMetadata == null) {
+        continue;
+      }
+      Chunk chunk = reader.readMemChunk(valueChunkMetadata);
+      valueChunks.add(chunk);
+      valuePages.add(
+          chunk == null
+              ? null
+              : new 
CompactionChunkReader(chunk).readPageDataWithoutUncompressing());
+    }
+
+    CompactionAlignedChunkReader alignedChunkReader =
+        new CompactionAlignedChunkReader(timeChunk, valueChunks, true);
+    for (int i = 0; i < timePages.size(); i++) {
+      Pair<PageHeader, ByteBuffer> timePage = timePages.get(i);
+      List<PageHeader> valuePageHeaders = new ArrayList<>(valuePages.size());
+      List<ByteBuffer> compressedValuePages = new 
ArrayList<>(valuePages.size());
+      for (int j = 0; j < valuePages.size(); j++) {
+        Pair<PageHeader, ByteBuffer> valuePage = valuePages.get(j).get(i);
+        valuePageHeaders.add(valuePage.getLeft());
+        compressedValuePages.add(valuePage.getRight());
+      }
+      IPointReader pagePointReader =
+          alignedChunkReader.getPagePointReader(
+              timePage.getLeft(), valuePageHeaders, timePage.getRight(), 
compressedValuePages);
+
+      while (pagePointReader.hasNextTimeValuePair()) {
+        TimeValuePair timeValuePair = pagePointReader.nextTimeValuePair();
+        removeDeletedObjectFiles(timeValuePair, deletionCursors, 
timeDeletions, objectDeletions);
+      }
+    }
+  }
+
+  private static void removeDeletedObjectFiles(
+      TimeValuePair timeValuePair,
+      int[] cursors,
+      List<ModEntry> timeDeletions,
+      List<List<ModEntry>> objectDeletions) {
+    long timestamp = timeValuePair.getTimestamp();
+    boolean timeDeleted = isDeleted(timestamp, timeDeletions, cursors, 0);
+    for (int i = 0; i < timeValuePair.getValues().length; i++) {
+      Binary value = (Binary) timeValuePair.getValues()[i];
+      if (value == null) {
+        continue;
+      }
+      if (timeDeleted || isDeleted(timestamp, objectDeletions.get(i), cursors, 
i + 1)) {
+        ObjectTypeUtils.deleteObjectPathFromBinary(value);
+      }
+    }
+  }
+
+  private static boolean isDeleted(
+      long timestamp, List<ModEntry> deleteIntervalList, int[] deleteCursors, 
int idx) {
+    while (deleteIntervalList != null && deleteCursors[idx] < 
deleteIntervalList.size()) {
+      if 
(deleteIntervalList.get(deleteCursors[idx]).getTimeRange().contains(timestamp)) 
{
+        return true;
+      } else if 
(deleteIntervalList.get(deleteCursors[idx]).getTimeRange().getMax() < 
timestamp) {
+        deleteCursors[idx]++;
+      } else {
+        return false;
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index ea886aafd78..1c272cb3453 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -424,7 +424,7 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
    */
   private void applyModificationForAlignedChunkMetadataList(
       TsFileResource tsFileResource, List<AbstractAlignedChunkMetadata> 
alignedChunkMetadataList)
-      throws IllegalPathException {
+      throws IllegalPathException, IOException {
     if (alignedChunkMetadataList.isEmpty()) {
       // all the value chunks is empty chunk
       return;
@@ -476,6 +476,12 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
           modificationList.isEmpty() ? Collections.emptyList() : 
modificationList);
     }
 
+    CompactionUtils.removeDeletedObjectFiles(
+        readerMap.get(tsFileResource),
+        alignedChunkMetadataList,
+        modificationForTimeColumn,
+        modificationForValueColumns);
+
     ModificationUtils.modifyAlignedChunkMetaData(
         alignedChunkMetadataList,
         modificationForTimeColumn,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
index 7ad78814c5f..4817d855ca0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.PatternTreeMap;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.AlignedSeriesBatchCompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
@@ -171,7 +172,8 @@ public class FastAlignedSeriesCompactionExecutor extends 
SeriesCompactionExecuto
             new ChunkMetadataElement(
                 alignedChunkMetadataList.get(i),
                 i == alignedChunkMetadataList.size() - 1,
-                fileElement));
+                fileElement,
+                measurementSchemas));
       }
     }
   }
@@ -260,6 +262,12 @@ public class FastAlignedSeriesCompactionExecutor extends 
SeriesCompactionExecuto
                 }
               });
 
+      CompactionUtils.removeDeletedObjectFiles(
+          readerCacheMap.get(resource),
+          alignedChunkMetadataList,
+          timeModifications,
+          valueModifications);
+
       // modify aligned chunk metadatas
       ModificationUtils.modifyAlignedChunkMetaData(
           alignedChunkMetadataList, timeModifications, valueModifications, 
ignoreAllNullRows);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java
index e2be4be0cc2..402f06994b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java
@@ -51,6 +51,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -159,7 +160,10 @@ public class FastNonAlignedSeriesCompactionExecutor 
extends SeriesCompactionExec
         // add into queue
         chunkMetadataQueue.add(
             new ChunkMetadataElement(
-                chunkMetadata, i == iChunkMetadataList.size() - 1, 
fileElement));
+                chunkMetadata,
+                i == iChunkMetadataList.size() - 1,
+                fileElement,
+                Collections.emptyList()));
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
index 6828841f1bd..2d29e3944d8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.common.MergeRead
 
 import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
 
 import java.util.List;
 
@@ -42,12 +43,19 @@ public class ChunkMetadataElement {
 
   public boolean needForceDecodingPage;
 
+  // for aligned series
+  public List<IMeasurementSchema> measurementSchemasOfValueChunks;
+
   public ChunkMetadataElement(
-      IChunkMetadata chunkMetadata, boolean isLastChunk, FileElement 
fileElement) {
+      IChunkMetadata chunkMetadata,
+      boolean isLastChunk,
+      FileElement fileElement,
+      List<IMeasurementSchema> measurementSchemasOfValueChunks) {
     this.chunkMetadata = chunkMetadata;
     this.startTime = chunkMetadata.getStartTime();
     this.isLastChunk = isLastChunk;
     this.fileElement = fileElement;
+    this.measurementSchemasOfValueChunks = measurementSchemasOfValueChunks;
   }
 
   public void clearChunks() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
new file mode 100644
index 00000000000..4b5214c738d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.commons.exception.ObjectFileNotExist;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Optional;
+
+public class ObjectTypeUtils {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ObjectTypeUtils.class);
+  private static final TierManager TIER_MANAGER = TierManager.getInstance();
+
+  private ObjectTypeUtils() {}
+
+  public static File getObjectPathFromBinary(Binary binary) {
+    byte[] bytes = binary.getValues();
+    String relativeObjectFilePath =
+        new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
+    Optional<File> file = 
TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath);
+    if (!file.isPresent()) {
+      throw new ObjectFileNotExist(relativeObjectFilePath);
+    }
+    return file.get();
+  }
+
+  public static Optional<File> getNullableObjectPathFromBinary(Binary binary) {
+    byte[] bytes = binary.getValues();
+    String relativeObjectFilePath =
+        new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
+    return TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath);
+  }
+
+  public static void deleteObjectPathFromBinary(Binary binary) {
+    Optional<File> file = 
ObjectTypeUtils.getNullableObjectPathFromBinary(binary);
+    if (!file.isPresent()) {
+      return;
+    }
+    logger.info("Remove object file {}", file.get().getAbsolutePath());
+    try {
+      Files.deleteIfExists(file.get().toPath());
+    } catch (IOException e) {
+      logger.error("Failed to remove object file {}", 
file.get().getAbsolutePath(), e);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
new file mode 100644
index 00000000000..695681ea009
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
+import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.read.common.TimeRange;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CompactionDeleteObjectFileTest extends AbstractCompactionTest {
+  @Before
+  public void setUp()
+      throws IOException, WriteProcessException, MetadataException, 
InterruptedException {
+    super.setUp();
+  }
+
+  @After
+  public void tearDown() throws IOException, StorageEngineException {
+    super.tearDown();
+  }
+
+  @Test
+  public void test1() throws IOException {
+    File dir = new File("/Users/shuww/Downloads/0708/1_副本");
+    List<TsFileResource> resources = new ArrayList<>();
+    for (File file : dir.listFiles()) {
+      if (!file.getName().endsWith(".tsfile")) {
+        continue;
+      }
+      TsFileResource resource = new TsFileResource(file);
+      try (ModificationFile modificationFile = resource.getExclusiveModFile()) 
{
+        modificationFile.write(
+            new TableDeletionEntry(
+                new DeletionPredicate(
+                    "tsfile_table",
+                    new IDPredicate.FullExactMatch(
+                        new StringArrayDeviceID(new String[] {"tsfile_table", 
"1", "5", "3"})),
+                    Arrays.asList("file")),
+                new TimeRange(1, 20)));
+      }
+      resource.deserialize();
+      resources.add(resource);
+    }
+
+    InnerSpaceCompactionTask task =
+        new InnerSpaceCompactionTask(
+            0, tsFileManager, resources, true, new 
ReadChunkCompactionPerformer(), 0);
+    task.start();
+  }
+}

Reply via email to