This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch readTsFileTableFunction in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 48d86c920af5ad5aec943ca7fe9ae2a6dacd24c0 Author: shuwenwei <[email protected]> AuthorDate: Mon Jun 1 11:22:56 2026 +0800 push down tag predicate --- .../fragment/FragmentInstanceContext.java | 2 +- .../execution/operator/source/SeriesScanUtil.java | 46 +++++++------ .../ExternalTsFileDeviceFilterVisitor.java | 50 ++++++++++++++ .../relational/ExternalTsFileSeriesScanUtil.java | 6 ++ .../relational/MultiTsFileResourceIterator.java | 12 +++- .../OrderedExternalTsFileTableScanOperator.java | 16 ++++- .../UnorderedExternalTsFileTableScanOperator.java | 8 ++- .../planner/DataNodeTableOperatorGenerator.java | 36 +++++++++- .../iterative/rule/PruneTableScanColumns.java | 2 + .../planner/node/ExternalTsFileScanNode.java | 80 ++++++++++++++++++++++ .../optimizations/PushPredicateIntoTableScan.java | 23 ++++++- .../optimizations/UnaliasSymbolReferences.java | 2 + 12 files changed, 251 insertions(+), 32 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index a57b99ca18c..67b36a6d23d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -819,7 +819,7 @@ public class FragmentInstanceContext extends QueryContext { } this.sharedQueryDataSource = - new QueryDataSource(externalTsFileResources, Collections.emptyList()); + new QueryDataSource(Collections.emptyList(), externalTsFileResources); return true; } finally { addInitQueryDataSourceCost(System.nanoTime() - startTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index 6e6eb76905b..0591e1234c3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -208,27 +208,7 @@ public class SeriesScanUtil implements Accountable { dataSource.fillOrderIndexes(deviceID, orderUtils.getAscending()); this.dataSource = dataSource; - // updated filter concerning TTL - // IgnoreAllNullRows is false indicating that the current query is a table model query. - // In most cases, We can use this condition to determine from which model to obtain the ttl - // of the current device. However, it should be noted that for tree model data queried using - // table view, ttl also needs to be obtained from the tree model. - if (context.isIgnoreAllNullRows() || scanOptions.isTableViewForTreeModel()) { - if (deviceID != EMPTY_DEVICE_ID) { - long ttl = DataNodeTTLCache.getInstance().getTTLForTree(deviceID); - scanOptions.setTTLForTreeDevice(ttl); - } - } else { - if (scanOptions.timeFilterNeedUpdatedByTtl()) { - String databaseName = dataSource.getDatabaseName(); - long ttl = - databaseName == null - ? Long.MAX_VALUE - : DataNodeTTLCache.getInstance() - .getTTLForTable(databaseName, deviceID.getTableName()); - scanOptions.setTTLForTableDevice(ttl); - } - } + updateFilterUsingTTL(dataSource); // init file index orderUtils.setCurSeqFileIndex(dataSource); @@ -254,6 +234,30 @@ public class SeriesScanUtil implements Accountable { } } + protected void updateFilterUsingTTL(QueryDataSource dataSource) { + // updated filter concerning TTL + // IgnoreAllNullRows is false indicating that the current query is a table model query. + // In most cases, We can use this condition to determine from which model to obtain the ttl + // of the current device. However, it should be noted that for tree model data queried using + // table view, ttl also needs to be obtained from the tree model. + if (context.isIgnoreAllNullRows() || scanOptions.isTableViewForTreeModel()) { + if (deviceID != EMPTY_DEVICE_ID) { + long ttl = DataNodeTTLCache.getInstance().getTTLForTree(deviceID); + scanOptions.setTTLForTreeDevice(ttl); + } + } else { + if (scanOptions.timeFilterNeedUpdatedByTtl()) { + String databaseName = dataSource.getDatabaseName(); + long ttl = + databaseName == null + ? Long.MAX_VALUE + : DataNodeTTLCache.getInstance() + .getTTLForTable(databaseName, deviceID.getTableName()); + scanOptions.setTTLForTableDevice(ttl); + } + } + } + protected PriorityMergeReader getPriorityMergeReader() { return new PriorityMergeReader(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileDeviceFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileDeviceFilterVisitor.java new file mode 100644 index 00000000000..005491d2b6e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileDeviceFilterVisitor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.commons.schema.filter.SchemaFilter; +import org.apache.iotdb.commons.schema.filter.SchemaFilterVisitor; +import org.apache.iotdb.commons.schema.filter.impl.StringValueFilterVisitor; +import org.apache.iotdb.commons.schema.filter.impl.singlechild.AttributeFilter; +import org.apache.iotdb.commons.schema.filter.impl.singlechild.TagFilter; + +import org.apache.tsfile.file.metadata.IDeviceID; + +public class ExternalTsFileDeviceFilterVisitor extends SchemaFilterVisitor<IDeviceID> { + + @Override + protected Boolean visitNode(final SchemaFilter filter, final IDeviceID deviceID) { + throw new UnsupportedOperationException( + "The schema filter type " + filter.getSchemaFilterType() + " is not supported"); + } + + @Override + public Boolean visitTagFilter(final TagFilter filter, final IDeviceID deviceID) { + final int index = filter.getIndex() + 1; + final String value = index < deviceID.segmentNum() ? (String) deviceID.segment(index) : null; + return filter.getChild().accept(StringValueFilterVisitor.getInstance(), value); + } + + @Override + public Boolean visitAttributeFilter(final AttributeFilter filter, final IDeviceID deviceID) { + throw new UnsupportedOperationException( + "Attribute filter is not supported for external TsFile device filtering"); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java index c568a2768c9..f8e7766ffbe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.enums.TSDataType; @@ -72,6 +73,11 @@ public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil { return metadataLoader.loadTimeSeriesMetadata(resource, (AlignedFullPath) seriesPath); } + @Override + protected void updateFilterUsingTTL(QueryDataSource dataSource) { + // External TsFiles are not managed by IoTDB metadata, so no table/tree TTL applies here. + } + @FunctionalInterface public interface ExternalTsFileMetadataLoader { AbstractAlignedTimeSeriesMetadata loadTimeSeriesMetadata( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MultiTsFileResourceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MultiTsFileResourceIterator.java index 3748f16c1cd..67bddc4facb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MultiTsFileResourceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MultiTsFileResourceIterator.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; import org.apache.iotdb.commons.path.AlignedFullPath; +import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; @@ -42,6 +43,9 @@ public class MultiTsFileResourceIterator { private final String tableName; private final FragmentInstanceContext fragmentInstanceContext; private final SeriesScanOptions seriesScanOptions; + private final SchemaFilter deviceFilter; + private final ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = + new ExternalTsFileDeviceFilterVisitor(); private final Map<TsFileResource, TsFileResourceDeviceIterator> deviceIteratorMap = new HashMap<>(); @@ -53,10 +57,12 @@ public class MultiTsFileResourceIterator { List<TsFileResource> unseqResources, Map<TsFileResource, TsFileSequenceReader> resourceReaderMap, FragmentInstanceContext fragmentInstanceContext, - SeriesScanOptions seriesScanOptions) { + SeriesScanOptions seriesScanOptions, + SchemaFilter deviceFilter) { this.tableName = tableName; this.fragmentInstanceContext = fragmentInstanceContext; this.seriesScanOptions = seriesScanOptions; + this.deviceFilter = deviceFilter; initDeviceIterators(seqResources, resourceReaderMap); initDeviceIterators(unseqResources, resourceReaderMap); } @@ -145,7 +151,9 @@ public class MultiTsFileResourceIterator { } private boolean isDeviceMatched(IDeviceID deviceID) { - return tableName.equalsIgnoreCase(deviceID.getTableName()); + return tableName.equalsIgnoreCase(deviceID.getTableName()) + && (deviceFilter == null + || Boolean.TRUE.equals(deviceFilter.accept(deviceFilterVisitor, deviceID))); } private class TsFileResourceDeviceIterator { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/OrderedExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/OrderedExternalTsFileTableScanOperator.java index 3c0ff7a087a..1476ed889aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/OrderedExternalTsFileTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/OrderedExternalTsFileTableScanOperator.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.TableScanNode import org.apache.iotdb.commons.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.commons.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.source.FileLoaderUtils; @@ -60,6 +61,9 @@ public class OrderedExternalTsFileTableScanOperator extends AbstractTableScanOpe private final String tableName; private final Map<Symbol, ColumnSchema> assignments; private final OrderingScheme pushedOrderingScheme; + private final SchemaFilter deviceFilter; + private final ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = + new ExternalTsFileDeviceFilterVisitor(); private final Map<TsFileResource, Map<IDeviceID, long[]>> deviceMeasurementNodeOffsetMap = new HashMap<>(); @@ -71,11 +75,13 @@ public class OrderedExternalTsFileTableScanOperator extends AbstractTableScanOpe AbstractTableScanOperatorParameter parameter, String tableName, Map<Symbol, ColumnSchema> assignments, - OrderingScheme pushedOrderingScheme) { + OrderingScheme pushedOrderingScheme, + SchemaFilter deviceFilter) { super(parameter); this.tableName = tableName; this.assignments = assignments; this.pushedOrderingScheme = pushedOrderingScheme; + this.deviceFilter = deviceFilter; this.currentDeviceIndex = 0; } @@ -138,6 +144,9 @@ public class OrderedExternalTsFileTableScanOperator extends AbstractTableScanOpe if (!isDeviceMatched(deviceID)) { continue; } + if (!isDeviceFilterMatched(deviceID)) { + continue; + } deviceInfos.add( new ExternalTsFileDeviceInfo( deviceID, resource, deviceIterator.getCurrentDeviceMeasurementNodeOffset())); @@ -161,6 +170,11 @@ public class OrderedExternalTsFileTableScanOperator extends AbstractTableScanOpe return tableName.equalsIgnoreCase(deviceID.getTableName()); } + private boolean isDeviceFilterMatched(IDeviceID deviceID) { + return deviceFilter == null + || Boolean.TRUE.equals(deviceFilter.accept(deviceFilterVisitor, deviceID)); + } + private Comparator<ExternalTsFileDeviceInfo> createDeviceInfoComparator() { Comparator<ExternalTsFileDeviceInfo> comparator = null; for (Symbol symbol : pushedOrderingScheme.getOrderBy()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java index 518a2fc0102..b1eb554a3ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/UnorderedExternalTsFileTableScanOperator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; +import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; @@ -46,6 +47,7 @@ public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanO RamUsageEstimator.shallowSizeOfInstance(UnorderedExternalTsFileTableScanOperator.class); private final String tableName; + private final SchemaFilter deviceFilter; private MultiTsFileResourceIterator deviceIterator; private Map<TsFileResource, TsFileSequenceReader> resourceReaderMap = Collections.emptyMap(); @@ -53,9 +55,10 @@ public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanO private int currentDeviceIndex; public UnorderedExternalTsFileTableScanOperator( - AbstractTableScanOperatorParameter parameter, String tableName) { + AbstractTableScanOperatorParameter parameter, String tableName, SchemaFilter deviceFilter) { super(parameter); this.tableName = tableName; + this.deviceFilter = deviceFilter; this.currentDeviceIndex = 0; } @@ -93,7 +96,8 @@ public class UnorderedExternalTsFileTableScanOperator extends AbstractTableScanO queryDataSource.getUnseqResources(), resourceReaderMap, ((OperatorContext) operatorContext).getInstanceContext(), - seriesScanOptions); + seriesScanOptions, + deviceFilter); } private Map<TsFileResource, TsFileSequenceReader> createResourceReaderMap( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java index e24528ce5c7..3f678f7abdd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/DataNodeTableOperatorGenerator.java @@ -51,7 +51,9 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.FunctionCall import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.LongLiteral; import org.apache.iotdb.commons.queryengine.plan.relational.type.InternalTypeManager; import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils; +import org.apache.iotdb.commons.schema.filter.SchemaFilter; import org.apache.iotdb.commons.schema.table.TsTable; +import org.apache.iotdb.commons.schema.table.column.TagColumnSchema; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -111,6 +113,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.read.Coun import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema.ConvertSchemaPredicateToFilterVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedDeviceEntry; @@ -1129,6 +1132,7 @@ public class DataNodeTableOperatorGenerator ExternalTsFileScanNode node, LocalExecutionPlanContext context) { AbstractTableScanOperator.AbstractTableScanOperatorParameter parameter = constructExternalTsFileTableScanOperatorParameter(node, context); + SchemaFilter deviceFilter = constructExternalTsFileDeviceFilter(node); AbstractTableScanOperator externalTsFileTableScanOperator = node.getPushedOrderingScheme().isPresent() @@ -1136,9 +1140,10 @@ public class DataNodeTableOperatorGenerator parameter, node.getQualifiedObjectName().getObjectName(), node.getAssignments(), - node.getPushedOrderingScheme().get()) + node.getPushedOrderingScheme().get(), + deviceFilter) : new UnorderedExternalTsFileTableScanOperator( - parameter, node.getQualifiedObjectName().getObjectName()); + parameter, node.getQualifiedObjectName().getObjectName(), deviceFilter); context.getInstanceContext().collectTable(node.getQualifiedObjectName().getObjectName()); @@ -1151,6 +1156,31 @@ public class DataNodeTableOperatorGenerator return externalTsFileTableScanOperator; } + private SchemaFilter constructExternalTsFileDeviceFilter(ExternalTsFileScanNode node) { + if (!node.getTagPredicate().isPresent()) { + return null; + } + TsTable table = new TsTable(node.getQualifiedObjectName().getObjectName()); + for (Map.Entry<Symbol, ColumnSchema> entry : node.getAssignments().entrySet()) { + ColumnSchema columnSchema = entry.getValue(); + if (columnSchema.getColumnCategory() == TsTableColumnCategory.TAG) { + table.addColumnSchema( + new TagColumnSchema(entry.getKey().getName(), getTSDataType(columnSchema.getType()))); + } + } + SchemaFilter deviceFilter = + node.getTagPredicate() + .get() + .accept( + new ConvertSchemaPredicateToFilterVisitor(), + new ConvertSchemaPredicateToFilterVisitor.Context(table)); + if (deviceFilter == null) { + throw new UnsupportedOperationException( + "Unsupported external TsFile device filter: " + node.getTagPredicate().get()); + } + return deviceFilter; + } + private AbstractTableScanOperator.AbstractTableScanOperatorParameter constructExternalTsFileTableScanOperatorParameter( ExternalTsFileScanNode node, LocalExecutionPlanContext context) { @@ -1164,7 +1194,7 @@ public class DataNodeTableOperatorGenerator commonParameter.measurementColumnNames, commonParameter.measurementColumnsIndexMap, commonParameter.timeColumnName, - Optional.empty(), + node.getTimePredicate(), node.getPushDownLimit(), node.getPushDownOffset(), false, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java index 2c8654da468..97dc0672b55 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableScanColumns.java @@ -140,6 +140,8 @@ public class PruneTableScanColumns extends ProjectOffPushDownRule<TableScanNode> externalTsFileScanNode.getPushDownPredicate(), externalTsFileScanNode.getPushDownLimit(), externalTsFileScanNode.getPushDownOffset(), + externalTsFileScanNode.getTagPredicate().orElse(null), + externalTsFileScanNode.getTimePredicate().orElse(null), externalTsFileScanNode.getScanOrder(), externalTsFileScanNode.getPushedOrderingScheme().orElse(null), externalTsFileScanNode.getTsFilePaths())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java index ed39c7b4ed0..e60cebfdf29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java @@ -44,6 +44,8 @@ import java.util.Optional; public class ExternalTsFileScanNode extends TableScanNode { private List<String> tsFilePaths; + private Expression tagPredicate; + private Expression timePredicate; private Ordering scanOrder = Ordering.ASC; private OrderingScheme pushedOrderingScheme; @@ -70,6 +72,34 @@ public class ExternalTsFileScanNode extends TableScanNode { Ordering scanOrder, OrderingScheme pushedOrderingScheme, List<String> tsFilePaths) { + this( + id, + qualifiedObjectName, + outputSymbols, + assignments, + pushDownPredicate, + pushDownLimit, + pushDownOffset, + null, + null, + scanOrder, + pushedOrderingScheme, + tsFilePaths); + } + + public ExternalTsFileScanNode( + PlanNodeId id, + QualifiedObjectName qualifiedObjectName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + Expression pushDownPredicate, + long pushDownLimit, + long pushDownOffset, + Expression tagPredicate, + Expression timePredicate, + Ordering scanOrder, + OrderingScheme pushedOrderingScheme, + List<String> tsFilePaths) { super( id, qualifiedObjectName, @@ -78,6 +108,8 @@ public class ExternalTsFileScanNode extends TableScanNode { pushDownPredicate, pushDownLimit, pushDownOffset); + this.tagPredicate = tagPredicate; + this.timePredicate = timePredicate; this.scanOrder = scanOrder; this.pushedOrderingScheme = pushedOrderingScheme; this.tsFilePaths = Collections.unmodifiableList(new ArrayList<>(tsFilePaths)); @@ -98,6 +130,8 @@ public class ExternalTsFileScanNode extends TableScanNode { pushDownPredicate, pushDownLimit, pushDownOffset, + tagPredicate, + timePredicate, scanOrder, pushedOrderingScheme, tsFilePaths); @@ -107,6 +141,22 @@ public class ExternalTsFileScanNode extends TableScanNode { return tsFilePaths; } + public Optional<Expression> getTagPredicate() { + return Optional.ofNullable(tagPredicate); + } + + public void setTagPredicate(Expression tagPredicate) { + this.tagPredicate = tagPredicate; + } + + public Optional<Expression> getTimePredicate() { + return Optional.ofNullable(timePredicate); + } + + public void setTimePredicate(Expression timePredicate) { + this.timePredicate = timePredicate; + } + public Ordering getScanOrder() { return scanOrder; } @@ -127,6 +177,8 @@ public class ExternalTsFileScanNode extends TableScanNode { protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.EXTERNAL_TSFILE_SCAN_NODE.serialize(byteBuffer); TableScanNode.serializeMemberVariables(this, byteBuffer, true); + serializePredicate(tagPredicate, byteBuffer); + serializePredicate(timePredicate, byteBuffer); ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer); if (pushedOrderingScheme == null) { ReadWriteIOUtils.write(false, byteBuffer); @@ -141,6 +193,8 @@ public class ExternalTsFileScanNode extends TableScanNode { protected void serializeAttributes(DataOutputStream stream) throws IOException { PlanNodeType.EXTERNAL_TSFILE_SCAN_NODE.serialize(stream); TableScanNode.serializeMemberVariables(this, stream, true); + serializePredicate(tagPredicate, stream); + serializePredicate(timePredicate, stream); ReadWriteIOUtils.write(scanOrder.ordinal(), stream); if (pushedOrderingScheme == null) { ReadWriteIOUtils.write(false, stream); @@ -165,10 +219,36 @@ public class ExternalTsFileScanNode extends TableScanNode { } } + private void serializePredicate(Expression predicate, ByteBuffer byteBuffer) { + if (predicate == null) { + ReadWriteIOUtils.write(false, byteBuffer); + } else { + ReadWriteIOUtils.write(true, byteBuffer); + Expression.serialize(predicate, byteBuffer); + } + } + + private void serializePredicate(Expression predicate, DataOutputStream stream) + throws IOException { + if (predicate == null) { + ReadWriteIOUtils.write(false, stream); + } else { + ReadWriteIOUtils.write(true, stream); + Expression.serialize(predicate, stream); + } + } + public static ExternalTsFileScanNode deserialize(ByteBuffer byteBuffer) { ExternalTsFileScanNode node = new ExternalTsFileScanNode(); TableScanNode.deserializeMemberVariables(byteBuffer, node, true); + if (ReadWriteIOUtils.readBool(byteBuffer)) { + node.tagPredicate = Expression.deserialize(byteBuffer); + } + if (ReadWriteIOUtils.readBool(byteBuffer)) { + node.timePredicate = Expression.deserialize(byteBuffer); + } + node.scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; if (ReadWriteIOUtils.readBool(byteBuffer)) { node.pushedOrderingScheme = OrderingScheme.deserialize(byteBuffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index aff3a144584..361e2d540bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -72,6 +72,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ReplaceSymbolInExpression; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExternalTsFileScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; @@ -471,12 +472,22 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { return combineFilterAndScan(tableScanNode, context.inheritedPredicate); } + @Override + public PlanNode visitExternalTsFileScan( + ExternalTsFileScanNode tableScanNode, RewriteContext context) { + if (TRUE_LITERAL.equals(context.inheritedPredicate)) { + return tableScanNode; + } + + return combineFilterAndScan(tableScanNode, context.inheritedPredicate); + } + public PlanNode combineFilterAndScan(TableScanNode tableScanNode, Expression predicate) { SplitExpression splitExpression = tableScanNode instanceof InformationSchemaTableScanNode ? splitPredicateForInformationSchemaTable( (InformationSchemaTableScanNode) tableScanNode, predicate) - : splitPredicate((DeviceTableScanNode) tableScanNode, predicate); + : splitPredicate(tableScanNode, predicate); // exist expressions can push down to scan operator if (!splitExpression.getExpressionsCanPushDown().isEmpty()) { @@ -492,6 +503,8 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { Boolean hasValueFilter = resultPair.getRight(); if (tableScanNode instanceof DeviceTableScanNode && resultPair.left != null) { ((DeviceTableScanNode) tableScanNode).setTimePredicate(resultPair.left); + } else if (tableScanNode instanceof ExternalTsFileScanNode && resultPair.left != null) { + ((ExternalTsFileScanNode) tableScanNode).setTimePredicate(resultPair.left); } if (Boolean.TRUE.equals(hasValueFilter)) { if (pushDownPredicate instanceof LogicalExpression @@ -510,6 +523,12 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { if (tableScanNode instanceof DeviceTableScanNode) { getDeviceEntriesWithDataPartitions( (DeviceTableScanNode) tableScanNode, splitExpression.getMetadataExpressions()); + } else if (tableScanNode instanceof ExternalTsFileScanNode) { + ((ExternalTsFileScanNode) tableScanNode) + .setTagPredicate( + splitExpression.getMetadataExpressions().isEmpty() + ? null + : combineConjuncts(splitExpression.getMetadataExpressions())); } // exist expressions can not push down to scan operator @@ -621,7 +640,7 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { Collections.emptyList(), expressionsCanPushDown, expressionsCannotPushDown, null); } - private SplitExpression splitPredicate(DeviceTableScanNode node, Expression predicate) { + private SplitExpression splitPredicate(TableScanNode node, Expression predicate) { Set<String> idOrAttributeColumnNames = new HashSet<>(node.getAssignments().size()); Set<String> timeOrMeasurementColumnNames = new HashSet<>(node.getAssignments().size()); String timeColumnName = null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 3dd10fa0361..389a91f300b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -296,6 +296,8 @@ public class UnaliasSymbolReferences implements PlanOptimizer { node.getPushDownPredicate() == null ? null : mapper.map(node.getPushDownPredicate()), node.getPushDownLimit(), node.getPushDownOffset(), + node.getTagPredicate().map(mapper::map).orElse(null), + node.getTimePredicate().map(mapper::map).orElse(null), node.getScanOrder(), node.getPushedOrderingScheme().map(mapper::map).orElse(null), node.getTsFilePaths()),
