This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TimeFilterForTableScan in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cfb1c9c019556037c8142a611a76fcda44210145 Author: JackieTien97 <[email protected]> AuthorDate: Mon Jun 24 11:52:03 2024 +0800 Remove global timefilter in Analysis --- .../fragment/FragmentInstanceContext.java | 9 +++++ .../source/relational/TableScanOperator.java | 11 +------ .../plan/planner/TableOperatorGenerator.java | 38 ++++++++-------------- .../plan/planner/plan/FragmentInstance.java | 13 ++++++++ .../plan/relational/analyzer/Analysis.java | 12 ++----- .../plan/relational/planner/QueryPlanner.java | 1 - .../distribute/TableModelQueryFragmentPlanner.java | 4 --- .../relational/planner/node/TableScanNode.java | 12 +++++++ 8 files changed, 51 insertions(+), 49 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index eab61c8a2c9..2df96391ebc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -359,6 +359,15 @@ public class FragmentInstanceContext extends QueryContext { return globalTimeFilter; } + public void setTimeFilterForTableModel(Filter timeFilter) { + if (globalTimeFilter == null) { + globalTimeFilter = timeFilter; + } else { + throw new IllegalStateException( + "globalTimeFilter in FragmentInstanceContext should only be set once in Table Model!"); + } + } + public IDataRegionForQuery getDataRegion() { return dataRegion; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java index fdbb3a34275..c998203d7b5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java @@ -38,7 +38,6 @@ import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.TsBlockBuilder; import org.apache.tsfile.read.common.block.column.BinaryColumn; @@ -375,16 +374,8 @@ public class TableScanOperator extends AbstractDataSourceOperator { DeviceEntry deviceEntry, List<String> measurementColumnNames, List<IMeasurementSchema> measurementSchemas) { - String[] devicePath = new String[1 + deviceEntry.getDeviceID().segmentNum()]; - devicePath[0] = "root"; - for (int i = 1; i < devicePath.length; i++) { - devicePath[i] = (String) deviceEntry.getDeviceID().segment(i - 1); - } - return new AlignedFullPath( - IDeviceID.Factory.DEFAULT_FACTORY.create(devicePath), - measurementColumnNames, - measurementSchemas); + deviceEntry.getDeviceID(), measurementColumnNames, measurementSchemas); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 804d38fddf0..dbfac54c645 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -59,6 +59,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; @@ -85,9 +86,10 @@ import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; +import javax.validation.constraints.NotNull; + import java.io.File; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -245,15 +247,17 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution System.arraycopy(columnsIndexArray, 0, newColumnsIndexArray, 0, outputColumnCount - 1); } - SeriesScanOptions.Builder scanOptionsBuilder = getSeriesScanOptionsBuilder(context); + SeriesScanOptions.Builder scanOptionsBuilder = + node.getTimePredicate() + .map(timePredicate -> getSeriesScanOptionsBuilder(context, timePredicate)) + .orElse(new SeriesScanOptions.Builder()); scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit()); scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset()); scanOptionsBuilder.withPushLimitToEachDevice(node.isPushLimitToEachDevice()); scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames)); Expression pushDownPredicate = node.getPushDownPredicate(); - boolean predicateCanPushIntoScan = canPushIntoScan(pushDownPredicate); - if (pushDownPredicate != null && predicateCanPushIntoScan) { + if (pushDownPredicate != null) { scanOptionsBuilder.withPushDownFilter( convertPredicateToFilter(pushDownPredicate, measurementColumnNames, columnSchemaMap)); } @@ -299,20 +303,6 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution context.getDriverContext().setInputDriver(true); - if (!predicateCanPushIntoScan) { - - return constructFilterAndProjectOperator( - Optional.of(pushDownPredicate), - tableScanOperator, - node.getOutputSymbols().stream() - .filter(symbol -> !TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(symbol.getName())) - .map(Symbol::toSymbolReference) - .toArray(Expression[]::new), - tableScanOperator.getResultDataTypes(), - makeLayout(Collections.singletonList(node)), - node.getPlanNodeId(), - context); - } return tableScanOperator; } @@ -339,14 +329,14 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution return outputMappings; } - private SeriesScanOptions.Builder getSeriesScanOptionsBuilder(LocalExecutionPlanContext context) { + private SeriesScanOptions.Builder getSeriesScanOptionsBuilder( + LocalExecutionPlanContext context, @NotNull Expression timePredicate) { SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); - Filter globalTimeFilter = context.getGlobalTimeFilter(); - if (globalTimeFilter != null) { - // time filter may be stateful, so we need to copy it - scanOptionsBuilder.withGlobalTimeFilter(globalTimeFilter.copy()); - } + Filter timeFilter = timePredicate.accept(new ConvertPredicateToTimeFilterVisitor(), null); + context.getDriverContext().getFragmentInstanceContext().setTimeFilterForTableModel(timeFilter); + // time filter may be stateful, so we need to copy it + scanOptionsBuilder.withGlobalTimeFilter(timeFilter.copy()); return scanOptionsBuilder; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java index d212f4ca725..7a5afed6d8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java @@ -112,6 +112,19 @@ public class FragmentInstance implements IConsensusRequest { this.isExplainAnalyze = isExplainAnalyze; } + public FragmentInstance( + PlanFragment fragment, + FragmentInstanceId id, + QueryType type, + long timeOut, + SessionInfo sessionInfo, + boolean isExplainAnalyze, + boolean isRoot) { + this(fragment, id, null, type, timeOut, sessionInfo); + this.isRoot = isRoot; + this.isExplainAnalyze = isExplainAnalyze; + } + public FragmentInstance( PlanFragment fragment, FragmentInstanceId id, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java index 749e40a3abf..065260c9899 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java @@ -154,10 +154,10 @@ public class Analysis implements IAnalysis { private final Set<NodeRef<Relation>> aliasedRelations = new LinkedHashSet<>(); - private Expression globalTableModelTimePredicate; - + // only be used in write plan and won't be used in query private DataPartition dataPartition; + // only be used in write plan and won't be used in query private SchemaPartition schemaPartition; private DatasetHeader respDatasetHeader; @@ -167,14 +167,6 @@ public class Analysis implements IAnalysis { // indicate is there a value filter private boolean hasValueFilter = false; - public Expression getGlobalTableModelTimePredicate() { - return this.globalTableModelTimePredicate; - } - - public void setGlobalTableModelTimePredicate(Expression globalTableModelTimePredicate) { - this.globalTableModelTimePredicate = globalTableModelTimePredicate; - } - public DataPartition getDataPartition() { return dataPartition; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java index cc8ff98032a..033a35e17f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java @@ -255,7 +255,6 @@ public class QueryPlanner { queryContext.setGlobalTimeFilter( globalTimePredicate.accept(new ConvertPredicateToTimeFilterVisitor(), null)); } - analysis.setGlobalTableModelTimePredicate(globalTimePredicate); boolean hasValueFilter = resultPair.right; if (!hasValueFilter) { return planBuilder; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java index 2d9cf1a2b01..d77211641c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java @@ -27,13 +27,11 @@ import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; -import org.apache.iotdb.db.queryengine.plan.planner.plan.TableModelTimePredicate; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query; import org.apache.tsfile.utils.Pair; @@ -96,12 +94,10 @@ public class TableModelQueryFragmentPlanner { } private void produceFragmentInstance(PlanFragment fragment) { - Expression globalTimePredicate = analysis.getGlobalTableModelTimePredicate(); FragmentInstance fragmentInstance = new FragmentInstance( fragment, fragment.getId().genFragmentInstanceId(), - globalTimePredicate == null ? null : new TableModelTimePredicate(globalTimePredicate), QueryType.READ, queryContext.getTimeOut(), queryContext.getSession(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java index a20decfe57c..80b96c48ea5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; public class TableScanNode extends SourceNode { @@ -56,6 +57,13 @@ public class TableScanNode extends SourceNode { // The default order is TIMESTAMP_ASC, which means "order by timestamp asc" private Ordering scanOrder = Ordering.ASC; + // extracted time filter expression in where clause + // case 1: where s1 > 1 and time >= 0 and time <= 10, time predicate will be time >= 0 and time <= + // 10, pushDownPredicate will be s1 > 1 + // case 2: where s1 > 1 or time < 10, time predicate will be null, pushDownPredicate will be s1 > + // 1 or time < 10 + @Nullable private Expression timePredicate; + // push down predicate for current series, could be null if it doesn't exist @Nullable private Expression pushDownPredicate; @@ -363,4 +371,8 @@ public class TableScanNode extends SourceNode { public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) { this.regionReplicaSet = regionReplicaSet; } + + public Optional<Expression> getTimePredicate() { + return Optional.ofNullable(timePredicate); + } }
