This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch force_ci/support_schema_evolution in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ce18f8d5c54a71f98e5bba3b692b2a68c2e2b219 Author: Tian Jiang <[email protected]> AuthorDate: Tue Jan 13 16:22:37 2026 +0800 Support sevo in FastCompactionPerformer and cross compaction --- .../execution/operator/source/SeriesScanUtil.java | 4 +- .../performer/impl/FastCompactionPerformer.java | 10 +- .../subtask/FastCompactionPerformerSubTask.java | 9 +- .../execute/utils/MultiTsFileDeviceIterator.java | 4 +- .../utils/ReorderedTsFileDeviceIterator.java | 1 - ...BatchedFastAlignedSeriesCompactionExecutor.java | 24 +- .../fast/FastAlignedSeriesCompactionExecutor.java | 30 +- .../FastNonAlignedSeriesCompactionExecutor.java | 6 +- .../executor/fast/SeriesCompactionExecutor.java | 11 +- .../writer/AbstractCrossCompactionWriter.java | 20 +- .../compaction/io/CompactionTsFileWriter.java | 18 +- .../dataregion/read/QueryDataSource.java | 4 +- .../read/control/QueryResourceManager.java | 7 +- .../dataregion/tsfile/TsFileResource.java | 5 +- .../dataregion/tsfile/evolution/EvolvedSchema.java | 11 + .../compaction/CompactionWithSevoTest.java | 674 +++++++++++++++++++++ .../FastInnerCompactionPerformerTest.java | 21 + .../compaction/ReadChunkInnerCompactionTest.java | 289 --------- 18 files changed, 809 insertions(+), 339 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index b232b1377d6..01513207ee3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -1782,7 +1782,7 @@ public class SeriesScanUtil implements Accountable { public boolean hasNextUnseqResource() { while (dataSource.hasNextUnseqResource(curUnseqFileIndex, false, deviceID, maxTsFileSetEndVersion)) { if (dataSource.isUnSeqSatisfied( - deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false)) { + deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false, maxTsFileSetEndVersion)) { break; } curUnseqFileIndex++; @@ -1911,7 +1911,7 @@ public class SeriesScanUtil implements Accountable { public boolean hasNextUnseqResource() { while (dataSource.hasNextUnseqResource(curUnseqFileIndex, true, deviceID, maxTsFileSetEndVersion)) { if (dataSource.isUnSeqSatisfied( - deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false)) { + deviceID, curUnseqFileIndex, scanOptions.getGlobalTimeFilter(), false, maxTsFileSetEndVersion)) { break; } curUnseqFileIndex++; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index 4c4b88e765f..2acf0afa17f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -213,20 +213,14 @@ public class FastCompactionPerformer maxTsFileSetEndVersionAndMinResource.left); IDeviceID originalDevice = device; if (evolvedSchema != null) { - originalDevice = evolvedSchema.rewriteToFinal(device); + originalDevice = evolvedSchema.rewriteToOriginal(device); } return x.definitelyNotContains(originalDevice) || !x.isDeviceAlive(originalDevice, ttl); }); // checked above sortedSourceFiles.sort(Comparator.comparingLong(x -> { - EvolvedSchema evolvedSchema = x.getMergedEvolvedSchema( - maxTsFileSetEndVersionAndMinResource.left); - IDeviceID originalDevice = device; - if (evolvedSchema != null) { - originalDevice = evolvedSchema.rewriteToFinal(device); - } //noinspection OptionalGetWithoutIsPresent - return x.getStartTime(originalDevice).get(); + return x.getStartTime(device, maxTsFileSetEndVersionAndMinResource.left).get(); })); ModEntry ttlDeletion = null; if (ttl != Long.MAX_VALUE) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/FastCompactionPerformerSubTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/FastCompactionPerformerSubTask.java index 33393370641..1050e40bd56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/FastCompactionPerformerSubTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/subtask/FastCompactionPerformerSubTask.java @@ -144,7 +144,8 @@ public class FastCompactionPerformerSubTask implements Callable<Void> { sortedSourceFiles, deviceId, subTaskId, - summary); + summary, + maxTsFileSetEndVersionAndMinResource); for (String measurement : measurements) { seriesCompactionExecutor.setNewMeasurement(timeseriesMetadataOffsetMap.get(measurement)); seriesCompactionExecutor.execute(); @@ -166,7 +167,8 @@ public class FastCompactionPerformerSubTask implements Callable<Void> { subTaskId, measurementSchemas, summary, - ignoreAllNullRows); + ignoreAllNullRows, + maxTsFileSetEndVersionAndMinResource); } else { seriesCompactionExecutor = new FastAlignedSeriesCompactionExecutor( @@ -179,7 +181,8 @@ public class FastCompactionPerformerSubTask implements Callable<Void> { subTaskId, measurementSchemas, summary, - ignoreAllNullRows); + ignoreAllNullRows, + maxTsFileSetEndVersionAndMinResource); } seriesCompactionExecutor.execute(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index 13c83cbf21b..b5878853187 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -554,7 +554,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { } IDeviceID device = currentDevice.getLeft(); ModEntry ttlDeletion = null; - Optional<Long> startTime = tsFileResource.getStartTime(device); + Optional<Long> startTime = tsFileResource.getStartTime(device, maxTsFileSetEndVersion); if (startTime.isPresent() && startTime.get() < timeLowerBoundForCurrentDevice) { ttlDeletion = CompactionUtils.convertTtlToDeletion(device, timeLowerBoundForCurrentDevice); } @@ -780,7 +780,7 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { Map<String, List<ChunkMetadata>> chunkMetadataListMap = chunkMetadataCacheMap.get(reader); ModEntry ttlDeletion = null; - Optional<Long> startTime = resource.getStartTime(device); + Optional<Long> startTime = resource.getStartTime(device, maxTsFileSetEndVersion); if (startTime.isPresent() && startTime.get() < timeLowerBoundForCurrentDevice) { ttlDeletion = new TreeDeletionEntry( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java index 3b9bf6ab8da..1a05fedb2a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/ReorderedTsFileDeviceIterator.java @@ -35,7 +35,6 @@ public class ReorderedTsFileDeviceIterator extends TransformedTsFileDeviceIterat private void collectAndSort() throws IOException { while (super.hasNext()) { Pair<IDeviceID, Boolean> next = super.next(); - next.left = transformer.apply(next.left); deviceIDAndFirstMeasurementNodeList.add( new Pair<>(next, super.getFirstMeasurementNodeOfCurrentDevice())); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java index 0c1f12e9886..787452be2dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/BatchedFastAlignedSeriesCompactionExecutor.java @@ -89,7 +89,8 @@ public class BatchedFastAlignedSeriesCompactionExecutor int subTaskId, List<IMeasurementSchema> measurementSchemas, FastCompactionTaskSummary summary, - boolean ignoreAllNullRows) { + boolean ignoreAllNullRows, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { super( compactionWriter, timeseriesMetadataOffsetMap, @@ -100,7 +101,8 @@ public class BatchedFastAlignedSeriesCompactionExecutor subTaskId, measurementSchemas, summary, - ignoreAllNullRows); + ignoreAllNullRows, + maxTsFileSetEndVersionAndMinResource); timeSchema = measurementSchemas.remove(0); valueMeasurementSchemas = measurementSchemas; this.batchColumnSelection = @@ -171,7 +173,8 @@ public class BatchedFastAlignedSeriesCompactionExecutor subTaskId, selectedMeasurementSchemas, summary, - ignoreAllNullRows); + ignoreAllNullRows, + maxTsFileSetEndVersionAndMinResource); executor.execute(); LOGGER.debug( "[Batch Compaction] current device is {}, first batch compacted time chunk is {}", @@ -199,7 +202,8 @@ public class BatchedFastAlignedSeriesCompactionExecutor subTaskId, currentBatchMeasurementSchemas, summary, - ignoreAllNullRows); + ignoreAllNullRows, + maxTsFileSetEndVersionAndMinResource); executor.execute(); } } @@ -230,7 +234,8 @@ public class BatchedFastAlignedSeriesCompactionExecutor int subTaskId, List<IMeasurementSchema> measurementSchemas, FastCompactionTaskSummary summary, - boolean ignoreAllNullRows) { + boolean ignoreAllNullRows, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { super( compactionWriter, timeseriesMetadataOffsetMap, @@ -241,7 +246,8 @@ public class BatchedFastAlignedSeriesCompactionExecutor subTaskId, measurementSchemas, summary, - ignoreAllNullRows); + ignoreAllNullRows, + maxTsFileSetEndVersionAndMinResource); isBatchedCompaction = true; } @@ -340,7 +346,8 @@ public class BatchedFastAlignedSeriesCompactionExecutor int subTaskId, List<IMeasurementSchema> measurementSchemas, FastCompactionTaskSummary summary, - boolean ignoreAllNullRows) { + boolean ignoreAllNullRows, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { super( compactionWriter, timeseriesMetadataOffsetMap, @@ -351,7 +358,8 @@ public class BatchedFastAlignedSeriesCompactionExecutor subTaskId, measurementSchemas, summary, - ignoreAllNullRows); + ignoreAllNullRows, + maxTsFileSetEndVersionAndMinResource); isBatchedCompaction = true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java index 4e71c03d25f..faab7c3d91d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.EvolvedSchema; import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; @@ -87,9 +88,10 @@ public class FastAlignedSeriesCompactionExecutor extends SeriesCompactionExecuto int subTaskId, List<IMeasurementSchema> measurementSchemas, FastCompactionTaskSummary summary, - boolean ignoreAllNullRows) { + boolean ignoreAllNullRows, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { super( - compactionWriter, readerCacheMap, modificationCacheMap, deviceId, true, subTaskId, summary); + compactionWriter, readerCacheMap, modificationCacheMap, deviceId, true, subTaskId, summary, maxTsFileSetEndVersionAndMinResource); this.timeseriesMetadataOffsetMap = timeseriesMetadataOffsetMap; this.measurementSchemas = measurementSchemas; this.timeColumnMeasurementSchema = measurementSchemas.get(0); @@ -185,6 +187,9 @@ public class FastAlignedSeriesCompactionExecutor extends SeriesCompactionExecuto // read time chunk metadatas and value chunk metadatas in the current file List<IChunkMetadata> timeChunkMetadatas = null; List<List<IChunkMetadata>> valueChunkMetadatas = new ArrayList<>(); + EvolvedSchema evolvedSchema = resource.getMergedEvolvedSchema( + maxTsFileSetEndVersionAndMinResource.getLeft()); + for (Map.Entry<String, Map<TsFileResource, Pair<Long, Long>>> entry : timeseriesMetadataOffsetMap.entrySet()) { String measurementID = entry.getKey(); @@ -213,7 +218,7 @@ public class FastAlignedSeriesCompactionExecutor extends SeriesCompactionExecuto .get(resource) .getChunkMetadataListByTimeseriesMetadataOffset( timeseriesOffsetInCurrentFile.left, timeseriesOffsetInCurrentFile.right); - if (isValueChunkDataTypeMatchSchema(valueColumnChunkMetadataList)) { + if (isValueChunkDataTypeMatchSchema(valueColumnChunkMetadataList, evolvedSchema)) { valueChunkMetadatas.add(valueColumnChunkMetadataList); } else { valueChunkMetadatas.add(null); @@ -267,17 +272,29 @@ public class FastAlignedSeriesCompactionExecutor extends SeriesCompactionExecuto // modify aligned chunk metadatas ModificationUtils.modifyAlignedChunkMetaData( alignedChunkMetadataList, timeModifications, valueModifications, ignoreAllNullRows); + + if (evolvedSchema != null) { + String originalTableName = evolvedSchema.getOriginalTableName(deviceId.getTableName()); + for (AbstractAlignedChunkMetadata abstractAlignedChunkMetadata : alignedChunkMetadataList) { + evolvedSchema.rewriteToFinal(abstractAlignedChunkMetadata, originalTableName); + } + } } return alignedChunkMetadataList; } private boolean isValueChunkDataTypeMatchSchema( - List<IChunkMetadata> chunkMetadataListOfOneValueColumn) { + List<IChunkMetadata> chunkMetadataListOfOneValueColumn, + EvolvedSchema evolvedSchema) { for (IChunkMetadata chunkMetadata : chunkMetadataListOfOneValueColumn) { if (chunkMetadata == null) { continue; } String measurement = chunkMetadata.getMeasurementUid(); + if (evolvedSchema != null) { + String originalTableName = evolvedSchema.getOriginalTableName(deviceId.getTableName()); + measurement = evolvedSchema.getFinalColumnName(originalTableName, measurement); + } IMeasurementSchema schema = measurementSchemaMap.get(measurement); return schema.getType() == chunkMetadata.getDataType(); } @@ -362,7 +379,10 @@ public class FastAlignedSeriesCompactionExecutor extends SeriesCompactionExecuto valueChunks.add(null); continue; } - valueChunks.add(readChunk(reader, (ChunkMetadata) valueChunkMetadata)); + Chunk chunk = readChunk(reader, (ChunkMetadata) valueChunkMetadata); + // the column may be renamed, enqueue with the final column name + chunk.getHeader().setMeasurementID(valueChunkMetadata.getMeasurementUid()); + valueChunks.add(chunk); } chunkMetadataElement.valueChunks = valueChunks; setForceDecoding(chunkMetadataElement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java index b07407c7db6..fdd5f43f42f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java @@ -77,7 +77,8 @@ public class FastNonAlignedSeriesCompactionExecutor extends SeriesCompactionExec List<TsFileResource> sortedSourceFiles, IDeviceID deviceId, int subTaskId, - FastCompactionTaskSummary summary) { + FastCompactionTaskSummary summary, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { super( compactionWriter, readerCacheMap, @@ -85,7 +86,8 @@ public class FastNonAlignedSeriesCompactionExecutor extends SeriesCompactionExec deviceId, false, subTaskId, - summary); + summary, + maxTsFileSetEndVersionAndMinResource); this.sortResources = sortedSourceFiles; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java index b3073bd3d25..6525132fef6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.PriorityQueue; +import org.apache.tsfile.utils.Pair; public abstract class SeriesCompactionExecutor { @@ -97,6 +98,8 @@ public abstract class SeriesCompactionExecutor { protected boolean isAligned; + protected final Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource; + protected SeriesCompactionExecutor( AbstractCompactionWriter compactionWriter, Map<TsFileResource, TsFileSequenceReader> readerCacheMap, @@ -105,7 +108,8 @@ public abstract class SeriesCompactionExecutor { IDeviceID deviceId, boolean isAligned, int subTaskId, - FastCompactionTaskSummary summary) { + FastCompactionTaskSummary summary, + Pair<Long, TsFileResource> maxTsFileSetEndVersionAndMinResource) { this.compactionWriter = compactionWriter; this.subTaskId = subTaskId; this.deviceId = deviceId; @@ -128,6 +132,7 @@ public abstract class SeriesCompactionExecutor { int timeCompare = Long.compare(o1.getStartTime(), o2.getStartTime()); return timeCompare != 0 ? timeCompare : o2.getPriority().compareTo(o1.getPriority()); }); + this.maxTsFileSetEndVersionAndMinResource = maxTsFileSetEndVersionAndMinResource; } public abstract void execute() @@ -350,12 +355,12 @@ public abstract class SeriesCompactionExecutor { */ protected List<FileElement> findOverlapFiles(FileElement fileToCheck) { List<FileElement> overlappedFiles = new ArrayList<>(); - Optional<Long> endTimeInCheckingFile = fileToCheck.resource.getEndTime(deviceId); + Optional<Long> endTimeInCheckingFile = fileToCheck.resource.getEndTime(deviceId, maxTsFileSetEndVersionAndMinResource.left); for (FileElement otherFile : fileList) { if (!endTimeInCheckingFile.isPresent()) { continue; } - Optional<Long> startTimeInOtherFile = otherFile.resource.getStartTime(deviceId); + Optional<Long> startTimeInOtherFile = otherFile.resource.getStartTime(deviceId, maxTsFileSetEndVersionAndMinResource.left); if (startTimeInOtherFile.isPresent() && startTimeInOtherFile.get() <= endTimeInCheckingFile.get()) { if (!otherFile.isSelected) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index 5c4a56ce54a..3c0fe38ae1e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -76,6 +76,8 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr private final EncryptParameter encryptParameter; + private final long maxTsFileSetEndVersion; + @TestOnly protected AbstractCrossCompactionWriter( List<TsFileResource> targetResources, List<TsFileResource> seqFileResources) @@ -116,6 +118,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr } this.seqTsFileResources = seqFileResources; this.targetResources = targetResources; + this.maxTsFileSetEndVersion = maxTsFileSetEndVersion; } @Override @@ -236,10 +239,17 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr private void checkIsDeviceExistAndGetDeviceEndTime() throws IOException { int fileIndex = 0; while (fileIndex < seqTsFileResources.size()) { - ITimeIndex timeIndex = seqTsFileResources.get(fileIndex).getTimeIndex(); + TsFileResource tsFileResource = seqTsFileResources.get(fileIndex); + EvolvedSchema evolvedSchema = tsFileResource.getMergedEvolvedSchema(maxTsFileSetEndVersion); + IDeviceID originalDeviceId = deviceId; + if (evolvedSchema != null) { + originalDeviceId = evolvedSchema.rewriteToOriginal(deviceId); + } + + ITimeIndex timeIndex = tsFileResource.getTimeIndex(); if (timeIndex.getTimeIndexType() != ITimeIndex.FILE_TIME_INDEX_TYPE) { // the timeIndexType of resource is deviceTimeIndex - Optional<Long> endTime = timeIndex.getEndTime(deviceId); + Optional<Long> endTime = timeIndex.getEndTime(originalDeviceId); currentDeviceEndTime[fileIndex] = endTime.orElse(Long.MIN_VALUE); isCurrentDeviceExistedInSourceSeqFiles[fileIndex] = endTime.isPresent(); } else { @@ -248,7 +258,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr // Fast compaction get reader from cache map, while read point compaction get reader from // FileReaderManager Map<String, TimeseriesMetadata> deviceMetadataMap = - getFileReader(seqTsFileResources.get(fileIndex)).readDeviceMetadata(deviceId); + getFileReader(tsFileResource).readDeviceMetadata(originalDeviceId); for (Map.Entry<String, TimeseriesMetadata> entry : deviceMetadataMap.entrySet()) { long tmpStartTime = entry.getValue().getStatistics().getStartTime(); long tmpEndTime = entry.getValue().getStatistics().getEndTime(); @@ -287,7 +297,9 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr targetResource.setTsFileManager(minVersionResource.getTsFileManager()); EvolvedSchema evolvedSchema = targetResource.getMergedEvolvedSchema(maxTsFileSetEndVersion); - schema = evolvedSchema.rewriteToOriginal(schema, CompactionTableSchema::new); + if (evolvedSchema != null) { + schema = evolvedSchema.rewriteToOriginal(schema, CompactionTableSchema::new); + } compactionTsFileWriter.setSchema(schema); } else { compactionTsFileWriter.setSchema(schema); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java index efe33d228f6..cd3bbefcbf7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/io/CompactionTsFileWriter.java @@ -55,7 +55,7 @@ public class CompactionTsFileWriter extends TsFileIOWriter { private volatile boolean isWritingAligned = false; private boolean isEmptyTargetFile = true; - private IDeviceID currentDeviceId; + private IDeviceID currentOriginalDeviceId; private final TsFileResource tsFileResource; private final EvolvedSchema evolvedSchema; @@ -114,7 +114,7 @@ public class CompactionTsFileWriter extends TsFileIOWriter { ? null : measurementName -> evolvedSchema.getOriginalColumnName( - evolvedSchema.getFinalTableName(currentDeviceId.getTableName()), measurementName)); + evolvedSchema.getFinalTableName(currentOriginalDeviceId.getTableName()), measurementName)); long writtenDataSize = this.getPos() - beforeOffset; CompactionMetrics.getInstance() .recordWriteInfo( @@ -130,11 +130,13 @@ public class CompactionTsFileWriter extends TsFileIOWriter { isEmptyTargetFile = false; } if (evolvedSchema != null) { + String finalTableName = evolvedSchema.getFinalTableName( + currentOriginalDeviceId.getTableName()); chunk .getHeader() .setMeasurementID( evolvedSchema.getOriginalColumnName( - currentDeviceId.getTableName(), chunk.getHeader().getMeasurementID())); + finalTableName, chunk.getHeader().getMeasurementID())); } super.writeChunk(chunk, chunkMetadata); long writtenDataSize = this.getPos() - beforeOffset; @@ -155,7 +157,7 @@ public class CompactionTsFileWriter extends TsFileIOWriter { throws IOException { if (evolvedSchema != null) { measurementId = - evolvedSchema.getOriginalColumnName(currentDeviceId.getTableName(), measurementId); + evolvedSchema.getOriginalColumnName(currentOriginalDeviceId.getTableName(), measurementId); } long beforeOffset = this.getPos(); super.writeEmptyValueChunk( @@ -177,21 +179,21 @@ public class CompactionTsFileWriter extends TsFileIOWriter { if (evolvedSchema != null) { deviceId = evolvedSchema.rewriteToOriginal(deviceId); } - currentDeviceId = deviceId; + currentOriginalDeviceId = deviceId; return super.startChunkGroup(deviceId); } @Override public void endChunkGroup() throws IOException { - if (currentDeviceId == null || chunkMetadataList.isEmpty()) { + if (currentOriginalDeviceId == null || chunkMetadataList.isEmpty()) { return; } - String tableName = currentDeviceId.getTableName(); + String tableName = currentOriginalDeviceId.getTableName(); TableSchema tableSchema = getSchema().getTableSchemaMap().get(tableName); boolean generateTableSchemaForCurrentChunkGroup = tableSchema != null; setGenerateTableSchema(generateTableSchemaForCurrentChunkGroup); super.endChunkGroup(); - currentDeviceId = null; + currentOriginalDeviceId = null; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java index b659585d9fe..b1f20ab81fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java @@ -168,7 +168,7 @@ public class QueryDataSource implements IQueryDataSource { } public boolean isUnSeqSatisfied( - IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug) { + IDeviceID deviceID, int curIndex, Filter timeFilter, boolean debug, long maxTsFileSetEndVersion) { if (curIndex != this.curUnSeqIndex) { throw new IllegalArgumentException( String.format( @@ -178,7 +178,7 @@ public class QueryDataSource implements IQueryDataSource { TsFileResource tsFileResource = unseqResources.get(unSeqFileOrderIndex[curIndex]); curUnSeqSatisfied = tsFileResource != null - && (isSingleDevice || tsFileResource.isSatisfied(deviceID, timeFilter, false, debug)); + && (isSingleDevice || tsFileResource.isSatisfied(deviceID, timeFilter, false, debug, maxTsFileSetEndVersion)); } return curUnSeqSatisfied; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/control/QueryResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/control/QueryResourceManager.java index 930d68b4e89..391775c8a81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/control/QueryResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/control/QueryResourceManager.java @@ -56,7 +56,12 @@ public class QueryResourceManager { * queryId = xx + Long.MIN_VALUE */ public long assignCompactionQueryId() { - long threadNum = Long.parseLong((Thread.currentThread().getName().split("-"))[5]); + long threadNum = 0; + try { + threadNum = Long.parseLong((Thread.currentThread().getName().split("-"))[5]); + } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) { + // test environment, ignore it + } long queryId = Long.MIN_VALUE + threadNum; filePathsManager.addQueryId(queryId); return queryId; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index de55bcd209f..7c0c653caf5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -1720,7 +1720,10 @@ public class TsFileResource implements PersistentResource, Cloneable { for (TsFileResource tsFileResource : tsFileResources) { List<TsFileSet> tsFileSets = tsFileResource.getTsFileSets(); if (tsFileSets.isEmpty()) { - continue; + // include the newest files that does not belong to any file sets, + // should apply all schema evolution + maxTsFileSetEndVersion = Long.MAX_VALUE; + break; } TsFileSet lastTsFileSet = tsFileSets.get(tsFileSets.size() - 1); if (lastTsFileSet.getEndVersion() > maxTsFileSetEndVersion) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java index 3dca08b13d7..8c04b4805f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/evolution/EvolvedSchema.java @@ -26,6 +26,8 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEn import org.apache.iotdb.db.storageengine.dataregion.modification.TagPredicate; import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.file.metadata.TableSchema; @@ -383,4 +385,13 @@ public class EvolvedSchema { } return copySchema; } + + public void rewriteToFinal(AbstractAlignedChunkMetadata abstractAlignedChunkMetadata, + String originalTableName) { + for (IChunkMetadata iChunkMetadata : abstractAlignedChunkMetadata.getValueChunkMetadataList()) { + if (iChunkMetadata != null) { + iChunkMetadata.setMeasurementUid(getFinalColumnName(originalTableName, iChunkMetadata.getMeasurementUid())); + } + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWithSevoTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWithSevoTest.java new file mode 100644 index 00000000000..9ba186ab9af --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionWithSevoTest.java @@ -0,0 +1,674 @@ +package org.apache.iotdb.db.storageengine.dataregion.compaction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadPointCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; +import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionFileGeneratorUtils; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; +import org.apache.iotdb.db.utils.EncryptDBUtils; +import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; +import org.apache.tsfile.file.metadata.ColumnSchemaBuilder; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.v4.ITsFileReader; +import org.apache.tsfile.read.v4.TsFileReaderBuilder; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; +import org.junit.Test; + +public class CompactionWithSevoTest extends AbstractCompactionTest{ + + @Test + public void testReadChunkCompactionPerformer() throws Exception { + testInner(targets ->new ReadChunkCompactionPerformer(seqResources, targets, EncryptDBUtils.getDefaultFirstEncryptParam()), CompactionTaskSummary::new); + } + + @Test + public void testReadPointCompactionPerformerSeq() throws Exception { + testInner(targets ->new ReadPointCompactionPerformer(seqResources, Collections.emptyList(), targets), CompactionTaskSummary::new); + } + + @Test + public void testReadPointCompactionPerformerUnseq() throws Exception { + testInner(targets ->new ReadPointCompactionPerformer(Collections.emptyList(), seqResources, targets), CompactionTaskSummary::new); + } + + @Test + public void testReadPointCompactionPerformerCross() throws Exception { + testCross(targets ->new ReadPointCompactionPerformer(seqResources, unseqResources, targets), CompactionTaskSummary::new); + } + + @Test + public void testFastCompactionPerformerSeq() throws Exception { + testInner(targets -> new FastCompactionPerformer(seqResources, Collections.emptyList(), targets, EncryptDBUtils.getDefaultFirstEncryptParam()), FastCompactionTaskSummary::new); + } + + @Test + public void testFastCompactionPerformerUnseq() throws Exception { + testInner(targets -> new FastCompactionPerformer(Collections.emptyList(), seqResources, targets, EncryptDBUtils.getDefaultFirstEncryptParam()), FastCompactionTaskSummary::new); + } + + @Test + public void testFastCompactionPerformerCross() throws Exception { + testCross(targets -> new FastCompactionPerformer(seqResources, unseqResources, targets, EncryptDBUtils.getDefaultFirstEncryptParam()), FastCompactionTaskSummary::new); + } + + private void genSourceFiles() throws Exception{ + String fileSetDir = + TestConstant.BASE_OUTPUT_PATH + File.separator + TsFileSet.FILE_SET_DIR_NAME; + // seq-file1: + // table1[s1, s2, s3] + // table2[s1, s2, s3] + File seqf1 = new File(SEQ_DIRS, "0-1-0-0.tsfile"); + TableSchema tableSchema1_1 = + new TableSchema( + "table1", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + TableSchema tableSchema1_2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + try (TsFileWriter tsFileWriter = new TsFileWriter(seqf1)) { + tsFileWriter.registerTableSchema(tableSchema1_1); + tsFileWriter.registerTableSchema(tableSchema1_2); + + Tablet tablet1 = new Tablet(tableSchema1_1.getTableName(), tableSchema1_1.getColumnSchemas()); + tablet1.addTimestamp(0, 0); + tablet1.addValue(0, 0, 1); + tablet1.addValue(0, 1, 2); + tablet1.addValue(0, 2, 3); + + Tablet tablet2 = new Tablet(tableSchema1_2.getTableName(), tableSchema1_2.getColumnSchemas()); + tablet2.addTimestamp(0, 0); + tablet2.addValue(0, 0, 101); + tablet2.addValue(0, 1, 102); + tablet2.addValue(0, 2, 103); + + tsFileWriter.writeTable(tablet1); + tsFileWriter.writeTable(tablet2); + } + TsFileResource resource1 = new TsFileResource(seqf1); + resource1.setTsFileManager(tsFileManager); + resource1.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 0); + resource1.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 0); + resource1.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 0); + resource1.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 0); + resource1.close(); + + // rename table1 -> table0 + TsFileSet tsFileSet1 = new TsFileSet(1, fileSetDir, false); + tsFileSet1.appendSchemaEvolution( + Collections.singletonList(new TableRename("table1", "table0"))); + tsFileManager.addTsFileSet(tsFileSet1, 0); + + // seq-file2: + // table0[s1, s2, s3] + // table2[s1, s2, s3] + File seqf2 = new File(SEQ_DIRS, "0-2-0-0.tsfile"); + TableSchema tableSchema2_1 = + new TableSchema( + "table0", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + TableSchema tableSchema2_2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + try (TsFileWriter tsFileWriter = new TsFileWriter(seqf2)) { + tsFileWriter.registerTableSchema(tableSchema2_1); + tsFileWriter.registerTableSchema(tableSchema2_2); + + Tablet tablet1 = new Tablet(tableSchema2_1.getTableName(), tableSchema2_1.getColumnSchemas()); + tablet1.addTimestamp(0, 1); + tablet1.addValue(0, 0, 11); + tablet1.addValue(0, 1, 12); + tablet1.addValue(0, 2, 13); + + Tablet tablet2 = new Tablet(tableSchema2_2.getTableName(), tableSchema2_2.getColumnSchemas()); + tablet2.addTimestamp(0, 1); + tablet2.addValue(0, 0, 111); + tablet2.addValue(0, 1, 112); + tablet2.addValue(0, 2, 113); + + tsFileWriter.writeTable(tablet1); + tsFileWriter.writeTable(tablet2); + } + TsFileResource resource2 = new TsFileResource(seqf2); + resource2.setTsFileManager(tsFileManager); + resource2.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1); + resource2.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1); + resource2.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 1); + resource2.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 1); + resource2.close(); + + + // rename table0.s1 -> table0.s0 + TsFileSet tsFileSet2 = new TsFileSet(2, fileSetDir, false); + tsFileSet2.appendSchemaEvolution( + Collections.singletonList(new ColumnRename("table0", "s1", "s0"))); + tsFileManager.addTsFileSet(tsFileSet2, 0); + + // seq-file3: + // table0[s0, s2, s3] + // table2[s1, s2, s3] + File seqf3 = new File(SEQ_DIRS, "0-3-0-0.tsfile"); + TableSchema tableSchema3_1 = + new TableSchema( + "table0", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s0") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + TableSchema tableSchema3_2 = + new TableSchema( + "table2", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + try (TsFileWriter tsFileWriter = new TsFileWriter(seqf3)) { + tsFileWriter.registerTableSchema(tableSchema3_1); + tsFileWriter.registerTableSchema(tableSchema3_2); + + Tablet tablet1 = new Tablet(tableSchema3_1.getTableName(), tableSchema3_1.getColumnSchemas()); + tablet1.addTimestamp(0, 2); + tablet1.addValue(0, 0, 21); + tablet1.addValue(0, 1, 22); + tablet1.addValue(0, 2, 23); + + Tablet tablet2 = new Tablet(tableSchema3_2.getTableName(), tableSchema3_2.getColumnSchemas()); + tablet2.addTimestamp(0, 2); + tablet2.addValue(0, 0, 121); + tablet2.addValue(0, 1, 122); + tablet2.addValue(0, 2, 123); + + tsFileWriter.writeTable(tablet1); + tsFileWriter.writeTable(tablet2); + } + TsFileResource resource3 = new TsFileResource(seqf3); + resource3.setTsFileManager(tsFileManager); + resource3.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 2); + resource3.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 2); + resource3.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 2); + resource3.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 2); + resource3.close(); + + // rename table2 -> table1 + TsFileSet tsFileSet3 = new TsFileSet(3, fileSetDir, false); + tsFileSet3.appendSchemaEvolution( + Collections.singletonList(new TableRename("table2", "table1"))); + tsFileManager.addTsFileSet(tsFileSet3, 0); + + seqResources.add(resource1); + seqResources.add(resource2); + seqResources.add(resource3); + + // unseq-file4: + // table0[s0, s2, s3] + // table1[s1, s2, s3] + File unseqf4 = new File(UNSEQ_DIRS, "0-4-0-0.tsfile"); + TableSchema tableSchema4_1 = + new TableSchema( + "table0", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s0") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + TableSchema tableSchema4_2 = + new TableSchema( + "table1", + Arrays.asList( + new ColumnSchemaBuilder() + .name("s1") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s2") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build(), + new ColumnSchemaBuilder() + .name("s3") + .dataType(TSDataType.INT32) + .category(ColumnCategory.FIELD) + .build())); + try (TsFileWriter tsFileWriter = new TsFileWriter(unseqf4)) { + tsFileWriter.registerTableSchema(tableSchema4_1); + tsFileWriter.registerTableSchema(tableSchema4_2); + + Tablet tablet1 = new Tablet(tableSchema4_1.getTableName(), tableSchema4_1.getColumnSchemas()); + tablet1.addTimestamp(0, 1); + tablet1.addValue(0, 0, 1011); + tablet1.addValue(0, 1, 1012); + tablet1.addValue(0, 2, 1013); + + Tablet tablet2 = new Tablet(tableSchema4_2.getTableName(), tableSchema4_2.getColumnSchemas()); + tablet2.addTimestamp(0, 1); + tablet2.addValue(0, 0, 1111); + tablet2.addValue(0, 1, 1112); + tablet2.addValue(0, 2, 1113); + + tsFileWriter.writeTable(tablet1); + tsFileWriter.writeTable(tablet2); + } + TsFileResource resource4 = new TsFileResource(unseqf4); + resource4.setTsFileManager(tsFileManager); + resource4.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1); + resource4.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1); + resource4.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 1); + resource4.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 1); + resource4.close(); + unseqResources.add(resource4); + } + + private void testCross(Function<List<TsFileResource>, ICompactionPerformer> compactionPerformerFunction, Supplier<CompactionTaskSummary> summarySupplier) throws Exception { + genSourceFiles(); + List<TsFileResource> targetResources; + ICompactionPerformer performer; + + targetResources = CompactionFileGeneratorUtils.getCrossCompactionTargetTsFileResources(seqResources); + targetResources.forEach(s -> s.setTsFileManager(tsFileManager)); + + performer = compactionPerformerFunction.apply(targetResources); + performer.setSummary(summarySupplier.get()); + performer.perform(); + + // target(version=1): + // table1[s1, s2, s3] + // table2[s1, s2, s3] + try (ITsFileReader tsFileReader = + new TsFileReaderBuilder().file(targetResources.get(0).getTsFile()).build()) { + // table0 should not exist + try { + tsFileReader.query("table0", Collections.singletonList("s2"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table0 should not exist"); + } catch (NoTableException e) { + assertEquals("Table table0 not found", e.getMessage()); + } + + // table1.s0 should not exist + try { + tsFileReader.query("table1", Collections.singletonList("s0"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table1.s0 should not exist"); + } catch (NoMeasurementException e) { + assertEquals("No measurement for s0", e.getMessage()); + } + + // check data of table1 + ResultSet resultSet = tsFileReader.query("table1", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + assertTrue(resultSet.next()); + assertEquals(0, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(j + 1, resultSet.getLong(j + 2)); + } + + // check data of table2 + resultSet = tsFileReader.query("table2", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + assertTrue(resultSet.next()); + assertEquals(0, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(100 + j + 1, resultSet.getLong(j + 2)); + } + } + + // target(version=2): + // table0[s1, s2, s3] + // table2[s1, s2, s3] + try (ITsFileReader tsFileReader = + new TsFileReaderBuilder().file(targetResources.get(1).getTsFile()).build()) { + // table1 should not exist + try { + tsFileReader.query("table1", Collections.singletonList("s2"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table1 should not exist"); + } catch (NoTableException e) { + assertEquals("Table table1 not found", e.getMessage()); + } + + // table0.s0 should not exist + try { + tsFileReader.query("table0", Collections.singletonList("s0"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table0.s0 should not exist"); + } catch (NoMeasurementException e) { + assertEquals("No measurement for s0", e.getMessage()); + } + + // check data of table0 + ResultSet resultSet = tsFileReader.query("table0", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + assertTrue(resultSet.next()); + assertEquals(1, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(1010 + j + 1, resultSet.getLong(j + 2)); + } + + // check data of table2 + resultSet = tsFileReader.query("table2", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + assertTrue(resultSet.next()); + assertEquals(1, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(1110 + j + 1, resultSet.getLong(j + 2)); + } + } + + // target(version=2): + // table0[s0, s2, s3] + // table2[s1, s2, s3] + try (ITsFileReader tsFileReader = + new TsFileReaderBuilder().file(targetResources.get(2).getTsFile()).build()) { + // table1 should not exist + try { + tsFileReader.query("table1", Collections.singletonList("s2"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table1 should not exist"); + } catch (NoTableException e) { + assertEquals("Table table1 not found", e.getMessage()); + } + + // table0.s1 should not exist + try { + tsFileReader.query("table0", Collections.singletonList("s1"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table0.s0 should not exist"); + } catch (NoMeasurementException e) { + assertEquals("No measurement for s1", e.getMessage()); + } + + // check data of table0 + ResultSet resultSet = tsFileReader.query("table0", Arrays.asList("s0", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + assertTrue(resultSet.next()); + assertEquals(2, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(20 + j + 1, resultSet.getLong(j + 2)); + } + + // check data of table2 + resultSet = tsFileReader.query("table2", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + assertTrue(resultSet.next()); + assertEquals(2, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(100 + 20 + j + 1, resultSet.getLong(j + 2)); + } + } + } + + private void testInner(Function<List<TsFileResource>, ICompactionPerformer> compactionPerformerFunction, Supplier<CompactionTaskSummary> summarySupplier) throws Exception { + genSourceFiles(); + List<TsFileResource> targetResources; + ICompactionPerformer performer; + + // target(version=1): + // table1[s1, s2, s3] + // table2[s1, s2, s3] + targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true); + targetResources.forEach(s -> s.setTsFileManager(tsFileManager)); + + performer = compactionPerformerFunction.apply(targetResources); + performer.setSummary(summarySupplier.get()); + performer.perform(); + + try (ITsFileReader tsFileReader = + new TsFileReaderBuilder().file(targetResources.get(0).getTsFile()).build()) { + // table0 should not exist + try { + tsFileReader.query("table0", Collections.singletonList("s2"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table0 should not exist"); + } catch (NoTableException e) { + assertEquals("Table table0 not found", e.getMessage()); + } + + // table1.s0 should not exist + try { + tsFileReader.query("table1", Collections.singletonList("s0"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table1.s0 should not exist"); + } catch (NoMeasurementException e) { + assertEquals("No measurement for s0", e.getMessage()); + } + + // check data of table1 + ResultSet resultSet = tsFileReader.query("table1", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + for (int i = 0; i < 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(i * 10 + j + 1, resultSet.getLong(j + 2)); + } + } + + // check data of table2 + resultSet = tsFileReader.query("table2", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + for (int i = 0; i < 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(100 + i * 10 + j + 1, resultSet.getLong(j + 2)); + } + } + } + + // target(version=2): + // table0[s1, s2, s3] + // table2[s1, s2, s3] + targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources.subList(1, + seqResources.size()), true); + targetResources.forEach(s -> s.setTsFileManager(tsFileManager)); + + performer = compactionPerformerFunction.apply(targetResources); + performer.setSummary(summarySupplier.get()); + performer.perform(); + + try (ITsFileReader tsFileReader = + new TsFileReaderBuilder().file(targetResources.get(0).getTsFile()).build()) { + // table1 should not exist + try { + tsFileReader.query("table1", Collections.singletonList("s2"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table1 should not exist"); + } catch (NoTableException e) { + assertEquals("Table table1 not found", e.getMessage()); + } + + // table0.s0 should not exist + try { + tsFileReader.query("table0", Collections.singletonList("s0"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table0.s0 should not exist"); + } catch (NoMeasurementException e) { + assertEquals("No measurement for s0", e.getMessage()); + } + + // check data of table0 + ResultSet resultSet = tsFileReader.query("table0", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + for (int i = 0; i < 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(i * 10 + j + 1, resultSet.getLong(j + 2)); + } + } + + // check data of table2 + resultSet = tsFileReader.query("table2", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + for (int i = 0; i < 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(100 + i * 10 + j + 1, resultSet.getLong(j + 2)); + } + } + } + + // target(version=2): + // table0[s0, s2, s3] + // table2[s1, s2, s3] + targetResources = CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources.subList(2, + seqResources.size()), true); + targetResources.forEach(s -> s.setTsFileManager(tsFileManager)); + + performer = compactionPerformerFunction.apply(targetResources); + performer.setSummary(summarySupplier.get()); + performer.perform(); + + try (ITsFileReader tsFileReader = + new TsFileReaderBuilder().file(targetResources.get(0).getTsFile()).build()) { + // table1 should not exist + try { + tsFileReader.query("table1", Collections.singletonList("s2"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table1 should not exist"); + } catch (NoTableException e) { + assertEquals("Table table1 not found", e.getMessage()); + } + + // table0.s1 should not exist + try { + tsFileReader.query("table0", Collections.singletonList("s1"), Long.MIN_VALUE, Long.MAX_VALUE); + fail("table0.s0 should not exist"); + } catch (NoMeasurementException e) { + assertEquals("No measurement for s1", e.getMessage()); + } + + // check data of table0 + ResultSet resultSet = tsFileReader.query("table0", Arrays.asList("s0", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + for (int i = 0; i < 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(i * 10 + j + 1, resultSet.getLong(j + 2)); + } + } + + // check data of table2 + resultSet = tsFileReader.query("table2", Arrays.asList("s1", "s2", "s3"), + Long.MIN_VALUE, Long.MAX_VALUE); + for (int i = 0; i < 3; i++) { + assertTrue(resultSet.next()); + assertEquals(i, resultSet.getLong(1)); + for (int j = 0; j < 3; j++) { + assertEquals(100 + i * 10 + j + 1, resultSet.getLong(j + 2)); + } + } + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java index d75d7c4ede7..e3aae24bcf7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastInnerCompactionPerformerTest.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.storageengine.dataregion.compaction; +import java.io.File; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.AlignedFullPath; @@ -26,7 +27,10 @@ import org.apache.iotdb.commons.path.NonAlignedFullPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.IDataBlockReader; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.SeriesDataBlockReader; @@ -34,16 +38,26 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionF import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.ColumnRename; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.evolution.TableRename; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.fileset.TsFileSet; +import org.apache.iotdb.db.utils.EncryptDBUtils; import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.constant.TestConstant; import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.ColumnSchemaBuilder; import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TimeValuePair; @@ -53,11 +67,16 @@ import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.IBatchDataIterator; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.query.dataset.ResultSet; import org.apache.tsfile.read.reader.IPointReader; import org.apache.tsfile.read.reader.chunk.AlignedChunkReader; +import org.apache.tsfile.read.v4.ITsFileReader; +import org.apache.tsfile.read.v4.TsFileReaderBuilder; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; @@ -75,6 +94,8 @@ import java.util.Map; import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; @SuppressWarnings("OptionalGetWithoutIsPresent") public class FastInnerCompactionPerformerTest extends AbstractCompactionTest { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java index f9c5f2d1852..e72436207a7 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/ReadChunkInnerCompactionTest.java @@ -996,293 +996,4 @@ public class ReadChunkInnerCompactionTest extends AbstractCompactionTest { "Unknown data type " + valuePageWriter.getStatistics().getType()); } } - - @Test - public void testWithSevoFile() throws Exception { - String fileSetDir = - TestConstant.BASE_OUTPUT_PATH + File.separator + TsFileSet.FILE_SET_DIR_NAME; - // file1: - // table1[s1, s2, s3] - // table2[s1, s2, s3] - File f1 = new File(SEQ_DIRS, "0-1-0-0.tsfile"); - TableSchema tableSchema1_1 = - new TableSchema( - "table1", - Arrays.asList( - new ColumnSchemaBuilder() - .name("s1") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s2") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s3") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build())); - TableSchema tableSchema1_2 = - new TableSchema( - "table2", - Arrays.asList( - new ColumnSchemaBuilder() - .name("s1") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s2") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s3") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build())); - try (TsFileWriter tsFileWriter = new TsFileWriter(f1)) { - tsFileWriter.registerTableSchema(tableSchema1_1); - tsFileWriter.registerTableSchema(tableSchema1_2); - - Tablet tablet1 = new Tablet(tableSchema1_1.getTableName(), tableSchema1_1.getColumnSchemas()); - tablet1.addTimestamp(0, 0); - tablet1.addValue(0, 0, 1); - tablet1.addValue(0, 1, 2); - tablet1.addValue(0, 2, 3); - - Tablet tablet2 = new Tablet(tableSchema1_2.getTableName(), tableSchema1_2.getColumnSchemas()); - tablet2.addTimestamp(0, 0); - tablet2.addValue(0, 0, 101); - tablet2.addValue(0, 1, 102); - tablet2.addValue(0, 2, 103); - - tsFileWriter.writeTable(tablet1); - tsFileWriter.writeTable(tablet2); - } - TsFileResource resource1 = new TsFileResource(f1); - resource1.setTsFileManager(tsFileManager); - resource1.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 0); - resource1.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table1"}), 0); - resource1.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 0); - resource1.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 0); - resource1.close(); - - // rename table1 -> table0 - TsFileSet tsFileSet1 = new TsFileSet(1, fileSetDir, false); - tsFileSet1.appendSchemaEvolution( - Collections.singletonList(new TableRename("table1", "table0"))); - tsFileManager.addTsFileSet(tsFileSet1, 0); - - // file2: - // table0[s1, s2, s3] - // table2[s1, s2, s3] - File f2 = new File(SEQ_DIRS, "0-2-0-0.tsfile"); - TableSchema tableSchema2_1 = - new TableSchema( - "table0", - Arrays.asList( - new ColumnSchemaBuilder() - .name("s1") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s2") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s3") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build())); - TableSchema tableSchema2_2 = - new TableSchema( - "table2", - Arrays.asList( - new ColumnSchemaBuilder() - .name("s1") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s2") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s3") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build())); - try (TsFileWriter tsFileWriter = new TsFileWriter(f2)) { - tsFileWriter.registerTableSchema(tableSchema2_1); - tsFileWriter.registerTableSchema(tableSchema2_2); - - Tablet tablet1 = new Tablet(tableSchema2_1.getTableName(), tableSchema2_1.getColumnSchemas()); - tablet1.addTimestamp(0, 1); - tablet1.addValue(0, 0, 11); - tablet1.addValue(0, 1, 12); - tablet1.addValue(0, 2, 13); - - Tablet tablet2 = new Tablet(tableSchema2_2.getTableName(), tableSchema2_2.getColumnSchemas()); - tablet2.addTimestamp(0, 1); - tablet2.addValue(0, 0, 111); - tablet2.addValue(0, 1, 112); - tablet2.addValue(0, 2, 113); - - tsFileWriter.writeTable(tablet1); - tsFileWriter.writeTable(tablet2); - } - TsFileResource resource2 = new TsFileResource(f2); - resource2.setTsFileManager(tsFileManager); - resource2.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1); - resource2.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 1); - resource2.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 1); - resource2.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 1); - resource2.close(); - - - // rename table0.s1 -> table0.s0 - TsFileSet tsFileSet2 = new TsFileSet(2, fileSetDir, false); - tsFileSet2.appendSchemaEvolution( - Collections.singletonList(new ColumnRename("table0", "s1", "s0"))); - tsFileManager.addTsFileSet(tsFileSet2, 0); - - // file3: - // table0[s0, s2, s3] - // table2[s1, s2, s3] - File f3 = new File(SEQ_DIRS, "0-3-0-0.tsfile"); - TableSchema tableSchema3_1 = - new TableSchema( - "table0", - Arrays.asList( - new ColumnSchemaBuilder() - .name("s0") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s2") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s3") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build())); - TableSchema tableSchema3_2 = - new TableSchema( - "table2", - Arrays.asList( - new ColumnSchemaBuilder() - .name("s1") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s2") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build(), - new ColumnSchemaBuilder() - .name("s3") - .dataType(TSDataType.INT32) - .category(ColumnCategory.FIELD) - .build())); - try (TsFileWriter tsFileWriter = new TsFileWriter(f3)) { - tsFileWriter.registerTableSchema(tableSchema3_1); - tsFileWriter.registerTableSchema(tableSchema3_2); - - Tablet tablet1 = new Tablet(tableSchema3_1.getTableName(), tableSchema3_1.getColumnSchemas()); - tablet1.addTimestamp(0, 2); - tablet1.addValue(0, 0, 21); - tablet1.addValue(0, 1, 22); - tablet1.addValue(0, 2, 23); - - Tablet tablet2 = new Tablet(tableSchema3_2.getTableName(), tableSchema3_2.getColumnSchemas()); - tablet2.addTimestamp(0, 2); - tablet2.addValue(0, 0, 121); - tablet2.addValue(0, 1, 122); - tablet2.addValue(0, 2, 123); - - tsFileWriter.writeTable(tablet1); - tsFileWriter.writeTable(tablet2); - } - TsFileResource resource3 = new TsFileResource(f3); - resource3.setTsFileManager(tsFileManager); - resource3.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 2); - resource3.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table0"}), 2); - resource3.updateStartTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 2); - resource3.updateEndTime(Factory.DEFAULT_FACTORY.create(new String[]{"table2"}), 2); - resource3.close(); - - // rename table2 -> table1 - TsFileSet tsFileSet3 = new TsFileSet(3, fileSetDir, false); - tsFileSet3.appendSchemaEvolution( - Collections.singletonList(new TableRename("table2", "table1"))); - tsFileManager.addTsFileSet(tsFileSet3, 0); - - // perform compaction - seqResources.add(resource1); - seqResources.add(resource2); - seqResources.add(resource3); - - List<TsFileResource> targetResources = - CompactionFileGeneratorUtils.getInnerCompactionTargetTsFileResources(seqResources, true); - targetResources.forEach(s -> s.setTsFileManager(tsFileManager)); - - ICompactionPerformer performer = - new ReadChunkCompactionPerformer(seqResources, targetResources, EncryptDBUtils.getDefaultFirstEncryptParam()); - performer.setSummary(new CompactionTaskSummary()); - performer.perform(); - - // target(version=1): - // table1[s1, s2, s3] - // table2[s1, s2, s3] - try (ITsFileReader tsFileReader = - new TsFileReaderBuilder().file(targetResources.get(0).getTsFile()).build()) { - // table1 should not exist - try { - tsFileReader.query("table0", Collections.singletonList("s2"), Long.MIN_VALUE, Long.MAX_VALUE); - fail("table0 should not exist"); - } catch (NoTableException e) { - assertEquals("Table table0 not found", e.getMessage()); - } - - // table1.s0 should not exist - try { - tsFileReader.query("table1", Collections.singletonList("s0"), Long.MIN_VALUE, Long.MAX_VALUE); - fail("table1.s0 should not exist"); - } catch (NoMeasurementException e) { - assertEquals("No measurement for s0", e.getMessage()); - } - - // check data of table1 - ResultSet resultSet = tsFileReader.query("table1", Arrays.asList("s1", "s2", "s3"), - Long.MIN_VALUE, Long.MAX_VALUE); - for (int i = 0; i < 3; i++) { - assertTrue(resultSet.next()); - assertEquals(i, resultSet.getLong(1)); - for (int j = 0; j < 3; j++) { - assertEquals(i * 10 + j + 1, resultSet.getLong(j + 2)); - } - } - - // check data of table2 - resultSet = tsFileReader.query("table2", Arrays.asList("s1", "s2", "s3"), - Long.MIN_VALUE, Long.MAX_VALUE); - for (int i = 0; i < 3; i++) { - assertTrue(resultSet.next()); - assertEquals(i, resultSet.getLong(1)); - for (int j = 0; j < 3; j++) { - assertEquals(100 + i * 10 + j + 1, resultSet.getLong(j + 2)); - } - } - } - } }
