This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch readTsFileTableFunction in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0026afbb8de1245e9d359b035d9c133aba8e99a7 Author: shuwenwei <[email protected]> AuthorDate: Fri May 29 18:18:18 2026 +0800 scan --- .../execution/driver/DataDriverContext.java | 16 ++ .../fragment/FragmentInstanceContext.java | 54 ++++ .../AbstractDeviceTableScanOperator.java | 78 ++++++ .../relational/AbstractTableScanOperator.java | 72 ++--- .../relational/ExternalTsFileSeriesScanUtil.java | 80 ++++++ .../relational/MultiTsFileResourceIterator.java | 203 ++++++++++++++ .../OrderedExternalTsFileTableScanOperator.java | 298 +++++++++++++++++++++ .../source/relational/TableScanOperator.java | 2 +- .../TreeAlignedDeviceViewScanOperator.java | 2 +- .../UnorderedExternalTsFileTableScanOperator.java | 211 +++++++++++++++ .../planner/DataNodeTableOperatorGenerator.java | 102 ++++++- .../plan/planner/LocalExecutionPlanner.java | 16 ++ .../plan/relational/planner/RelationPlanner.java | 2 +- .../distribute/TableDistributedPlanGenerator.java | 64 +++++ .../iterative/rule/PruneTableScanColumns.java | 15 ++ .../planner/node/ExternalTsFileScanNode.java | 46 ++++ .../optimizations/TransformSortToStreamSort.java | 29 +- .../optimizations/UnaliasSymbolReferences.java | 2 + .../dataregion/read/QueryDataSourceType.java | 3 +- .../relational/tvf/ReadTsFileTableFunction.java | 40 ++- 20 files changed, 1266 insertions(+), 69 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java index f895859d1ad..a2892b6fec0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriverContext.java @@ -41,6 +41,7 @@ public class DataDriverContext extends DriverContext { private List<IFullPath> paths; private QueryDataSourceType queryDataSourceType = null; private Map<IDeviceID, DeviceContext> deviceIDToContext; + private List<String> externalTsFilePaths; // it will be set to null, after QueryDataSource being inited private List<DataSourceOperator> sourceOperators; @@ -49,6 +50,7 @@ public class DataDriverContext extends DriverContext { this.paths = new ArrayList<>(); this.sourceOperators = new ArrayList<>(); this.deviceIDToContext = null; + this.externalTsFilePaths = null; } public DataDriverContext(DataDriverContext parentContext, int pipelineId) { @@ -56,6 +58,7 @@ public class DataDriverContext extends DriverContext { this.paths = new ArrayList<>(); this.sourceOperators = new ArrayList<>(); this.deviceIDToContext = null; + this.externalTsFilePaths = null; } public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) { @@ -71,6 +74,19 @@ public class DataDriverContext extends DriverContext { deviceIDToContext = null; } + public void setExternalTsFilePaths(List<String> externalTsFilePaths) { + this.externalTsFilePaths = externalTsFilePaths; + } + + public List<String> getExternalTsFilePaths() { + return externalTsFilePaths; + } + + public void clearExternalTsFilePaths() { + // friendly for gc + externalTsFilePaths = null; + } + public void addPath(IFullPath path) { this.paths.add(path); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 2a0373cf6fd..a57b99ca18c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -56,6 +56,8 @@ import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp; @@ -69,6 +71,8 @@ import org.apache.tsfile.utils.RamUsageEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; import java.time.ZoneId; import java.time.format.DateTimeParseException; import java.util.ArrayList; @@ -117,6 +121,8 @@ public class FragmentInstanceContext extends QueryContext { // Used for region scan, relating methods are to be added. private Map<IDeviceID, DeviceContext> devicePathsToContext; + private List<String> externalTsFilePaths; + // Shared by all scan operators in this fragment instance to avoid memory problem protected IQueryDataSource sharedQueryDataSource; @@ -612,6 +618,10 @@ public class FragmentInstanceContext extends QueryContext { this.devicePathsToContext = devicePathsToContext; } + public void setExternalTsFilePaths(List<String> externalTsFilePaths) { + this.externalTsFilePaths = externalTsFilePaths; + } + public MemoryReservationManager getMemoryReservationContext() { return memoryReservationManager; } @@ -779,6 +789,43 @@ public class FragmentInstanceContext extends QueryContext { } } + public boolean initExternalTsFileQueryDataSource(List<String> externalTsFilePaths) + throws QueryProcessException { + long startTime = System.nanoTime(); + try { + if (externalTsFilePaths == null || externalTsFilePaths.isEmpty()) { + this.sharedQueryDataSource = EMPTY_QUERY_DATA_SOURCE; + return true; + } + + List<TsFileResource> externalTsFileResources = new ArrayList<>(externalTsFilePaths.size()); + for (String externalTsFilePath : externalTsFilePaths) { + TsFileResource resource = + new TsFileResource(new File(externalTsFilePath), TsFileResourceStatus.NORMAL); + if (resource.resourceFileExists()) { + try { + resource.deserialize(); + } catch (IOException e) { + throw new QueryProcessException( + "Failed to deserialize external TsFile resource: " + + externalTsFilePath + + ", " + + e.getMessage()); + } + } else { + resource.setTimeIndex(new FileTimeIndex(Long.MIN_VALUE, Long.MAX_VALUE)); + } + externalTsFileResources.add(resource); + } + + this.sharedQueryDataSource = + new QueryDataSource(externalTsFileResources, Collections.emptyList()); + return true; + } finally { + addInitQueryDataSourceCost(System.nanoTime() - startTime); + } + } + public synchronized IQueryDataSource getSharedQueryDataSource() throws QueryProcessException { if (sharedQueryDataSource == null) { switch (queryDataSourceType) { @@ -804,6 +851,13 @@ public class FragmentInstanceContext extends QueryContext { return getUnfinishedQueryDataSource(); } break; + case EXTERNAL_TSFILE_SCAN: + if (initExternalTsFileQueryDataSource(externalTsFilePaths)) { + externalTsFilePaths = null; + } else { + return getUnfinishedQueryDataSource(); + } + break; default: throw new QueryProcessException( "Unsupported query data source type: " + queryDataSourceType); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDeviceTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDeviceTableScanOperator.java new file mode 100644 index 00000000000..e4b119291ae --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractDeviceTableScanOperator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; + +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; + +import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; + +public abstract class AbstractDeviceTableScanOperator extends AbstractTableScanOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AbstractDeviceTableScanOperator.class); + + protected List<DeviceEntry> deviceEntries; + protected int deviceCount; + protected int currentDeviceIndex; + + protected AbstractDeviceTableScanOperator(AbstractTableScanOperatorParameter parameter) { + super(parameter); + this.deviceEntries = parameter.deviceEntries; + this.deviceCount = parameter.deviceEntries.size(); + this.currentDeviceIndex = 0; + this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(this.deviceCount)); + recordCurrentDeviceIndex(); + constructAlignedSeriesScanUtil(); + } + + @Override + protected boolean hasCurrentDeviceEntry() { + return currentDeviceIndex < deviceCount; + } + + @Override + protected DeviceEntry getCurrentDeviceEntry() { + return deviceEntries.get(currentDeviceIndex); + } + + @Override + protected boolean advanceDeviceEntry() { + currentDeviceIndex++; + return hasCurrentDeviceEntry(); + } + + @Override + protected void recordCurrentDeviceIndex() { + this.operatorContext.recordSpecifiedInfo( + CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); + } + + @Override + public long ramBytesUsed() { + return super.ramBytesUsed() + + INSTANCE_SIZE + - AbstractTableScanOperator.INSTANCE_SIZE + + RamUsageEstimator.sizeOfCollection(deviceEntries); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java index 52a21628f2a..45481e95f11 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; -import org.apache.iotdb.calc.plan.planner.CommonOperatorUtils; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; @@ -51,30 +50,25 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; import static org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator.appendDataIntoBuilder; -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter.DEVICE_NUMBER; public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperator { - private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(TableScanOperator.class); + protected static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(AbstractTableScanOperator.class); private final List<ColumnSchema> columnSchemas; private final int[] columnsIndexArray; - private final List<DeviceEntry> deviceEntries; + protected final Ordering scanOrder; + protected final SeriesScanOptions seriesScanOptions; - private final int deviceCount; + protected final List<String> measurementColumnNames; - private final Ordering scanOrder; - private final SeriesScanOptions seriesScanOptions; + protected final Set<String> allSensors; - private final List<String> measurementColumnNames; + protected final List<IMeasurementSchema> measurementSchemas; - private final Set<String> allSensors; - - private final List<IMeasurementSchema> measurementSchemas; - - private final List<TSDataType> measurementColumnTSDataTypes; + protected final List<TSDataType> measurementColumnTSDataTypes; private TsBlockBuilder measurementDataBuilder; @@ -84,17 +78,11 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat private QueryDataSource queryDataSource; - private int currentDeviceIndex; - public AbstractTableScanOperator(AbstractTableScanOperatorParameter parameter) { this.sourceId = parameter.sourceId; this.operatorContext = parameter.context; - this.operatorContext.recordSpecifiedInfo( - DEVICE_NUMBER, Integer.toString(parameter.deviceEntries.size())); this.columnSchemas = parameter.columnSchemas; this.columnsIndexArray = parameter.columnsIndexArray; - this.deviceEntries = parameter.deviceEntries; - this.deviceCount = parameter.deviceEntries.size(); this.scanOrder = parameter.scanOrder; this.seriesScanOptions = parameter.seriesScanOptions; this.measurementColumnNames = parameter.measurementColumnNames; @@ -104,18 +92,12 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat parameter.measurementSchemas.stream() .map(IMeasurementSchema::getType) .collect(Collectors.toList()); - this.currentDeviceIndex = 0; - this.operatorContext.recordSpecifiedInfo( - CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, Integer.toString(0)); - // allSensors include time and all field columns this.maxReturnSize = Math.min( maxReturnSize, allSensors.size() * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); this.maxTsBlockLineNum = parameter.maxTsBlockLineNum; - - constructAlignedSeriesScanUtil(); } @Override @@ -160,8 +142,7 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat if (measurementDataBuilder.isEmpty() && measurementDataBlock == null && currentDeviceNoMoreData) { - currentDeviceIndex++; - prepareForNextDevice(); + moveToNextDevice(); } } catch (IOException e) { @@ -200,13 +181,13 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat } private void constructResultTsBlock() { - DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex); + DeviceEntry currentDeviceEntry = getCurrentDeviceEntry(); this.resultTsBlock = MeasurementToTableViewAdaptorUtils.toTableBlock( measurementDataBlock, columnsIndexArray, columnSchemas, - deviceEntries.get(currentDeviceIndex), + currentDeviceEntry, idColumnIndex -> getNthIdColumnValue(currentDeviceEntry, idColumnIndex)); } @@ -220,7 +201,7 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat @Override public boolean isFinished() throws Exception { return (retainedTsBlock == null) - && (currentDeviceIndex >= deviceCount || seriesScanOptions.limitConsumedUp()); + && (!hasCurrentDeviceEntry() || seriesScanOptions.limitConsumedUp()); } @Override @@ -252,31 +233,37 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat this.measurementDataBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); } - private void prepareForNextDevice() { - if (currentDeviceIndex < deviceCount) { + private void moveToNextDevice() { + if (advanceDeviceEntry()) { // construct AlignedSeriesScanUtil for next device constructAlignedSeriesScanUtil(); // reset QueryDataSource queryDataSource.reset(); this.seriesScanUtil.initQueryDataSource(queryDataSource); - this.operatorContext.recordSpecifiedInfo( - CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); + recordCurrentDeviceIndex(); } } - private void constructAlignedSeriesScanUtil() { - if (this.deviceEntries.isEmpty()) { + protected abstract boolean hasCurrentDeviceEntry(); + + protected abstract DeviceEntry getCurrentDeviceEntry(); + + protected abstract boolean advanceDeviceEntry(); + + protected abstract void recordCurrentDeviceIndex(); + + protected void constructAlignedSeriesScanUtil() { + if (!hasCurrentDeviceEntry()) { // no need to construct SeriesScanUtil, hasNext will return false return; } - if (this.deviceEntries.get(this.currentDeviceIndex) == null) { - throw new IllegalStateException( - "Device entries of index " + this.currentDeviceIndex + " in TableScanOperator is empty"); + if (getCurrentDeviceEntry() == null) { + throw new IllegalStateException("Current device entry in TableScanOperator is empty"); } - DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex); + DeviceEntry deviceEntry = getCurrentDeviceEntry(); AlignedFullPath alignedPath = constructAlignedPath(deviceEntry, measurementColumnNames, measurementSchemas, allSensors); this.seriesScanUtil = @@ -304,8 +291,7 @@ public abstract class AbstractTableScanOperator extends AbstractSeriesScanOperat + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil) + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext) + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId) - + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) - + RamUsageEstimator.sizeOfCollection(deviceEntries); + + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()); } public static class AbstractTableScanOperatorParameter { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java new file mode 100644 index 00000000000..c568a2768c9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.commons.path.AlignedFullPath; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; + +import java.io.IOException; +import java.util.List; + +public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { + + private final ExternalTsFileMetadataLoader metadataLoader; + + public ExternalTsFileSeriesScanUtil( + AlignedFullPath seriesPath, + Ordering scanOrder, + SeriesScanOptions scanOptions, + FragmentInstanceContext context, + boolean queryAllSensors, + List<TSDataType> givenDataTypes, + ExternalTsFileMetadataLoader metadataLoader) { + super(seriesPath, scanOrder, scanOptions, context, queryAllSensors, givenDataTypes); + this.metadataLoader = metadataLoader; + } + + public ExternalTsFileSeriesScanUtil( + AlignedFullPath seriesPath, + Ordering scanOrder, + SeriesScanOptions scanOptions, + FragmentInstanceContext context, + boolean queryAllSensors, + List<TSDataType> givenDataTypes, + MultiTsFileResourceIterator resourceIterator) { + this( + seriesPath, + scanOrder, + scanOptions, + context, + queryAllSensors, + givenDataTypes, + resourceIterator::loadTimeSeriesMetadata); + } + + @Override + protected AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( + TsFileResource resource, boolean isSeq) throws IOException { + return metadataLoader.loadTimeSeriesMetadata(resource, (AlignedFullPath) seriesPath); + } + + @FunctionalInterface + public interface ExternalTsFileMetadataLoader { + AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( + TsFileResource resource, AlignedFullPath alignedFullPath) throws IOException; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MultiTsFileResourceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MultiTsFileResourceIterator.java new file mode 100644 index 00000000000..3748f16c1cd --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MultiTsFileResourceIterator.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.commons.path.AlignedFullPath; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.LazyTsFileDeviceIterator; +import org.apache.tsfile.read.TsFileSequenceReader; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.LongConsumer; + +public class MultiTsFileResourceIterator { + + private final String tableName; + private final FragmentInstanceContext fragmentInstanceContext; + private final SeriesScanOptions seriesScanOptions; + private final Map<TsFileResource, TsFileResourceDeviceIterator> deviceIteratorMap = + new HashMap<>(); + + private IDeviceID currentDevice; + + public MultiTsFileResourceIterator( + String tableName, + List<TsFileResource> seqResources, + List<TsFileResource> unseqResources, + Map<TsFileResource, TsFileSequenceReader> resourceReaderMap, + FragmentInstanceContext fragmentInstanceContext, + SeriesScanOptions seriesScanOptions) { + this.tableName = tableName; + this.fragmentInstanceContext = fragmentInstanceContext; + this.seriesScanOptions = seriesScanOptions; + initDeviceIterators(seqResources, resourceReaderMap); + initDeviceIterators(unseqResources, resourceReaderMap); + } + + private void initDeviceIterators( + List<TsFileResource> resources, Map<TsFileResource, TsFileSequenceReader> resourceReaderMap) { + for (TsFileResource resource : resources) { + try { + TsFileSequenceReader reader = resourceReaderMap.get(resource); + if (reader == null) { + throw new IllegalArgumentException( + "Missing external TsFile reader: " + resource.getTsFilePath()); + } + deviceIteratorMap.put(resource, new TsFileResourceDeviceIterator(resource, reader)); + } catch (IOException e) { + throw new RuntimeException( + "Failed to create device iterator for external TsFile: " + resource.getTsFilePath(), e); + } + } + } + + public boolean hasNextDevice() { + for (TsFileResourceDeviceIterator iterator : deviceIteratorMap.values()) { + if (iterator.hasNextDevice() + || (iterator.getCurrentDevice() != null + && !iterator.getCurrentDevice().equals(currentDevice))) { + return true; + } + } + return false; + } + + public IDeviceID nextDevice() { + IDeviceID nextDevice = null; + List<TsFileResource> exhaustedResources = new ArrayList<>(); + for (Map.Entry<TsFileResource, TsFileResourceDeviceIterator> entry : + deviceIteratorMap.entrySet()) { + TsFileResource resource = entry.getKey(); + TsFileResourceDeviceIterator iterator = entry.getValue(); + if (iterator.getCurrentDevice() == null + || iterator.getCurrentDevice().equals(currentDevice)) { + if (iterator.hasNextDevice()) { + if (iterator.nextDevice() == null) { + exhaustedResources.add(resource); + continue; + } + } else { + exhaustedResources.add(resource); + continue; + } + } + if (nextDevice == null || nextDevice.compareTo(iterator.getCurrentDevice()) > 0) { + nextDevice = iterator.getCurrentDevice(); + } + } + for (TsFileResource resource : exhaustedResources) { + deviceIteratorMap.remove(resource); + } + currentDevice = nextDevice; + return currentDevice; + } + + public IDeviceID getCurrentDevice() { + return currentDevice; + } + + public AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( + TsFileResource resource, AlignedFullPath alignedPath) throws IOException { + TsFileResourceDeviceIterator iterator = deviceIteratorMap.get(resource); + if (iterator == null + || currentDevice == null + || !currentDevice.equals(iterator.getCurrentDevice())) { + return null; + } + return iterator.loadTimeSeriesMetadata(alignedPath); + } + + public long[] getCurrentDeviceMeasurementNodeOffset(TsFileResource resource) { + TsFileResourceDeviceIterator iterator = deviceIteratorMap.get(resource); + if (iterator == null + || currentDevice == null + || !currentDevice.equals(iterator.getCurrentDevice())) { + return null; + } + return iterator.getCurrentDeviceMeasurementNodeOffset(); + } + + private boolean isDeviceMatched(IDeviceID deviceID) { + return tableName.equalsIgnoreCase(deviceID.getTableName()); + } + + private class TsFileResourceDeviceIterator { + + private final TsFileResource resource; + private final LazyTsFileDeviceIterator deviceIterator; + private IDeviceID currentDevice; + + private TsFileResourceDeviceIterator(TsFileResource resource, TsFileSequenceReader reader) + throws IOException { + this.resource = resource; + LongConsumer ioSizeRecorder = + fragmentInstanceContext.getQueryStatistics().getLoadTimeSeriesMetadataActualIOSize() + ::addAndGet; + this.deviceIterator = new LazyTsFileDeviceIterator(reader, tableName, ioSizeRecorder); + } + + private boolean hasNextDevice() { + return deviceIterator.hasNext(); + } + + private IDeviceID nextDevice() { + while (deviceIterator.hasNext()) { + IDeviceID nextDevice = deviceIterator.next(); + if (isDeviceMatched(nextDevice)) { + currentDevice = nextDevice; + return currentDevice; + } + } + currentDevice = null; + return null; + } + + private IDeviceID getCurrentDevice() { + return currentDevice; + } + + private long[] getCurrentDeviceMeasurementNodeOffset() { + return deviceIterator.getCurrentDeviceMeasurementNodeOffset(); + } + + private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata(AlignedFullPath alignedPath) + throws IOException { + // TODO: Pass getCurrentDeviceMeasurementNodeOffset() to FileLoaderUtils after this branch + // supports offset-based metadata loading. + return FileLoaderUtils.loadAlignedTimeSeriesMetadata( + resource, + alignedPath, + fragmentInstanceContext, + seriesScanOptions.getGlobalTimeFilter(), + resource.isSeq(), + fragmentInstanceContext.isIgnoreAllNullRows()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/OrderedExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/OrderedExternalTsFileTableScanOperator.java new file mode 100644 index 00000000000..3c0ff7a087a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/OrderedExternalTsFileTableScanOperator.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.commons.path.AlignedFullPath; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode; +import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.OrderingScheme; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.file.metadata.AbstractAlignedTimeSeriesMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.TsFileDeviceIterator; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING; + +public class OrderedExternalTsFileTableScanOperator extends AbstractTableScanOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(OrderedExternalTsFileTableScanOperator.class); + + private final String tableName; + private final Map<Symbol, ColumnSchema> assignments; + private final OrderingScheme pushedOrderingScheme; + private final Map<TsFileResource, Map<IDeviceID, long[]>> deviceMeasurementNodeOffsetMap = + new HashMap<>(); + + private DeviceEntry currentDeviceEntry; + private int currentDeviceIndex; + private List<DeviceEntry> sortedDeviceEntries = new ArrayList<>(); + + public OrderedExternalTsFileTableScanOperator( + AbstractTableScanOperatorParameter parameter, + String tableName, + Map<Symbol, ColumnSchema> assignments, + OrderingScheme pushedOrderingScheme) { + super(parameter); + this.tableName = tableName; + this.assignments = assignments; + this.pushedOrderingScheme = pushedOrderingScheme; + this.currentDeviceIndex = 0; + } + + @Override + String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { + int segmentOffset = + deviceEntry.getDeviceID().segmentNum() > 0 + && tableName.equalsIgnoreCase((String) deviceEntry.getNthSegment(0)) + ? 1 + : 0; + Object segment = deviceEntry.getNthSegment(idColumnIndex + segmentOffset); + return segment == null ? null : (String) segment; + } + + @Override + public void initQueryDataSource(IQueryDataSource dataSource) { + super.initQueryDataSource(dataSource); + sortedDeviceEntries = collectSortedDeviceEntries((QueryDataSource) dataSource); + currentDeviceEntry = sortedDeviceEntries.isEmpty() ? null : sortedDeviceEntries.get(0); + recordCurrentDeviceIndex(); + constructAlignedSeriesScanUtil(); + if (seriesScanUtil != null) { + seriesScanUtil.initQueryDataSource((QueryDataSource) dataSource); + } + } + + private List<DeviceEntry> collectSortedDeviceEntries(QueryDataSource queryDataSource) { + List<ExternalTsFileDeviceInfo> deviceInfos = collectDeviceInfos(queryDataSource); + deviceInfos.sort(createDeviceInfoComparator()); + + List<DeviceEntry> deviceEntries = new ArrayList<>(deviceInfos.size()); + Set<IDeviceID> visitedDevices = new LinkedHashSet<>(); + for (ExternalTsFileDeviceInfo deviceInfo : deviceInfos) { + deviceMeasurementNodeOffsetMap + .computeIfAbsent(deviceInfo.resource, ignored -> new HashMap<>()) + .put(deviceInfo.deviceID, deviceInfo.deviceMeasurementNodeOffset); + if (visitedDevices.add(deviceInfo.deviceID)) { + deviceEntries.add(new AlignedDeviceEntry(deviceInfo.deviceID, new Binary[0])); + } + } + return deviceEntries; + } + + private List<ExternalTsFileDeviceInfo> collectDeviceInfos(QueryDataSource queryDataSource) { + List<ExternalTsFileDeviceInfo> deviceInfos = new ArrayList<>(); + for (TsFileResource resource : getAllResources(queryDataSource)) { + collectDeviceInfos(resource, deviceInfos); + } + return deviceInfos; + } + + private void collectDeviceInfos( + TsFileResource resource, List<ExternalTsFileDeviceInfo> deviceInfos) { + try (TsFileSequenceReader reader = new TsFileSequenceReader(resource.getTsFilePath())) { + TsFileDeviceIterator deviceIterator = + reader.getTableDevicesIteratorWithIsAligned(tableName, contextValue -> {}); + while (deviceIterator.hasNext()) { + Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.next(); + IDeviceID deviceID = deviceInfo.left; + if (!isDeviceMatched(deviceID)) { + continue; + } + deviceInfos.add( + new ExternalTsFileDeviceInfo( + deviceID, resource, deviceIterator.getCurrentDeviceMeasurementNodeOffset())); + } + } catch (IOException e) { + throw new RuntimeException( + "Failed to collect devices from external TsFile: " + resource.getTsFilePath(), e); + } + } + + private List<TsFileResource> getAllResources(QueryDataSource queryDataSource) { + List<TsFileResource> resources = + new ArrayList<>( + queryDataSource.getSeqResources().size() + queryDataSource.getUnseqResources().size()); + resources.addAll(queryDataSource.getSeqResources()); + resources.addAll(queryDataSource.getUnseqResources()); + return resources; + } + + private boolean isDeviceMatched(IDeviceID deviceID) { + return tableName.equalsIgnoreCase(deviceID.getTableName()); + } + + private Comparator<ExternalTsFileDeviceInfo> createDeviceInfoComparator() { + Comparator<ExternalTsFileDeviceInfo> comparator = null; + for (Symbol symbol : pushedOrderingScheme.getOrderBy()) { + if (TableScanNode.isTimeColumn(symbol, assignments)) { + continue; + } + int tagIndex = getTagIndex(symbol); + final int deviceSegmentIndex = tagIndex + 1; + Comparator<String> valueComparator = + pushedOrderingScheme.getOrdering(symbol).isNullsFirst() + ? Comparator.nullsFirst(Comparator.naturalOrder()) + : Comparator.nullsLast(Comparator.naturalOrder()); + Comparator<ExternalTsFileDeviceInfo> currentComparator = + Comparator.comparing( + deviceInfo -> getDeviceSegment(deviceInfo.deviceID, deviceSegmentIndex), + valueComparator); + if (!pushedOrderingScheme.getOrdering(symbol).isAscending()) { + currentComparator = currentComparator.reversed(); + } + comparator = + comparator == null ? currentComparator : comparator.thenComparing(currentComparator); + } + return comparator == null + ? Comparator.comparing(deviceInfo -> deviceInfo.deviceID) + : comparator.thenComparing(deviceInfo -> deviceInfo.deviceID); + } + + private String getDeviceSegment(IDeviceID deviceID, int deviceSegmentIndex) { + return deviceSegmentIndex < deviceID.segmentNum() + ? (String) deviceID.segment(deviceSegmentIndex) + : null; + } + + private int getTagIndex(Symbol symbol) { + int tagIndex = 0; + for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) { + if (entry.getValue().getColumnCategory() != TsTableColumnCategory.TAG) { + continue; + } + if (entry.getKey().equals(symbol)) { + return tagIndex; + } + tagIndex++; + } + throw new IllegalArgumentException("Unexpected external TsFile ordering symbol: " + symbol); + } + + @Override + protected boolean hasCurrentDeviceEntry() { + return currentDeviceEntry != null; + } + + @Override + protected DeviceEntry getCurrentDeviceEntry() { + return currentDeviceEntry; + } + + @Override + protected boolean advanceDeviceEntry() { + currentDeviceIndex++; + currentDeviceEntry = + currentDeviceIndex < sortedDeviceEntries.size() + ? sortedDeviceEntries.get(currentDeviceIndex) + : null; + return currentDeviceEntry != null; + } + + @Override + protected void recordCurrentDeviceIndex() { + operatorContext.recordSpecifiedInfo( + CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); + } + + @Override + protected void constructAlignedSeriesScanUtil() { + if (!hasCurrentDeviceEntry()) { + return; + } + + DeviceEntry deviceEntry = getCurrentDeviceEntry(); + if (deviceEntry == null) { + throw new IllegalStateException("Current device entry in TableScanOperator is empty"); + } + + this.seriesScanUtil = + new ExternalTsFileSeriesScanUtil( + constructAlignedPath( + deviceEntry, measurementColumnNames, measurementSchemas, allSensors), + scanOrder, + seriesScanOptions, + ((OperatorContext) operatorContext).getInstanceContext(), + true, + measurementColumnTSDataTypes, + this::loadTimeSeriesMetadata); + } + + private AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( + TsFileResource resource, AlignedFullPath alignedPath) throws IOException { + Optional<long[]> deviceMeasurementNodeOffset = + Optional.ofNullable(deviceMeasurementNodeOffsetMap) + .map(map -> map.get(resource)) + .map(map -> map.get(alignedPath.getDeviceId())); + // TODO: Use deviceMeasurementNodeOffset after FileLoaderUtils supports offset-based metadata + // loading in this branch. + return FileLoaderUtils.loadAlignedTimeSeriesMetadata( + resource, + alignedPath, + ((OperatorContext) operatorContext).getInstanceContext(), + seriesScanOptions.getGlobalTimeFilter(), + resource.isSeq(), + ((OperatorContext) operatorContext).getInstanceContext().isIgnoreAllNullRows()); + } + + @Override + public long ramBytesUsed() { + return super.ramBytesUsed() + + INSTANCE_SIZE + - AbstractTableScanOperator.INSTANCE_SIZE + + RamUsageEstimator.sizeOfMap(deviceMeasurementNodeOffsetMap) + + RamUsageEstimator.sizeOfCollection(sortedDeviceEntries); + } + + private static class ExternalTsFileDeviceInfo { + private final IDeviceID deviceID; + private final TsFileResource resource; + private final long[] deviceMeasurementNodeOffset; + + private ExternalTsFileDeviceInfo( + IDeviceID deviceID, TsFileResource resource, long[] deviceMeasurementNodeOffset) { + this.deviceID = deviceID; + this.resource = resource; + this.deviceMeasurementNodeOffset = deviceMeasurementNodeOffset; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java index 11f10ef3dae..a50e978b377 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; -public class TableScanOperator extends AbstractTableScanOperator { +public class TableScanOperator extends AbstractDeviceTableScanOperator { public TableScanOperator(AbstractTableScanOperatorParameter parameter) { super(parameter); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java index 03d0e998e71..12feaccc42f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeAlignedDeviceViewScanOperator.java @@ -23,7 +23,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.tsfile.file.metadata.IDeviceID; -public class TreeAlignedDeviceViewScanOperator extends AbstractTableScanOperator { +public class TreeAlignedDeviceViewScanOperator extends AbstractDeviceTableScanOperator { private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java new file mode 100644 index 00000000000..518a2fc0102 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.utils.EncryptDBUtils; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.iotdb.calc.plan.planner.CommonOperatorUtils.CURRENT_DEVICE_INDEX_STRING; + +public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(UnorderedExternalTsFileTableScanOperator.class); + + private final String tableName; + + private MultiTsFileResourceIterator deviceIterator; + private Map<TsFileResource, TsFileSequenceReader> resourceReaderMap = Collections.emptyMap(); + private DeviceEntry currentDeviceEntry; + private int currentDeviceIndex; + + public UnorderedExternalTsFileTableScanOperator( + AbstractTableScanOperatorParameter parameter, String tableName) { + super(parameter); + this.tableName = tableName; + this.currentDeviceIndex = 0; + } + + @Override + String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { + int segmentOffset = + deviceEntry.getDeviceID().segmentNum() > 0 + && tableName.equalsIgnoreCase((String) deviceEntry.getNthSegment(0)) + ? 1 + : 0; + Object segment = deviceEntry.getNthSegment(idColumnIndex + segmentOffset); + return segment == null ? null : (String) segment; + } + + @Override + public void initQueryDataSource(IQueryDataSource dataSource) { + super.initQueryDataSource(dataSource); + + QueryDataSource queryDataSource = (QueryDataSource) dataSource; + initDeviceIterator(queryDataSource); + currentDeviceEntry = nextDeviceEntry(); + recordCurrentDeviceIndex(); + constructAlignedSeriesScanUtil(); + if (seriesScanUtil != null) { + seriesScanUtil.initQueryDataSource(queryDataSource); + } + } + + private void initDeviceIterator(QueryDataSource queryDataSource) { + resourceReaderMap = createResourceReaderMap(getAllResources(queryDataSource)); + deviceIterator = + new MultiTsFileResourceIterator( + tableName, + queryDataSource.getSeqResources(), + queryDataSource.getUnseqResources(), + resourceReaderMap, + ((OperatorContext) operatorContext).getInstanceContext(), + seriesScanOptions); + } + + private Map<TsFileResource, TsFileSequenceReader> createResourceReaderMap( + List<TsFileResource> resources) { + Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>(resources.size()); + for (TsFileResource resource : resources) { + try { + readerMap.put( + resource, + new TsFileSequenceReader( + resource.getTsFilePath(), + ((OperatorContext) operatorContext) + .getInstanceContext() + .getQueryStatistics() + .getLoadTimeSeriesMetadataActualIOSize() + ::addAndGet, + EncryptDBUtils.getFirstEncryptParamFromTSFilePath(resource.getTsFilePath()))); + } catch (IOException e) { + closeResourceReaders(readerMap); + throw new RuntimeException( + "Failed to open external TsFile reader: " + resource.getTsFilePath(), e); + } + } + return readerMap; + } + + private List<TsFileResource> getAllResources(QueryDataSource queryDataSource) { + List<TsFileResource> resources = + new ArrayList<>( + queryDataSource.getSeqResources().size() + queryDataSource.getUnseqResources().size()); + resources.addAll(queryDataSource.getSeqResources()); + resources.addAll(queryDataSource.getUnseqResources()); + return resources; + } + + private DeviceEntry nextDeviceEntry() { + if (deviceIterator == null || !deviceIterator.hasNextDevice()) { + return null; + } + IDeviceID nextDevice = deviceIterator.nextDevice(); + return nextDevice == null ? null : new AlignedDeviceEntry(nextDevice, new Binary[0]); + } + + @Override + protected boolean hasCurrentDeviceEntry() { + return currentDeviceEntry != null; + } + + @Override + protected DeviceEntry getCurrentDeviceEntry() { + return currentDeviceEntry; + } + + @Override + protected boolean advanceDeviceEntry() { + currentDeviceIndex++; + currentDeviceEntry = nextDeviceEntry(); + return currentDeviceEntry != null; + } + + @Override + protected void recordCurrentDeviceIndex() { + operatorContext.recordSpecifiedInfo( + CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex)); + } + + @Override + protected void constructAlignedSeriesScanUtil() { + if (!hasCurrentDeviceEntry()) { + return; + } + + DeviceEntry deviceEntry = getCurrentDeviceEntry(); + if (deviceEntry == null) { + throw new IllegalStateException("Current device entry in TableScanOperator is empty"); + } + + this.seriesScanUtil = + new ExternalTsFileSeriesScanUtil( + constructAlignedPath( + deviceEntry, measurementColumnNames, measurementSchemas, allSensors), + scanOrder, + seriesScanOptions, + ((OperatorContext) operatorContext).getInstanceContext(), + true, + measurementColumnTSDataTypes, + deviceIterator); + } + + @Override + public void close() throws Exception { + closeResourceReaders(resourceReaderMap); + resourceReaderMap = Collections.emptyMap(); + deviceIterator = null; + super.close(); + } + + @Override + public long ramBytesUsed() { + return super.ramBytesUsed() + + INSTANCE_SIZE + - AbstractTableScanOperator.INSTANCE_SIZE + + RamUsageEstimator.sizeOfMap(resourceReaderMap); + } + + private void closeResourceReaders(Map<TsFileResource, TsFileSequenceReader> readerMap) { + for (TsFileSequenceReader reader : readerMap.values()) { + try { + reader.close(); + } catch (IOException ignored) { + // ignore close failure + } + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index 81bbfa60d13..e24528ce5c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -40,6 +40,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.common.SessionInfo; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode; import org.apache.iotdb.commons.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; @@ -97,11 +98,13 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.Defa import org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.OrderedExternalTsFileTableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeNonAlignedDeviceViewAggregationScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeToTableViewAdaptorOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.UnorderedExternalTsFileTableScanOperator; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.read.CountSchemaMergeNode; @@ -136,6 +139,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.IDeviceSchemaInfo; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -445,7 +449,8 @@ public class DataNodeTableOperatorGenerator "PushDownOffset should not be set when isPushLimitToEachDevice is true."); } CommonTableScanOperatorParameters commonParameter = - new CommonTableScanOperatorParameters(node, fieldColumnsRenameMap, true); + new CommonTableScanOperatorParameters( + node, fieldColumnsRenameMap, true, node.getTagAndAttributeIndexMap()); List<IMeasurementSchema> measurementSchemas = commonParameter.measurementSchemas; List<Symbol> measurementSchemaIndex2Symbols = commonParameter.measurementSchemaIndex2Symbol; List<String> measurementColumnNames = commonParameter.measurementColumnNames; @@ -855,9 +860,10 @@ public class DataNodeTableOperatorGenerator int idx; private CommonTableScanOperatorParameters( - DeviceTableScanNode node, + TableScanNode node, Map<String, String> fieldColumnsRenameMap, - boolean keepNonOutputMeasurementColumns) { + boolean keepNonOutputMeasurementColumns, + Map<Symbol, Integer> tagAndAttributeColumnsIndexMap) { outputColumnNames = node.getOutputSymbols(); int outputColumnCount = keepNonOutputMeasurementColumns ? node.getAssignments().size() : outputColumnNames.size(); @@ -865,7 +871,7 @@ public class DataNodeTableOperatorGenerator symbolInputs = new ArrayList<>(outputColumnCount); columnsIndexArray = new int[outputColumnCount]; columnSchemaMap = node.getAssignments(); - tagAndAttributeColumnsIndexMap = node.getTagAndAttributeIndexMap(); + this.tagAndAttributeColumnsIndexMap = tagAndAttributeColumnsIndexMap; measurementColumnNames = new ArrayList<>(); measurementColumnsIndexMap = new HashMap<>(); measurementSchemas = new ArrayList<>(); @@ -1022,7 +1028,8 @@ public class DataNodeTableOperatorGenerator long viewTTL) { CommonTableScanOperatorParameters commonParameter = - new CommonTableScanOperatorParameters(node, fieldColumnsRenameMap, false); + new CommonTableScanOperatorParameters( + node, fieldColumnsRenameMap, false, node.getTagAndAttributeIndexMap()); List<IMeasurementSchema> measurementSchemas = commonParameter.measurementSchemas; List<String> measurementColumnNames = commonParameter.measurementColumnNames; List<ColumnSchema> columnSchemas = commonParameter.columnSchemas; @@ -1120,8 +1127,89 @@ public class DataNodeTableOperatorGenerator @Override public Operator visitExternalTsFileScan( ExternalTsFileScanNode node, LocalExecutionPlanContext context) { - throw new UnsupportedOperationException( - "ExternalTsFileScanNode physical operator is not implemented yet"); + AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter = + constructExternalTsFileTableScanOperatorParameter(node, context); + + AbstractTableScanOperator externalTsFileTableScanOperator = + node.getPushedOrderingScheme().isPresent() + ? new OrderedExternalTsFileTableScanOperator( + parameter, + node.getQualifiedObjectName().getObjectName(), + node.getAssignments(), + node.getPushedOrderingScheme().get()) + : new UnorderedExternalTsFileTableScanOperator( + parameter, node.getQualifiedObjectName().getObjectName()); + + context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); + + DataDriverContext dataDriverContext = (DataDriverContext) context.getDriverContext(); + dataDriverContext.addSourceOperator(externalTsFileTableScanOperator); + dataDriverContext.setExternalTsFilePaths(node.getTsFilePaths()); + dataDriverContext.setQueryDataSourceType(QueryDataSourceType.EXTERNAL_TSFILE_SCAN); + dataDriverContext.setInputDriver(true); + + return externalTsFileTableScanOperator; + } + + private AbstractTableScanOperator.AbstractTableScanOperatorParameter + constructExternalTsFileTableScanOperatorParameter( + ExternalTsFileScanNode node, LocalExecutionPlanContext context) { + CommonTableScanOperatorParameters commonParameter = + new CommonTableScanOperatorParameters( + node, Collections.emptyMap(), false, buildTagAndAttributeColumnsIndexMap(node)); + SeriesScanOptions seriesScanOptions = + buildSeriesScanOptions( + context, + commonParameter.columnSchemaMap, + commonParameter.measurementColumnNames, + commonParameter.measurementColumnsIndexMap, + commonParameter.timeColumnName, + Optional.empty(), + node.getPushDownLimit(), + node.getPushDownOffset(), + false, + node.getPushDownPredicate()); + + OperatorContext operatorContext = + addOperatorContext( + context, + node.getPlanNodeId(), + node.getPushedOrderingScheme().isPresent() + ? OrderedExternalTsFileTableScanOperator.class.getSimpleName() + : UnorderedExternalTsFileTableScanOperator.class.getSimpleName()); + + Set<String> allSensors = new HashSet<>(commonParameter.measurementColumnNames); + // for time column + allSensors.add(""); + + return new AbstractTableScanOperator.AbstractTableScanOperatorParameter( + allSensors, + operatorContext, + node.getPlanNodeId(), + commonParameter.columnSchemas, + commonParameter.columnsIndexArray, + Collections.emptyList(), + node.getScanOrder(), + seriesScanOptions, + commonParameter.measurementColumnNames, + commonParameter.measurementSchemas, + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()); + } + + private static Map<Symbol, Integer> buildTagAndAttributeColumnsIndexMap(TableScanNode node) { + Map<Symbol, Integer> tagAndAttributeColumnsIndexMap = new HashMap<>(); + int index = 0; + for (Map.Entry<Symbol, ColumnSchema> entry : node.getAssignments().entrySet()) { + switch (entry.getValue().getColumnCategory()) { + case TAG: + case ATTRIBUTE: + tagAndAttributeColumnsIndexMap.put(entry.getKey(), index++); + break; + default: + break; + } + } + return tagAndAttributeColumnsIndexMap; } private SeriesScanOptions.Builder getSeriesScanOptionsBuilder( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 95f28052c1f..f05c49b466d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -124,6 +124,7 @@ public class LocalExecutionPlanner { instanceContext.setSourcePaths(collectSourcePaths(context)); instanceContext.setDevicePathsToContext(collectDevicePathsToContext(context)); + instanceContext.setExternalTsFilePaths(collectExternalTsFilePaths(context)); instanceContext.setQueryDataSourceType( getQueryDataSourceType((DataDriverContext) context.getDriverContext())); @@ -264,6 +265,21 @@ public class LocalExecutionPlanner { return sourcePaths; } + private List<String> collectExternalTsFilePaths(LocalExecutionPlanContext context) { + List<String> externalTsFilePaths = new ArrayList<>(); + context + .getPipelineDriverFactories() + .forEach( + pipeline -> { + DataDriverContext dataDriverContext = (DataDriverContext) pipeline.getDriverContext(); + if (dataDriverContext.getExternalTsFilePaths() != null) { + externalTsFilePaths.addAll(dataDriverContext.getExternalTsFilePaths()); + } + dataDriverContext.clearExternalTsFilePaths(); + }); + return externalTsFilePaths; + } + public synchronized boolean forceAllocateFreeMemoryForOperators(long memoryInBytes) { // TODO @spricoder: consider a better way if (OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes() - memoryInBytes diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index 18e2c2d8cb7..f857775cc45 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -1617,7 +1617,7 @@ public class RelationPlanner implements AstVisitor<RelationPlan, Void> { field.getName().orElse(null), field.getType(), field.isHidden(), - field.getColumnCategory())); + handle.getOutputColumnCategories().get(i))); } List<Symbol> outputSymbols = outputSymbolsBuilder.build(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 709db77a38e..6c54af74445 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -735,6 +735,9 @@ public class TableDistributedPlanGenerator new TRegionReplicaSet( null, ImmutableList.of(DataNodeEndPoints.getLocalDataNodeLocation()))); context.mostUsedRegion = node.getRegionReplicaSet(); + if (context.hasSortProperty) { + processExternalTsFileSortProperty(node, context); + } return Collections.singletonList(node); } @@ -1935,6 +1938,67 @@ public class TableDistributedPlanGenerator } } + private void processExternalTsFileSortProperty( + final ExternalTsFileScanNode externalTsFileScanNode, final PlanContext context) { + final OrderingScheme expectedOrderingScheme = context.expectedOrderingScheme; + final List<Symbol> newOrderingSymbols = new ArrayList<>(); + final List<SortOrder> newSortOrders = new ArrayList<>(); + boolean lastIsTimeRelated = false; + + for (final Symbol symbol : expectedOrderingScheme.getOrderBy()) { + if (externalTsFileScanNode.isTimeColumn(symbol)) { + if (!expectedOrderingScheme.getOrdering(symbol).isAscending()) { + externalTsFileScanNode.setScanOrder(Ordering.DESC); + } + newOrderingSymbols.add(symbol); + newSortOrders.add(expectedOrderingScheme.getOrdering(symbol)); + lastIsTimeRelated = true; + break; + } + + final ColumnSchema columnSchema = externalTsFileScanNode.getAssignments().get(symbol); + if (columnSchema == null || columnSchema.getColumnCategory() != TsTableColumnCategory.TAG) { + break; + } + + newOrderingSymbols.add(symbol); + newSortOrders.add(expectedOrderingScheme.getOrdering(symbol)); + } + + if (newOrderingSymbols.isEmpty()) { + return; + } + + OrderingScheme pushedOrderingScheme = + new OrderingScheme( + newOrderingSymbols, + IntStream.range(0, newOrderingSymbols.size()) + .boxed() + .collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get))); + externalTsFileScanNode.setPushedOrderingScheme(pushedOrderingScheme); + + if (lastIsTimeRelated) { + if (newOrderingSymbols.size() > 1 + && newOrderingSymbols.size() == expectedOrderingScheme.getOrderBy().size() + && isOrderByAllIdsAndTime( + analysis.getTableColumnSchema(externalTsFileScanNode.getQualifiedObjectName()), + externalTsFileScanNode.getAssignments(), + new OrderingScheme( + newOrderingSymbols.subList(0, newOrderingSymbols.size() - 1), + IntStream.range(0, newOrderingSymbols.size() - 1) + .boxed() + .collect(Collectors.toMap(newOrderingSymbols::get, newSortOrders::get))), + newOrderingSymbols.size() - 2)) { + nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), pushedOrderingScheme); + } + return; + } + + if (newOrderingSymbols.size() == expectedOrderingScheme.getOrderBy().size()) { + nodeOrderingMap.put(externalTsFileScanNode.getPlanNodeId(), pushedOrderingScheme); + } + } + private Optional<IDeviceID.TreeDeviceIdColumnValueExtractor> createTreeDeviceIdColumnValueExtractor(DeviceTableScanNode node) { if (node instanceof TreeDeviceViewScanNode diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java index f0969147fa5..2c8654da468 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; @@ -128,6 +129,20 @@ public class PruneTableScanColumns extends ProjectOffPushDownRule<TableScanNode> deviceTableScanNode.isPushLimitToEachDevice(), deviceTableScanNode.containsNonAlignedDevice())); } + } else if (node instanceof ExternalTsFileScanNode) { + ExternalTsFileScanNode externalTsFileScanNode = (ExternalTsFileScanNode) node; + return Optional.of( + new ExternalTsFileScanNode( + externalTsFileScanNode.getPlanNodeId(), + externalTsFileScanNode.getQualifiedObjectName(), + newOutputs, + newAssignments, + externalTsFileScanNode.getPushDownPredicate(), + externalTsFileScanNode.getPushDownLimit(), + externalTsFileScanNode.getPushDownOffset(), + externalTsFileScanNode.getScanOrder(), + externalTsFileScanNode.getPushedOrderingScheme().orElse(null), + externalTsFileScanNode.getTsFilePaths())); } else if (node instanceof InformationSchemaTableScanNode) { // For the convenience of process in execution stage, column-prune for // InformationSchemaTableScanNode is diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java index 7e2d4850b56..ed39c7b4ed0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java @@ -25,9 +25,11 @@ import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.commons.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -38,9 +40,12 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; public class ExternalTsFileScanNode extends TableScanNode { private List<String> tsFilePaths; + private Ordering scanOrder = Ordering.ASC; + private OrderingScheme pushedOrderingScheme; protected ExternalTsFileScanNode() {} @@ -62,6 +67,8 @@ public class ExternalTsFileScanNode extends TableScanNode { Expression pushDownPredicate, long pushDownLimit, long pushDownOffset, + Ordering scanOrder, + OrderingScheme pushedOrderingScheme, List<String> tsFilePaths) { super( id, @@ -71,6 +78,8 @@ public class ExternalTsFileScanNode extends TableScanNode { pushDownPredicate, pushDownLimit, pushDownOffset); + this.scanOrder = scanOrder; + this.pushedOrderingScheme = pushedOrderingScheme; this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); } @@ -89,6 +98,8 @@ public class ExternalTsFileScanNode extends TableScanNode { pushDownPredicate, pushDownLimit, pushDownOffset, + scanOrder, + pushedOrderingScheme, tsFilePaths); } @@ -96,10 +107,33 @@ public class ExternalTsFileScanNode extends TableScanNode { return tsFilePaths; } + public Ordering getScanOrder() { + return scanOrder; + } + + public void setScanOrder(Ordering scanOrder) { + this.scanOrder = scanOrder; + } + + public Optional<OrderingScheme> getPushedOrderingScheme() { + return Optional.ofNullable(pushedOrderingScheme); + } + + public void setPushedOrderingScheme(OrderingScheme pushedOrderingScheme) { + this.pushedOrderingScheme = pushedOrderingScheme; + } + @Override protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.EXTERNAL_TSFILE_SCAN_NODE.serialize(byteBuffer); TableScanNode.serializeMemberVariables(this, byteBuffer, true); + ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer); + if (pushedOrderingScheme == null) { + ReadWriteIOUtils.write(false, byteBuffer); + } else { + ReadWriteIOUtils.write(true, byteBuffer); + pushedOrderingScheme.serialize(byteBuffer); + } serializeTsFilePaths(byteBuffer); } @@ -107,6 +141,13 @@ public class ExternalTsFileScanNode extends TableScanNode { protected void serializeAttributes(DataOutputStream stream) throws IOException { PlanNodeType.EXTERNAL_TSFILE_SCAN_NODE.serialize(stream); TableScanNode.serializeMemberVariables(this, stream, true); + ReadWriteIOUtils.write(scanOrder.ordinal(), stream); + if (pushedOrderingScheme == null) { + ReadWriteIOUtils.write(false, stream); + } else { + ReadWriteIOUtils.write(true, stream); + pushedOrderingScheme.serialize(stream); + } serializeTsFilePaths(stream); } @@ -128,6 +169,11 @@ public class ExternalTsFileScanNode extends TableScanNode { ExternalTsFileScanNode node = new ExternalTsFileScanNode(); TableScanNode.deserializeMemberVariables(byteBuffer, node, true); + node.scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; + if (ReadWriteIOUtils.readBool(byteBuffer)) { + node.pushedOrderingScheme = OrderingScheme.deserialize(byteBuffer); + } + int size = ReadWriteIOUtils.readInt(byteBuffer); List<String> tsFilePaths = new ArrayList<>(size); while (size-- > 0) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java index 4b4866aa1fa..e6f5962f2e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.commons.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; @@ -35,11 +36,12 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import java.util.Map; -import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode.isTimeColumn; +import static org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode.isTimeColumn; /** * <b>Optimization phase:</b> Logical plan planning. @@ -91,9 +93,13 @@ public class TransformSortToStreamSort implements PlanOptimizer { } context.setCanTransform(false); - DeviceTableScanNode deviceTableScanNode = context.getTableScanNode(); + TableScanNode tableScanNode = context.getTableScanNode(); + if (tableScanNode == null) { + node.setChild(child); + return node; + } Map<Symbol, ColumnSchema> tableColumnSchema = - analysis.getTableColumnSchema(deviceTableScanNode.getQualifiedObjectName()); + analysis.getTableColumnSchema(tableScanNode.getQualifiedObjectName()); OrderingScheme orderingScheme = node.getOrderingScheme(); int streamSortIndex = -1; @@ -110,10 +116,7 @@ public class TransformSortToStreamSort implements PlanOptimizer { if (streamSortIndex >= 0) { boolean orderByAllIdsAndTime = isOrderByAllIdsAndTime( - tableColumnSchema, - deviceTableScanNode.getAssignments(), - orderingScheme, - streamSortIndex); + tableColumnSchema, tableScanNode.getAssignments(), orderingScheme, streamSortIndex); return new StreamSortNode( queryContext.getQueryId().genPlanNodeId(), @@ -144,6 +147,12 @@ public class TransformSortToStreamSort implements PlanOptimizer { return node; } + @Override + public PlanNode visitExternalTsFileScan(ExternalTsFileScanNode node, Context context) { + context.setTableScanNode(node); + return node; + } + @Override public PlanNode visitInformationSchemaTableScan( InformationSchemaTableScanNode node, Context context) { @@ -200,15 +209,15 @@ public class TransformSortToStreamSort implements PlanOptimizer { } private static class Context { - private DeviceTableScanNode tableScanNode; + private TableScanNode tableScanNode; private boolean canTransform = true; - public DeviceTableScanNode getTableScanNode() { + public TableScanNode getTableScanNode() { return tableScanNode; } - public void setTableScanNode(DeviceTableScanNode tableScanNode) { + public void setTableScanNode(TableScanNode tableScanNode) { this.tableScanNode = tableScanNode; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 62f88860aed..3dd10fa0361 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -296,6 +296,8 @@ public class UnaliasSymbolReferences implements PlanOptimizer { node.getPushDownPredicate() == null ? null : mapper.map(node.getPushDownPredicate()), node.getPushDownLimit(), node.getPushDownOffset(), + node.getScanOrder(), + node.getPushedOrderingScheme().map(mapper::map).orElse(null), node.getTsFilePaths()), mapping); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSourceType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSourceType.java index d80ce27ad7a..1214fd2b89c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSourceType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSourceType.java @@ -21,5 +21,6 @@ package org.apache.iotdb.db.storageengine.dataregion.read; public enum QueryDataSourceType { SERIES_SCAN, DEVICE_REGION_SCAN, - TIME_SERIES_REGION_SCAN + TIME_SERIES_REGION_SCAN, + EXTERNAL_TSFILE_SCAN } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java index 91641c75a46..82c4133b339 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/ReadTsFileTableFunction.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; import org.apache.iotdb.udf.api.exception.UDFException; @@ -99,6 +100,9 @@ public class ReadTsFileTableFunction implements TableFunction { schemaCollection.tsFiles.stream() .map(File::getAbsolutePath) .collect(Collectors.toList()), + schemaCollection.mergedTableSchema.getColumnTypes().stream() + .map(TsTableColumnCategory::fromTsFileColumnCategory) + .collect(Collectors.toList()), outputSchema); return TableFunctionAnalysis.builder().properColumnSchema(outputSchema).handle(handle).build(); @@ -352,13 +356,22 @@ public class ReadTsFileTableFunction implements TableFunction { private List<String> tsFilePaths; private List<String> outputColumnNames; private List<Type> outputColumnTypes; + private List<TsTableColumnCategory> outputColumnCategories; public ReadTsFileTableFunctionHandle() { - this("", Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + this( + "", + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList()); } public ReadTsFileTableFunctionHandle( - String tableName, List<String> tsFilePaths, DescribedSchema outputSchema) { + String tableName, + List<String> tsFilePaths, + List<TsTableColumnCategory> outputColumnCategories, + DescribedSchema outputSchema) { this( tableName, tsFilePaths, @@ -367,21 +380,28 @@ public class ReadTsFileTableFunction implements TableFunction { .collect(Collectors.toList()), outputSchema.getFields().stream() .map(DescribedSchema.Field::getType) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + outputColumnCategories); } private ReadTsFileTableFunctionHandle( String tableName, List<String> tsFilePaths, List<String> outputColumnNames, - List<Type> outputColumnTypes) { + List<Type> outputColumnTypes, + List<TsTableColumnCategory> outputColumnCategories) { if (outputColumnNames.size() != outputColumnTypes.size()) { throw new IllegalArgumentException("Output column names and types size mismatch"); } + if (outputColumnNames.size() != outputColumnCategories.size()) { + throw new IllegalArgumentException("Output column names and categories size mismatch"); + } this.tableName = tableName; this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); this.outputColumnNames = Collections.unmodifiableList(new ArrayList<>(outputColumnNames)); this.outputColumnTypes = Collections.unmodifiableList(new ArrayList<>(outputColumnTypes)); + this.outputColumnCategories = + Collections.unmodifiableList(new ArrayList<>(outputColumnCategories)); } public String getTableName() { @@ -400,6 +420,10 @@ public class ReadTsFileTableFunction implements TableFunction { return outputColumnTypes; } + public List<TsTableColumnCategory> getOutputColumnCategories() { + return outputColumnCategories; + } + @Override public byte[] serialize() { ByteBuffer buffer = ByteBuffer.allocate(calculateSerializeSize()); @@ -410,6 +434,7 @@ public class ReadTsFileTableFunction implements TableFunction { for (int i = 0; i < outputColumnNames.size(); i++) { writeString(buffer, outputColumnNames.get(i)); buffer.put(outputColumnTypes.get(i).getType()); + buffer.put(outputColumnCategories.get(i).getCategory()); } return buffer.array(); } @@ -427,12 +452,15 @@ public class ReadTsFileTableFunction implements TableFunction { size = buffer.getInt(); List<String> columnNames = new ArrayList<>(size); List<Type> columnTypes = new ArrayList<>(size); + List<TsTableColumnCategory> columnCategories = new ArrayList<>(size); for (int i = 0; i < size; i++) { columnNames.add(readString(buffer)); columnTypes.add(Type.valueOf(buffer.get())); + columnCategories.add(TsTableColumnCategory.deserialize(buffer.get())); } outputColumnNames = Collections.unmodifiableList(columnNames); outputColumnTypes = Collections.unmodifiableList(columnTypes); + outputColumnCategories = Collections.unmodifiableList(columnCategories); } @Override @@ -447,6 +475,8 @@ public class ReadTsFileTableFunction implements TableFunction { + outputColumnNames + ", outputColumnTypes=" + outputColumnTypes + + ", outputColumnCategories=" + + outputColumnCategories + '}'; } @@ -458,7 +488,7 @@ public class ReadTsFileTableFunction implements TableFunction { } size += Integer.BYTES; for (String columnName : outputColumnNames) { - size += Integer.BYTES + columnName.getBytes(StandardCharsets.UTF_8).length + Byte.BYTES; + size += Integer.BYTES + columnName.getBytes(StandardCharsets.UTF_8).length + 2 * Byte.BYTES; } return size; }
