This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch vectorByAlima in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ec155f2ba783bfe9d043dd620569a07034018345 Author: Alima777 <[email protected]> AuthorDate: Tue Mar 16 17:37:20 2021 +0800 optimize the structure of duduplicate() --- .../iotdb/db/qp/physical/crud/AggregationPlan.java | 7 ++ .../iotdb/db/qp/physical/crud/FillQueryPlan.java | 8 ++ .../iotdb/db/qp/physical/crud/QueryPlan.java | 3 +- .../iotdb/db/qp/strategy/PhysicalGenerator.java | 35 +------ .../db/query/control/QueryResourceManager.java | 8 +- .../org/apache/iotdb/db/service/TSServiceImpl.java | 105 +++++++++++---------- 6 files changed, 81 insertions(+), 85 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java index 95e12da..b91b6db 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java @@ -102,4 +102,11 @@ public class AggregationPlan extends RawDataQueryPlan { } return levelAggPaths; } + + public void setAlignByTime(boolean align) throws QueryProcessException { + if (!align) { + throw new QueryProcessException( + getOperatorType().name() + " doesn't support disable align clause."); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java index a952e7b..5bb95dd 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/FillQueryPlan.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.qp.physical.crud; +import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.query.executor.fill.IFill; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -49,4 +50,11 @@ public class FillQueryPlan extends RawDataQueryPlan { public void setFillType(Map<TSDataType, IFill> fillType) { this.fillType = fillType; } + + public void setAlignByTime(boolean align) throws QueryProcessException { + if (!align) { + throw new QueryProcessException( + getOperatorType().name() + " doesn't support disable align clause."); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java index eeccb57..5dd756a 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.qp.physical.crud; +import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; @@ -91,7 +92,7 @@ public abstract class QueryPlan extends PhysicalPlan { return alignByTime; } - public void setAlignByTime(boolean align) { + public void setAlignByTime(boolean align) throws QueryProcessException { alignByTime = align; } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index 8332a88..1431ca0 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@ -22,7 +22,6 @@ import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.LogicalOperatorException; import org.apache.iotdb.db.exception.query.LogicalOptimizeException; -import org.apache.iotdb.db.exception.query.PathNumOverLimitException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.runtime.SQLParserException; import org.apache.iotdb.db.metadata.PartialPath; @@ -123,7 +122,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTriggersPlan; import org.apache.iotdb.db.qp.physical.sys.StartTriggerPlan; import org.apache.iotdb.db.qp.physical.sys.StopTriggerPlan; import org.apache.iotdb.db.qp.physical.sys.TracingPlan; -import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.udf.core.context.UDFContext; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.FilePathUtils; @@ -515,7 +513,7 @@ public class PhysicalGenerator { @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning private PhysicalPlan transformQuery(QueryOperator queryOperator, int fetchSize) throws QueryProcessException { - QueryPlan queryPlan = null; + QueryPlan queryPlan; if (queryOperator.hasAggregation()) { queryPlan = new AggPhysicalPlanRule().transform(queryOperator, fetchSize); @@ -570,7 +568,7 @@ public class PhysicalGenerator { return queryPlan; } try { - deduplicate(queryPlan, fetchSize); + deduplicate(queryPlan); } catch (MetadataException e) { throw new QueryProcessException(e); } @@ -738,13 +736,6 @@ public class PhysicalGenerator { measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset); } - int maxDeduplicatedPathNum = - QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize); - - if (measurements.size() > maxDeduplicatedPathNum) { - throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size()); - } - // assigns to alignByDevicePlan alignByDevicePlan.setMeasurements(measurements); alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap); @@ -833,8 +824,7 @@ public class PhysicalGenerator { } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning - private void deduplicate(QueryPlan queryPlan, int fetchSize) - throws MetadataException, PathNumOverLimitException { + private void deduplicate(QueryPlan queryPlan) throws MetadataException { // generate dataType first List<PartialPath> paths = queryPlan.getPaths(); List<TSDataType> dataTypes = getSeriesTypes(paths); @@ -845,19 +835,6 @@ public class PhysicalGenerator { return; } - if (queryPlan instanceof GroupByTimePlan) { - GroupByTimePlan plan = (GroupByTimePlan) queryPlan; - // the actual row number of group by query should be calculated from startTime, endTime and - // interval. - long interval = (plan.getEndTime() - plan.getStartTime()) / plan.getInterval(); - if (interval > 0) { - fetchSize = Math.min((int) (interval), fetchSize); - } - } else if (queryPlan instanceof AggregationPlan) { - // the actual row number of aggregation query is 1 - fetchSize = 1; - } - RawDataQueryPlan rawDataQueryPlan = (RawDataQueryPlan) queryPlan; Set<String> columnForReaderSet = new HashSet<>(); // if it's a last query, no need to sort by device @@ -896,11 +873,8 @@ public class PhysicalGenerator { } indexedPaths.sort(Comparator.comparing(pair -> pair.left)); - int maxDeduplicatedPathNum = - QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize); Map<String, Integer> pathNameToReaderIndex = new HashMap<>(); Set<String> columnForDisplaySet = new HashSet<>(); - for (Pair<PartialPath, Integer> indexedPath : indexedPaths) { PartialPath originalPath = indexedPath.left; Integer originalIndex = indexedPath.right; @@ -929,9 +903,6 @@ public class PhysicalGenerator { .addDeduplicatedAggregations(queryPlan.getAggregations().get(originalIndex)); } columnForReaderSet.add(columnForReader); - if (maxDeduplicatedPathNum < columnForReaderSet.size()) { - throw new PathNumOverLimitException(maxDeduplicatedPathNum, columnForReaderSet.size()); - } } String columnForDisplay = diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java index c766f34..7c6aaa7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.query.PathNumOverLimitException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.query.context.QueryContext; @@ -104,7 +105,12 @@ public class QueryResourceManager { } /** Register a new query. When a query request is created firstly, this method must be invoked. */ - public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) { + public long assignQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) + throws PathNumOverLimitException { + int maxDeduplicatedPathNum = getMaxDeduplicatedPathNum(fetchSize); + if (deduplicatedPathNum >= maxDeduplicatedPathNum) { + throw new PathNumOverLimitException(maxDeduplicatedPathNum, deduplicatedPathNum); + } long queryId = queryIdAtom.incrementAndGet(); if (isDataQuery) { filePathsManager.addQueryId(queryId); diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 02f0621..a928c4b 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; +import org.apache.iotdb.db.exception.query.PathNumOverLimitException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException; import org.apache.iotdb.db.exception.runtime.SQLParserException; @@ -129,6 +130,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; +import org.apache.iotdb.tsfile.utils.Pair; import org.antlr.v4.runtime.misc.ParseCancellationException; import org.apache.thrift.TException; @@ -617,45 +619,11 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { long queryId = -1; try { - // In case users forget to set this field in query, use the default value - fetchSize = fetchSize == 0 ? DEFAULT_FETCH_SIZE : fetchSize; - - if (plan instanceof QueryPlan && !((QueryPlan) plan).isAlignByTime()) { - OperatorType operatorType = plan.getOperatorType(); - if (operatorType == OperatorType.AGGREGATION - || operatorType == OperatorType.FILL - || operatorType == OperatorType.GROUPBYTIME) { - throw new QueryProcessException( - operatorType.name() + " doesn't support disable align clause."); - } - } - if (plan.getOperatorType() == OperatorType.AGGREGATION) { - // the actual row number of aggregation query is 1 - fetchSize = 1; - } - - if (plan instanceof GroupByTimePlan) { - fetchSize = Math.min(getFetchSizeForGroupByTimePlan((GroupByTimePlan) plan), fetchSize); - } - - // get deduplicated path num - int deduplicatedPathNum = -1; - if (plan instanceof AlignByDevicePlan) { - deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size(); - } else if (plan instanceof LastQueryPlan) { - // dataset of last query consists of three column: time column + value column = 1 - // deduplicatedPathNum - // and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum - deduplicatedPathNum = 2; - // last query's actual row number should be the minimum between the number of series and - // fetchSize - fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize); - } else if (plan instanceof RawDataQueryPlan) { - deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size(); - } - + // pair.left = fetchSize, pair.right = deduplicatedNum + Pair<Integer, Integer> p = getMemoryParametersFromPhysicalPlan(plan, fetchSize); + fetchSize = p.left; // generate the queryId for the operation - queryId = generateQueryId(true, fetchSize, deduplicatedPathNum); + queryId = generateQueryId(true, fetchSize, p.right); // register query info to queryTimeManager if (!(plan instanceof ShowQueryProcesslistPlan)) { queryTimeManager.registerQuery(queryId, startTime, statement, timeout); @@ -744,20 +712,49 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } } + /** + * get fetchSize and deduplicatedPathNum that are used for memory estimation + * + * @return Pair<fetchSize, deduplicatedPathNum> + */ + private Pair<Integer, Integer> getMemoryParametersFromPhysicalPlan( + PhysicalPlan plan, int fetchSizeBefore) { + // In case users forget to set this field in query, use the default value + int fetchSize = fetchSizeBefore == 0 ? DEFAULT_FETCH_SIZE : fetchSizeBefore; + int deduplicatedPathNum = -1; + if (plan instanceof GroupByTimePlan) { + fetchSize = Math.min(getFetchSizeForGroupByTimePlan((GroupByTimePlan) plan), fetchSize); + } else if (plan.getOperatorType() == OperatorType.AGGREGATION) { + // the actual row number of aggregation query is 1 + fetchSize = 1; + } + if (plan instanceof AlignByDevicePlan) { + deduplicatedPathNum = ((AlignByDevicePlan) plan).getMeasurements().size(); + } else if (plan instanceof LastQueryPlan) { + // dataset of last query consists of three column: time column + value column = 1 + // deduplicatedPathNum + // and we assume that the memory which sensor name takes equals to 1 deduplicatedPathNum + deduplicatedPathNum = 2; + // last query's actual row number should be the minimum between the number of series and + // fetchSize + fetchSize = Math.min(((LastQueryPlan) plan).getDeduplicatedPaths().size(), fetchSize); + } else if (plan instanceof RawDataQueryPlan) { + deduplicatedPathNum = ((RawDataQueryPlan) plan).getDeduplicatedPaths().size(); + } + return new Pair<>(fetchSize, deduplicatedPathNum); + } + /* calculate fetch size for group by time plan */ - private int getFetchSizeForGroupByTimePlan(GroupByTimePlan groupByTimePlan) { - int rows = - (int) - ((groupByTimePlan.getEndTime() - groupByTimePlan.getStartTime()) - / groupByTimePlan.getInterval()); + private int getFetchSizeForGroupByTimePlan(GroupByTimePlan plan) { + int rows = (int) ((plan.getEndTime() - plan.getStartTime()) / plan.getInterval()); // rows gets 0 is caused by: the end time - the start time < the time interval. - if (rows == 0 && groupByTimePlan.isIntervalByMonth()) { + if (rows == 0 && plan.isIntervalByMonth()) { Calendar calendar = Calendar.getInstance(); - calendar.setTimeInMillis(groupByTimePlan.getStartTime()); - calendar.add(Calendar.MONTH, (int) (groupByTimePlan.getInterval() / MS_TO_MONTH)); - rows = calendar.getTimeInMillis() <= groupByTimePlan.getEndTime() ? 1 : 0; + calendar.setTimeInMillis(plan.getStartTime()); + calendar.add(Calendar.MONTH, (int) (plan.getInterval() / MS_TO_MONTH)); + rows = calendar.getTimeInMillis() <= plan.getEndTime() ? 1 : 0; } return rows; } @@ -1058,7 +1055,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { } } - private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan, long sessionId) { + private TSExecuteStatementResp executeUpdateStatement(PhysicalPlan plan, long sessionId) + throws QueryProcessException { TSStatus status = checkAuthority(plan, sessionId); if (status != null) { return new TSExecuteStatementResp(status); @@ -1858,9 +1856,14 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR); } - private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) { - return QueryResourceManager.getInstance() - .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum); + private long generateQueryId(boolean isDataQuery, int fetchSize, int deduplicatedPathNum) + throws QueryProcessException { + try { + return QueryResourceManager.getInstance() + .assignQueryId(isDataQuery, fetchSize, deduplicatedPathNum); + } catch (PathNumOverLimitException e) { + throw new QueryProcessException(e); + } } protected List<TSDataType> getSeriesTypesByPaths(
