This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-1022-v2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7771a59d5319dc240c60e6672ce476b4f413640c Author: SteveYurongSu <[email protected]> AuthorDate: Mon May 17 21:48:04 2021 +0800 fix udf tests --- .../iotdb/db/qp/logical/crud/SelectOperator.java | 12 +- .../db/qp/physical/crud/AlignByDevicePlan.java | 6 + .../iotdb/db/qp/physical/crud/LastQueryPlan.java | 24 ++++ .../iotdb/db/qp/physical/crud/QueryPlan.java | 14 +++ .../db/qp/physical/crud/RawDataQueryPlan.java | 67 +++++++++++ .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 70 ++++++++++-- .../iotdb/db/qp/strategy/LogicalChecker.java | 16 +++ .../iotdb/db/qp/strategy/PhysicalGenerator.java | 125 ++------------------- .../iotdb/db/query/expression/Expression.java | 4 + .../iotdb/db/query/expression/ResultColumn.java | 8 ++ .../query/expression/binary/BinaryExpression.java | 7 ++ .../query/expression/unary/FunctionExpression.java | 8 ++ .../db/query/expression/unary/MinusExpression.java | 6 + .../query/expression/unary/TimeSeriesOperand.java | 6 + .../org/apache/iotdb/db/service/TSServiceImpl.java | 9 +- .../db/integration/IoTDBUDTFHybridQueryIT.java | 3 +- 16 files changed, 250 insertions(+), 135 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java index 12d51ab..14d2c24 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectOperator.java @@ -96,14 +96,10 @@ public final class SelectOperator extends Operator { pathsCache = new ArrayList<>(); for (ResultColumn resultColumn : resultColumns) { Expression expression = resultColumn.getExpression(); - if (expression instanceof TimeSeriesOperand) { - pathsCache.add(((TimeSeriesOperand) resultColumn.getExpression()).getPath()); - } else { - TimeSeriesOperand timeSeriesOperand = - (TimeSeriesOperand) - ((FunctionExpression) resultColumn.getExpression()).getExpressions().get(0); - pathsCache.add(timeSeriesOperand.getPath()); - } + pathsCache.add( + expression instanceof TimeSeriesOperand + ? ((TimeSeriesOperand) expression).getPath() + : null); } } return pathsCache; diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java index be6ed98..3f79a19 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.crud; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; +import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.IExpression; @@ -53,6 +54,11 @@ public class AlignByDevicePlan extends QueryPlan { super(); } + @Override + public void deduplicate(PhysicalGenerator physicalGenerator) { + // do nothing + } + public void setMeasurements(List<String> measurements) { this.measurements = measurements; } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java index 56fdd1c..553feeb 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/LastQueryPlan.java @@ -19,14 +19,22 @@ package org.apache.iotdb.db.qp.physical.crud; +import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; +import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.expression.IExpression; import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGt; import org.apache.iotdb.tsfile.read.filter.TimeFilter.TimeGtEq; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import java.util.HashSet; +import java.util.Set; + public class LastQueryPlan extends RawDataQueryPlan { public LastQueryPlan() { @@ -35,6 +43,22 @@ public class LastQueryPlan extends RawDataQueryPlan { } @Override + public void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException { + Set<String> columnForReaderSet = new HashSet<>(); + for (int i = 0; i < paths.size(); i++) { + PartialPath path = paths.get(i); + String column = getColumnForReaderFromPath(path, i); + if (!columnForReaderSet.contains(column)) { + TSDataType seriesType = dataTypes.get(i); + addDeduplicatedPaths(path); + addDeduplicatedDataTypes(seriesType); + columnForReaderSet.add(column); + } + } + transformPaths(IoTDB.metaManager); + } + + @Override public void setExpression(IExpression expression) throws QueryProcessException { if (isValidExpression(expression)) { super.setExpression(expression); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java index 2de90a5..e2db5a2 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java @@ -19,10 +19,13 @@ package org.apache.iotdb.db.qp.physical.crud; import org.apache.iotdb.db.exception.metadata.IllegalPathException; +import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; +import org.apache.iotdb.db.query.expression.ResultColumn; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import java.util.HashMap; @@ -31,6 +34,7 @@ import java.util.Map; public abstract class QueryPlan extends PhysicalPlan { + protected List<ResultColumn> resultColumns = null; protected List<PartialPath> paths = null; protected List<TSDataType> dataTypes = null; private boolean alignByTime = true; // for disable align sql @@ -55,6 +59,8 @@ public abstract class QueryPlan extends PhysicalPlan { super(isQuery, operatorType); } + public abstract void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException; + @Override public List<PartialPath> getPaths() { return paths; @@ -150,4 +156,12 @@ public abstract class QueryPlan extends PhysicalPlan { public void setVectorPathToIndex(Map<String, Integer> vectorPathToIndex) { this.vectorPathToIndex = vectorPathToIndex; } + + public List<ResultColumn> getResultColumns() { + return resultColumns; + } + + public void setResultColumns(List<ResultColumn> resultColumns) { + this.resultColumns = resultColumns; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java index 6dc4d60..74b6b25 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java @@ -24,12 +24,16 @@ import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.VectorPartialPath; import org.apache.iotdb.db.qp.logical.Operator; +import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; +import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.expression.IExpression; +import org.apache.iotdb.tsfile.utils.Pair; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -54,6 +58,49 @@ public class RawDataQueryPlan extends QueryPlan { super(isQuery, operatorType); } + @Override + public void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException { + // sort paths by device, to accelerate the metadata read process + List<Pair<PartialPath, Integer>> indexedPaths = new ArrayList<>(); + for (int i = 0; i < paths.size(); i++) { + indexedPaths.add(new Pair<>(paths.get(i), i)); + } + indexedPaths.sort(Comparator.comparing(pair -> pair.left)); + + Map<String, Integer> pathNameToReaderIndex = new HashMap<>(); + Set<String> columnForReaderSet = new HashSet<>(); + Set<String> columnForDisplaySet = new HashSet<>(); + + for (Pair<PartialPath, Integer> indexedPath : indexedPaths) { + PartialPath originalPath = indexedPath.left; + Integer originalIndex = indexedPath.right; + + String columnForReader = getColumnForReaderFromPath(originalPath, originalIndex); + if (!columnForReaderSet.contains(columnForReader)) { + addDeduplicatedPaths(originalPath); + addDeduplicatedDataTypes(dataTypes.get(originalIndex)); + pathNameToReaderIndex.put(columnForReader, pathNameToReaderIndex.size()); + if (this instanceof AggregationPlan) { + ((AggregationPlan) this) + .addDeduplicatedAggregations(getAggregations().get(originalIndex)); + } + columnForReaderSet.add(columnForReader); + } + + String columnForDisplay = getColumnForDisplay(columnForReader, originalIndex); + if (!columnForDisplaySet.contains(columnForDisplay)) { + addPathToIndex(columnForDisplay, getPathToIndex().size()); + columnForDisplaySet.add(columnForDisplay); + } + } + + if (!isRawQuery()) { + transformPaths(IoTDB.metaManager); + } else { + transformVectorPaths(physicalGenerator, columnForDisplaySet); + } + } + public IExpression getExpression() { return expression; } @@ -132,6 +179,26 @@ public class RawDataQueryPlan extends QueryPlan { } } + public void transformVectorPaths( + PhysicalGenerator physicalGenerator, Set<String> columnForDisplaySet) + throws MetadataException { + Pair<List<PartialPath>, Map<String, Integer>> pair = + physicalGenerator.getSeriesSchema(getDeduplicatedPaths()); + + List<PartialPath> vectorizedDeduplicatedPaths = pair.left; + List<TSDataType> vectorizedDeduplicatedDataTypes = + new ArrayList<>(physicalGenerator.getSeriesTypes(vectorizedDeduplicatedPaths)); + setDeduplicatedVectorPaths(vectorizedDeduplicatedPaths); + setDeduplicatedVectorDataTypes(vectorizedDeduplicatedDataTypes); + + Map<String, Integer> columnForDisplayToQueryDataSetIndex = pair.right; + Map<String, Integer> pathToIndex = new HashMap<>(); + for (String columnForDisplay : columnForDisplaySet) { + pathToIndex.put(columnForDisplay, columnForDisplayToQueryDataSetIndex.get(columnForDisplay)); + } + setVectorPathToIndex(pathToIndex); + } + public List<PartialPath> getDeduplicatedVectorPaths() { return deduplicatedVectorPaths; } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java index 6eecb73..390bded 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java @@ -19,20 +19,30 @@ package org.apache.iotdb.db.qp.physical.crud; +import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; +import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; +import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.expression.ResultColumn; import org.apache.iotdb.db.query.expression.unary.FunctionExpression; +import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand; import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor; import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager; import org.apache.iotdb.db.query.udf.service.UDFRegistrationService; +import org.apache.iotdb.db.service.IoTDB; +import org.apache.iotdb.tsfile.utils.Pair; import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; public class UDTFPlan extends RawDataQueryPlan implements UDFPlan { @@ -53,15 +63,62 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan { } @Override + public void deduplicate(PhysicalGenerator physicalGenerator) throws MetadataException { + // sort paths by device, to accelerate the metadata read process + List<Pair<PartialPath, Integer>> indexedPaths = new ArrayList<>(); + for (int i = 0; i < resultColumns.size(); i++) { + for (PartialPath path : resultColumns.get(i).collectPaths()) { + indexedPaths.add(new Pair<>(path, i)); + } + } + indexedPaths.sort(Comparator.comparing(pair -> pair.left)); + + Map<String, Integer> pathNameToReaderIndex = new HashMap<>(); + Set<String> columnForReaderSet = new HashSet<>(); + Set<String> columnForDisplaySet = new HashSet<>(); + + for (Pair<PartialPath, Integer> indexedPath : indexedPaths) { + PartialPath originalPath = indexedPath.left; + Integer originalIndex = indexedPath.right; + + boolean isUdf = + !(resultColumns.get(originalIndex).getExpression() instanceof TimeSeriesOperand); + + String columnForReader = getColumnForReaderFromPath(originalPath, originalIndex); + if (!columnForReaderSet.contains(columnForReader)) { + addDeduplicatedPaths(originalPath); + addDeduplicatedDataTypes( + isUdf ? IoTDB.metaManager.getSeriesType(originalPath) : dataTypes.get(originalIndex)); + pathNameToReaderIndex.put(columnForReader, pathNameToReaderIndex.size()); + columnForReaderSet.add(columnForReader); + } + + String columnForDisplay = getColumnForDisplay(columnForReader, originalIndex); + if (!columnForDisplaySet.contains(columnForDisplay)) { + addPathToIndex(columnForDisplay, getPathToIndex().size()); + if (isUdf) { + addUdfOutputColumn(columnForDisplay); + } else { + addRawQueryOutputColumn(columnForDisplay); + } + columnForDisplaySet.add(columnForDisplay); + } + } + + setPathNameToReaderIndex(pathNameToReaderIndex); + } + + @Override public void constructUdfExecutors(List<ResultColumn> resultColumns) { for (int i = 0; i < resultColumns.size(); ++i) { - FunctionExpression expression = (FunctionExpression) resultColumns.get(i).getExpression(); - if (expression == null) { + Expression expression = resultColumns.get(i).getExpression(); + if (!(expression instanceof FunctionExpression)) { continue; } String columnName = expression.toString(); - columnName2Executor.computeIfAbsent(columnName, k -> new UDTFExecutor(expression, zoneId)); + columnName2Executor.computeIfAbsent( + columnName, k -> new UDTFExecutor((FunctionExpression) expression, zoneId)); originalOutputColumnIndex2Executor.put(i, columnName2Executor.get(columnName)); } } @@ -131,10 +188,9 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan { @Override public String getColumnForDisplay(String columnForReader, int pathIndex) { - if (paths.get(pathIndex) == null) { - return this.getExecutorByOriginalOutputColumnIndex(pathIndex).getExpression().toString(); - } - return columnForReader; + return !(resultColumns.get(pathIndex).getExpression() instanceof TimeSeriesOperand) + ? getExecutorByOriginalOutputColumnIndex(pathIndex).getExpression().toString() + : columnForReader; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java index cec031c..f55fe7c 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java @@ -47,16 +47,32 @@ public class LogicalChecker { private static void checkSelectOperator(QueryOperator queryOperator) throws LogicalOperatorException { + checkLast(queryOperator); checkAggregation(queryOperator); checkAlignByDevice(queryOperator); } + private static void checkLast(QueryOperator queryOperator) throws LogicalOperatorException { + SelectOperator selectOperator = queryOperator.getSelectOperator(); + if (!selectOperator.isLastQuery()) { + return; + } + + for (ResultColumn resultColumn : selectOperator.getResultColumns()) { + Expression expression = resultColumn.getExpression(); + if (!(expression instanceof TimeSeriesOperand)) { + throw new LogicalOperatorException("Last queries can only be applied on raw time series."); + } + } + } + private static void checkAggregation(QueryOperator queryOperator) throws LogicalOperatorException { SelectOperator selectOperator = queryOperator.getSelectOperator(); if (!selectOperator.hasAggregationFunction()) { return; } + for (ResultColumn resultColumn : selectOperator.getResultColumns()) { Expression expression = resultColumn.getExpression(); if (expression instanceof TimeSeriesOperand) { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index 71dc3da..9ea5d3a 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -122,7 +122,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan; import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan; import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan; import org.apache.iotdb.db.qp.physical.sys.TracingPlan; -import org.apache.iotdb.db.query.expression.unary.FunctionExpression; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -131,7 +130,6 @@ import org.apache.iotdb.tsfile.utils.Pair; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -451,7 +449,7 @@ public class PhysicalGenerator { } } - protected List<TSDataType> getSeriesTypes(List<PartialPath> paths) throws MetadataException { + public List<TSDataType> getSeriesTypes(List<PartialPath> paths) throws MetadataException { return SchemaUtils.getSeriesTypesByPaths(paths); } @@ -592,8 +590,15 @@ public class PhysicalGenerator { } return queryPlan; } + + queryPlan.setResultColumns(queryOperator.getSelectOperator().getResultColumns()); + try { - deduplicate(queryPlan); + List<PartialPath> paths = queryPlan.getPaths(); + List<TSDataType> dataTypes = getSeriesTypes(paths); + queryPlan.setDataTypes(dataTypes); + + queryPlan.deduplicate(this); } catch (MetadataException e) { throw new QueryProcessException(e); } @@ -847,117 +852,7 @@ public class PhysicalGenerator { basicOperator.setSinglePath(concatPath); } - @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - private void deduplicate(QueryPlan queryPlan) throws MetadataException { - // generate dataType first - List<PartialPath> paths = queryPlan.getPaths(); - List<TSDataType> dataTypes = getSeriesTypes(paths); - queryPlan.setDataTypes(dataTypes); - - // deduplicate from here - if (queryPlan instanceof AlignByDevicePlan) { - return; - } - - RawDataQueryPlan rawDataQueryPlan = (RawDataQueryPlan) queryPlan; - Set<String> columnForReaderSet = new HashSet<>(); - // if it's a last query, no need to sort by device - if (queryPlan instanceof LastQueryPlan) { - for (int i = 0; i < paths.size(); i++) { - PartialPath path = paths.get(i); - String column = queryPlan.getColumnForReaderFromPath(path, i); - if (!columnForReaderSet.contains(column)) { - TSDataType seriesType = dataTypes.get(i); - rawDataQueryPlan.addDeduplicatedPaths(path); - rawDataQueryPlan.addDeduplicatedDataTypes(seriesType); - columnForReaderSet.add(column); - } - } - ((LastQueryPlan) queryPlan).transformPaths(IoTDB.metaManager); - return; - } - - // sort path by device - List<Pair<PartialPath, Integer>> indexedPaths = new ArrayList<>(); - for (int i = 0; i < paths.size(); i++) { - PartialPath path = paths.get(i); - if (path != null) { // non-udf - indexedPaths.add(new Pair<>(paths.get(i), i)); - } else { // udf - FunctionExpression functionExpression = - (FunctionExpression) - ((UDTFPlan) queryPlan).getExecutorByOriginalOutputColumnIndex(i).getExpression(); - for (PartialPath udfPath : functionExpression.getPaths()) { - indexedPaths.add(new Pair<>(udfPath, i)); - } - } - } - indexedPaths.sort(Comparator.comparing(pair -> pair.left)); - - Map<String, Integer> pathNameToReaderIndex = new HashMap<>(); - Set<String> columnForDisplaySet = new HashSet<>(); - for (Pair<PartialPath, Integer> indexedPath : indexedPaths) { - PartialPath originalPath = indexedPath.left; - Integer originalIndex = indexedPath.right; - - String columnForReader = queryPlan.getColumnForReaderFromPath(originalPath, originalIndex); - boolean isUdf = queryPlan instanceof UDTFPlan && paths.get(originalIndex) == null; - - if (!columnForReaderSet.contains(columnForReader)) { - rawDataQueryPlan.addDeduplicatedPaths(originalPath); - rawDataQueryPlan.addDeduplicatedDataTypes( - isUdf ? IoTDB.metaManager.getSeriesType(originalPath) : dataTypes.get(originalIndex)); - pathNameToReaderIndex.put(columnForReader, pathNameToReaderIndex.size()); - if (queryPlan instanceof AggregationPlan) { - ((AggregationPlan) queryPlan) - .addDeduplicatedAggregations(queryPlan.getAggregations().get(originalIndex)); - } - columnForReaderSet.add(columnForReader); - } - - String columnForDisplay = queryPlan.getColumnForDisplay(columnForReader, originalIndex); - - if (!columnForDisplaySet.contains(columnForDisplay)) { - queryPlan.addPathToIndex(columnForDisplay, queryPlan.getPathToIndex().size()); - if (queryPlan instanceof UDTFPlan) { - if (isUdf) { - ((UDTFPlan) queryPlan).addUdfOutputColumn(columnForDisplay); - } else { - ((UDTFPlan) queryPlan).addRawQueryOutputColumn(columnForDisplay); - } - } - columnForDisplaySet.add(columnForDisplay); - } - } - if (queryPlan instanceof UDTFPlan) { - ((UDTFPlan) queryPlan).setPathNameToReaderIndex(pathNameToReaderIndex); - return; - } - - if (!rawDataQueryPlan.isRawQuery()) { - rawDataQueryPlan.transformPaths(IoTDB.metaManager); - } else { - // support vector - List<PartialPath> deduplicatedPaths = rawDataQueryPlan.getDeduplicatedPaths(); - Pair<List<PartialPath>, Map<String, Integer>> pair = getSeriesSchema(deduplicatedPaths); - - List<PartialPath> vectorizedDeduplicatedPaths = pair.left; - List<TSDataType> vectorizedDeduplicatedDataTypes = - new ArrayList<>(getSeriesTypes(vectorizedDeduplicatedPaths)); - rawDataQueryPlan.setDeduplicatedVectorPaths(vectorizedDeduplicatedPaths); - rawDataQueryPlan.setDeduplicatedVectorDataTypes(vectorizedDeduplicatedDataTypes); - - Map<String, Integer> columnForDisplayToQueryDataSetIndex = pair.right; - Map<String, Integer> pathToIndex = new HashMap<>(); - for (String columnForDisplay : columnForDisplaySet) { - pathToIndex.put( - columnForDisplay, columnForDisplayToQueryDataSetIndex.get(columnForDisplay)); - } - queryPlan.setVectorPathToIndex(pathToIndex); - } - } - - protected Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchema(List<PartialPath> paths) + public Pair<List<PartialPath>, Map<String, Integer>> getSeriesSchema(List<PartialPath> paths) throws MetadataException { return IoTDB.metaManager.getSeriesSchemas(paths); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java index 94f888e..3429787 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.qp.utils.WildcardsRemover; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import java.util.List; +import java.util.Set; public abstract class Expression { @@ -40,6 +41,7 @@ public abstract class Expression { return isTimeSeriesGeneratingFunctionExpression; } + // TODO: implement this method public abstract TSDataType dataType() throws MetadataException; public abstract void concat(List<PartialPath> prefixPaths, List<Expression> resultExpressions); @@ -47,4 +49,6 @@ public abstract class Expression { public abstract void removeWildcards( WildcardsRemover wildcardsRemover, List<Expression> resultExpressions) throws LogicalOptimizeException; + + public abstract void collectPaths(Set<PartialPath> pathSet); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java index dd4a149..93c2d27 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java @@ -24,7 +24,9 @@ import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.utils.WildcardsRemover; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; public class ResultColumn { @@ -58,6 +60,12 @@ public class ResultColumn { } } + public Set<PartialPath> collectPaths() { + Set<PartialPath> pathSet = new HashSet<>(); + expression.collectPaths(pathSet); + return pathSet; + } + public Expression getExpression() { return expression; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java index 6f83c60..3091007 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java @@ -27,6 +27,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import java.util.ArrayList; import java.util.List; +import java.util.Set; public abstract class BinaryExpression extends Expression { @@ -102,6 +103,12 @@ public abstract class BinaryExpression extends Expression { } @Override + public void collectPaths(Set<PartialPath> pathSet) { + leftExpression.collectPaths(pathSet); + rightExpression.collectPaths(pathSet); + } + + @Override public final String toString() { return String.format( "%s %s %s", leftExpression.toString(), operator(), rightExpression.toString()); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java index de337a0..b05a6ec 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java @@ -35,6 +35,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; public class FunctionExpression extends Expression { @@ -129,6 +130,13 @@ public class FunctionExpression extends Expression { } } + @Override + public void collectPaths(Set<PartialPath> pathSet) { + for (Expression expression : expressions) { + expression.collectPaths(pathSet); + } + } + public List<TSDataType> getDataTypes() throws MetadataException { if (dataTypes == null) { dataTypes = new ArrayList<>(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java index 9ad761d..00518fb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/MinusExpression.java @@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import java.util.ArrayList; import java.util.List; +import java.util.Set; public class MinusExpression extends Expression { @@ -66,6 +67,11 @@ public class MinusExpression extends Expression { } @Override + public void collectPaths(Set<PartialPath> pathSet) { + expression.collectPaths(pathSet); + } + + @Override public String toString() { return "-" + expression.toString(); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java index d2ba984..3886abc 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import java.util.List; +import java.util.Set; public class TimeSeriesOperand extends Expression { @@ -74,6 +75,11 @@ public class TimeSeriesOperand extends Expression { } @Override + public void collectPaths(Set<PartialPath> pathSet) { + pathSet.add(path); + } + + @Override public String toString() { return path.toString(); } diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 0637aa2..2e7b8c9 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -82,6 +82,8 @@ import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet; import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet; import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet; import org.apache.iotdb.db.query.dataset.UDTFDataSet; +import org.apache.iotdb.db.query.expression.ResultColumn; +import org.apache.iotdb.db.query.expression.unary.FunctionExpression; import org.apache.iotdb.db.tools.watermark.GroupedLSBWatermarkEncoder; import org.apache.iotdb.db.tools.watermark.WatermarkEncoder; import org.apache.iotdb.db.utils.QueryDataSetUtils; @@ -992,8 +994,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { private void getWideQueryHeaders( QueryPlan plan, List<String> respColumns, List<String> columnTypes) throws TException, MetadataException { - // Restore column header of aggregate to func(column_name), only - // support single aggregate function for now + List<ResultColumn> resultColumns = plan.getResultColumns(); List<PartialPath> paths = plan.getPaths(); List<TSDataType> seriesTypes = new ArrayList<>(); switch (plan.getOperatorType()) { @@ -1040,11 +1041,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { UDTFPlan udtfPlan = (UDTFPlan) plan; for (int i = 0; i < paths.size(); i++) { respColumns.add( - paths.get(i) != null + !(resultColumns.get(i).getExpression() instanceof FunctionExpression) ? paths.get(i).getFullPath() : udtfPlan.getExecutorByOriginalOutputColumnIndex(i).getExpression().toString()); seriesTypes.add( - paths.get(i) != null + !(resultColumns.get(i).getExpression() instanceof FunctionExpression) ? udtfPlan.getDataTypes().get(i) : udtfPlan .getExecutorByOriginalOutputColumnIndex(i) diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFHybridQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFHybridQueryIT.java index 4ab33dd..0710b24 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFHybridQueryIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFHybridQueryIT.java @@ -155,7 +155,8 @@ public class IoTDBUDTFHybridQueryIT { statement.executeQuery(sql); fail(); } catch (SQLException throwable) { - assertTrue(throwable.getMessage().contains("parsing SQL to physical plan")); + assertTrue( + throwable.getMessage().contains("Last queries can only be applied on raw time series.")); } }
