This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d4ccd578da [IOTDB-2931] Remove access to metadata manager during
compaction (#5572)
d4ccd578da is described below
commit d4ccd578da58505289ca921e19bbe2c53ffc4a47
Author: Liu Xuxin <[email protected]>
AuthorDate: Wed Jun 1 20:52:31 2022 +0800
[IOTDB-2931] Remove access to metadata manager during compaction (#5572)
---
.../engine/compaction/CompactionTaskManager.java | 7 ++
.../rewrite/task/ReadPointPerformerSubTask.java | 26 ++----
.../compaction/inner/InnerSpaceCompactionTask.java | 4 +-
.../utils/AlignedSeriesCompactionExecutor.java | 24 ++++-
.../inner/utils/MultiTsFileDeviceIterator.java | 3 +
.../utils/SingleSeriesCompactionExecutor.java | 34 +++++++
.../impl/ReadChunkCompactionPerformer.java | 18 +---
.../impl/ReadPointCompactionPerformer.java | 104 +++++++++++++++++----
.../compaction/CompactionTaskManagerTest.java | 21 ++++-
.../compaction/inner/InnerUnseqCompactionTest.java | 2 +
.../ReadChunkCompactionPerformerNoAlignedTest.java | 1 -
11 files changed, 180 insertions(+), 64 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index 0523fc26ff..385e23aa0d 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -404,6 +404,13 @@ public class CompactionTaskManager implements IService {
+ " seconds for all sub compaction tasks to finish.");
}
}
+ if (this.subCompactionTaskExecutionPool != null) {
+ subCompactionTaskExecutionPool.shutdownNow();
+ if (!this.subCompactionTaskExecutionPool.awaitTermination(
+ MAX_WAITING_TIME, TimeUnit.MILLISECONDS)) {
+ throw new RuntimeException("Failed to shutdown
subCompactionTaskExecutionPool");
+ }
+ }
this.taskExecutionPool =
(WrappedScheduledExecutorService)
IoTDBThreadPoolFactory.newScheduledThreadPool(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
index 780a04e01e..d22362746c 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/task/ReadPointPerformerSubTask.java
@@ -19,24 +19,20 @@
package org.apache.iotdb.db.engine.compaction.cross.rewrite.task;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.engine.compaction.performer.impl.ReadPointCompactionPerformer;
import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.metadata.idtable.IDTableManager;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -53,6 +49,7 @@ public class ReadPointPerformerSubTask implements
Callable<Void> {
private final QueryContext queryContext;
private final QueryDataSource queryDataSource;
private final AbstractCompactionWriter compactionWriter;
+ private final Map<String, MeasurementSchema> schemaMap;
private final int taskId;
public ReadPointPerformerSubTask(
@@ -61,31 +58,22 @@ public class ReadPointPerformerSubTask implements
Callable<Void> {
QueryContext queryContext,
QueryDataSource queryDataSource,
AbstractCompactionWriter compactionWriter,
+ Map<String, MeasurementSchema> schemaMap,
int taskId) {
this.device = device;
this.measurementList = measurementList;
this.queryContext = queryContext;
this.queryDataSource = queryDataSource;
this.compactionWriter = compactionWriter;
+ this.schemaMap = schemaMap;
this.taskId = taskId;
}
@Override
public Void call() throws Exception {
for (String measurement : measurementList) {
- List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
- try {
- if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
-
measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device,
measurement));
- } else {
- measurementSchemas.add(
- IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device,
measurement)));
- }
- } catch (PathNotExistException e) {
- logger.info("A deleted path is skipped: {}", e.getMessage());
- continue;
- }
-
+ List<IMeasurementSchema> measurementSchemas =
+ Collections.singletonList(schemaMap.get(measurement));
IBatchReader dataBatchReader =
ReadPointCompactionPerformer.constructReader(
device,
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
index c519697162..4b2760ce5b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/InnerSpaceCompactionTask.java
@@ -202,7 +202,9 @@ public class InnerSpaceCompactionTask extends
AbstractCompactionTask {
// catch throwable to handle OOM errors
if (!(throwable instanceof InterruptedException)) {
LOGGER.error(
- "{} [Compaction] Meet errors in inner space compaction.",
fullStorageGroupName);
+ "{} [Compaction] Meet errors in inner space compaction.",
+ fullStorageGroupName,
+ throwable);
}
// handle exception
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index af5353153f..8ee99d9e1f 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -19,15 +19,19 @@
package org.apache.iotdb.db.engine.compaction.inner.utils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.compaction.CompactionMetricsManager;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.read.TsFileAlignedSeriesReaderIterator;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -69,7 +73,8 @@ public class AlignedSeriesCompactionExecutor {
String device,
TsFileResource targetResource,
LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
readerAndChunkMetadataList,
- TsFileIOWriter writer) {
+ TsFileIOWriter writer)
+ throws IOException {
this.device = device;
this.readerAndChunkMetadataList = readerAndChunkMetadataList;
this.writer = writer;
@@ -86,11 +91,13 @@ public class AlignedSeriesCompactionExecutor {
* @return
*/
private List<IMeasurementSchema> collectSchemaFromAlignedChunkMetadataList(
- LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
- readerAndChunkMetadataList) {
+ LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>>
readerAndChunkMetadataList)
+ throws IOException {
Set<MeasurementSchema> schemaSet = new HashSet<>();
+ Set<String> measurementSet = new HashSet<>();
for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair
:
readerAndChunkMetadataList) {
+ TsFileSequenceReader reader = readerListPair.left;
List<AlignedChunkMetadata> alignedChunkMetadataList =
readerListPair.right;
for (AlignedChunkMetadata alignedChunkMetadata :
alignedChunkMetadataList) {
List<IChunkMetadata> valueChunkMetadataList =
@@ -99,9 +106,18 @@ public class AlignedSeriesCompactionExecutor {
if (chunkMetadata == null) {
continue;
}
+ if (measurementSet.contains(chunkMetadata.getMeasurementUid())) {
+ continue;
+ }
+ measurementSet.add(chunkMetadata.getMeasurementUid());
+ Chunk chunk = ChunkCache.getInstance().get((ChunkMetadata)
chunkMetadata);
+ ChunkHeader header = chunk.getHeader();
schemaSet.add(
new MeasurementSchema(
- chunkMetadata.getMeasurementUid(),
chunkMetadata.getDataType()));
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType()));
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
index be6d2c8691..73f3c2ca45 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/MultiTsFileDeviceIterator.java
@@ -181,6 +181,9 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
TsFileSequenceReader reader = readerMap.get(tsFileResource);
List<AlignedChunkMetadata> alignedChunkMetadataList =
reader.getAlignedChunkMetadata(currentDevice.left);
+ if (alignedChunkMetadataList.size() > 0) {
+ alignedChunkMetadataList.forEach(x ->
x.setFilePath(tsFileResource.getTsFilePath()));
+ }
applyModificationForAlignedChunkMetadataList(tsFileResource,
alignedChunkMetadataList);
readerAndChunkMetadataList.add(new Pair<>(reader,
alignedChunkMetadataList));
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index dd1dbea024..1bba81dd04 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.db.engine.compaction.constant.CompactionType;
import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -36,6 +37,7 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import com.google.common.util.concurrent.RateLimiter;
@@ -47,6 +49,7 @@ import java.util.List;
/** This class is used to compact one series during inner space compaction. */
public class SingleSeriesCompactionExecutor {
private String device;
+ private PartialPath series;
private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList;
private TsFileIOWriter fileWriter;
private TsFileResource targetResource;
@@ -80,6 +83,7 @@ public class SingleSeriesCompactionExecutor {
TsFileIOWriter fileWriter,
TsFileResource targetResource) {
this.device = series.getDevice();
+ this.series = series;
this.readerAndChunkMetadataList = readerAndChunkMetadataList;
this.fileWriter = fileWriter;
this.schema = measurementSchema;
@@ -89,6 +93,22 @@ public class SingleSeriesCompactionExecutor {
this.targetResource = targetResource;
}
+ public SingleSeriesCompactionExecutor(
+ PartialPath series,
+ LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList,
+ TsFileIOWriter fileWriter,
+ TsFileResource targetResource) {
+ this.device = series.getDevice();
+ this.series = series;
+ this.readerAndChunkMetadataList = readerAndChunkMetadataList;
+ this.fileWriter = fileWriter;
+ this.schema = null;
+ this.chunkWriter = null;
+ this.cachedChunk = null;
+ this.cachedChunkMetadata = null;
+ this.targetResource = targetResource;
+ }
+
/**
* This function execute the compaction of a single time series. Notice, the
result of single
* series compaction may contain more than one chunk.
@@ -101,6 +121,9 @@ public class SingleSeriesCompactionExecutor {
List<ChunkMetadata> chunkMetadataList = readerListPair.right;
for (ChunkMetadata chunkMetadata : chunkMetadataList) {
Chunk currentChunk = reader.readMemChunk(chunkMetadata);
+ if (this.chunkWriter == null) {
+ constructChunkWriterFromReadChunk(currentChunk);
+ }
CompactionMetricsManager.recordReadInfo(
currentChunk.getHeader().getSerializedSize() +
currentChunk.getHeader().getDataSize());
@@ -135,6 +158,17 @@ public class SingleSeriesCompactionExecutor {
targetResource.updateEndTime(device, maxEndTimestamp);
}
+ private void constructChunkWriterFromReadChunk(Chunk chunk) {
+ ChunkHeader chunkHeader = chunk.getHeader();
+ this.schema =
+ new MeasurementSchema(
+ series.getMeasurement(),
+ chunkHeader.getDataType(),
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType());
+ this.chunkWriter = new ChunkWriterImpl(this.schema);
+ }
+
private long getChunkSize(Chunk chunk) {
return chunk.getHeader().getSerializedSize() +
chunk.getHeader().getDataSize();
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
index e8ca02bbc7..2a3d50d9ff 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadChunkCompactionPerformer.java
@@ -21,16 +21,12 @@ package
org.apache.iotdb.db.engine.compaction.performer.impl;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.engine.compaction.inner.utils.AlignedSeriesCompactionExecutor;
import
org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import
org.apache.iotdb.db.engine.compaction.inner.utils.SingleSeriesCompactionExecutor;
import org.apache.iotdb.db.engine.compaction.performer.ISeqCompactionPerformer;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.metadata.idtable.IDTableManager;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -141,20 +137,8 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
// dead-loop.
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>>
readerAndChunkMetadataList =
seriesIterator.getMetadataListForCurrentSeries();
- try {
- if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
- measurementSchema =
- IDTableManager.getInstance().getSeriesSchema(device,
p.getMeasurement());
- } else {
- measurementSchema = IoTDB.schemaProcessor.getSeriesSchema(p);
- }
- } catch (PathNotExistException e) {
- LOGGER.info("A deleted path is skipped: {}", e.getMessage());
- continue;
- }
SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries =
- new SingleSeriesCompactionExecutor(
- p, measurementSchema, readerAndChunkMetadataList, writer,
targetResource);
+ new SingleSeriesCompactionExecutor(p, readerAndChunkMetadataList,
writer, targetResource);
compactionExecutorOfCurrentTimeSeries.execute();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
index 3dc0c5ab11..499868e80b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/performer/impl/ReadPointCompactionPerformer.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import
org.apache.iotdb.db.engine.compaction.cross.rewrite.task.ReadPointPerformerSubTask;
import
org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
@@ -32,23 +33,27 @@ import
org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter;
import org.apache.iotdb.db.engine.compaction.writer.InnerSpaceCompactionWriter;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.metadata.idtable.IDTableManager;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.QueryUtils;
+import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.reader.IBatchReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
@@ -57,7 +62,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -72,6 +79,7 @@ public class ReadPointCompactionPerformer
private List<TsFileResource> unseqFiles = Collections.emptyList();
private static final int subTaskNum =
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
+ private Map<TsFileResource, TsFileSequenceReader> readerCacheMap = new
HashMap<>();
private List<TsFileResource> targetFiles = Collections.emptyList();
@@ -127,6 +135,7 @@ public class ReadPointCompactionPerformer
updateDeviceStartTimeAndEndTime(targetFiles, compactionWriter);
updatePlanIndexes(targetFiles, seqFiles, unseqFiles);
} finally {
+ clearReaderCache();
QueryResourceManager.getInstance().endQuery(queryId);
}
}
@@ -146,19 +155,8 @@ public class ReadPointCompactionPerformer
MultiTsFileDeviceIterator.AlignedMeasurementIterator
alignedMeasurementIterator =
deviceIterator.iterateAlignedSeries(device);
Set<String> allMeasurements =
alignedMeasurementIterator.getAllMeasurements();
- List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
- for (String measurement : allMeasurements) {
- try {
- if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
-
measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device,
measurement));
- } else {
- measurementSchemas.add(
- IoTDB.schemaProcessor.getSeriesSchema(new PartialPath(device,
measurement)));
- }
- } catch (PathNotExistException e) {
- LOGGER.info("A deleted path is skipped: {}", e.getMessage());
- }
- }
+ Map<String, MeasurementSchema> schemaMap = getMeasurementSchema(device,
allMeasurements);
+ List<IMeasurementSchema> measurementSchemas = new
ArrayList<>(schemaMap.values());
if (measurementSchemas.isEmpty()) {
return;
}
@@ -192,11 +190,12 @@ public class ReadPointCompactionPerformer
AbstractCompactionWriter compactionWriter,
QueryContext queryContext,
QueryDataSource queryDataSource)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, IllegalPathException {
MultiTsFileDeviceIterator.MeasurementIterator measurementIterator =
deviceIterator.iterateNotAlignedSeries(device, false);
Set<String> allMeasurements = measurementIterator.getAllMeasurements();
int subTaskNums = Math.min(allMeasurements.size(), subTaskNum);
+ Map<String, MeasurementSchema> schemaMap = getMeasurementSchema(device,
allMeasurements);
// assign all measurements to different sub tasks
Set<String>[] measurementsForEachSubTask = new HashSet[subTaskNums];
@@ -221,6 +220,7 @@ public class ReadPointCompactionPerformer
queryContext,
queryDataSource,
compactionWriter,
+ schemaMap,
i)));
}
@@ -237,6 +237,76 @@ public class ReadPointCompactionPerformer
compactionWriter.endChunkGroup();
}
+ private Map<String, MeasurementSchema> getMeasurementSchema(
+ String device, Set<String> measurements) throws IllegalPathException,
IOException {
+ HashMap<String, MeasurementSchema> schemaMap = new HashMap<>();
+ List<TsFileResource> allResources = new LinkedList<>(seqFiles);
+ allResources.addAll(unseqFiles);
+ // sort the tsfile by version, so that we can iterate the tsfile from the
newest to oldest
+ allResources.sort(
+ (o1, o2) -> {
+ try {
+ TsFileNameGenerator.TsFileName n1 =
+ TsFileNameGenerator.getTsFileName(o1.getTsFile().getName());
+ TsFileNameGenerator.TsFileName n2 =
+ TsFileNameGenerator.getTsFileName(o2.getTsFile().getName());
+ return (int) (n2.getVersion() - n1.getVersion());
+ } catch (IOException e) {
+ return 0;
+ }
+ });
+ for (String measurement : measurements) {
+ for (TsFileResource tsFileResource : allResources) {
+ if (!tsFileResource.mayContainsDevice(device)) {
+ continue;
+ }
+ MeasurementSchema schema =
+ getMeasurementSchemaFromReader(
+ tsFileResource,
+ readerCacheMap.computeIfAbsent(
+ tsFileResource,
+ x -> {
+ try {
+
FileReaderManager.getInstance().increaseFileReaderReference(x, true);
+ return
FileReaderManager.getInstance().get(x.getTsFilePath(), true);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to construct sequence reader for %s",
tsFileResource));
+ }
+ }),
+ device,
+ measurement);
+ if (schema != null) {
+ schemaMap.put(measurement, schema);
+ break;
+ }
+ }
+ }
+ return schemaMap;
+ }
+
+ private MeasurementSchema getMeasurementSchemaFromReader(
+ TsFileResource resource, TsFileSequenceReader reader, String device,
String measurement)
+ throws IllegalPathException, IOException {
+ List<ChunkMetadata> chunkMetadata =
+ reader.getChunkMetadataList(new PartialPath(device, measurement));
+ if (chunkMetadata.size() > 0) {
+ chunkMetadata.get(0).setFilePath(resource.getTsFilePath());
+ Chunk chunk = ChunkCache.getInstance().get(chunkMetadata.get(0));
+ ChunkHeader header = chunk.getHeader();
+ return new MeasurementSchema(
+ measurement, header.getDataType(), header.getEncodingType(),
header.getCompressionType());
+ }
+ return null;
+ }
+
+ private void clearReaderCache() throws IOException {
+ for (TsFileResource resource : readerCacheMap.keySet()) {
+ FileReaderManager.getInstance().decreaseFileReaderReference(resource,
true);
+ }
+ }
+
private static void updateDeviceStartTimeAndEndTime(
List<TsFileResource> targetResources, AbstractCompactionWriter
compactionWriter) {
List<TsFileIOWriter> targetFileWriters =
compactionWriter.getFileIOWriter();
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
index 9c1ca40f7d..df68f4eb07 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManagerTest.java
@@ -40,8 +40,11 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.Assert.fail;
+
public class CompactionTaskManagerTest extends InnerCompactionTest {
static final Logger logger =
LoggerFactory.getLogger(CompactionTaskManagerTest.class);
File tempSGDir;
@@ -120,7 +123,7 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
logger.warn("{}", manager.getRunningCompactionTaskList());
}
if (waitingTime > MAX_WAITING_TIME) {
- Assert.fail();
+ fail();
}
}
}
@@ -179,7 +182,7 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
logger.warn("{}",
CompactionTaskManager.getInstance().getRunningCompactionTaskList());
}
if (waitingTime > MAX_WAITING_TIME) {
- Assert.fail();
+ fail();
}
}
for (TsFileResource resource : seqResources) {
@@ -232,7 +235,7 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
logger.warn("{}", manager.getRunningCompactionTaskList());
}
if (waitingTime > MAX_WAITING_TIME) {
- Assert.fail();
+ fail();
}
}
}
@@ -278,7 +281,7 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
logger.warn("{}", manager.getRunningCompactionTaskList());
}
if (waitingTime > MAX_WAITING_TIME) {
- Assert.fail();
+ fail();
}
}
}
@@ -313,6 +316,7 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
public void testRewriteCrossCompactionFileStatus() throws Exception {
TsFileManager tsFileManager =
new TsFileManager("root.compactionTest", "0",
tempSGDir.getAbsolutePath());
+ seqResources = seqResources.subList(1, 5);
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
CrossSpaceCompactionTask task =
@@ -340,7 +344,14 @@ public class CompactionTaskManagerTest extends
InnerCompactionTest {
}
CompactionTaskManager.getInstance().submitTaskFromTaskQueue();
- CompactionTaskManager.getInstance().waitAllCompactionFinish();
+ long waitingTime = 0;
+ while (!task.isTaskFinished()) {
+ TimeUnit.MILLISECONDS.sleep(200);
+ waitingTime += 200;
+ if (waitingTime > 10_000) {
+ fail();
+ }
+ }
for (TsFileResource resource : seqResources) {
Assert.assertFalse(resource.isCompactionCandidate());
}
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
index 403ddcc51e..e0419f721b 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/InnerUnseqCompactionTest.java
@@ -110,11 +110,13 @@ public class InnerUnseqCompactionTest {
Collections.emptyMap());
}
Thread.currentThread().setName("pool-1-IoTDB-Compaction-1");
+ EnvironmentUtils.envSetUp();
}
@After
public void tearDown() throws IOException, StorageEngineException {
new CompactionConfigRestorer().restoreCompactionConfig();
+ EnvironmentUtils.cleanEnv();
CompactionClearUtils.clearAllCompactionFiles();
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
index a88a6e57d5..fe9f9ef987 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/inner/ReadChunkCompactionPerformerNoAlignedTest.java
@@ -483,7 +483,6 @@ public class ReadChunkCompactionPerformerNoAlignedTest {
for (String path : fullPathSetWithDeleted) {
chunkPagePointsNumMerged.put(path, chunkPointsArray);
}
- chunkPagePointsNumMerged.put(deletedPath, null);
CompactionCheckerUtils.checkChunkAndPage(chunkPagePointsNumMerged,
targetResource);
Map<PartialPath, List<TimeValuePair>> compactedData =
CompactionCheckerUtils.getDataByQuery(