This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch optimizeDDP in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b90b203dbccde29b808e8f6b74064adcf486162b Merge: e4a953b 5540272 Author: Alima777 <[email protected]> AuthorDate: Tue Mar 16 19:13:02 2021 +0800 merge master and fix conflict docs/SystemDesign/StorageEngine/WAL.md | 6 +- docs/zh/SystemDesign/StorageEngine/WAL.md | 7 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../db/engine/merge/manage/MergeResource.java | 27 ++ .../iotdb/db/engine/merge/task/MergeFileTask.java | 26 +- .../db/engine/merge/task/MergeMultiChunkTask.java | 4 + .../db/engine/storagegroup/TsFileResource.java | 10 + .../storagegroup/timeindex/DeviceTimeIndex.java | 12 + .../storagegroup/timeindex/FileTimeIndex.java | 12 + .../engine/storagegroup/timeindex/ITimeIndex.java | 16 + .../org/apache/iotdb/db/metadata/mnode/MNode.java | 31 ++ .../iotdb/db/qp/strategy/PhysicalGenerator.java | 391 +++++++++++---------- .../iotdb/db/engine/merge/MergeTaskTest.java | 55 +++ .../apache/iotdb/db/metadata/mnode/MNodeTest.java | 27 ++ 14 files changed, 432 insertions(+), 194 deletions(-) diff --cc server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java index 93f2af2,8332a88..c99f5dc --- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/PhysicalGenerator.java @@@ -723,6 -582,188 +581,181 @@@ public class PhysicalGenerator return queryPlan; } + @SuppressWarnings("squid:S3777") // Suppress high Cognitive Complexity warning + private QueryPlan getAlignQueryPlan( + QueryOperator queryOperator, int fetchSize, QueryPlan queryPlan) + throws QueryProcessException { + // below is the core realization of ALIGN_BY_DEVICE sql logic + AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan(); + if (queryPlan instanceof GroupByTimePlan) { + alignByDevicePlan.setGroupByTimePlan((GroupByTimePlan) queryPlan); + } else if (queryPlan instanceof FillQueryPlan) { + alignByDevicePlan.setFillQueryPlan((FillQueryPlan) queryPlan); + } else if (queryPlan instanceof AggregationPlan) { + if (((AggregationPlan) queryPlan).getLevel() >= 0) { + throw new QueryProcessException("group by level does not support align by device now."); + } + alignByDevicePlan.setAggregationPlan((AggregationPlan) queryPlan); + } + + List<PartialPath> prefixPaths = queryOperator.getFromOperator().getPrefixPaths(); + // remove stars in fromPaths and get deviceId with deduplication + List<PartialPath> devices = this.removeStarsInDeviceWithUnique(prefixPaths); + List<PartialPath> suffixPaths = queryOperator.getSelectOperator().getSuffixPaths(); + List<String> originAggregations = queryOperator.getSelectOperator().getAggregations(); + + // to record result measurement columns + List<String> measurements = new ArrayList<>(); + Map<String, String> measurementAliasMap = new HashMap<>(); + // to check the same measurement of different devices having the same datatype + // record the data type of each column of result set + Map<String, TSDataType> columnDataTypeMap = new HashMap<>(); + Map<String, MeasurementType> measurementTypeMap = new HashMap<>(); + + // to record the real type of the corresponding measurement + Map<String, TSDataType> measurementDataTypeMap = new HashMap<>(); + List<PartialPath> paths = new ArrayList<>(); + + for (int i = 0; i < suffixPaths.size(); i++) { // per suffix in SELECT + PartialPath suffixPath = suffixPaths.get(i); + + // to record measurements in the loop of a suffix path + Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>(); + + // if const measurement + if (suffixPath.getMeasurement().startsWith("'")) { + measurements.add(suffixPath.getMeasurement()); + measurementTypeMap.put(suffixPath.getMeasurement(), MeasurementType.Constant); + continue; + } + + for (PartialPath device : devices) { // per device in FROM after deduplication + + PartialPath fullPath = device.concatPath(suffixPath); + try { + // remove stars in SELECT to get actual paths + List<PartialPath> actualPaths = getMatchedTimeseries(fullPath); + if (suffixPath.isTsAliasExists()) { + if (actualPaths.size() == 1) { + String columnName = actualPaths.get(0).getMeasurement(); + if (originAggregations != null && !originAggregations.isEmpty()) { + measurementAliasMap.put( + originAggregations.get(i) + "(" + columnName + ")", suffixPath.getTsAlias()); + } else { + measurementAliasMap.put(columnName, suffixPath.getTsAlias()); + } + } else if (actualPaths.size() >= 2) { + throw new QueryProcessException( + "alias '" + + suffixPath.getTsAlias() + + "' can only be matched with one time series"); + } + } + + // for actual non exist path + if (originAggregations != null && actualPaths.isEmpty() && originAggregations.isEmpty()) { + String nonExistMeasurement = fullPath.getMeasurement(); + if (measurementSetOfGivenSuffix.add(nonExistMeasurement) + && measurementTypeMap.get(nonExistMeasurement) != MeasurementType.Exist) { + measurementTypeMap.put(fullPath.getMeasurement(), MeasurementType.NonExist); + } + } + + // Get data types with and without aggregate functions (actual time series) respectively + // Data type with aggregation function `columnDataTypes` is used for: + // 1. Data type consistency check 2. Header calculation, output result set + // The actual data type of the time series `measurementDataTypes` is used for + // the actual query in the AlignByDeviceDataSet + String aggregation = + originAggregations != null && !originAggregations.isEmpty() + ? originAggregations.get(i) + : null; + + Pair<List<TSDataType>, List<TSDataType>> pair = getSeriesTypes(actualPaths, aggregation); + List<TSDataType> columnDataTypes = pair.left; + List<TSDataType> measurementDataTypes = pair.right; + for (int pathIdx = 0; pathIdx < actualPaths.size(); pathIdx++) { + PartialPath path = new PartialPath(actualPaths.get(pathIdx).getNodes()); + + // check datatype consistency + // a example of inconsistency: select s0 from root.sg1.d1, root.sg1.d2 align by + // device, + // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT. + String measurementChecked; + if (originAggregations != null && !originAggregations.isEmpty()) { + measurementChecked = originAggregations.get(i) + "(" + path.getMeasurement() + ")"; + } else { + measurementChecked = path.getMeasurement(); + } + TSDataType columnDataType = columnDataTypes.get(pathIdx); + if (columnDataTypeMap.containsKey(measurementChecked)) { + if (!columnDataType.equals(columnDataTypeMap.get(measurementChecked))) { + throw new QueryProcessException( + "The data types of the same measurement column should be the same across " + + "devices in ALIGN_BY_DEVICE sql. For more details please refer to the " + + "SQL document."); + } + } else { + columnDataTypeMap.put(measurementChecked, columnDataType); + measurementDataTypeMap.put(measurementChecked, measurementDataTypes.get(pathIdx)); + } + + // This step indicates that the measurement exists under the device and is correct, + // First, update measurementSetOfGivenSuffix which is distinct + // Then if this measurement is recognized as NonExist before,update it to Exist + if (measurementSetOfGivenSuffix.add(measurementChecked) + || measurementTypeMap.get(measurementChecked) != MeasurementType.Exist) { + measurementTypeMap.put(measurementChecked, MeasurementType.Exist); + } + + // update paths + paths.add(path); + } + + } catch (MetadataException e) { + throw new LogicalOptimizeException( + String.format( + "Error when getting all paths of a full path: %s", fullPath.getFullPath()) + + e.getMessage()); + } + } + + // update measurements + // Note that in the loop of a suffix path, set is used. + // And across the loops of suffix paths, list is used. + // e.g. select *,s1 from root.sg.d0, root.sg.d1 + // for suffix *, measurementSetOfGivenSuffix = {s1,s2,s3} + // for suffix s1, measurementSetOfGivenSuffix = {s1} + // therefore the final measurements is [s1,s2,s3,s1]. + measurements.addAll(measurementSetOfGivenSuffix); + } + + // slimit trim on the measurementColumnList + if (queryOperator.hasSlimit()) { + int seriesSlimit = queryOperator.getSeriesLimit(); + int seriesOffset = queryOperator.getSeriesOffset(); + measurements = slimitTrimColumn(measurements, seriesSlimit, seriesOffset); + } + - int maxDeduplicatedPathNum = - QueryResourceManager.getInstance().getMaxDeduplicatedPathNum(fetchSize); - - if (measurements.size() > maxDeduplicatedPathNum) { - throw new PathNumOverLimitException(maxDeduplicatedPathNum, measurements.size()); - } - + // assigns to alignByDevicePlan + alignByDevicePlan.setMeasurements(measurements); + alignByDevicePlan.setMeasurementAliasMap(measurementAliasMap); + alignByDevicePlan.setDevices(devices); + alignByDevicePlan.setColumnDataTypeMap(columnDataTypeMap); + alignByDevicePlan.setMeasurementTypeMap(measurementTypeMap); + alignByDevicePlan.setMeasurementDataTypeMap(measurementDataTypeMap); + alignByDevicePlan.setPaths(paths); + + // get deviceToFilterMap + FilterOperator filterOperator = queryOperator.getFilterOperator(); + if (filterOperator != null) { + alignByDevicePlan.setDeviceToFilterMap(concatFilterByDevice(devices, filterOperator)); + } + + queryPlan = alignByDevicePlan; + return queryPlan; + } + // e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to // [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10, // root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]
