This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch new_vector in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 314c3abe62cb545607eef8b524ff394c1212c7cd Author: JackieTien97 <[email protected]> AuthorDate: Wed Nov 10 10:38:56 2021 +0800 stage 1 --- .../org/apache/iotdb/db/engine/StorageEngine.java | 10 +-- .../db/engine/storagegroup/TsFileProcessor.java | 18 +---- .../db/engine/storagegroup/TsFileResource.java | 19 +---- .../apache/iotdb/db/metadata/path/AlignedPath.java | 28 ++----- .../iotdb/db/metadata/path/MeasurementPath.java | 26 +++--- .../apache/iotdb/db/metadata/path/PartialPath.java | 9 +-- .../db/qp/physical/crud/RawDataQueryPlan.java | 12 --- .../iotdb/db/query/context/QueryContext.java | 19 ++++- .../db/query/control/QueryResourceManager.java | 9 ++- .../dataset/RawQueryDataSetWithValueFilter.java | 13 ++- .../db/query/executor/AggregationExecutor.java | 4 + .../iotdb/db/query/executor/QueryRouter.java | 6 +- .../query/reader/chunk/DiskAlignedChunkLoader.java | 67 +++++++++++++++ .../db/query/reader/chunk/DiskChunkLoader.java | 12 +++ .../db/query/reader/chunk/MemChunkLoader.java | 8 +- ...er.java => DiskAlignedChunkMetadataLoader.java} | 94 +++++++++------------- .../chunk/metadata/DiskChunkMetadataLoader.java | 70 +++++++--------- ...der.java => MemAlignedChunkMetadataLoader.java} | 24 +++--- .../chunk/metadata/MemChunkMetadataLoader.java | 15 ++-- .../query/reader/series/AlignedSeriesReader.java | 4 +- .../reader/series/SeriesReaderByTimestamp.java | 4 +- .../query/timegenerator/ServerTimeGenerator.java | 27 +++++-- .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 64 +++++---------- .../java/org/apache/iotdb/db/utils/QueryUtils.java | 69 ++++++++++++++++ .../storagegroup/StorageGroupProcessorTest.java | 1 - .../engine/storagegroup/TsFileProcessorTest.java | 18 ----- .../tsfile/file/metadata/AlignedChunkMetadata.java | 69 +++++++--------- .../file/metadata/AlignedTimeSeriesMetadata.java | 62 +++++++------- .../tsfile/file/metadata/ITimeSeriesMetadata.java | 1 + .../tsfile/file/metadata/TimeseriesMetadata.java | 2 +- .../apache/iotdb/tsfile/read/common/BatchData.java | 15 ++-- .../read/controller/CachedChunkLoaderImpl.java | 18 +++++ .../iotdb/tsfile/read/controller/IChunkLoader.java | 5 ++ .../read/controller/IChunkMetadataLoader.java | 2 +- .../tsfile/read/reader/page/TimePageReader.java | 3 + .../iotdb/tsfile/read/ReadOnlyTsFileTest.java | 2 +- .../TsFileGeneratorForSeriesReaderByTimestamp.java | 22 ++--- .../org/apache/iotdb/tsfile/utils/RecordUtils.java | 12 +-- 38 files changed, 475 insertions(+), 388 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 6106912..192ea08 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -64,7 +64,7 @@ import org.apache.iotdb.db.utils.UpgradeUtils; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; -import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.FilePathUtils; import org.apache.iotdb.tsfile.utils.Pair; @@ -764,15 +764,11 @@ public class StorageEngine implements IService { /** query data. */ public QueryDataSource query( - SingleSeriesExpression seriesExpression, - QueryContext context, - QueryFileManager filePathsManager) + PartialPath fullPath, Filter filter, QueryContext context, QueryFileManager filePathsManager) throws StorageEngineException, QueryProcessException { - PartialPath fullPath = (PartialPath) seriesExpression.getSeriesPath(); PartialPath deviceId = fullPath.getDevicePath(); StorageGroupProcessor storageGroupProcessor = getProcessor(deviceId); - return storageGroupProcessor.query( - fullPath, context, filePathsManager, seriesExpression.getFilter()); + return storageGroupProcessor.query(fullPath, context, filePathsManager, filter); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index ef9fb4a..2489839 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -32,7 +32,6 @@ import org.apache.iotdb.db.engine.memtable.IMemTable; import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; -import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTimeCallBack; import org.apache.iotdb.db.exception.TsFileProcessorException; @@ -49,7 +48,6 @@ import org.apache.iotdb.db.rescon.MemTableManager; import org.apache.iotdb.db.rescon.PrimitiveArrayManager; import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.db.utils.MemUtils; -import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.db.writelog.WALFlushListener; @@ -59,7 +57,6 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; -import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.utils.Binary; @@ -1301,21 +1298,10 @@ public class TsFileProcessor { } } - ModificationFile modificationFile = tsFileResource.getModFile(); - List<Modification> modifications = context.getPathModifications(modificationFile, fullPath); - - List<IChunkMetadata> chunkMetadataList = - fullPath - .getMeasurementSchema() - .getVisibleMetadataListFromWriter(writer, fullPath.getDevice()); - - QueryUtils.modifyChunkMetaData(chunkMetadataList, modifications); - chunkMetadataList.removeIf(context::chunkNotSatisfy); - // get in memory data - if (!readOnlyMemChunks.isEmpty() || !chunkMetadataList.isEmpty()) { + if (!readOnlyMemChunks.isEmpty()) { tsfileResourcesForQuery.add( - fullPath.createTsFileResource(readOnlyMemChunks, chunkMetadataList, tsFileResource)); + fullPath.createTsFileResource(readOnlyMemChunks, tsFileResource)); } } catch (QueryProcessException e) { logger.error( diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index c973886..21c46cf 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -34,7 +34,6 @@ import org.apache.iotdb.db.exception.PartitionViolationException; import org.apache.iotdb.db.service.UpgradeSevice; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; @@ -52,7 +51,6 @@ import java.io.OutputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -110,12 +108,6 @@ public class TsFileResource { private boolean isSeq; - /** - * Chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query - * process. - */ - private List<IChunkMetadata> chunkMetadataList; - /** Mem chunk data. Only be set in a temporal TsFileResource in a query process. */ private List<ReadOnlyMemChunk> readOnlyMemChunk; @@ -163,7 +155,6 @@ public class TsFileResource { this.closed = other.closed; this.deleted = other.deleted; this.isMerging = other.isMerging; - this.chunkMetadataList = other.chunkMetadataList; this.readOnlyMemChunk = other.readOnlyMemChunk; this.tsFileLock = other.tsFileLock; this.fsFactory = other.fsFactory; @@ -191,14 +182,11 @@ public class TsFileResource { /** unsealed TsFile, for query */ public TsFileResource( - List<ReadOnlyMemChunk> readOnlyMemChunk, - List<IChunkMetadata> chunkMetadataList, - TsFileResource originTsFileResource) + List<ReadOnlyMemChunk> readOnlyMemChunk, TsFileResource originTsFileResource) throws IOException { this.file = originTsFileResource.file; this.timeIndex = originTsFileResource.timeIndex; this.timeIndexType = originTsFileResource.timeIndexType; - this.chunkMetadataList = chunkMetadataList; this.readOnlyMemChunk = readOnlyMemChunk; this.originTsFileResource = originTsFileResource; this.version = originTsFileResource.version; @@ -317,10 +305,6 @@ public class TsFileResource { return fsFactory.getFile(file + RESOURCE_SUFFIX).exists(); } - public List<IChunkMetadata> getChunkMetadataList() { - return new ArrayList<>(chunkMetadataList); - } - public List<ReadOnlyMemChunk> getReadOnlyMemChunk() { return readOnlyMemChunk; } @@ -391,7 +375,6 @@ public class TsFileResource { modFile = null; } processor = null; - chunkMetadataList = null; timeIndex.close(); } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java index bf34e4c..57262cb 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/AlignedPath.java @@ -34,7 +34,6 @@ import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata; -import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; @@ -177,7 +176,7 @@ public class AlignedPath extends PartialPath { } public TSDataType getSeriesType() { - return getMeasurementSchema().getType(); + return TSDataType.VECTOR; } @Override @@ -257,14 +256,10 @@ public class AlignedPath extends PartialPath { @Override public TsFileResource createTsFileResource( - List<ReadOnlyMemChunk> readOnlyMemChunk, - List<IChunkMetadata> chunkMetadataList, - TsFileResource originTsFileResource) + List<ReadOnlyMemChunk> readOnlyMemChunk, TsFileResource originTsFileResource) throws IOException { - TsFileResource tsFileResource = - new TsFileResource(readOnlyMemChunk, chunkMetadataList, originTsFileResource); - tsFileResource.setTimeSeriesMetadata( - generateTimeSeriesMetadata(readOnlyMemChunk, chunkMetadataList)); + TsFileResource tsFileResource = new TsFileResource(readOnlyMemChunk, originTsFileResource); + tsFileResource.setTimeSeriesMetadata(generateTimeSeriesMetadata(readOnlyMemChunk)); return tsFileResource; } @@ -273,8 +268,7 @@ public class AlignedPath extends PartialPath { * have chunkMetadata, but query will use these, so we need to generate it for them. */ private AlignedTimeSeriesMetadata generateTimeSeriesMetadata( - List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList) - throws IOException { + List<ReadOnlyMemChunk> readOnlyMemChunk) throws IOException { TimeseriesMetadata timeTimeSeriesMetadata = new TimeseriesMetadata(); timeTimeSeriesMetadata.setOffsetOfChunkMetaDataList(-1); timeTimeSeriesMetadata.setDataSizeOfChunkMetaDataList(-1); @@ -296,18 +290,6 @@ public class AlignedPath extends PartialPath { valueTimeSeriesMetadataList.add(valueMetadata); } - for (IChunkMetadata chunkMetadata : chunkMetadataList) { - AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetadata; - timeStatistics.mergeStatistics(alignedChunkMetadata.getTimeChunkMetadata().getStatistics()); - for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { - valueTimeSeriesMetadataList - .get(i) - .getStatistics() - .mergeStatistics( - alignedChunkMetadata.getValueChunkMetadataList().get(i).getStatistics()); - } - } - for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) { if (!memChunk.isEmpty()) { AlignedChunkMetadata alignedChunkMetadata = diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java index 05188ed..840b8c2 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java @@ -30,7 +30,6 @@ import org.apache.iotdb.db.query.filter.TsFileFilter; import org.apache.iotdb.db.query.reader.series.SeriesReader; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.db.utils.datastructure.TVList; -import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; @@ -117,6 +116,14 @@ public class MeasurementPath extends PartialPath { return result; } + /** + * if isUnderAlignedEntity is true, return an AlignedPath with only one sub sensor otherwise, + * return itself + */ + public PartialPath transformToExactPath() { + return isUnderAlignedEntity ? new AlignedPath(this) : this; + } + public SeriesReader createSeriesReader( Set<String> allSensors, TSDataType dataType, @@ -162,14 +169,10 @@ public class MeasurementPath extends PartialPath { @Override public TsFileResource createTsFileResource( - List<ReadOnlyMemChunk> readOnlyMemChunk, - List<IChunkMetadata> chunkMetadataList, - TsFileResource originTsFileResource) + List<ReadOnlyMemChunk> readOnlyMemChunk, TsFileResource originTsFileResource) throws IOException { - TsFileResource tsFileResource = - new TsFileResource(readOnlyMemChunk, chunkMetadataList, originTsFileResource); - tsFileResource.setTimeSeriesMetadata( - generateTimeSeriesMetadata(readOnlyMemChunk, chunkMetadataList)); + TsFileResource tsFileResource = new TsFileResource(readOnlyMemChunk, originTsFileResource); + tsFileResource.setTimeSeriesMetadata(generateTimeSeriesMetadata(readOnlyMemChunk)); return tsFileResource; } @@ -177,8 +180,7 @@ public class MeasurementPath extends PartialPath { * Because the unclosed tsfile don't have TimeSeriesMetadata and memtables in the memory don't * have chunkMetadata, but query will use these, so we need to generate it for them. */ - private TimeseriesMetadata generateTimeSeriesMetadata( - List<ReadOnlyMemChunk> readOnlyMemChunk, List<IChunkMetadata> chunkMetadataList) + private TimeseriesMetadata generateTimeSeriesMetadata(List<ReadOnlyMemChunk> readOnlyMemChunk) throws IOException { TimeseriesMetadata timeSeriesMetadata = new TimeseriesMetadata(); timeSeriesMetadata.setMeasurementId(measurementSchema.getMeasurementId()); @@ -188,10 +190,6 @@ public class MeasurementPath extends PartialPath { Statistics<? extends Serializable> seriesStatistics = Statistics.getStatsByType(timeSeriesMetadata.getTSDataType()); - // flush chunkMetadataList one by one - for (IChunkMetadata chunkMetadata : chunkMetadataList) { - seriesStatistics.mergeStatistics(chunkMetadata.getStatistics()); - } for (ReadOnlyMemChunk memChunk : readOnlyMemChunk) { if (!memChunk.isEmpty()) { diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java index 9ff86d6..47d348e 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/PartialPath.java @@ -31,7 +31,6 @@ import org.apache.iotdb.db.query.filter.TsFileFilter; import org.apache.iotdb.db.query.reader.series.SeriesReader; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.common.TimeRange; @@ -309,8 +308,8 @@ public class PartialPath extends Path implements Comparable<Path> { throw new MetadataException("This path doesn't represent a measurement"); } - public TSDataType getSeriesType() throws MetadataException { - throw new MetadataException("This path doesn't represent a measurement"); + public TSDataType getSeriesType() throws UnsupportedOperationException { + throw new UnsupportedOperationException("This path doesn't represent a measurement"); } @Override @@ -398,9 +397,7 @@ public class PartialPath extends Path implements Comparable<Path> { } public TsFileResource createTsFileResource( - List<ReadOnlyMemChunk> readOnlyMemChunk, - List<IChunkMetadata> chunkMetadataList, - TsFileResource originTsFileResource) + List<ReadOnlyMemChunk> readOnlyMemChunk, TsFileResource originTsFileResource) throws IOException { throw new UnsupportedOperationException("Should call exact sub class!"); } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java index 5658caf..5bd1bdf 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java @@ -136,10 +136,6 @@ public class RawDataQueryPlan extends QueryPlan { this.deduplicatedPaths = deduplicatedPaths; } - public void setDeduplicatedPaths(List<PartialPath> deduplicatedPaths) { - this.deduplicatedPaths = deduplicatedPaths; - } - public List<TSDataType> getDeduplicatedDataTypes() { return deduplicatedDataTypes; } @@ -181,18 +177,10 @@ public class RawDataQueryPlan extends QueryPlan { setDeduplicatedVectorDataTypes(vectorizedDeduplicatedDataTypes); } - public List<PartialPath> getDeduplicatedVectorPaths() { - return deduplicatedVectorPaths; - } - public void setDeduplicatedVectorPaths(List<PartialPath> deduplicatedVectorPaths) { this.deduplicatedVectorPaths = deduplicatedVectorPaths; } - public List<TSDataType> getDeduplicatedVectorDataTypes() { - return deduplicatedVectorDataTypes; - } - public void setDeduplicatedVectorDataTypes(List<TSDataType> deduplicatedVectorDataTypes) { this.deduplicatedVectorDataTypes = deduplicatedVectorDataTypes; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java index e7cec1d..df1b008 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java +++ b/server/src/main/java/org/apache/iotdb/db/query/context/QueryContext.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.context; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; +import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.query.control.QueryTimeManager; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; @@ -38,13 +39,14 @@ public class QueryContext { * The outer key is the path of a ModificationFile, the inner key in the name of a timeseries and * the value is the Modifications of a timeseries in this file. */ - private Map<String, Map<String, List<Modification>>> filePathModCache = new ConcurrentHashMap<>(); + private final Map<String, Map<String, List<Modification>>> filePathModCache = + new ConcurrentHashMap<>(); /** * The key is the path of a ModificationFile and the value is all Modifications in this file. We * use this field because each call of Modification.getModifications() return a copy of the * Modifications, and we do not want it to create multiple copies within a query. */ - private Map<String, List<Modification>> fileModCache = new HashMap<>(); + private final Map<String, List<Modification>> fileModCache = new HashMap<>(); private long queryId; @@ -111,6 +113,19 @@ public class QueryContext { }); } + /** + * Find the modifications of all aligned 'paths' in 'modFile'. If they are not in the cache, read + * them from 'modFile' and put then into the cache. + */ + public List<List<Modification>> getPathModifications(ModificationFile modFile, AlignedPath path) { + int n = path.getMeasurementList().size(); + List<List<Modification>> ans = new ArrayList<>(n); + for (int i = 0; i < n; i++) { + ans.add(getPathModifications(modFile, path.getPathWithMeasurement(i))); + } + return ans; + } + public long getQueryId() { return queryId; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java index 416e6a2..31d4351 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java @@ -27,7 +27,6 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.tracing.TracingManager; import org.apache.iotdb.db.query.externalsort.serialize.IExternalSortFileDeserializer; import org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService; -import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import java.io.IOException; @@ -84,14 +83,16 @@ public class QueryResourceManager { externalSortFileMap.computeIfAbsent(queryId, x -> new ArrayList<>()).add(deserializer); } + /** + * @param selectedPath MeasurementPath or AlignedPath, even if it contains only one sub sensor of + * an aligned device, it should be AlignedPath instead of MeasurementPath + */ public QueryDataSource getQueryDataSource( PartialPath selectedPath, QueryContext context, Filter filter) throws StorageEngineException, QueryProcessException { - SingleSeriesExpression singleSeriesExpression = - new SingleSeriesExpression(selectedPath, filter); QueryDataSource queryDataSource = - StorageEngine.getInstance().query(singleSeriesExpression, context, filePathsManager); + StorageEngine.getInstance().query(selectedPath, filter, context, filePathsManager); // for tracing: calculate the distinct number of seq and unseq tsfiles if (context.isEnableTracing()) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java index d4dd6b3..fdbd5c3 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java @@ -36,7 +36,7 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFI private final List<IReaderByTimestamp> seriesReaderByTimestampList; private final List<Boolean> cached; - private List<RowRecord> cachedRowRecords = new ArrayList<>(); + private final List<RowRecord> cachedRowRecords = new ArrayList<>(); /** Used for UDF. */ private List<Object[]> cachedRowInObjects = new ArrayList<>(); @@ -121,11 +121,18 @@ public class RawQueryDataSetWithValueFilter extends QueryDataSet implements UDFI if (results[j] == null) { rowRecords[j].addField(null); } else { - hasField[j] = true; if (dataTypes.get(i) == TSDataType.VECTOR) { TsPrimitiveType[] result = (TsPrimitiveType[]) results[j]; - rowRecords[j].addField(result[0].getValue(), result[0].getDataType()); + for (TsPrimitiveType value : result) { + if (value == null) { + rowRecords[j].addField(null); + } else { + hasField[j] = true; + rowRecords[j].addField(value.getValue(), value.getDataType()); + } + } } else { + hasField[j] = true; rowRecords[j].addField(results[j], dataTypes.get(i)); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 4fead00..07f26c6 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -458,6 +458,10 @@ public class AggregationExecutor { int remainingToCalculate, Statistics statistics) throws QueryProcessException { + // some aligned paths' statistics may be null + if (statistics == null) { + return remainingToCalculate; + } int newRemainingToCalculate = remainingToCalculate; for (int i = 0; i < aggregateResultList.size(); i++) { if (!isCalculatedArray[i]) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java index 7a7acb4..82660c7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java @@ -83,6 +83,9 @@ public class QueryRouter implements IQueryRouter { } queryPlan.setExpression(optimizedExpression); + // group the vector partial paths for raw query after optimize the expression + // because path in expressions should not be grouped + queryPlan.transformToVector(); RawDataQueryExecutor rawDataQueryExecutor = getRawDataQueryExecutor(queryPlan); if (!queryPlan.isAlignByTime()) { @@ -101,9 +104,6 @@ public class QueryRouter implements IQueryRouter { return new EmptyDataSet(); } } - - // Currently, we only group the vector partial paths for raw query without value filter - queryPlan.transformToVector(); return rawDataQueryExecutor.executeWithoutValueFilter(context); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java new file mode 100644 index 0000000..3487682 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskAlignedChunkLoader.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.query.reader.chunk; + +import org.apache.iotdb.db.engine.cache.ChunkCache; +import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.controller.IChunkLoader; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.IChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class DiskAlignedChunkLoader implements IChunkLoader { + + private final boolean debug; + + public DiskAlignedChunkLoader(boolean debug) { + this.debug = debug; + } + + @Override + public Chunk loadChunk(ChunkMetadata chunkMetaData) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException {} + + @Override + public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) + throws IOException { + AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetaData; + Chunk timeChunk = + ChunkCache.getInstance() + .get((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata(), debug); + List<Chunk> valueChunkList = new ArrayList<>(); + for (IChunkMetadata valueChunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { + valueChunkList.add( + valueChunkMetadata == null + ? null + : ChunkCache.getInstance().get((ChunkMetadata) valueChunkMetadata, debug)); + } + return new AlignedChunkReader(timeChunk, valueChunkList, timeFilter); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java index 15ace5c..2f926a7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/DiskChunkLoader.java @@ -21,8 +21,12 @@ package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.db.engine.cache.ChunkCache; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.controller.IChunkLoader; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.IChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import java.io.IOException; @@ -44,4 +48,12 @@ public class DiskChunkLoader implements IChunkLoader { public void close() { // do nothing } + + @Override + public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) + throws IOException { + Chunk chunk = ChunkCache.getInstance().get((ChunkMetadata) chunkMetaData, debug); + chunk.setFromOldFile(chunkMetaData.isFromOldTsFile()); + return new ChunkReader(chunk, timeFilter); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java index d5cb2d8..eff76f6 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkLoader.java @@ -21,8 +21,11 @@ package org.apache.iotdb.db.query.reader.chunk; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.controller.IChunkLoader; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.IChunkReader; /** To read one chunk from memory, and only used in iotdb server module */ public class MemChunkLoader implements IChunkLoader { @@ -43,7 +46,8 @@ public class MemChunkLoader implements IChunkLoader { // no resources need to close } - public ReadOnlyMemChunk getChunk() { - return chunk; + @Override + public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) { + return new MemChunkReader(chunk, timeFilter); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java similarity index 63% copy from server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java copy to server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java index b6ff733..de2c652 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskAlignedChunkMetadataLoader.java @@ -20,33 +20,35 @@ package org.apache.iotdb.db.query.reader.chunk.metadata; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.metadata.path.PartialPath; +import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.reader.chunk.DiskChunkLoader; +import org.apache.iotdb.db.query.reader.chunk.DiskAlignedChunkLoader; import org.apache.iotdb.db.utils.QueryUtils; +import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; -import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; -public class DiskChunkMetadataLoader implements IChunkMetadataLoader { +public class DiskAlignedChunkMetadataLoader implements IChunkMetadataLoader { private final TsFileResource resource; - private final PartialPath seriesPath; + private final AlignedPath seriesPath; private final QueryContext context; // time filter or value filter, only used to check time range private final Filter filter; private static final Logger DEBUG_LOGGER = LoggerFactory.getLogger("QUERY_DEBUG"); - public DiskChunkMetadataLoader( - TsFileResource resource, PartialPath seriesPath, QueryContext context, Filter filter) { + public DiskAlignedChunkMetadataLoader( + TsFileResource resource, AlignedPath seriesPath, QueryContext context, Filter filter) { this.resource = resource; this.seriesPath = seriesPath; this.context = context; @@ -54,49 +56,12 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { } @Override - public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeseriesMetadata) { + public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) { + List<AlignedChunkMetadata> alignedChunkMetadataList = + ((AlignedTimeSeriesMetadata) timeSeriesMetadata).getChunkMetadataList(); - List<IChunkMetadata> chunkMetadataList = - ((TimeseriesMetadata) timeseriesMetadata).getChunkMetadataList(); - - setDiskChunkLoader(chunkMetadataList, resource, seriesPath, context); - - /* - * remove not satisfied ChunkMetaData - */ - chunkMetadataList.removeIf( - chunkMetaData -> - (filter != null - && !filter.satisfyStartEndTime( - chunkMetaData.getStartTime(), chunkMetaData.getEndTime())) - || chunkMetaData.getStartTime() > chunkMetaData.getEndTime()); - - // For chunkMetadata from old TsFile, do not set version - for (IChunkMetadata metadata : chunkMetadataList) { - if (!metadata.isFromOldTsFile()) { - metadata.setVersion(resource.getVersion()); - } - } - - if (context.isDebug()) { - DEBUG_LOGGER.info("After removed by filter Chunk meta data list is: "); - chunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); - } - - return chunkMetadataList; - } - - @Override - public boolean isMemChunkMetadataLoader() { - return false; - } - - public static void setDiskChunkLoader( - List<IChunkMetadata> chunkMetadataList, - TsFileResource resource, - PartialPath seriesPath, - QueryContext context) { - List<Modification> pathModifications = + // get all sub sensors' modifications + List<List<Modification>> pathModifications = context.getPathModifications(resource.getModFile(), seriesPath); if (context.isDebug()) { @@ -107,24 +72,43 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { pathModifications.forEach(c -> DEBUG_LOGGER.info(c.toString())); } - if (!pathModifications.isEmpty()) { - QueryUtils.modifyChunkMetaData(chunkMetadataList, pathModifications); - } + // remove ChunkMetadata that have been deleted + QueryUtils.modifyAlignedChunkMetaData(alignedChunkMetadataList, pathModifications); if (context.isDebug()) { DEBUG_LOGGER.info("After modification Chunk meta data list is: "); - chunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); + alignedChunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); } + // remove not satisfied ChunkMetaData + alignedChunkMetadataList.removeIf( + alignedChunkMetaData -> + (filter != null + && !filter.satisfyStartEndTime( + alignedChunkMetaData.getStartTime(), alignedChunkMetaData.getEndTime())) + || alignedChunkMetaData.getStartTime() > alignedChunkMetaData.getEndTime()); + // it is ok, even if it is not thread safe, because the cost of creating a DiskChunkLoader is // very cheap. - chunkMetadataList.forEach( + alignedChunkMetadataList.forEach( chunkMetadata -> { if (chunkMetadata.needSetChunkLoader()) { chunkMetadata.setFilePath(resource.getTsFilePath()); chunkMetadata.setClosed(resource.isClosed()); - chunkMetadata.setChunkLoader(new DiskChunkLoader(context.isDebug())); + chunkMetadata.setChunkLoader(new DiskAlignedChunkLoader(context.isDebug())); } }); + + if (context.isDebug()) { + DEBUG_LOGGER.info("After removed by filter Chunk meta data list is: "); + alignedChunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); + } + + return new ArrayList<>(alignedChunkMetadataList); + } + + @Override + public boolean isMemChunkMetadataLoader() { + return false; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java index b6ff733..0cc30e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/DiskChunkMetadataLoader.java @@ -54,48 +54,11 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { } @Override - public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeseriesMetadata) { + public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) { List<IChunkMetadata> chunkMetadataList = - ((TimeseriesMetadata) timeseriesMetadata).getChunkMetadataList(); + ((TimeseriesMetadata) timeSeriesMetadata).getChunkMetadataList(); - setDiskChunkLoader(chunkMetadataList, resource, seriesPath, context); - - /* - * remove not satisfied ChunkMetaData - */ - chunkMetadataList.removeIf( - chunkMetaData -> - (filter != null - && !filter.satisfyStartEndTime( - chunkMetaData.getStartTime(), chunkMetaData.getEndTime())) - || chunkMetaData.getStartTime() > chunkMetaData.getEndTime()); - - // For chunkMetadata from old TsFile, do not set version - for (IChunkMetadata metadata : chunkMetadataList) { - if (!metadata.isFromOldTsFile()) { - metadata.setVersion(resource.getVersion()); - } - } - - if (context.isDebug()) { - DEBUG_LOGGER.info("After removed by filter Chunk meta data list is: "); - chunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); - } - - return chunkMetadataList; - } - - @Override - public boolean isMemChunkMetadataLoader() { - return false; - } - - public static void setDiskChunkLoader( - List<IChunkMetadata> chunkMetadataList, - TsFileResource resource, - PartialPath seriesPath, - QueryContext context) { List<Modification> pathModifications = context.getPathModifications(resource.getModFile(), seriesPath); @@ -126,5 +89,34 @@ public class DiskChunkMetadataLoader implements IChunkMetadataLoader { chunkMetadata.setChunkLoader(new DiskChunkLoader(context.isDebug())); } }); + + /* + * remove not satisfied ChunkMetaData + */ + chunkMetadataList.removeIf( + chunkMetaData -> + (filter != null + && !filter.satisfyStartEndTime( + chunkMetaData.getStartTime(), chunkMetaData.getEndTime())) + || chunkMetaData.getStartTime() > chunkMetaData.getEndTime()); + + // For chunkMetadata from old TsFile, do not set version + for (IChunkMetadata metadata : chunkMetadataList) { + if (!metadata.isFromOldTsFile()) { + metadata.setVersion(resource.getVersion()); + } + } + + if (context.isDebug()) { + DEBUG_LOGGER.info("After removed by filter Chunk meta data list is: "); + chunkMetadataList.forEach(c -> DEBUG_LOGGER.info(c.toString())); + } + + return chunkMetadataList; + } + + @Override + public boolean isMemChunkMetadataLoader() { + return false; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java similarity index 78% copy from server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java copy to server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java index 950dbf8..16ee5ab 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemAlignedChunkMetadataLoader.java @@ -27,16 +27,17 @@ import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import java.util.ArrayList; import java.util.List; -public class MemChunkMetadataLoader implements IChunkMetadataLoader { +public class MemAlignedChunkMetadataLoader implements IChunkMetadataLoader { - private TsFileResource resource; - private PartialPath seriesPath; - private QueryContext context; - private Filter timeFilter; + private final TsFileResource resource; + private final PartialPath seriesPath; + private final QueryContext context; + private final Filter timeFilter; - public MemChunkMetadataLoader( + public MemAlignedChunkMetadataLoader( TsFileResource resource, PartialPath seriesPath, QueryContext context, Filter timeFilter) { this.resource = resource; this.seriesPath = seriesPath; @@ -44,12 +45,13 @@ public class MemChunkMetadataLoader implements IChunkMetadataLoader { this.timeFilter = timeFilter; } + // TODO current implementation is same as MemChunkMetadataLoader, I think we need to move the + // processing of modification for ReadOnlyMemChunk from TSP to this class + // There is no need to set IChunkLoader for it, because the MemChunkLoader has already been set + // while creating ReadOnlyMemChunk @Override - public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeseriesMetadata) { - List<IChunkMetadata> chunkMetadataList = resource.getChunkMetadataList(); - - DiskChunkMetadataLoader.setDiskChunkLoader(chunkMetadataList, resource, seriesPath, context); - + public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) { + List<IChunkMetadata> chunkMetadataList = new ArrayList<>(); List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk(); if (memChunks != null) { for (ReadOnlyMemChunk readOnlyMemChunk : memChunks) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java index 950dbf8..9affdf0 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/metadata/MemChunkMetadataLoader.java @@ -27,14 +27,15 @@ import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; import org.apache.iotdb.tsfile.read.controller.IChunkMetadataLoader; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import java.util.ArrayList; import java.util.List; public class MemChunkMetadataLoader implements IChunkMetadataLoader { - private TsFileResource resource; - private PartialPath seriesPath; - private QueryContext context; - private Filter timeFilter; + private final TsFileResource resource; + private final PartialPath seriesPath; + private final QueryContext context; + private final Filter timeFilter; public MemChunkMetadataLoader( TsFileResource resource, PartialPath seriesPath, QueryContext context, Filter timeFilter) { @@ -45,10 +46,8 @@ public class MemChunkMetadataLoader implements IChunkMetadataLoader { } @Override - public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeseriesMetadata) { - List<IChunkMetadata> chunkMetadataList = resource.getChunkMetadataList(); - - DiskChunkMetadataLoader.setDiskChunkLoader(chunkMetadataList, resource, seriesPath, context); + public List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) { + List<IChunkMetadata> chunkMetadataList = new ArrayList<>(); List<ReadOnlyMemChunk> memChunks = resource.getReadOnlyMemChunk(); if (memChunks != null) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesReader.java index cebc792..402d78f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/AlignedSeriesReader.java @@ -26,7 +26,7 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.filter.TsFileFilter; import org.apache.iotdb.db.utils.FileLoaderUtils; import org.apache.iotdb.db.utils.TestOnly; -import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.filter.basic.Filter; @@ -82,7 +82,7 @@ public class AlignedSeriesReader extends SeriesReader { } @Override - protected ITimeSeriesMetadata loadTimeSeriesMetadata( + protected AlignedTimeSeriesMetadata loadTimeSeriesMetadata( TsFileResource resource, PartialPath seriesPath, QueryContext context, diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java index 70506e9..bb154a2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestamp.java @@ -33,9 +33,9 @@ import java.util.Set; public class SeriesReaderByTimestamp implements IReaderByTimestamp { - private SeriesReader seriesReader; + private final SeriesReader seriesReader; private BatchData batchData; - private boolean ascending; + private final boolean ascending; public SeriesReaderByTimestamp( PartialPath seriesPath, diff --git a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java index 547217c..1933e2f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/query/timegenerator/ServerTimeGenerator.java @@ -22,12 +22,12 @@ import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader; -import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.ExpressionType; import org.apache.iotdb.tsfile.read.expression.IBinaryExpression; @@ -72,7 +72,7 @@ public class ServerTimeGenerator extends TimeGenerator { public void serverConstructNode(IExpression expression) throws IOException, StorageEngineException { List<PartialPath> pathList = new ArrayList<>(); - getPartialPathFromExpression(expression, pathList); + getAndTransformPartialPathFromExpression(expression, pathList); List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(pathList); try { operatorNode = construct(expression); @@ -81,12 +81,24 @@ public class ServerTimeGenerator extends TimeGenerator { } } - private void getPartialPathFromExpression(IExpression expression, List<PartialPath> pathList) { + /** + * collect PartialPath from Expression and transform MeasurementPath whose isUnderAlignedEntity is + * true to AlignedPath + */ + private void getAndTransformPartialPathFromExpression( + IExpression expression, List<PartialPath> pathList) { if (expression.getType() == ExpressionType.SERIES) { - pathList.add((PartialPath) ((SingleSeriesExpression) expression).getSeriesPath()); + SingleSeriesExpression seriesExpression = (SingleSeriesExpression) expression; + MeasurementPath measurementPath = (MeasurementPath) seriesExpression.getSeriesPath(); + pathList.add(measurementPath.getDevicePath()); + // change the MeasurementPath to AlignedPath if the MeasurementPath's isUnderAlignedEntity == + // true + seriesExpression.setSeriesPath(measurementPath.transformToExactPath()); } else { - getPartialPathFromExpression(((IBinaryExpression) expression).getLeft(), pathList); - getPartialPathFromExpression(((IBinaryExpression) expression).getRight(), pathList); + getAndTransformPartialPathFromExpression( + ((IBinaryExpression) expression).getLeft(), pathList); + getAndTransformPartialPathFromExpression( + ((IBinaryExpression) expression).getRight(), pathList); } } @@ -95,10 +107,9 @@ public class ServerTimeGenerator extends TimeGenerator { throws IOException { Filter valueFilter = expression.getFilter(); PartialPath path = (PartialPath) expression.getSeriesPath(); - TSDataType dataType; + TSDataType dataType = path.getSeriesType(); QueryDataSource queryDataSource; try { - dataType = IoTDB.metaManager.getSeriesType(path); queryDataSource = QueryResourceManager.getInstance().getQueryDataSource(path, context, valueFilter); // update valueFilter by TTL diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index b5f8695..30caccf 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -26,25 +26,20 @@ import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; -import org.apache.iotdb.db.query.reader.chunk.MemChunkLoader; -import org.apache.iotdb.db.query.reader.chunk.MemChunkReader; +import org.apache.iotdb.db.query.reader.chunk.metadata.DiskAlignedChunkMetadataLoader; import org.apache.iotdb.db.query.reader.chunk.metadata.DiskChunkMetadataLoader; +import org.apache.iotdb.db.query.reader.chunk.metadata.MemAlignedChunkMetadataLoader; import org.apache.iotdb.db.query.reader.chunk.metadata.MemChunkMetadataLoader; -import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.AlignedTimeSeriesMetadata; -import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; -import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.controller.IChunkLoader; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.IChunkReader; import org.apache.iotdb.tsfile.read.reader.IPageReader; -import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader; -import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import java.io.IOException; import java.util.ArrayList; @@ -94,7 +89,7 @@ public class FileLoaderUtils { * @param filter any filter, only used to check time range */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - public static ITimeSeriesMetadata loadTimeSeriesMetadata( + public static TimeseriesMetadata loadTimeSeriesMetadata( TsFileResource resource, PartialPath seriesPath, QueryContext context, @@ -103,7 +98,7 @@ public class FileLoaderUtils { throws IOException { // common path - ITimeSeriesMetadata timeSeriesMetadata; + TimeseriesMetadata timeSeriesMetadata; // If the tsfile is closed, we need to load from tsfile if (resource.isClosed()) { if (!resource.getTsFile().exists()) { @@ -123,7 +118,7 @@ public class FileLoaderUtils { new DiskChunkMetadataLoader(resource, seriesPath, context, filter)); } } else { // if the tsfile is unclosed, we just get it directly from TsFileResource - timeSeriesMetadata = resource.getTimeSeriesMetadata(); + timeSeriesMetadata = (TimeseriesMetadata) resource.getTimeSeriesMetadata(); if (timeSeriesMetadata != null) { timeSeriesMetadata.setChunkMetadataLoader( new MemChunkMetadataLoader(resource, seriesPath, context, filter)); @@ -177,8 +172,6 @@ public class FileLoaderUtils { TimeseriesMetadata timeColumn = cache.get(new TimeSeriesMetadataCacheKey(filePath, deviceId, ""), allSensors, isDebug); if (timeColumn != null) { - timeColumn.setChunkMetadataLoader( - new DiskChunkMetadataLoader(resource, vectorPath, context, filter)); List<TimeseriesMetadata> valueTimeSeriesMetadataList = new ArrayList<>(valueMeasurementList.size()); // if all the queried aligned sensors does not exist, we will return null @@ -189,32 +182,25 @@ public class FileLoaderUtils { new TimeSeriesMetadataCacheKey(filePath, deviceId, valueMeasurement), allSensors, isDebug); - if (valueColumn != null) { - valueColumn.setChunkMetadataLoader( - new DiskChunkMetadataLoader(resource, vectorPath, context, filter)); - exist = true; - } + exist = (exist || (valueColumn != null)); valueTimeSeriesMetadataList.add(valueColumn); } if (exist) { alignedTimeSeriesMetadata = new AlignedTimeSeriesMetadata(timeColumn, valueTimeSeriesMetadataList); + alignedTimeSeriesMetadata.setChunkMetadataLoader( + new DiskAlignedChunkMetadataLoader(resource, vectorPath, context, filter)); } } } else { // if the tsfile is unclosed, we just get it directly from TsFileResource alignedTimeSeriesMetadata = (AlignedTimeSeriesMetadata) resource.getTimeSeriesMetadata(); if (alignedTimeSeriesMetadata != null) { alignedTimeSeriesMetadata.setChunkMetadataLoader( - new MemChunkMetadataLoader(resource, vectorPath, context, filter)); + new MemAlignedChunkMetadataLoader(resource, vectorPath, context, filter)); } } - // TODO Modification should be applied to each aligned sensor instead of only applying to time - // column if (alignedTimeSeriesMetadata != null) { - List<Modification> pathModifications = - context.getPathModifications(resource.getModFile(), vectorPath); - alignedTimeSeriesMetadata.getTimeseriesMetadata().setModified(!pathModifications.isEmpty()); if (alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getStartTime() > alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime()) { return null; @@ -225,14 +211,21 @@ public class FileLoaderUtils { alignedTimeSeriesMetadata.getTimeseriesMetadata().getStatistics().getEndTime())) { return null; } + + // set modifications to each aligned path List<TimeseriesMetadata> valueTimeSeriesMetadataList = alignedTimeSeriesMetadata.getValueTimeseriesMetadataList(); + boolean modified = false; for (int i = 0; i < valueTimeSeriesMetadataList.size(); i++) { - pathModifications = - context.getPathModifications( - resource.getModFile(), vectorPath.getPathWithMeasurement(i)); - valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty()); + if (valueTimeSeriesMetadataList.get(i) != null) { + List<Modification> pathModifications = + context.getPathModifications( + resource.getModFile(), vectorPath.getPathWithMeasurement(i)); + valueTimeSeriesMetadataList.get(i).setModified(!pathModifications.isEmpty()); + modified = (modified || !pathModifications.isEmpty()); + } } + alignedTimeSeriesMetadata.getTimeseriesMetadata().setModified(modified); } return alignedTimeSeriesMetadata; } @@ -258,23 +251,8 @@ public class FileLoaderUtils { if (chunkMetaData == null) { throw new IOException("Can't init null chunkMeta"); } - IChunkReader chunkReader; IChunkLoader chunkLoader = chunkMetaData.getChunkLoader(); - if (chunkLoader instanceof MemChunkLoader) { - MemChunkLoader memChunkLoader = (MemChunkLoader) chunkLoader; - chunkReader = new MemChunkReader(memChunkLoader.getChunk(), timeFilter); - } else { - if (chunkMetaData instanceof ChunkMetadata) { - Chunk chunk = chunkLoader.loadChunk((ChunkMetadata) chunkMetaData); - chunk.setFromOldFile(chunkMetaData.isFromOldTsFile()); - chunkReader = new ChunkReader(chunk, timeFilter); - } else { - AlignedChunkMetadata alignedChunkMetadata = (AlignedChunkMetadata) chunkMetaData; - Chunk timeChunk = alignedChunkMetadata.getTimeChunk(); - List<Chunk> valueChunkList = alignedChunkMetadata.getValueChunkList(); - chunkReader = new AlignedChunkReader(timeChunk, valueChunkList, timeFilter); - } - } + IChunkReader chunkReader = chunkLoader.getChunkReader(chunkMetaData, timeFilter); return chunkReader.loadPageReaderList(); } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java index bd82deb..2feaa2f 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryUtils.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.common.TimeRange; @@ -86,6 +87,74 @@ public class QueryUtils { }); } + public static void modifyAlignedChunkMetaData( + List<AlignedChunkMetadata> chunkMetaData, List<List<Modification>> modifications) { + for (AlignedChunkMetadata metaData : chunkMetaData) { + List<IChunkMetadata> valueChunkMetadataList = metaData.getValueChunkMetadataList(); + // deal with each sub sensor + for (int i = 0; i < valueChunkMetadataList.size(); i++) { + IChunkMetadata v = valueChunkMetadataList.get(i); + if (v != null) { + List<Modification> modificationList = modifications.get(i); + for (Modification modification : modificationList) { + // The case modification.getFileOffset() == metaData.getOffsetOfChunkHeader() + // is not supposed to exist as getFileOffset() is offset containing full chunk, + // while getOffsetOfChunkHeader() returns the chunk header offset + if (modification.getFileOffset() > v.getOffsetOfChunkHeader()) { + doModifyChunkMetaData(modification, v); + } + } + } + } + } + // if all sub sensors' chunk metadata are deleted, then remove the aligned chunk metadata + // otherwise, set the deleted chunk metadata of some sensors to null + chunkMetaData.removeIf( + alignedChunkMetadata -> { + // the whole aligned path need to be removed, only set to be true if all the sub sensors + // are deleted + boolean removed = true; + // the whole aligned path is modified, set to be true if any sub sensor is modified + boolean modified = false; + List<IChunkMetadata> valueChunkMetadataList = + alignedChunkMetadata.getValueChunkMetadataList(); + for (int i = 0; i < valueChunkMetadataList.size(); i++) { + IChunkMetadata valueChunkMetadata = valueChunkMetadataList.get(i); + if (valueChunkMetadata == null) { + continue; + } + // current sub sensor's chunk metadata is completely removed + boolean currentRemoved = false; + if (valueChunkMetadata.getDeleteIntervalList() != null) { + for (TimeRange range : valueChunkMetadata.getDeleteIntervalList()) { + if (range.contains( + valueChunkMetadata.getStartTime(), valueChunkMetadata.getEndTime())) { + valueChunkMetadataList.set(i, null); + currentRemoved = true; + break; + } else { + if (!valueChunkMetadata.isModified() + && range.overlaps( + new TimeRange( + valueChunkMetadata.getStartTime(), + valueChunkMetadata.getEndTime()))) { + valueChunkMetadata.setModified(true); + modified = true; + } + } + } + } + // current sub sensor's chunk metadata is not completely removed, + // so the whole aligned path don't need to be removed from list + if (!currentRemoved) { + removed = false; + } + } + alignedChunkMetadata.setModified(modified); + return removed; + }); + } + private static void doModifyChunkMetaData(Modification modification, IChunkMetadata metaData) { if (modification instanceof Deletion) { Deletion deletion = (Deletion) modification; diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index 07c0b97..f5fec46 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -142,7 +142,6 @@ public class StorageGroupProcessorTest { } Assert.assertEquals(1, tsfileResourcesForQuery.size()); - Assert.assertEquals(0, tsfileResourcesForQuery.get(0).getChunkMetadataList().size()); List<ReadOnlyMemChunk> memChunks = tsfileResourcesForQuery.get(0).getReadOnlyMemChunk(); long time = 16; for (ReadOnlyMemChunk memChunk : memChunks) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java index 2625175..7009b70 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java @@ -143,12 +143,6 @@ public class TsFileProcessorTest { tsfileResourcesForQuery.clear(); processor.query(fullPath, context, tsfileResourcesForQuery); assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty()); - assertEquals(1, tsfileResourcesForQuery.get(0).getChunkMetadataList().size()); - assertEquals( - measurementId, - tsfileResourcesForQuery.get(0).getChunkMetadataList().get(0).getMeasurementUid()); - assertEquals( - dataType, tsfileResourcesForQuery.get(0).getChunkMetadataList().get(0).getDataType()); processor.syncClose(); } @@ -207,12 +201,6 @@ public class TsFileProcessorTest { tsfileResourcesForQuery.clear(); processor.query(fullPath, context, tsfileResourcesForQuery); assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty()); - assertEquals(1, tsfileResourcesForQuery.get(0).getChunkMetadataList().size()); - assertEquals( - measurementId, - tsfileResourcesForQuery.get(0).getChunkMetadataList().get(0).getMeasurementUid()); - assertEquals( - dataType, tsfileResourcesForQuery.get(0).getChunkMetadataList().get(0).getDataType()); RestorableTsFileIOWriter tsFileIOWriter = processor.getWriter(); Map<String, List<ChunkMetadata>> chunkMetaDataListInChunkGroups = @@ -286,12 +274,6 @@ public class TsFileProcessorTest { processor.query(fullPath, context, tsfileResourcesForQuery); assertFalse(tsfileResourcesForQuery.isEmpty()); assertTrue(tsfileResourcesForQuery.get(0).getReadOnlyMemChunk().isEmpty()); - assertEquals(10, tsfileResourcesForQuery.get(0).getChunkMetadataList().size()); - assertEquals( - measurementId, - tsfileResourcesForQuery.get(0).getChunkMetadataList().get(0).getMeasurementUid()); - assertEquals( - dataType, tsfileResourcesForQuery.get(0).getChunkMetadataList().get(0).getDataType()); processor.syncClose(); } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java index 47330ef..d2379ff 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedChunkMetadata.java @@ -20,13 +20,10 @@ package org.apache.iotdb.tsfile.file.metadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.controller.IChunkLoader; -import java.io.IOException; import java.io.OutputStream; -import java.util.ArrayList; import java.util.List; public class AlignedChunkMetadata implements IChunkMetadata { @@ -36,6 +33,9 @@ public class AlignedChunkMetadata implements IChunkMetadata { // ChunkMetadata for all subSensors in the vector private final List<IChunkMetadata> valueChunkMetadataList; + /** ChunkLoader of metadata, used to create IChunkReader */ + private IChunkLoader chunkLoader; + public AlignedChunkMetadata( IChunkMetadata timeChunkMetadata, List<IChunkMetadata> valueChunkMetadataList) { this.timeChunkMetadata = timeChunkMetadata; @@ -44,13 +44,14 @@ public class AlignedChunkMetadata implements IChunkMetadata { @Override public Statistics getStatistics() { - return valueChunkMetadataList.size() == 1 + return valueChunkMetadataList.size() == 1 && valueChunkMetadataList.get(0) != null ? valueChunkMetadataList.get(0).getStatistics() : timeChunkMetadata.getStatistics(); } public Statistics getStatistics(int index) { - return valueChunkMetadataList.get(index).getStatistics(); + IChunkMetadata v = valueChunkMetadataList.get(index); + return v == null ? null : v.getStatistics(); } @Override @@ -61,6 +62,11 @@ public class AlignedChunkMetadata implements IChunkMetadata { @Override public void setModified(boolean modified) { timeChunkMetadata.setModified(modified); + for (IChunkMetadata v : valueChunkMetadataList) { + if (v != null) { + v.setModified(modified); + } + } } @Override @@ -71,6 +77,11 @@ public class AlignedChunkMetadata implements IChunkMetadata { @Override public void setSeq(boolean seq) { timeChunkMetadata.setSeq(seq); + for (IChunkMetadata v : valueChunkMetadataList) { + if (v != null) { + v.setSeq(seq); + } + } } @Override @@ -81,6 +92,11 @@ public class AlignedChunkMetadata implements IChunkMetadata { @Override public void setVersion(long version) { timeChunkMetadata.setVersion(version); + for (IChunkMetadata valueChunkMetadata : valueChunkMetadataList) { + if (valueChunkMetadata != null) { + valueChunkMetadata.setVersion(version); + } + } } @Override @@ -110,31 +126,21 @@ public class AlignedChunkMetadata implements IChunkMetadata { @Override public boolean needSetChunkLoader() { - if (timeChunkMetadata.needSetChunkLoader()) { - return true; - } else { - for (IChunkMetadata chunkMetadata : valueChunkMetadataList) { - if (chunkMetadata.needSetChunkLoader()) { - return true; - } - } - } - return false; + return chunkLoader == null; } @Override public void setChunkLoader(IChunkLoader chunkLoader) { - timeChunkMetadata.setChunkLoader(chunkLoader); - for (IChunkMetadata chunkMetadata : valueChunkMetadataList) { - chunkMetadata.setChunkLoader(chunkLoader); - } + this.chunkLoader = chunkLoader; } @Override public void setFilePath(String filePath) { timeChunkMetadata.setFilePath(filePath); for (IChunkMetadata chunkMetadata : valueChunkMetadataList) { - chunkMetadata.setFilePath(filePath); + if (chunkMetadata != null) { + chunkMetadata.setFilePath(filePath); + } } } @@ -142,7 +148,9 @@ public class AlignedChunkMetadata implements IChunkMetadata { public void setClosed(boolean closed) { timeChunkMetadata.setClosed(closed); for (IChunkMetadata chunkMetadata : valueChunkMetadataList) { - chunkMetadata.setClosed(closed); + if (chunkMetadata != null) { + chunkMetadata.setClosed(closed); + } } } @@ -158,12 +166,12 @@ public class AlignedChunkMetadata implements IChunkMetadata { @Override public void insertIntoSortedDeletions(long startTime, long endTime) { - timeChunkMetadata.insertIntoSortedDeletions(startTime, endTime); + throw new UnsupportedOperationException(); } @Override public List<TimeRange> getDeleteIntervalList() { - return timeChunkMetadata.getDeleteIntervalList(); + throw new UnsupportedOperationException(); } @Override @@ -176,21 +184,6 @@ public class AlignedChunkMetadata implements IChunkMetadata { return 0; } - public Chunk getTimeChunk() throws IOException { - return timeChunkMetadata.getChunkLoader().loadChunk((ChunkMetadata) timeChunkMetadata); - } - - public List<Chunk> getValueChunkList() throws IOException { - List<Chunk> valueChunkList = new ArrayList<>(); - for (IChunkMetadata chunkMetadata : valueChunkMetadataList) { - valueChunkList.add( - chunkMetadata == null - ? null - : chunkMetadata.getChunkLoader().loadChunk((ChunkMetadata) chunkMetadata)); - } - return valueChunkList; - } - public IChunkMetadata getTimeChunkMetadata() { return timeChunkMetadata; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java index 74649a3..850481d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/AlignedTimeSeriesMetadata.java @@ -32,6 +32,8 @@ public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { // TimeSeriesMetadata for all subSensors in the vector private final List<TimeseriesMetadata> valueTimeseriesMetadataList; + private IChunkMetadataLoader chunkMetadataLoader; + public AlignedTimeSeriesMetadata( TimeseriesMetadata timeseriesMetadata, List<TimeseriesMetadata> valueTimeseriesMetadataList) { this.timeseriesMetadata = timeseriesMetadata; @@ -44,13 +46,14 @@ public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { */ @Override public Statistics getStatistics() { - return valueTimeseriesMetadataList.size() == 1 + return valueTimeseriesMetadataList.size() == 1 && valueTimeseriesMetadataList.get(0) != null ? valueTimeseriesMetadataList.get(0).getStatistics() : timeseriesMetadata.getStatistics(); } public Statistics getStatistics(int index) { - return valueTimeseriesMetadataList.get(index).getStatistics(); + TimeseriesMetadata v = valueTimeseriesMetadataList.get(index); + return v == null ? null : v.getStatistics(); } @Override @@ -62,7 +65,9 @@ public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { public void setModified(boolean modified) { timeseriesMetadata.setModified(modified); for (TimeseriesMetadata subSensor : valueTimeseriesMetadataList) { - subSensor.setModified(modified); + if (subSensor != null) { + subSensor.setModified(modified); + } } } @@ -75,7 +80,9 @@ public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { public void setSeq(boolean seq) { timeseriesMetadata.setSeq(seq); for (TimeseriesMetadata subSensor : valueTimeseriesMetadataList) { - subSensor.setSeq(seq); + if (subSensor != null) { + subSensor.setSeq(seq); + } } } @@ -89,37 +96,36 @@ public class AlignedTimeSeriesMetadata implements ITimeSeriesMetadata { */ @Override public List<IChunkMetadata> loadChunkMetadataList() throws IOException { - if (timeseriesMetadata.getChunkMetadataLoader().isMemChunkMetadataLoader()) { - return timeseriesMetadata.loadChunkMetadataList(); - } else { - List<IChunkMetadata> timeChunkMetadata = timeseriesMetadata.loadChunkMetadataList(); - List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>(); - for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) { - valueChunkMetadataList.add(metadata == null ? null : metadata.loadChunkMetadataList()); - } + return chunkMetadataLoader.loadChunkMetadataList(this); + } + + public List<AlignedChunkMetadata> getChunkMetadataList() { + List<IChunkMetadata> timeChunkMetadata = timeseriesMetadata.getChunkMetadataList(); + List<List<IChunkMetadata>> valueChunkMetadataList = new ArrayList<>(); + for (TimeseriesMetadata metadata : valueTimeseriesMetadataList) { + valueChunkMetadataList.add(metadata == null ? null : metadata.getChunkMetadataList()); + } - List<IChunkMetadata> res = new ArrayList<>(); - - for (int i = 0; i < timeChunkMetadata.size(); i++) { - List<IChunkMetadata> chunkMetadataList = new ArrayList<>(); - // only at least one sensor exits, we add the AlignedChunkMetadata to the list - boolean exits = false; - for (List<IChunkMetadata> chunkMetadata : valueChunkMetadataList) { - IChunkMetadata v = chunkMetadata == null ? null : chunkMetadata.get(i); - exits = (exits || v != null); - chunkMetadataList.add(v); - } - if (exits) { - res.add(new AlignedChunkMetadata(timeChunkMetadata.get(i), chunkMetadataList)); - } + List<AlignedChunkMetadata> res = new ArrayList<>(); + for (int i = 0; i < timeChunkMetadata.size(); i++) { + List<IChunkMetadata> chunkMetadataList = new ArrayList<>(); + // only at least one sensor exits, we add the AlignedChunkMetadata to the list + boolean exits = false; + for (List<IChunkMetadata> chunkMetadata : valueChunkMetadataList) { + IChunkMetadata v = chunkMetadata == null ? null : chunkMetadata.get(i); + exits = (exits || v != null); + chunkMetadataList.add(v); + } + if (exits) { + res.add(new AlignedChunkMetadata(timeChunkMetadata.get(i), chunkMetadataList)); } - return res; } + return res; } @Override public void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader) { - timeseriesMetadata.setChunkMetadataLoader(chunkMetadataLoader); + this.chunkMetadataLoader = chunkMetadataLoader; } public List<TimeseriesMetadata> getValueTimeseriesMetadataList() { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java index 3a773b1..e7b8548 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ITimeSeriesMetadata.java @@ -36,6 +36,7 @@ public interface ITimeSeriesMetadata { void setSeq(boolean seq); + /** @return the result has already been filtered by modification files */ List<IChunkMetadata> loadChunkMetadataList() throws IOException; void setChunkMetadataLoader(IChunkMetadataLoader chunkMetadataLoader); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java index 156d220..f674c90 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/TimeseriesMetadata.java @@ -61,7 +61,7 @@ public class TimeseriesMetadata implements Accountable, ITimeSeriesMetadata { // modified is true when there are modifications of the series, or from unseq file private boolean modified; - protected IChunkMetadataLoader chunkMetadataLoader; + private IChunkMetadataLoader chunkMetadataLoader; private long ramSize; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java index 4cb8f17..314aa36 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/BatchData.java @@ -764,12 +764,17 @@ public class BatchData { } @Override - public Object currentValue() { - if (dataType == TSDataType.VECTOR) { - return getVector()[subIndex].getValue(); - } else { - return null; + public boolean hasNext() { + while (BatchData.this.hasCurrent() && currentValue() == null) { + super.next(); } + return BatchData.this.hasCurrent(); + } + + @Override + public Object currentValue() { + TsPrimitiveType v = getVector()[subIndex]; + return v == null ? null : v.getValue(); } } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/CachedChunkLoaderImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/CachedChunkLoaderImpl.java index 40ce4a3..4668d4d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/CachedChunkLoaderImpl.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/CachedChunkLoaderImpl.java @@ -20,8 +20,12 @@ package org.apache.iotdb.tsfile.read.controller; import org.apache.iotdb.tsfile.common.cache.LRUCache; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.IChunkReader; +import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; import java.io.IOException; @@ -71,4 +75,18 @@ public class CachedChunkLoaderImpl implements IChunkLoader { public void close() throws IOException { reader.close(); } + + @Override + public IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) + throws IOException { + chunkMetaData.setFilePath(reader.getFileName()); + Chunk chunk = chunkCache.get((ChunkMetadata) chunkMetaData); + return new ChunkReader( + new Chunk( + chunk.getHeader(), + chunk.getData().duplicate(), + chunkMetaData.getDeleteIntervalList(), + chunkMetaData.getStatistics()), + timeFilter); + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkLoader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkLoader.java index 76aa4b3..27554c2 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkLoader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkLoader.java @@ -19,7 +19,10 @@ package org.apache.iotdb.tsfile.read.controller; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.common.Chunk; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.iotdb.tsfile.read.reader.IChunkReader; import java.io.IOException; @@ -30,4 +33,6 @@ public interface IChunkLoader { /** close the file reader. */ void close() throws IOException; + + IChunkReader getChunkReader(IChunkMetadata chunkMetaData, Filter timeFilter) throws IOException; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java index 71371a4..441e36d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/IChunkMetadataLoader.java @@ -27,7 +27,7 @@ import java.util.List; public interface IChunkMetadataLoader { /** read all chunk metadata of one time series in one file. */ - List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeseriesMetadata) + List<IChunkMetadata> loadChunkMetadataList(ITimeSeriesMetadata timeSeriesMetadata) throws IOException; boolean isMemChunkMetadataLoader(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java index 574fd9d..a616a96 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/TimePageReader.java @@ -62,6 +62,9 @@ public class TimePageReader { return timeBatch; } + /** + * In case that we use sequence read, and the page doesn't have statistics, so we won't know time array's length at first + */ public long[] getNextTimeBatch() throws IOException { if (pageHeader.getStatistics() != null) { return nexTimeBatch(); diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFileTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFileTest.java index 8688e6b..9a6bc1b 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFileTest.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/ReadOnlyTsFileTest.java @@ -74,7 +74,7 @@ public class ReadOnlyTsFileTest { Path path = new Path("t", "id"); tsFileWriter.registerTimeseries( - path, + new Path(path.getDevice()), new UnaryMeasurementSchema("id", TSDataType.INT32, TSEncoding.PLAIN, CompressionType.LZ4)); for (int i = 0; i < 11000000; i++) { diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileGeneratorForSeriesReaderByTimestamp.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileGeneratorForSeriesReaderByTimestamp.java index f523878..63a746d 100755 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileGeneratorForSeriesReaderByTimestamp.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/read/query/timegenerator/TsFileGeneratorForSeriesReaderByTimestamp.java @@ -194,52 +194,52 @@ public class TsFileGeneratorForSeriesReaderByTimestamp { TSFileConfig conf = TSFileDescriptor.getInstance().getConfig(); schema = new Schema(); schema.registerTimeseries( - new Path("d1", "s1"), + new Path("d1"), new UnaryMeasurementSchema( "s1", TSDataType.INT32, TSEncoding.valueOf(conf.getValueEncoder()))); schema.registerTimeseries( - new Path("d1", "s2"), + new Path("d1"), new UnaryMeasurementSchema( "s2", TSDataType.INT64, TSEncoding.valueOf(conf.getValueEncoder()), CompressionType.UNCOMPRESSED)); schema.registerTimeseries( - new Path("d1", "s3"), + new Path("d1"), new UnaryMeasurementSchema( "s3", TSDataType.INT64, TSEncoding.valueOf(conf.getValueEncoder()), CompressionType.SNAPPY)); schema.registerTimeseries( - new Path("d1", "s4"), new UnaryMeasurementSchema("s4", TSDataType.TEXT, TSEncoding.PLAIN)); + new Path("d1"), new UnaryMeasurementSchema("s4", TSDataType.TEXT, TSEncoding.PLAIN)); schema.registerTimeseries( - new Path("d1", "s5"), + new Path("d1"), new UnaryMeasurementSchema("s5", TSDataType.BOOLEAN, TSEncoding.PLAIN)); schema.registerTimeseries( - new Path("d1", "s6"), new UnaryMeasurementSchema("s6", TSDataType.FLOAT, TSEncoding.RLE)); + new Path("d1"), new UnaryMeasurementSchema("s6", TSDataType.FLOAT, TSEncoding.RLE)); schema.registerTimeseries( - new Path("d1", "s7"), new UnaryMeasurementSchema("s7", TSDataType.DOUBLE, TSEncoding.RLE)); + new Path("d1"), new UnaryMeasurementSchema("s7", TSDataType.DOUBLE, TSEncoding.RLE)); schema.registerTimeseries( - new Path("d2", "s1"), + new Path("d2"), new UnaryMeasurementSchema( "s1", TSDataType.INT32, TSEncoding.valueOf(conf.getValueEncoder()))); schema.registerTimeseries( - new Path("d2", "s2"), + new Path("d2"), new UnaryMeasurementSchema( "s2", TSDataType.INT64, TSEncoding.valueOf(conf.getValueEncoder()), CompressionType.UNCOMPRESSED)); schema.registerTimeseries( - new Path("d2", "s3"), + new Path("d2"), new UnaryMeasurementSchema( "s3", TSDataType.INT64, TSEncoding.valueOf(conf.getValueEncoder()), CompressionType.SNAPPY)); schema.registerTimeseries( - new Path("d2", "s4"), new UnaryMeasurementSchema("s4", TSDataType.TEXT, TSEncoding.PLAIN)); + new Path("d2"), new UnaryMeasurementSchema("s4", TSDataType.TEXT, TSEncoding.PLAIN)); } public static void writeToFile(Schema schema) throws IOException, WriteProcessException { diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/RecordUtils.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/RecordUtils.java index eb4d228..ee1436f 100644 --- a/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/RecordUtils.java +++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/utils/RecordUtils.java @@ -54,7 +54,7 @@ public class RecordUtils { String deviceId = items[0].trim(); long timestamp; try { - timestamp = Long.valueOf(items[1].trim()); + timestamp = Long.parseLong(items[1].trim()); } catch (NumberFormatException e) { LOG.warn("given timestamp is illegal:{}", str); // return a TSRecord without any data points @@ -82,19 +82,19 @@ public class RecordUtils { try { switch (type) { case INT32: - ret.addTuple(new IntDataPoint(measurementId, Integer.valueOf(value))); + ret.addTuple(new IntDataPoint(measurementId, Integer.parseInt(value))); break; case INT64: - ret.addTuple(new LongDataPoint(measurementId, Long.valueOf(value))); + ret.addTuple(new LongDataPoint(measurementId, Long.parseLong(value))); break; case FLOAT: - ret.addTuple(new FloatDataPoint(measurementId, Float.valueOf(value))); + ret.addTuple(new FloatDataPoint(measurementId, Float.parseFloat(value))); break; case DOUBLE: - ret.addTuple(new DoubleDataPoint(measurementId, Double.valueOf(value))); + ret.addTuple(new DoubleDataPoint(measurementId, Double.parseDouble(value))); break; case BOOLEAN: - ret.addTuple(new BooleanDataPoint(measurementId, Boolean.valueOf(value))); + ret.addTuple(new BooleanDataPoint(measurementId, Boolean.parseBoolean(value))); break; case TEXT: ret.addTuple(new StringDataPoint(measurementId, Binary.valueOf(items[i + 1])));
