This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new 848f4ddc987 modify compaction
848f4ddc987 is described below
commit 848f4ddc987b1513903adbddfbd6920f4772105c
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jul 8 17:41:51 2025 +0800
modify compaction
---
.../main/java/org/apache/iotdb/ObjectExample.java | 6 +-
.../relational/function/scalar/GeoPenetrate.java | 2 +-
.../unary/scalar/ReadObjectColumnTransformer.java | 19 +--
.../execute/task/SettleCompactionTask.java | 8 +-
.../compaction/execute/utils/CompactionUtils.java | 171 +++++++++++++++++++++
.../execute/utils/MultiTsFileDeviceIterator.java | 8 +-
.../fast/FastAlignedSeriesCompactionExecutor.java | 10 +-
.../FastNonAlignedSeriesCompactionExecutor.java | 6 +-
.../fast/element/ChunkMetadataElement.java | 10 +-
.../org/apache/iotdb/db/utils/ObjectTypeUtils.java | 72 +++++++++
.../compaction/CompactionDeleteObjectFileTest.java | 85 ++++++++++
11 files changed, 371 insertions(+), 26 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
index 36cab4c8d0d..635141edd10 100644
--- a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
@@ -82,7 +82,8 @@ public class ObjectExample {
true,
0,
Files.readAllBytes(
-
Paths.get("/Users/ht/Downloads/2_1746622362350_fa24aa15233f4e76bcda789a5771f43f")));
+ Paths.get(
+
"/Users/shuww/Downloads/2_1746622362350_fa24aa15233f4e76bcda789a5771f43f")));
session.insert(tablet);
tablet.reset();
@@ -99,7 +100,8 @@ public class ObjectExample {
true,
0,
Files.readAllBytes(
-
Paths.get("/Users/ht/Downloads/2_1746622367063_8fb5ac8e21724140874195b60b878664")));
+ Paths.get(
+
"/Users/shuww/Downloads/2_1746622367063_8fb5ac8e21724140874195b60b878664")));
session.insert(tablet);
tablet.reset();
//
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/scalar/GeoPenetrate.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/scalar/GeoPenetrate.java
index 440b3728bc4..45e512ebd27 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/scalar/GeoPenetrate.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/scalar/GeoPenetrate.java
@@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import static
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.ReadObjectColumnTransformer.getObjectPathFromBinary;
+import static
org.apache.iotdb.db.utils.ObjectTypeUtils.getObjectPathFromBinary;
public class GeoPenetrate implements ScalarFunction {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
index 496e0dd0dbb..84a094e39fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/scalar/ReadObjectColumnTransformer.java
@@ -20,16 +20,14 @@
package org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
-import org.apache.iotdb.commons.exception.ObjectFileNotExist;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
import
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
-import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.Type;
import org.apache.tsfile.utils.Binary;
@@ -43,8 +41,6 @@ import java.util.Optional;
public class ReadObjectColumnTransformer extends UnaryColumnTransformer {
- private static final TierManager TIER_MANAGER = TierManager.getInstance();
-
private final Optional<FragmentInstanceContext> fragmentInstanceContext;
private long offset = 0;
private long length = -1;
@@ -110,7 +106,7 @@ public class ReadObjectColumnTransformer extends
UnaryColumnTransformer {
}
private Binary readObject(Binary binary) {
- File file = getObjectPathFromBinary(binary);
+ File file = ObjectTypeUtils.getObjectPathFromBinary(binary);
long fileSize = file.length();
if (offset >= fileSize) {
throw new UnsupportedOperationException("offset is greater than object
size");
@@ -130,15 +126,4 @@ public class ReadObjectColumnTransformer extends
UnaryColumnTransformer {
}
return new Binary(bytes);
}
-
- public static File getObjectPathFromBinary(Binary binary) {
- byte[] bytes = binary.getValues();
- String relativeObjectFilePath =
- new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
- Optional<File> file =
TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath);
- if (!file.isPresent()) {
- throw new ObjectFileNotExist(relativeObjectFilePath);
- }
- return file.get();
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
index e00703e5e75..79d6a41025b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/SettleCompactionTask.java
@@ -18,10 +18,12 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.SimpleCompactionLogger;
@@ -204,7 +206,7 @@ public class SettleCompactionTask extends
InnerSpaceCompactionTask {
return isSuccess;
}
- public boolean settleWithFullyDirtyFiles() {
+ public boolean settleWithFullyDirtyFiles() throws IllegalPathException,
IOException {
if (fullyDirtyFiles.isEmpty()) {
return true;
}
@@ -213,6 +215,8 @@ public class SettleCompactionTask extends
InnerSpaceCompactionTask {
if (recoverMemoryStatus) {
tsFileManager.remove(resource, resource.isSeq());
}
+ CompactionUtils.removeDeletedObjectFiles(resource);
+
boolean res = deleteTsFileOnDisk(resource);
if (res) {
fullyDeletedSuccessNum++;
@@ -288,7 +292,7 @@ public class SettleCompactionTask extends
InnerSpaceCompactionTask {
}
}
- public void recoverFullyDirtyFiles() {
+ public void recoverFullyDirtyFiles() throws IllegalPathException,
IOException {
if (!settleWithFullyDirtyFiles()) {
throw new CompactionRecoverException("Failed to delete fully_dirty
source file.");
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
index 44905afc461..d744b14e087 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java
@@ -29,6 +29,8 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.service.metrics.FileMetrics;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionAlignedChunkReader;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
import
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
import
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.FullExactMatch;
@@ -39,21 +41,36 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEn
import
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
+import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.SystemMetric;
import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@@ -448,4 +465,158 @@ public class CompactionUtils {
new TimeRange(Long.MIN_VALUE, timeLowerBound));
}
}
+
+ public static void removeDeletedObjectFiles(TsFileResource resource)
+ throws IOException, IllegalPathException {
+ try (MultiTsFileDeviceIterator deviceIterator =
+ new MultiTsFileDeviceIterator(Collections.singletonList(resource))) {
+ while (deviceIterator.hasNextDevice()) {
+ deviceIterator.nextDevice();
+ deviceIterator.getReaderAndChunkMetadataForCurrentAlignedSeries();
+ }
+ }
+ }
+
+ public static void removeDeletedObjectFiles(
+ TsFileSequenceReader reader,
+ List<AbstractAlignedChunkMetadata> alignedChunkMetadataList,
+ List<ModEntry> timeMods,
+ List<List<ModEntry>> valueMods)
+ throws IOException {
+ if (alignedChunkMetadataList.isEmpty()) {
+ return;
+ }
+ List<Integer> objectColumnIndexList = new ArrayList<>();
+ List<List<ModEntry>> objectDeletionIntervalList = new ArrayList<>();
+ boolean objectColumnHasDeletion = false;
+
+ TSDataType[] dataTypes = new TSDataType[valueMods.size()];
+ for (AbstractAlignedChunkMetadata alignedChunkMetadata :
alignedChunkMetadataList) {
+ boolean hasNull = false;
+ for (int i = 0; i <
alignedChunkMetadata.getValueChunkMetadataList().size(); i++) {
+ if (dataTypes[i] != null) {
+ continue;
+ }
+ IChunkMetadata chunkMetadata =
alignedChunkMetadata.getValueChunkMetadataList().get(i);
+ if (chunkMetadata == null) {
+ hasNull = true;
+ continue;
+ }
+ dataTypes[i] = chunkMetadata.getDataType();
+ if (dataTypes[i] == TSDataType.OBJECT) {
+ objectColumnIndexList.add(i);
+ List<ModEntry> deletionInterval =
ModificationUtils.sortAndMerge(valueMods.get(i));
+ objectColumnHasDeletion |= !deletionInterval.isEmpty();
+ objectDeletionIntervalList.add(deletionInterval);
+ }
+ }
+ if (!hasNull) {
+ break;
+ }
+ }
+ if (!objectColumnHasDeletion) {
+ return;
+ }
+ int[] deletionCursors = new int[objectColumnIndexList.size() + 1];
+ List<ModEntry> timeDeletionIntervalList =
ModificationUtils.sortAndMerge(timeMods);
+ for (AbstractAlignedChunkMetadata alignedChunkMetadata :
alignedChunkMetadataList) {
+ CompactionUtils.removeDeletedObjectFiles(
+ reader,
+ alignedChunkMetadata,
+ objectColumnIndexList,
+ timeDeletionIntervalList,
+ objectDeletionIntervalList,
+ deletionCursors);
+ }
+ }
+
+ private static void removeDeletedObjectFiles(
+ TsFileSequenceReader reader,
+ AbstractAlignedChunkMetadata alignedChunkMetadata,
+ List<Integer> objectColumnIndexList,
+ List<ModEntry> timeDeletions,
+ List<List<ModEntry>> objectDeletions,
+ int[] deletionCursors)
+ throws IOException {
+ Chunk timeChunk =
+ reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
+ CompactionChunkReader compactionChunkReader = new
CompactionChunkReader(timeChunk);
+ List<Pair<PageHeader, ByteBuffer>> timePages =
+ compactionChunkReader.readPageDataWithoutUncompressing();
+
+ List<Chunk> valueChunks = new ArrayList<>();
+ List<List<Pair<PageHeader, ByteBuffer>>> valuePages = new ArrayList<>();
+
+ for (int i = 0; i < objectColumnIndexList.size(); i++) {
+ int idxInAlignedChunkMetadata = objectColumnIndexList.get(i);
+ if (timeDeletions.isEmpty() && objectDeletions.get(i).isEmpty()) {
+ continue;
+ }
+ ChunkMetadata valueChunkMetadata =
+ (ChunkMetadata)
+
alignedChunkMetadata.getValueChunkMetadataList().get(idxInAlignedChunkMetadata);
+ if (valueChunkMetadata == null) {
+ continue;
+ }
+ Chunk chunk = reader.readMemChunk(valueChunkMetadata);
+ valueChunks.add(chunk);
+ valuePages.add(
+ chunk == null
+ ? null
+ : new
CompactionChunkReader(chunk).readPageDataWithoutUncompressing());
+ }
+
+ CompactionAlignedChunkReader alignedChunkReader =
+ new CompactionAlignedChunkReader(timeChunk, valueChunks, true);
+ for (int i = 0; i < timePages.size(); i++) {
+ Pair<PageHeader, ByteBuffer> timePage = timePages.get(i);
+ List<PageHeader> valuePageHeaders = new ArrayList<>(valuePages.size());
+ List<ByteBuffer> compressedValuePages = new
ArrayList<>(valuePages.size());
+ for (int j = 0; j < valuePages.size(); j++) {
+ Pair<PageHeader, ByteBuffer> valuePage = valuePages.get(j).get(i);
+ valuePageHeaders.add(valuePage.getLeft());
+ compressedValuePages.add(valuePage.getRight());
+ }
+ IPointReader pagePointReader =
+ alignedChunkReader.getPagePointReader(
+ timePage.getLeft(), valuePageHeaders, timePage.getRight(),
compressedValuePages);
+
+ while (pagePointReader.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = pagePointReader.nextTimeValuePair();
+ removeDeletedObjectFiles(timeValuePair, deletionCursors,
timeDeletions, objectDeletions);
+ }
+ }
+ }
+
+ private static void removeDeletedObjectFiles(
+ TimeValuePair timeValuePair,
+ int[] cursors,
+ List<ModEntry> timeDeletions,
+ List<List<ModEntry>> objectDeletions) {
+ long timestamp = timeValuePair.getTimestamp();
+ boolean timeDeleted = isDeleted(timestamp, timeDeletions, cursors, 0);
+ for (int i = 0; i < timeValuePair.getValues().length; i++) {
+ Binary value = (Binary) timeValuePair.getValues()[i];
+ if (value == null) {
+ continue;
+ }
+ if (timeDeleted || isDeleted(timestamp, objectDeletions.get(i), cursors,
i + 1)) {
+ ObjectTypeUtils.deleteObjectPathFromBinary(value);
+ }
+ }
+ }
+
+ private static boolean isDeleted(
+ long timestamp, List<ModEntry> deleteIntervalList, int[] deleteCursors,
int idx) {
+ while (deleteIntervalList != null && deleteCursors[idx] <
deleteIntervalList.size()) {
+ if
(deleteIntervalList.get(deleteCursors[idx]).getTimeRange().contains(timestamp))
{
+ return true;
+ } else if
(deleteIntervalList.get(deleteCursors[idx]).getTimeRange().getMax() <
timestamp) {
+ deleteCursors[idx]++;
+ } else {
+ return false;
+ }
+ }
+ return false;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index ea886aafd78..1c272cb3453 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -424,7 +424,7 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
*/
private void applyModificationForAlignedChunkMetadataList(
TsFileResource tsFileResource, List<AbstractAlignedChunkMetadata>
alignedChunkMetadataList)
- throws IllegalPathException {
+ throws IllegalPathException, IOException {
if (alignedChunkMetadataList.isEmpty()) {
// all the value chunks is empty chunk
return;
@@ -476,6 +476,12 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
modificationList.isEmpty() ? Collections.emptyList() :
modificationList);
}
+ CompactionUtils.removeDeletedObjectFiles(
+ readerMap.get(tsFileResource),
+ alignedChunkMetadataList,
+ modificationForTimeColumn,
+ modificationForValueColumns);
+
ModificationUtils.modifyAlignedChunkMetaData(
alignedChunkMetadataList,
modificationForTimeColumn,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
index 7ad78814c5f..4817d855ca0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.db.exception.WriteProcessException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.AlignedSeriesBatchCompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
@@ -171,7 +172,8 @@ public class FastAlignedSeriesCompactionExecutor extends
SeriesCompactionExecuto
new ChunkMetadataElement(
alignedChunkMetadataList.get(i),
i == alignedChunkMetadataList.size() - 1,
- fileElement));
+ fileElement,
+ measurementSchemas));
}
}
}
@@ -260,6 +262,12 @@ public class FastAlignedSeriesCompactionExecutor extends
SeriesCompactionExecuto
}
});
+ CompactionUtils.removeDeletedObjectFiles(
+ readerCacheMap.get(resource),
+ alignedChunkMetadataList,
+ timeModifications,
+ valueModifications);
+
// modify aligned chunk metadatas
ModificationUtils.modifyAlignedChunkMetaData(
alignedChunkMetadataList, timeModifications, valueModifications,
ignoreAllNullRows);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java
index e2be4be0cc2..402f06994b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java
@@ -51,6 +51,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -159,7 +160,10 @@ public class FastNonAlignedSeriesCompactionExecutor
extends SeriesCompactionExec
// add into queue
chunkMetadataQueue.add(
new ChunkMetadataElement(
- chunkMetadata, i == iChunkMetadataList.size() - 1,
fileElement));
+ chunkMetadata,
+ i == iChunkMetadataList.size() - 1,
+ fileElement,
+ Collections.emptyList()));
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
index 6828841f1bd..2d29e3944d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/ChunkMetadataElement.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.reader.common.MergeRead
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.util.List;
@@ -42,12 +43,19 @@ public class ChunkMetadataElement {
public boolean needForceDecodingPage;
+ // for aligned series
+ public List<IMeasurementSchema> measurementSchemasOfValueChunks;
+
public ChunkMetadataElement(
- IChunkMetadata chunkMetadata, boolean isLastChunk, FileElement
fileElement) {
+ IChunkMetadata chunkMetadata,
+ boolean isLastChunk,
+ FileElement fileElement,
+ List<IMeasurementSchema> measurementSchemasOfValueChunks) {
this.chunkMetadata = chunkMetadata;
this.startTime = chunkMetadata.getStartTime();
this.isLastChunk = isLastChunk;
this.fileElement = fileElement;
+ this.measurementSchemasOfValueChunks = measurementSchemasOfValueChunks;
}
public void clearChunks() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
new file mode 100644
index 00000000000..4b5214c738d
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.commons.exception.ObjectFileNotExist;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Optional;
+
+public class ObjectTypeUtils {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ObjectTypeUtils.class);
+ private static final TierManager TIER_MANAGER = TierManager.getInstance();
+
+ private ObjectTypeUtils() {}
+
+ public static File getObjectPathFromBinary(Binary binary) {
+ byte[] bytes = binary.getValues();
+ String relativeObjectFilePath =
+ new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
+ Optional<File> file =
TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath);
+ if (!file.isPresent()) {
+ throw new ObjectFileNotExist(relativeObjectFilePath);
+ }
+ return file.get();
+ }
+
+ public static Optional<File> getNullableObjectPathFromBinary(Binary binary) {
+ byte[] bytes = binary.getValues();
+ String relativeObjectFilePath =
+ new String(bytes, 8, bytes.length - 8, TSFileConfig.STRING_CHARSET);
+ return TIER_MANAGER.getAbsoluteObjectFilePath(relativeObjectFilePath);
+ }
+
+ public static void deleteObjectPathFromBinary(Binary binary) {
+ Optional<File> file =
ObjectTypeUtils.getNullableObjectPathFromBinary(binary);
+ if (!file.isPresent()) {
+ return;
+ }
+ logger.info("Remove object file {}", file.get().getAbsolutePath());
+ try {
+ Files.deleteIfExists(file.get().toPath());
+ } catch (IOException e) {
+ logger.error("Failed to remove object file {}",
file.get().getAbsolutePath(), e);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
new file mode 100644
index 00000000000..695681ea009
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionDeleteObjectFileTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.compaction;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
+import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.read.common.TimeRange;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class CompactionDeleteObjectFileTest extends AbstractCompactionTest {
+ @Before
+ public void setUp()
+ throws IOException, WriteProcessException, MetadataException,
InterruptedException {
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ }
+
+ @Test
+ public void test1() throws IOException {
+ File dir = new File("/Users/shuww/Downloads/0708/1_副本");
+ List<TsFileResource> resources = new ArrayList<>();
+ for (File file : dir.listFiles()) {
+ if (!file.getName().endsWith(".tsfile")) {
+ continue;
+ }
+ TsFileResource resource = new TsFileResource(file);
+ try (ModificationFile modificationFile = resource.getExclusiveModFile())
{
+ modificationFile.write(
+ new TableDeletionEntry(
+ new DeletionPredicate(
+ "tsfile_table",
+ new IDPredicate.FullExactMatch(
+ new StringArrayDeviceID(new String[] {"tsfile_table",
"1", "5", "3"})),
+ Arrays.asList("file")),
+ new TimeRange(1, 20)));
+ }
+ resource.deserialize();
+ resources.add(resource);
+ }
+
+ InnerSpaceCompactionTask task =
+ new InnerSpaceCompactionTask(
+ 0, tsFileManager, resources, true, new
ReadChunkCompactionPerformer(), 0);
+ task.start();
+ }
+}