This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch cacheMetadataIndexNodeOffsetsForQuery in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 057971adea9383aa4940e41c633bb5e688e326ba Author: shuwenwei <[email protected]> AuthorDate: Wed Jul 23 15:54:14 2025 +0800 cache device metadata index node offsets for query --- .../fragment/DeviceMetadataIndexEntryCache.java | 113 +++++++++++++++++++++ .../fragment/FragmentInstanceContext.java | 7 ++ .../source/AbstractDataSourceOperator.java | 18 +++- .../operator/source/AlignedSeriesScanOperator.java | 2 +- .../operator/source/AlignedSeriesScanUtil.java | 1 + .../execution/operator/source/FileLoaderUtils.java | 13 ++- .../execution/operator/source/SeriesScanUtil.java | 9 +- .../relational/AbstractAggTableScanOperator.java | 11 +- .../relational/AbstractTableScanOperator.java | 9 +- .../relational/DeviceIteratorScanOperator.java | 5 + .../plan/planner/OperatorTreeGenerator.java | 24 +++++ .../plan/planner/TableOperatorGenerator.java | 4 + .../buffer/TimeSeriesMetadataCache.java | 38 ++++++- .../tablemodel/CompactionWithAllNullRowsTest.java | 42 ++++++++ 14 files changed, 286 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DeviceMetadataIndexEntryCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DeviceMetadataIndexEntryCache.java new file mode 100644 index 00000000000..a0c71c40e6e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DeviceMetadataIndexEntryCache.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.fragment; + +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class DeviceMetadataIndexEntryCache { + private TreeMap<IDeviceID, Integer> deviceIndexMap; + private final Map<String, long[]> deviceMetadataIndexNodeOffsetsCache = new HashMap<>(); + private List<IDeviceID> sortedDevices; + private int[] deviceIdxArr; + + public void addDevices(AbstractDataSourceOperator operator, List<DeviceEntry> deviceEntries) { + deviceIndexMap = deviceIndexMap == null ? new TreeMap<>(IDeviceID::compareTo) : deviceIndexMap; + int[] operatorDeviceIndexArr = new int[deviceEntries.size()]; + for (int i = 0; i < deviceEntries.size(); i++) { + int idx = + deviceIndexMap.computeIfAbsent( + deviceEntries.get(i).getDeviceID(), k -> deviceIndexMap.size()); + operatorDeviceIndexArr[i] = idx; + } + operator.setDeviceIndexArr(operatorDeviceIndexArr); + } + + public void addDevice(AbstractDataSourceOperator operator, IDeviceID deviceID) { + deviceIndexMap = deviceIndexMap == null ? new TreeMap<>() : deviceIndexMap; + int idx = deviceIndexMap.computeIfAbsent(deviceID, k -> deviceIndexMap.size()); + operator.setDeviceIndexArr(new int[] {idx}); + } + + public Pair<long[], Boolean> getCachedDeviceMetadataIndexNodeOffset( + int deviceIndex, String filePath) throws IOException { + // cache is disabled + if (deviceIndex < 0) { + return new Pair<>(null, true); + } + // not in cache + long[] resourceCache = loadOffsetsToCache(filePath); + if (resourceCache == null) { + return new Pair<>(null, true); + } + int indexAfterSort = deviceIdxArr[deviceIndex]; + long startOffset = resourceCache[2 * indexAfterSort]; + // the device does not exist in the file + if (startOffset < 0) { + return new Pair<>(null, false); + } + long endOffset = resourceCache[2 * indexAfterSort + 1]; + return new Pair<>(new long[] {startOffset, endOffset}, true); + } + + private long[] loadOffsetsToCache(String filePath) throws IOException { + long[] offsets = deviceMetadataIndexNodeOffsetsCache.get(filePath); + if (offsets != null) { + return offsets; + } + TsFileSequenceReader reader = FileReaderManager.getInstance().get(filePath, true); + IDeviceID firstDevice = getSortedDevices().get(0); + offsets = + reader.getDeviceMetadataIndexNodeOffsets( + firstDevice.isTableModel() ? firstDevice.getTableName() : "", sortedDevices, null); + deviceMetadataIndexNodeOffsetsCache.put(filePath, offsets); + return offsets; + } + + private synchronized List<IDeviceID> getSortedDevices() { + if (deviceIdxArr == null) { + sort(); + } + return sortedDevices; + } + + private void sort() { + deviceIdxArr = new int[deviceIndexMap.size()]; + sortedDevices = new ArrayList<>(deviceIndexMap.size()); + int i = 0; + for (Map.Entry<IDeviceID, Integer> entry : deviceIndexMap.entrySet()) { + sortedDevices.add(entry.getKey()); + deviceIdxArr[entry.getValue()] = i++; + } + deviceIndexMap = null; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 8fc462bbd42..0844a18b602 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -145,6 +145,9 @@ public class FragmentInstanceContext extends QueryContext { private long closedSeqFileNum = 0; private long closedUnseqFileNum = 0; + private DeviceMetadataIndexEntryCache metadataIndexEntryCache = + new DeviceMetadataIndexEntryCache(); + public static FragmentInstanceContext createFragmentInstanceContext( FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) { FragmentInstanceContext instanceContext = @@ -327,6 +330,10 @@ public class FragmentInstanceContext extends QueryContext { stateMachine.addStateChangeListener(this::updateStatsIfDone); } + public DeviceMetadataIndexEntryCache getMetadataIndexEntryCache() { + return metadataIndexEntryCache; + } + private void updateStatsIfDone(FragmentInstanceState newState) { if (newState.isDone()) { long now = System.currentTimeMillis(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java index 47fe0acbf87..a5c642f32f2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java @@ -34,12 +34,28 @@ public abstract class AbstractDataSourceOperator extends AbstractSourceOperator // Using for building result tsBlock protected TsBlockBuilder resultTsBlockBuilder; + protected int firstDeviceIndexInFI; + + protected int[] deviceIndexInFI; + @Override public void initQueryDataSource(IQueryDataSource dataSource) { - seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource); + seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource, getCurrentDeviceIndex()); resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); } + public int getCurrentDeviceIndex() { + return deviceIndexInFI[0]; + } + + public void setDeviceIndexArr(int[] arr) { + this.deviceIndexInFI = arr; + } + + public void setFirstDeviceIndexInFI(int firstDeviceIndexInFI) { + this.firstDeviceIndexInFI = firstDeviceIndexInFI; + } + @Override public void close() throws Exception { // do nothing diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java index 1fc8014b3ea..e3995967e67 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java @@ -122,7 +122,7 @@ public class AlignedSeriesScanOperator extends AbstractSeriesScanOperator { @Override public void initQueryDataSource(IQueryDataSource dataSource) { - seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource); + seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource, getCurrentDeviceIndex()); resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java index f7ddaee472e..ab19d263604 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java @@ -97,6 +97,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { return FileLoaderUtils.loadAlignedTimeSeriesMetadata( resource, (AlignedFullPath) seriesPath, + deviceIndexInFI, context, scanOptions.getGlobalTimeFilter(), isSeq, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java index 66f24700b55..a0c2da1a53b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java @@ -79,6 +79,7 @@ public class FileLoaderUtils { public static TimeseriesMetadata loadTimeSeriesMetadata( TsFileResource resource, NonAlignedFullPath seriesPath, + int deviceIndexInFI, FragmentInstanceContext context, Filter globalTimeFilter, Set<String> allSensors, @@ -101,6 +102,7 @@ public class FileLoaderUtils { resource.getTsFileID(), seriesPath.getDeviceId(), seriesPath.getMeasurement()), + deviceIndexInFI, allSensors, context.ignoreNotExistsDevice() || resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE, @@ -180,6 +182,7 @@ public class FileLoaderUtils { public static AbstractAlignedTimeSeriesMetadata loadAlignedTimeSeriesMetadata( TsFileResource resource, AlignedFullPath alignedPath, + int deviceIndexInFI, FragmentInstanceContext context, Filter globalTimeFilter, boolean isSeq, @@ -193,7 +196,12 @@ public class FileLoaderUtils { if (resource.isClosed()) { alignedTimeSeriesMetadata = loadAlignedTimeSeriesMetadataFromDisk( - resource, alignedPath, context, globalTimeFilter, ignoreAllNullRows); + resource, + alignedPath, + deviceIndexInFI, + context, + globalTimeFilter, + ignoreAllNullRows); } else { // if the tsfile is unclosed, we just get it directly from TsFileResource loadFromMem = true; alignedTimeSeriesMetadata = @@ -256,6 +264,7 @@ public class FileLoaderUtils { private static AbstractAlignedTimeSeriesMetadata loadAlignedTimeSeriesMetadataFromDisk( TsFileResource resource, AlignedFullPath alignedPath, + int deviceIndexInFI, FragmentInstanceContext context, Filter globalTimeFilter, boolean ignoreAllNullRows) @@ -277,6 +286,7 @@ public class FileLoaderUtils { cache.get( filePath, new TimeSeriesMetadataCacheKey(resource.getTsFileID(), deviceId, ""), + deviceIndexInFI, allSensors, context.ignoreNotExistsDevice() || resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE, @@ -307,6 +317,7 @@ public class FileLoaderUtils { filePath, new TimeSeriesMetadataCacheKey( resource.getTsFileID(), deviceId, valueMeasurement), + deviceIndexInFI, allSensors, context.ignoreNotExistsDevice() || resource.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index 817649869d9..342d5cd308a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -81,6 +81,7 @@ public class SeriesScanUtil implements Accountable { protected final IFullPath seriesPath; private final IDeviceID deviceID; + protected int deviceIndexInFI = -1; protected boolean isAligned = false; private final TSDataType dataType; @@ -171,12 +172,17 @@ public class SeriesScanUtil implements Accountable { versionPageReader -> orderUtils.getOrderTime(versionPageReader.getStatistics()))); } + public void initQueryDataSource(QueryDataSource dataSource) { + initQueryDataSource(dataSource, -1); + } + /** * Initialize the query data source. This method should be called <b>before any other methods</b>. * * @param dataSource the query data source */ - public void initQueryDataSource(QueryDataSource dataSource) { + public void initQueryDataSource(QueryDataSource dataSource, int currentDeviceIndexInFI) { + this.deviceIndexInFI = currentDeviceIndexInFI; dataSource.fillOrderIndexes(deviceID, orderUtils.getAscending()); this.dataSource = dataSource; @@ -1235,6 +1241,7 @@ public class SeriesScanUtil implements Accountable { return FileLoaderUtils.loadTimeSeriesMetadata( resource, (NonAlignedFullPath) seriesPath, + deviceIndexInFI, context, scanOptions.getGlobalTimeFilter(), scanOptions.getAllSensors(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java index a59ec643b07..2012d918929 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java @@ -255,7 +255,7 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe // construct AlignedSeriesScanUtil for next device constructAlignedSeriesScanUtil(); queryDataSource.reset(); - this.seriesScanUtil.initQueryDataSource(queryDataSource); + this.seriesScanUtil.initQueryDataSource(queryDataSource, getCurrentDeviceIndex()); } if (currentDeviceIndex >= deviceCount) { @@ -717,7 +717,7 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe // construct AlignedSeriesScanUtil for next device constructAlignedSeriesScanUtil(); queryDataSource.reset(); - this.seriesScanUtil.initQueryDataSource(queryDataSource); + this.seriesScanUtil.initQueryDataSource(queryDataSource, getCurrentDeviceIndex()); } if (currentDeviceIndex >= deviceCount) { @@ -762,7 +762,7 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe @Override public void initQueryDataSource(IQueryDataSource dataSource) { this.queryDataSource = (QueryDataSource) dataSource; - this.seriesScanUtil.initQueryDataSource(queryDataSource); + this.seriesScanUtil.initQueryDataSource(queryDataSource, getCurrentDeviceIndex()); this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); } @@ -783,6 +783,11 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe : 0; } + @Override + public int getCurrentDeviceIndex() { + return deviceIndexInFI == null ? -1 : deviceIndexInFI[currentDeviceIndex]; + } + @Override public void close() throws Exception { super.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java index 151dd6f3f25..df2e18eacf2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java @@ -245,7 +245,7 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat public void initQueryDataSource(IQueryDataSource dataSource) { this.queryDataSource = (QueryDataSource) dataSource; if (this.seriesScanUtil != null) { - this.seriesScanUtil.initQueryDataSource(queryDataSource); + this.seriesScanUtil.initQueryDataSource(queryDataSource, getCurrentDeviceIndex()); } this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); this.resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); @@ -260,7 +260,7 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat // reset QueryDataSource queryDataSource.reset(); - this.seriesScanUtil.initQueryDataSource(queryDataSource); + this.seriesScanUtil.initQueryDataSource(queryDataSource, getCurrentDeviceIndex()); this.operatorContext.recordSpecifiedInfo( CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); } @@ -299,6 +299,11 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat deviceEntry.getDeviceID(), measurementColumnNames, measurementSchemas, allSensors); } + @Override + public int getCurrentDeviceIndex() { + return deviceIndexInFI[currentDeviceIndex]; + } + @Override public long ramBytesUsed() { return INSTANCE_SIZE diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java index f088e448fd7..cf0cae84602 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java @@ -180,6 +180,11 @@ public class DeviceIteratorScanOperator extends AbstractDataSourceOperator { return currentDeviceRootOperator.calculateRetainedSizeAfterCallingNext(); } + @Override + public int getCurrentDeviceIndex() { + return deviceIndexInFI[currentDeviceIndex]; + } + @Override public long ramBytesUsed() { return INSTANCE_SIZE diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 55b2defa543..bf160392881 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -394,6 +394,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP scanOptionsBuilder.build()); ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator); + context + .getInstanceContext() + .getMetadataIndexEntryCache() + .addDevice(seriesScanOperator, seriesPath.getDeviceId()); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); context.getDriverContext().setInputDriver(true); @@ -499,6 +503,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP maxTsBlockLineNum); ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator); + context + .getInstanceContext() + .getMetadataIndexEntryCache() + .addDevice(seriesScanOperator, seriesPath.getDeviceId()); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); context.getDriverContext().setInputDriver(true); @@ -686,6 +694,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP canUseStatistics); ((DataDriverContext) context.getDriverContext()).addSourceOperator(aggregateScanOperator); + context + .getInstanceContext() + .getMetadataIndexEntryCache() + .addDevice(aggregateScanOperator, seriesPath.getDeviceId()); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); context.getDriverContext().setInputDriver(true); return aggregateScanOperator; @@ -828,6 +840,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP ((DataDriverContext) context.getDriverContext()) .addSourceOperator(seriesAggregationScanOperator); + context + .getInstanceContext() + .getMetadataIndexEntryCache() + .addDevice(seriesAggregationScanOperator, seriesPath.getDeviceId()); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); context.getDriverContext().setInputDriver(true); return seriesAggregationScanOperator; @@ -2958,6 +2974,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP !TSDataType.BLOB.equals(seriesPath.getSeriesType())); ((DataDriverContext) context.getDriverContext()) .addSourceOperator(seriesAggregationScanOperator); + context + .getInstanceContext() + .getMetadataIndexEntryCache() + .addDevice(seriesAggregationScanOperator, seriesPath.getDeviceId()); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); return seriesAggregationScanOperator; } @@ -3003,6 +3023,10 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP canUseStatistics); ((DataDriverContext) context.getDriverContext()) .addSourceOperator(seriesAggregationScanOperator); + context + .getInstanceContext() + .getMetadataIndexEntryCache() + .addDevice(seriesAggregationScanOperator, unCachedPath.getDeviceId()); ((DataDriverContext) context.getDriverContext()).addPath(unCachedPath); return seriesAggregationScanOperator; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index a4fd11867dc..935266415a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -1077,6 +1077,10 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution ((DataDriverContext) context.getDriverContext()).addSourceOperator(sourceOperator); + context + .getInstanceContext() + .getMetadataIndexEntryCache() + .addDevices(sourceOperator, node.getDeviceEntries()); for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) { DeviceEntry deviceEntry = node.getDeviceEntries().get(i); if (deviceEntry == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java index f2cd55c5e7d..a0be5f0e1c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.DataNodeMemoryConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.TimeSeriesMetadataCacheMetrics; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; @@ -39,6 +40,7 @@ import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.utils.BloomFilter; +import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.RamUsageEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +115,7 @@ public class TimeSeriesMetadataCache { public TimeseriesMetadata get( String filePath, TimeSeriesMetadataCacheKey key, + int deviceIndexInFI, Set<String> allSensors, boolean ignoreNotExists, boolean debug, @@ -143,9 +146,27 @@ public class TimeSeriesMetadataCache { } loadBloomFilterTime = System.nanoTime() - startTime; + Pair<long[], Boolean> pair = null; + if (queryContext instanceof FragmentInstanceContext) { + pair = + ((FragmentInstanceContext) queryContext) + .getMetadataIndexEntryCache() + .getCachedDeviceMetadataIndexNodeOffset(deviceIndexInFI, filePath); + if (!pair.right) { + if (!ignoreNotExists) { + throw new IOException( + "Device {" + key.device + "} is not in tsFileMetaData of " + filePath); + } + return null; + } + } TimeseriesMetadata timeseriesMetadata = reader.readTimeseriesMetadata( - key.device, key.measurement, ignoreNotExists, timeSeriesMetadataIoSizeRecorder); + key.device, + pair == null ? null : pair.left, + key.measurement, + ignoreNotExists, + timeSeriesMetadataIoSizeRecorder); return (timeseriesMetadata == null || timeseriesMetadata.getStatistics().getCount() == 0) ? null : timeseriesMetadata; @@ -194,9 +215,24 @@ public class TimeSeriesMetadataCache { TsFileSequenceReader reader = FileReaderManager.getInstance() .get(filePath, true, timeSeriesMetadataIoSizeRecorder); + Pair<long[], Boolean> pair = null; + if (queryContext instanceof FragmentInstanceContext) { + pair = + ((FragmentInstanceContext) queryContext) + .getMetadataIndexEntryCache() + .getCachedDeviceMetadataIndexNodeOffset(deviceIndexInFI, filePath); + if (!pair.right) { + if (!ignoreNotExists) { + throw new IOException( + "Device {" + key.device + "} is not in tsFileMetaData of " + filePath); + } + return null; + } + } List<TimeseriesMetadata> timeSeriesMetadataList = reader.readTimeseriesMetadata( key.device, + pair == null ? null : pair.left, key.measurement, allSensors, ignoreNotExists, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java index a7c2d1cb289..804326e39b5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionWithAllNullRowsTest.java @@ -43,8 +43,10 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.exception.write.WriteProcessException; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.MetadataIndexNode; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.TimeRange; import org.junit.After; import org.junit.Assert; @@ -54,9 +56,13 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; @RunWith(Parameterized.class) public class CompactionWithAllNullRowsTest extends AbstractCompactionTest { @@ -114,6 +120,42 @@ public class CompactionWithAllNullRowsTest extends AbstractCompactionTest { } } + @Test + public void test1() throws IOException { + TsFileResource resource = + new TsFileResource( + new File( + "/Users/shuww/IdeaProjects/iotdb-versions/iotdb-master/data/datanode/data/sequence/db1/1/2892/1753182984040-1-0-0.tsfile")); + resource.deserialize(); + List<IDeviceID> sortedDevices = + resource.getDevices().stream() + .sorted(IDeviceID::compareTo) + .distinct() + .collect(Collectors.toList()); + long start = System.currentTimeMillis(); + AtomicLong atomicLong = new AtomicLong(0); + try (TsFileSequenceReader reader = new TsFileSequenceReader(resource.getTsFilePath())) { + long[] offsets = + reader.getDeviceMetadataIndexNodeOffsets( + sortedDevices.get(0).getTableName(), sortedDevices, atomicLong::addAndGet); + for (int i = 0; i < sortedDevices.size(); i++) { + MetadataIndexNode metadataIndexNode = + reader.readMetadataIndexNode(offsets[2 * i], offsets[2 * i + 1], false); + reader.getAlignedChunkMetadataByMetadataIndexNode( + sortedDevices.get(i), metadataIndexNode, true); + } + } + System.out.println(System.currentTimeMillis() - start); + System.out.println(atomicLong.get()); + start = System.currentTimeMillis(); + try (TsFileSequenceReader reader = new TsFileSequenceReader(resource.getTsFilePath())) { + for (IDeviceID device : sortedDevices) { + reader.getAlignedChunkMetadata(device, false); + } + } + System.out.println(System.currentTimeMillis() - start); + } + @Test public void testCompactionWithAllNullRows1() throws IOException { TsFileResource resource1 = createEmptyFileAndResource(true);
