This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch mergeLastQueryScanNodeOfSameDevice-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a5a3120e4baea55058d6e964b43b2c1233a0f7cc Author: shuwenwei <[email protected]> AuthorDate: Wed Jul 2 17:51:26 2025 +0800 merge LastQueryScanNode of same device --- .../db/queryengine/common/MPPQueryContext.java | 29 +++ .../execution/MemoryEstimationHelper.java | 28 +++ .../execution/fragment/DataNodeQueryContext.java | 21 +- .../last/AbstractUpdateLastCacheOperator.java | 25 ++- .../last/AlignedUpdateLastCacheOperator.java | 11 +- .../AlignedUpdateViewPathLastCacheOperator.java | 6 +- .../operator/process/last/LastQueryOperator.java | 1 + .../process/last/LastQuerySortOperator.java | 10 +- .../process/last/UpdateLastCacheOperator.java | 28 ++- .../db/queryengine/plan/analyze/Analysis.java | 33 ++- .../queryengine/plan/analyze/AnalyzeVisitor.java | 72 ++++-- .../queryengine/plan/analyze/ExpressionUtils.java | 14 +- .../memory/StatementMemorySourceVisitor.java | 3 +- .../plan/planner/LogicalPlanBuilder.java | 150 +++++-------- .../plan/planner/OperatorTreeGenerator.java | 246 ++++++++++----------- .../plan/planner/SubPlanTypeExtractor.java | 6 - .../planner/distribution/ExchangeNodeAdder.java | 15 +- .../planner/distribution/NodeGroupContext.java | 12 + .../SimpleFragmentParallelPlanner.java | 12 +- .../plan/planner/distribution/SourceRewriter.java | 152 ++++++++++--- .../plan/planner/plan/node/PlanGraphPrinter.java | 18 +- .../plan/planner/plan/node/PlanNodeType.java | 14 +- .../plan/planner/plan/node/PlanVisitor.java | 5 - .../plan/node/process/MultiChildProcessNode.java | 4 + .../plan/node/process/last/LastQueryNode.java | 141 ++++++++++-- .../plan/node/source/AlignedLastQueryScanNode.java | 245 -------------------- .../plan/node/source/LastQueryScanNode.java | 193 +++++++++++++--- .../execution/operator/LastQueryOperatorTest.java | 6 +- .../operator/LastQuerySortOperatorTest.java | 6 +- .../plan/planner/distribution/LastQueryTest.java | 22 +- .../logical/DataQueryLogicalPlannerTest.java | 86 +++---- .../node/source/LastQueryScanNodeSerdeTest.java | 70 ++++++ .../iotdb/commons/partition/DataPartition.java | 15 +- .../apache/iotdb/commons/partition/Partition.java | 2 +- .../org/apache/iotdb/commons/path/PartialPath.java | 4 + 35 files changed, 982 insertions(+), 723 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 1621327f9f7..63970762de7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -81,6 +81,13 @@ public class MPPQueryContext { // constructing some Expression and PlanNode. private final MemoryReservationManager memoryReservationManager; + private static final int minSizeToUseSampledTimeseriesOperandMemCost = 100; + private double avgTimeseriesOperandMemCost = 0; + private int numsOfSampledTimeseriesOperand = 0; + // When there is no view in a last query and no device exists in multiple regions, + // the updateScanNum process in distributed planning can be skipped. + private boolean needUpdateScanNumForLastQuery = false; + private boolean userQuery = false; public MPPQueryContext(QueryId queryId) { @@ -331,8 +338,30 @@ public class MPPQueryContext { this.memoryReservationManager.releaseMemoryCumulatively(bytes); } + public boolean useSampledAvgTimeseriesOperandMemCost() { + return numsOfSampledTimeseriesOperand >= minSizeToUseSampledTimeseriesOperandMemCost; + } + + public long getAvgTimeseriesOperandMemCost() { + return (long) avgTimeseriesOperandMemCost; + } + + public void calculateAvgTimeseriesOperandMemCost(long current) { + numsOfSampledTimeseriesOperand++; + avgTimeseriesOperandMemCost += + (current - avgTimeseriesOperandMemCost) / numsOfSampledTimeseriesOperand; + } + // endregion + public boolean needUpdateScanNumForLastQuery() { + return needUpdateScanNumForLastQuery; + } + + public void setNeedUpdateScanNumForLastQuery(boolean needUpdateScanNumForLastQuery) { + this.needUpdateScanNumForLastQuery = needUpdateScanNumForLastQuery; + } + public boolean isUserQuery() { return userQuery; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java index a18e2dbc58b..9da6b85e9f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java @@ -28,7 +28,9 @@ import org.apache.tsfile.utils.RamUsageEstimator; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; public class MemoryEstimationHelper { @@ -41,6 +43,11 @@ public class MemoryEstimationHelper { private static final long MEASUREMENT_PATH_INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(AlignedPath.class); + private static final long ARRAY_LIST_INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ArrayList.class); + private static final long INTEGER_INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(Integer.class); + private MemoryEstimationHelper() { // hide the constructor } @@ -86,4 +93,25 @@ public class MemoryEstimationHelper { } return totalSize; } + + // This method should only be called if the content in the current PartialPath comes from other + // structures whose memory cost have already been calculated. + public static long getEstimatedSizeOfCopiedPartialPath(@Nullable final PartialPath partialPath) { + if (partialPath == null) { + return 0; + } + return PARTIAL_PATH_INSTANCE_SIZE + RamUsageEstimator.shallowSizeOf(partialPath.getNodes()); + } + + public static long getEstimatedSizeOfIntegerArrayList(List<Integer> integerArrayList) { + if (integerArrayList == null) { + return 0L; + } + long size = ARRAY_LIST_INSTANCE_SIZE; + size += + (long) RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + + (long) integerArrayList.size() * (long) RamUsageEstimator.NUM_BYTES_OBJECT_REF; + size += INTEGER_INSTANCE_SIZE * integerArrayList.size(); + return RamUsageEstimator.alignObjectSize(size); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java index 881ff9dc8a2..ffa3ead32e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DataNodeQueryContext.java @@ -26,8 +26,8 @@ import org.apache.tsfile.utils.Pair; import javax.annotation.concurrent.GuardedBy; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -43,7 +43,7 @@ public class DataNodeQueryContext { private final ReentrantLock lock = new ReentrantLock(); public DataNodeQueryContext(int dataNodeFINum) { - this.uncachedPathToSeriesScanInfo = new HashMap<>(); + this.uncachedPathToSeriesScanInfo = new ConcurrentHashMap<>(); this.dataNodeFINum = new AtomicInteger(dataNodeFINum); } @@ -59,15 +59,24 @@ public class DataNodeQueryContext { return uncachedPathToSeriesScanInfo.get(path); } + public Map<PartialPath, Pair<AtomicInteger, TimeValuePair>> getUncachedPathToSeriesScanInfo() { + return uncachedPathToSeriesScanInfo; + } + public int decreaseDataNodeFINum() { return dataNodeFINum.decrementAndGet(); } - public void lock() { - lock.lock(); + public void lock(boolean isDeviceInMultiRegion) { + // When a device exists in only one region, there will be no intermediate state. + if (isDeviceInMultiRegion) { + lock.lock(); + } } - public void unLock() { - lock.unlock(); + public void unLock(boolean isDeviceInMultiRegion) { + if (isDeviceInMultiRegion) { + lock.unlock(); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java index b4ce8943ea9..30d305bcec7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java @@ -61,12 +61,15 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator protected String databaseName; + protected boolean deviceInMultiRegion; + protected AbstractUpdateLastCacheOperator( - OperatorContext operatorContext, - Operator child, - DataNodeSchemaCache dataNodeSchemaCache, - boolean needUpdateCache, - boolean needUpdateNullEntry) { + final OperatorContext operatorContext, + final Operator child, + final DataNodeSchemaCache dataNodeSchemaCache, + final boolean needUpdateCache, + final boolean needUpdateNullEntry, + final boolean deviceInMultiRegion) { this.operatorContext = operatorContext; this.child = child; this.lastCache = dataNodeSchemaCache; @@ -75,6 +78,7 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(1); this.dataNodeQueryContext = operatorContext.getDriverContext().getFragmentInstanceContext().getDataNodeQueryContext(); + this.deviceInMultiRegion = deviceInMultiRegion; } @Override @@ -103,8 +107,8 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator return; } try { - dataNodeQueryContext.lock(); - Pair<AtomicInteger, TimeValuePair> seriesScanInfo = + dataNodeQueryContext.lock(deviceInMultiRegion); + final Pair<AtomicInteger, TimeValuePair> seriesScanInfo = dataNodeQueryContext.getSeriesScanInfo(fullPath); // may enter this case when use TTL @@ -112,6 +116,11 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator return; } + if (!deviceInMultiRegion) { + lastCache.updateLastCache( + getDatabaseName(), fullPath, new TimeValuePair(time, value), false, Long.MIN_VALUE); + return; + } // update cache in DataNodeQueryContext if (seriesScanInfo.right == null || time > seriesScanInfo.right.getTimestamp()) { seriesScanInfo.right = new TimeValuePair(time, value); @@ -122,7 +131,7 @@ public abstract class AbstractUpdateLastCacheOperator implements ProcessOperator getDatabaseName(), fullPath, seriesScanInfo.right, false, Long.MIN_VALUE); } } finally { - dataNodeQueryContext.unLock(); + dataNodeQueryContext.unLock(deviceInMultiRegion); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateLastCacheOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateLastCacheOperator.java index c3f5fff6f78..85c3134c429 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateLastCacheOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateLastCacheOperator.java @@ -47,8 +47,15 @@ public class AlignedUpdateLastCacheOperator extends AbstractUpdateLastCacheOpera AlignedPath seriesPath, DataNodeSchemaCache dataNodeSchemaCache, boolean needUpdateCache, - boolean needUpdateNullEntry) { - super(operatorContext, child, dataNodeSchemaCache, needUpdateCache, needUpdateNullEntry); + boolean needUpdateNullEntry, + boolean deviceInMultiRegion) { + super( + operatorContext, + child, + dataNodeSchemaCache, + needUpdateCache, + needUpdateNullEntry, + deviceInMultiRegion); this.seriesPath = seriesPath; this.devicePath = seriesPath.getDevicePath(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java index 9a8a309b2ec..3fc9f0412bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java @@ -41,14 +41,16 @@ public class AlignedUpdateViewPathLastCacheOperator extends AlignedUpdateLastCac DataNodeSchemaCache dataNodeSchemaCache, boolean needUpdateCache, boolean needUpdateNullEntry, - String outputViewPath) { + String outputViewPath, + boolean deviceInMultiRegion) { super( operatorContext, child, seriesPath, dataNodeSchemaCache, needUpdateCache, - needUpdateNullEntry); + needUpdateNullEntry, + deviceInMultiRegion); checkArgument(seriesPath.getMeasurementList().size() == 1); this.outputViewPath = outputViewPath; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java index 9f50dcb6e7d..dd41bbd7afa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQueryOperator.java @@ -116,6 +116,7 @@ public class LastQueryOperator implements ProcessOperator { return null; } else if (!tsBlock.isEmpty()) { LastQueryUtil.appendLastValue(tsBlockBuilder, tsBlock); + return null; } } else { children.get(currentIndex).close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java index 0d102073802..e40cb1ad131 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/LastQuerySortOperator.java @@ -153,7 +153,8 @@ public class LastQuerySortOperator implements ProcessOperator { while (keepGoing(start, maxRuntime, endIndex)) { - if (prepareData()) { + prepareData(); + if (previousTsBlock == null) { return null; } @@ -179,21 +180,18 @@ public class LastQuerySortOperator implements ProcessOperator { && !tsBlockBuilder.isFull(); } - private boolean prepareData() throws Exception { + private void prepareData() throws Exception { if (previousTsBlock == null || previousTsBlock.getPositionCount() <= previousTsBlockIndex) { if (children.get(currentIndex).hasNextWithTimer()) { previousTsBlock = children.get(currentIndex).nextWithTimer(); previousTsBlockIndex = 0; - if (previousTsBlock == null) { - return true; - } + return; } else { children.get(currentIndex).close(); children.set(currentIndex, null); } currentIndex++; } - return false; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java index d49e89c19ef..d55f1d9fd4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java @@ -53,7 +53,33 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator { DataNodeSchemaCache dataNodeSchemaCache, boolean needUpdateCache, boolean isNeedUpdateNullEntry) { - super(operatorContext, child, dataNodeSchemaCache, needUpdateCache, isNeedUpdateNullEntry); + this( + operatorContext, + child, + fullPath, + dataType, + dataNodeSchemaCache, + needUpdateCache, + isNeedUpdateNullEntry, + true); + } + + public UpdateLastCacheOperator( + OperatorContext operatorContext, + Operator child, + MeasurementPath fullPath, + TSDataType dataType, + DataNodeSchemaCache dataNodeSchemaCache, + boolean needUpdateCache, + boolean isNeedUpdateNullEntry, + boolean deviceInMultiRegion) { + super( + operatorContext, + child, + dataNodeSchemaCache, + needUpdateCache, + isNeedUpdateNullEntry, + deviceInMultiRegion); this.fullPath = fullPath; this.dataType = dataType.name(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java index 93e386b9bd5..c3f31fa8a3b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java @@ -111,6 +111,10 @@ public class Analysis implements IAnalysis { // map from device name to series/aggregation under this device private Set<Expression> sourceExpressions; + // In order to perform some optimization, when the source expression is + // not used later, nothing will be placed in this structure. + private boolean shouldHaveSourceExpression; + // input expressions of aggregations to be calculated private Set<Expression> sourceTransformExpressions = new HashSet<>(); @@ -231,7 +235,9 @@ public class Analysis implements IAnalysis { // Key: non-writable view expression, Value: corresponding source expressions private Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap; - private Set<Expression> lastQueryBaseExpressions; + private Map<String, Map<String, Expression>> lastQueryOutputPathToSourceExpressionMap; + + private Set<String> deviceExistViewSet; // header of result dataset private DatasetHeader respDatasetHeader; @@ -610,6 +616,14 @@ public class Analysis implements IAnalysis { this.sourceExpressions = sourceExpressions; } + public void setShouldHaveSourceExpression(boolean shouldHaveSourceExpression) { + this.shouldHaveSourceExpression = shouldHaveSourceExpression; + } + + public boolean shouldHaveSourceExpression() { + return shouldHaveSourceExpression; + } + public Set<Expression> getSourceTransformExpressions() { return sourceTransformExpressions; } @@ -875,12 +889,21 @@ public class Analysis implements IAnalysis { this.timeseriesOrderingForLastQuery = timeseriesOrderingForLastQuery; } - public Set<Expression> getLastQueryBaseExpressions() { - return this.lastQueryBaseExpressions; + public Map<String, Map<String, Expression>> getLastQueryOutputPathToSourceExpressionMap() { + return lastQueryOutputPathToSourceExpressionMap; + } + + public void setLastQueryOutputPathToSourceExpressionMap( + Map<String, Map<String, Expression>> lastQueryOutputPathToSourceExpressionMap) { + this.lastQueryOutputPathToSourceExpressionMap = lastQueryOutputPathToSourceExpressionMap; + } + + public Set<String> getDeviceExistViewSet() { + return deviceExistViewSet; } - public void setLastQueryBaseExpressions(Set<Expression> lastQueryBaseExpressions) { - this.lastQueryBaseExpressions = lastQueryBaseExpressions; + public void setDeviceExistViewSet(Set<String> deviceExistViewSet) { + this.deviceExistViewSet = deviceExistViewSet; } public Map<Expression, List<Expression>> getLastQueryNonWritableViewSourceExpressionMap() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index b4f163838d9..7f22e4bd145 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -186,6 +186,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.TreeMap; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -601,48 +602,82 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) { selectExpressions.add(resultColumn.getExpression()); } - analyzeLastSource(analysis, selectExpressions, schemaTree, context); - analysis.setRespDatasetHeader(DatasetHeaderFactory.getLastQueryHeader()); - // fetch partition information - analyzeDataPartition(analysis, queryStatement, schemaTree, context); - - return analysis; + return analyzeLastSourceAndDataPartition(analysis, selectExpressions, schemaTree, context); } - private void analyzeLastSource( + private Analysis analyzeLastSourceAndDataPartition( Analysis analysis, List<Expression> selectExpressions, ISchemaTree schemaTree, MPPQueryContext context) { - Set<Expression> sourceExpressions = new LinkedHashSet<>(); - Set<Expression> lastQueryBaseExpressions = new LinkedHashSet<>(); + + // For fetch data partition + Set<String> allDeviceSet = new HashSet<>(); + + // For LogicalPlan + Set<String> deviceExistViewSet = new HashSet<>(); + Map<String, Map<String, Expression>> outputPathToSourceExpressionMap = new LinkedHashMap<>(); Map<Expression, List<Expression>> lastQueryNonWritableViewSourceExpressionMap = null; + Ordering timeseriesOrdering = analysis.getTimeseriesOrderingForLastQuery(); + for (Expression selectExpression : selectExpressions) { for (Expression lastQuerySourceExpression : bindSchemaForExpression(selectExpression, schemaTree, context)) { if (lastQuerySourceExpression instanceof TimeSeriesOperand) { - lastQueryBaseExpressions.add(lastQuerySourceExpression); - sourceExpressions.add(lastQuerySourceExpression); - } else { - if (lastQueryNonWritableViewSourceExpressionMap == null) { - lastQueryNonWritableViewSourceExpressionMap = new HashMap<>(); + TimeSeriesOperand timeSeriesOperand = (TimeSeriesOperand) lastQuerySourceExpression; + MeasurementPath outputPath = + (MeasurementPath) + (timeSeriesOperand.isViewExpression() + ? timeSeriesOperand.getViewPath() + : timeSeriesOperand.getPath()); + String outputDevice = + ExpressionAnalyzer.getDeviceNameInSourceExpression(timeSeriesOperand); + outputPathToSourceExpressionMap + .computeIfAbsent( + outputDevice, + k -> + timeseriesOrdering != null + ? new TreeMap<>(timeseriesOrdering.getStringComparator()) + : new LinkedHashMap<>()) + .put(outputPath.getMeasurement(), timeSeriesOperand); + + if (timeSeriesOperand.isViewExpression()) { + deviceExistViewSet.add(outputDevice); } + } else { + lastQueryNonWritableViewSourceExpressionMap = + lastQueryNonWritableViewSourceExpressionMap == null + ? new HashMap<>() + : lastQueryNonWritableViewSourceExpressionMap; List<Expression> sourceExpressionsOfNonWritableView = searchSourceExpressions(lastQuerySourceExpression); lastQueryNonWritableViewSourceExpressionMap.putIfAbsent( lastQuerySourceExpression, sourceExpressionsOfNonWritableView); - sourceExpressions.addAll(sourceExpressionsOfNonWritableView); + for (Expression expression : sourceExpressionsOfNonWritableView) { + allDeviceSet.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression)); + } } } } + if (allDeviceSet.isEmpty()) { + allDeviceSet = outputPathToSourceExpressionMap.keySet(); + } else { + allDeviceSet.addAll(outputPathToSourceExpressionMap.keySet()); + } - analysis.setSourceExpressions(sourceExpressions); - analysis.setLastQueryBaseExpressions(lastQueryBaseExpressions); + analysis.setShouldHaveSourceExpression(!allDeviceSet.isEmpty()); + analysis.setLastQueryOutputPathToSourceExpressionMap(outputPathToSourceExpressionMap); + analysis.setDeviceExistViewSet( + deviceExistViewSet.isEmpty() ? Collections.emptySet() : deviceExistViewSet); analysis.setLastQueryNonWritableViewSourceExpressionMap( lastQueryNonWritableViewSourceExpressionMap); + + DataPartition dataPartition = fetchDataPartitionByDevices(allDeviceSet, schemaTree, context); + analysis.setDataPartitionInfo(dataPartition); + return analysis; } private void updateSchemaTreeByViews( @@ -3185,13 +3220,12 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> updateSchemaTreeByViews(analysis, schemaTree, context); logger.debug("[EndFetchSchema]]"); - analyzeLastSource( + analyzeLastSourceAndDataPartition( analysis, Collections.singletonList( new TimeSeriesOperand(showTimeSeriesStatement.getPathPattern())), schemaTree, context); - analyzeDataPartition(analysis, new QueryStatement(), schemaTree, context); } analysis.setRespDatasetHeader(DatasetHeaderFactory.getShowTimeSeriesHeader()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java index fc9d3a0aa51..a27e323e1e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java @@ -69,9 +69,17 @@ public class ExpressionUtils { final MPPQueryContext queryContext) { List<Expression> resultExpressions = new ArrayList<>(); for (PartialPath actualPath : actualPaths) { - resultExpressions.add( - reserveMemoryForExpression( - queryContext, reconstructTimeSeriesOperand(rawExpression, actualPath))); + Expression expression = reconstructTimeSeriesOperand(rawExpression, actualPath); + long memCost; + if (queryContext.useSampledAvgTimeseriesOperandMemCost()) { + memCost = queryContext.getAvgTimeseriesOperandMemCost(); + } else { + memCost = expression.ramBytesUsed(); + queryContext.calculateAvgTimeseriesOperandMemCost(memCost); + } + queryContext.reserveMemoryForFrontEnd(memCost); + + resultExpressions.add(expression); } return resultExpressions; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/StatementMemorySourceVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/StatementMemorySourceVisitor.java index 7a37902dbb9..cb456d3a30e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/StatementMemorySourceVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/memory/StatementMemorySourceVisitor.java @@ -69,7 +69,8 @@ public class StatementMemorySourceVisitor } private boolean sourceNotExist(StatementMemorySourceContext context) { - return (context.getAnalysis().getSourceExpressions() == null + return !context.getAnalysis().shouldHaveSourceExpression() + && (context.getAnalysis().getSourceExpressions() == null || context.getAnalysis().getSourceExpressions().isEmpty()) && (context.getAnalysis().getDeviceToSourceExpressions() == null || context.getAnalysis().getDeviceToSourceExpressions().isEmpty()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 4c8eb7f1e39..91b29a9cf5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -81,10 +81,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.ShowQueriesNode; @@ -109,12 +107,12 @@ import org.apache.iotdb.db.utils.columngenerator.parameter.SlidingTimeColumnGene import org.apache.commons.lang3.Validate; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.schema.IMeasurementSchema; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -122,7 +120,6 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.stream.Collectors; import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; @@ -233,41 +230,24 @@ public class LogicalPlanBuilder { } public LogicalPlanBuilder planLast(Analysis analysis, Ordering timeseriesOrdering) { - Set<String> deviceAlignedSet = new HashSet<>(); - Set<String> deviceExistViewSet = new HashSet<>(); // <Device, <Measurement, Expression>> - Map<String, Map<String, Expression>> outputPathToSourceExpressionMap = new LinkedHashMap<>(); - - for (Expression sourceExpression : analysis.getLastQueryBaseExpressions()) { - MeasurementPath outputPath = - (MeasurementPath) - (sourceExpression.isViewExpression() - ? sourceExpression.getViewPath() - : ((TimeSeriesOperand) sourceExpression).getPath()); - String outputDevice = outputPath.getDevice(); - outputPathToSourceExpressionMap - .computeIfAbsent( - outputDevice, - k -> - timeseriesOrdering != null - ? new TreeMap<>(timeseriesOrdering.getStringComparator()) - : new LinkedHashMap<>()) - .put(outputPath.getMeasurement(), sourceExpression); - if (outputPath.isUnderAlignedEntity()) { - deviceAlignedSet.add(outputDevice); - } - if (sourceExpression.isViewExpression()) { - deviceExistViewSet.add(outputDevice); - } - } + Map<String, Map<String, Expression>> outputPathToSourceExpressionMap = + analysis.getLastQueryOutputPathToSourceExpressionMap(); + Set<String> deviceExistViewSet = analysis.getDeviceExistViewSet(); - List<PlanNode> sourceNodeList = new ArrayList<>(); + LastQueryNode lastQueryNode = + new LastQueryNode( + context.getQueryId().genPlanNodeId(), + timeseriesOrdering, + analysis.getLastQueryNonWritableViewSourceExpressionMap() != null); for (Map.Entry<String, Map<String, Expression>> deviceMeasurementExpressionEntry : outputPathToSourceExpressionMap.entrySet()) { - String outputDevice = deviceMeasurementExpressionEntry.getKey(); + String deviceId = deviceMeasurementExpressionEntry.getKey(); Map<String, Expression> measurementToExpressionsOfDevice = deviceMeasurementExpressionEntry.getValue(); - if (deviceExistViewSet.contains(outputDevice)) { + + boolean deviceExistView = deviceExistViewSet.contains(deviceId); + if (deviceExistView) { // exist view for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { MeasurementPath selectedPath = @@ -277,90 +257,64 @@ public class LogicalPlanBuilder { ? sourceExpression.getViewPath().getFullPath() : null; - if (selectedPath.isUnderAlignedEntity()) { // aligned series - sourceNodeList.add( - reserveMemoryForSeriesSourceNode( - new AlignedLastQueryScanNode( - context.getQueryId().genPlanNodeId(), - new AlignedPath(selectedPath), - outputViewPath))); - } else { // non-aligned series - sourceNodeList.add( - reserveMemoryForSeriesSourceNode( - new LastQueryScanNode( - context.getQueryId().genPlanNodeId(), selectedPath, outputViewPath))); - } + PartialPath devicePath = selectedPath.getDevicePath(); + devicePath.setIDeviceID(deviceId); + long memCost = + lastQueryNode.addDeviceLastQueryScanNode( + context.getQueryId().genPlanNodeId(), + devicePath, + selectedPath.isUnderAlignedEntity(), + Collections.singletonList(selectedPath.getMeasurementSchema()), + outputViewPath); + this.context.reserveMemoryForFrontEnd(memCost); } } else { - if (deviceAlignedSet.contains(outputDevice)) { - // aligned series - List<MeasurementPath> measurementPaths = - measurementToExpressionsOfDevice.values().stream() - .map(expression -> (MeasurementPath) ((TimeSeriesOperand) expression).getPath()) - .collect(Collectors.toList()); - AlignedPath alignedPath = new AlignedPath(measurementPaths.get(0).getDevicePath()); - for (MeasurementPath measurementPath : measurementPaths) { - alignedPath.addMeasurement(measurementPath); - } - sourceNodeList.add( - reserveMemoryForSeriesSourceNode( - new AlignedLastQueryScanNode( - context.getQueryId().genPlanNodeId(), alignedPath, null))); - } else { - // non-aligned series - for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { - MeasurementPath selectedPath = - (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath(); - sourceNodeList.add( - reserveMemoryForSeriesSourceNode( - new LastQueryScanNode( - context.getQueryId().genPlanNodeId(), selectedPath, null))); - } + boolean aligned = false; + List<IMeasurementSchema> measurementSchemas = + new ArrayList<>(measurementToExpressionsOfDevice.size()); + PartialPath devicePath = null; + for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { + MeasurementPath selectedPath = + (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath(); + aligned = selectedPath.isUnderAlignedEntity(); + devicePath = devicePath == null ? selectedPath.getDevicePath() : devicePath; + measurementSchemas.add(selectedPath.getMeasurementSchema()); } + // DeviceId is needed in the distribution plan stage + devicePath.setIDeviceID(deviceId); + long memCost = + lastQueryNode.addDeviceLastQueryScanNode( + context.getQueryId().genPlanNodeId(), + devicePath, + aligned, + measurementSchemas, + null); + this.context.reserveMemoryForFrontEnd(memCost); } } + this.context.reserveMemoryForFrontEnd(lastQueryNode.getMemorySizeOfSharedStructures()); - processLastQueryTransformNode(analysis, sourceNodeList); - - if (timeseriesOrdering != null) { - sourceNodeList.sort( - Comparator.comparing( - child -> { - String sortKey = ""; - if (child instanceof LastQueryScanNode) { - sortKey = ((LastQueryScanNode) child).getOutputSymbolForSort(); - } else if (child instanceof AlignedLastQueryScanNode) { - sortKey = ((AlignedLastQueryScanNode) child).getOutputSymbolForSort(); - } else if (child instanceof LastQueryTransformNode) { - sortKey = ((LastQueryTransformNode) child).getOutputSymbolForSort(); - } - return sortKey; - })); - if (timeseriesOrdering.equals(Ordering.DESC)) { - Collections.reverse(sourceNodeList); - } - } + processLastQueryTransformNode(analysis, lastQueryNode); - this.root = - new LastQueryNode( - context.getQueryId().genPlanNodeId(), - sourceNodeList, - timeseriesOrdering, - analysis.getLastQueryNonWritableViewSourceExpressionMap() != null); + lastQueryNode.sort(); + this.root = lastQueryNode; ColumnHeaderConstant.lastQueryColumnHeaders.forEach( columnHeader -> context .getTypeProvider() .setType(columnHeader.getColumnName(), columnHeader.getColumnType())); + // After planning is completed, this map is no longer needed + lastQueryNode.clearMeasurementSchema2IdxMap(); return this; } - private void processLastQueryTransformNode(Analysis analysis, List<PlanNode> sourceNodeList) { + private void processLastQueryTransformNode(Analysis analysis, LastQueryNode lastQueryNode) { if (analysis.getLastQueryNonWritableViewSourceExpressionMap() == null) { return; } + context.setNeedUpdateScanNumForLastQuery(true); for (Map.Entry<Expression, List<Expression>> entry : analysis.getLastQueryNonWritableViewSourceExpressionMap().entrySet()) { @@ -398,7 +352,7 @@ public class LogicalPlanBuilder { planBuilder.getRoot(), expression.getViewPath().getFullPath(), analysis.getType(expression).toString()); - sourceNodeList.add(transformNode); + lastQueryNode.addChild(transformNode); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 30d778c079d..706b22e2efc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -221,7 +221,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; @@ -278,6 +277,7 @@ import org.apache.tsfile.read.filter.operator.TimeFilterOperators.TimeGtEq; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TimeDuration; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -2754,129 +2754,86 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP ((SchemaDriverContext) (context.getDriverContext())).getSchemaRegion()); } - @Override - public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) { - PartialPath seriesPath = node.getSeriesPath().transformToPartialPath(); - TimeValuePair timeValuePair = null; - context.dataNodeQueryContext.lock(); - try { - if (!context.dataNodeQueryContext.unCached(seriesPath)) { - timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(seriesPath); - if (timeValuePair == null) { - context.dataNodeQueryContext.addUnCachePath(seriesPath, node.getDataNodeSeriesScanNum()); - } - } - } finally { - context.dataNodeQueryContext.unLock(); - } - - if (timeValuePair == null) { // last value is not cached - return createUpdateLastCacheOperator(node, context, node.getSeriesPath()); - } else if (timeValuePair.getValue() == null) { // there is no data for this time series - return null; - } else if (!LastQueryUtil.satisfyFilter( - updateFilterUsingTTL( - context.getGlobalTimeFilter(), - DataNodeTTLCache.getInstance().getTTL(seriesPath.getDevicePath().getNodes())), - timeValuePair)) { // cached last value is not satisfied - - if (!isFilterGtOrGe(context.getGlobalTimeFilter())) { - // time filter is not > or >=, we still need to read from disk - return createUpdateLastCacheOperator(node, context, node.getSeriesPath()); - } else { // otherwise, we just ignore it and return null - return null; - } - } else { // cached last value is satisfied, put it into LastCacheScanOperator - context.addCachedLastValue(timeValuePair, node.outputPathSymbol()); - return null; - } - } - - private boolean isFilterGtOrGe(Filter filter) { + public static boolean isFilterGtOrGe(Filter filter) { return filter instanceof TimeGt || filter instanceof TimeGtEq; } private UpdateLastCacheOperator createUpdateLastCacheOperator( - LastQueryScanNode node, LocalExecutionPlanContext context, MeasurementPath fullPath) { - SeriesAggregationScanOperator lastQueryScan = createLastQueryScanOperator(node, context); - if (node.getOutputViewPath() == null) { - OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - UpdateLastCacheOperator.class.getSimpleName()); - return new UpdateLastCacheOperator( - operatorContext, - lastQueryScan, - fullPath, - node.getSeriesPath().getSeriesType(), - DATA_NODE_SCHEMA_CACHE, - context.isNeedUpdateLastCache(), - context.isNeedUpdateNullEntry()); - } else { - OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - UpdateViewPathLastCacheOperator.class.getSimpleName()); - return new UpdateViewPathLastCacheOperator( - operatorContext, - lastQueryScan, - fullPath, - node.getSeriesPath().getSeriesType(), - DATA_NODE_SCHEMA_CACHE, - context.isNeedUpdateLastCache(), - context.isNeedUpdateNullEntry(), - node.getOutputViewPath()); - } + final LastQueryScanNode node, final LocalExecutionPlanContext context, final int idx) { + IMeasurementSchema measurementSchema = node.getMeasurementSchema(idx); + final SeriesAggregationScanOperator lastQueryScan = + createLastQueryScanOperator(node, context, measurementSchema); + MeasurementPath fullPath = new MeasurementPath(node.getDevicePath(), measurementSchema); + final OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + UpdateLastCacheOperator.class.getSimpleName()); + + return Objects.isNull(node.getOutputViewPath()) + ? new UpdateLastCacheOperator( + operatorContext, + lastQueryScan, + fullPath, + measurementSchema.getType(), + DATA_NODE_SCHEMA_CACHE, + context.isNeedUpdateLastCache(), + context.isNeedUpdateNullEntry()) + : new UpdateViewPathLastCacheOperator( + operatorContext, + lastQueryScan, + fullPath, + measurementSchema.getType(), + DATA_NODE_SCHEMA_CACHE, + context.isNeedUpdateLastCache(), + context.isNeedUpdateNullEntry(), + node.getOutputViewPath()); } private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator( - AlignedLastQueryScanNode node, AlignedPath unCachedPath, LocalExecutionPlanContext context) { - AlignedSeriesAggregationScanOperator lastQueryScan = - createLastQueryScanOperator(node, unCachedPath, context); + final String outputViewPath, + final PlanNodeId planNodeId, + final AlignedPath unCachedPath, + final LocalExecutionPlanContext context, + final boolean deviceInMultiRegion) { + final AlignedSeriesAggregationScanOperator lastQueryScan = + createLastQueryScanOperator(planNodeId, unCachedPath, context); - if (node.getOutputViewPath() == null) { - OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - AlignedUpdateLastCacheOperator.class.getSimpleName()); - return new AlignedUpdateLastCacheOperator( - operatorContext, - lastQueryScan, - unCachedPath, - DATA_NODE_SCHEMA_CACHE, - context.isNeedUpdateLastCache(), - context.isNeedUpdateNullEntry()); - } else { - OperatorContext operatorContext = - context - .getDriverContext() - .addOperatorContext( - context.getNextOperatorId(), - node.getPlanNodeId(), - AlignedUpdateViewPathLastCacheOperator.class.getSimpleName()); - return new AlignedUpdateViewPathLastCacheOperator( - operatorContext, - lastQueryScan, - unCachedPath, - DATA_NODE_SCHEMA_CACHE, - context.isNeedUpdateLastCache(), - context.isNeedUpdateNullEntry(), - node.getOutputViewPath()); - } + final OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + planNodeId, + AlignedUpdateLastCacheOperator.class.getSimpleName()); + + return Objects.isNull(outputViewPath) + ? new AlignedUpdateLastCacheOperator( + operatorContext, + lastQueryScan, + unCachedPath, + DATA_NODE_SCHEMA_CACHE, + context.isNeedUpdateLastCache(), + context.isNeedUpdateNullEntry(), + deviceInMultiRegion) + : new AlignedUpdateViewPathLastCacheOperator( + operatorContext, + lastQueryScan, + unCachedPath, + DATA_NODE_SCHEMA_CACHE, + context.isNeedUpdateLastCache(), + context.isNeedUpdateNullEntry(), + outputViewPath, + deviceInMultiRegion); } private SeriesAggregationScanOperator createLastQueryScanOperator( - LastQueryScanNode node, LocalExecutionPlanContext context) { - MeasurementPath seriesPath = node.getSeriesPath(); + LastQueryScanNode node, + LocalExecutionPlanContext context, + IMeasurementSchema measurementSchema) { + MeasurementPath seriesPath = new MeasurementPath(node.getDevicePath(), measurementSchema); OperatorContext operatorContext = context .getDriverContext() @@ -2915,7 +2872,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } private AlignedSeriesAggregationScanOperator createLastQueryScanOperator( - AlignedLastQueryScanNode node, AlignedPath unCachedPath, LocalExecutionPlanContext context) { + PlanNodeId planNodeId, AlignedPath unCachedPath, LocalExecutionPlanContext context) { // last_time, last_value List<Aggregator> aggregators = new ArrayList<>(); boolean canUseStatistics = true; @@ -2939,11 +2896,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP .getDriverContext() .addOperatorContext( context.getNextOperatorId(), - node.getPlanNodeId(), + planNodeId, AlignedSeriesAggregationScanOperator.class.getSimpleName()); AlignedSeriesAggregationScanOperator seriesAggregationScanOperator = new AlignedSeriesAggregationScanOperator( - node.getPlanNodeId(), + planNodeId, unCachedPath, Ordering.DESC, scanOptionsBuilder.build(), @@ -2960,18 +2917,21 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } @Override - public Operator visitAlignedLastQueryScan( - AlignedLastQueryScanNode node, LocalExecutionPlanContext context) { - AlignedPath alignedPath = node.getSeriesPath(); - PartialPath devicePath = alignedPath.getDevicePath(); - // get series under aligned entity that has not been cached + public Operator visitLastQueryScan(LastQueryScanNode node, LocalExecutionPlanContext context) { + final PartialPath devicePath = node.getDevicePath(); + List<Integer> idxOfMeasurementSchemas = node.getIdxOfMeasurementSchemas(); List<Integer> unCachedMeasurementIndexes = new ArrayList<>(); - List<String> measurementList = alignedPath.getMeasurementList(); - for (int i = 0; i < measurementList.size(); i++) { - PartialPath measurementPath = devicePath.concatNode(measurementList.get(i)); + Filter filter = + updateFilterUsingTTL( + context.getGlobalTimeFilter(), + DataNodeTTLCache.getInstance().getTTL(devicePath.getNodes())); + for (int i = 0; i < idxOfMeasurementSchemas.size(); i++) { + IMeasurementSchema measurementSchema = node.getMeasurementSchema(i); + final PartialPath measurementPath = + devicePath.concatNode(measurementSchema.getMeasurementId()); TimeValuePair timeValuePair = null; + context.dataNodeQueryContext.lock(node.isDeviceInMultiRegion()); try { - context.dataNodeQueryContext.lock(); if (!context.dataNodeQueryContext.unCached(measurementPath)) { timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(measurementPath); if (timeValuePair == null) { @@ -2980,18 +2940,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } } } finally { - context.dataNodeQueryContext.unLock(); + context.dataNodeQueryContext.unLock(node.isDeviceInMultiRegion()); } if (timeValuePair == null) { // last value is not cached unCachedMeasurementIndexes.add(i); } else if (timeValuePair.getValue() == null) { // there is no data for this time series, just ignore - } else if (!LastQueryUtil.satisfyFilter( - updateFilterUsingTTL( - context.getGlobalTimeFilter(), - DataNodeTTLCache.getInstance().getTTL(devicePath.getNodes())), - timeValuePair)) { // cached last value is not satisfied + } else if (!LastQueryUtil.satisfyFilter(filter, timeValuePair)) { + // cached last value is not satisfied if (!isFilterGtOrGe(context.getGlobalTimeFilter())) { // time filter is not > or >=, we still need to read from disk @@ -3007,12 +2964,33 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } if (unCachedMeasurementIndexes.isEmpty()) { return null; + } + if (node.isAligned()) { + AlignedPath unCachedPath = new AlignedPath(node.getDevicePath()); + for (int i : unCachedMeasurementIndexes) { + IMeasurementSchema measurementSchema = node.getMeasurementSchema(i); + unCachedPath.addMeasurement(measurementSchema.getMeasurementId(), measurementSchema); + } + return createAlignedUpdateLastCacheOperator( + node.getOutputViewPath(), + node.getPlanNodeId(), + unCachedPath, + context, + node.isDeviceInMultiRegion()); } else { - AlignedPath unCachedPath = new AlignedPath(alignedPath.getDevicePath()); + List<Operator> operators = new ArrayList<>(unCachedMeasurementIndexes.size()); for (int i : unCachedMeasurementIndexes) { - unCachedPath.addMeasurement(measurementList.get(i), alignedPath.getSchemaList().get(i)); + Operator operator = createUpdateLastCacheOperator(node, context, i); + operators.add(operator); } - return createAlignedUpdateLastCacheOperator(node, unCachedPath, context); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + LastQueryCollectOperator.class.getSimpleName()); + return new LastQueryCollectOperator(operatorContext, operators); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java index 34d48ea8a72..5016a9f1df2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java @@ -34,7 +34,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; @@ -134,11 +133,6 @@ public class SubPlanTypeExtractor { return null; } - @Override - public Void visitAlignedLastQueryScan(AlignedLastQueryScanNode node, Void context) { - return null; - } - @Override public Void visitLastQuery(LastQueryNode node, Void context) { if (node.isContainsLastTransformNode()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java index e5983b6a18c..f390d90c842 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java @@ -61,7 +61,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; @@ -195,12 +194,6 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return processNoChildSourceNode(node, context); } - @Override - public PlanNode visitAlignedLastQueryScan( - AlignedLastQueryScanNode node, NodeGroupContext context) { - return processNoChildSourceNode(node, context); - } - @Override public PlanNode visitSeriesAggregationScan( SeriesAggregationScanNode node, NodeGroupContext context) { @@ -263,7 +256,13 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { @Override public PlanNode visitLastQuery(LastQueryNode node, NodeGroupContext context) { - return processMultiChildNode(node, context); + // At this point, there should only be LastSeriesSourceNode in LastQueryNode, and all of them + // have been grouped in the rewriteSource stage by region. + context.putNodeDistribution( + node.getPlanNodeId(), + new NodeDistribution( + NodeDistributionType.SAME_WITH_ALL_CHILDREN, node.getRegionReplicaSetByFirstChild())); + return node; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java index 2cdd071c931..5f01e646b06 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeGroupContext.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; @@ -63,6 +64,17 @@ public class NodeGroupContext { } private void countRegionOfSourceNodes(PlanNode root, Map<TRegionReplicaSet, Long> result) { + if (root instanceof LastQueryNode) { + // At this point, there should only be LastSeriesSourceNode in LastQueryNode, and all of them + // have been grouped in the rewriteSource stage by region. + TRegionReplicaSet regionReplicaSet = ((LastQueryNode) root).getRegionReplicaSetByFirstChild(); + if (regionReplicaSet != DataPartition.NOT_ASSIGNED) { + result.compute( + regionReplicaSet, + (region, count) -> (count == null) ? 1 : count + root.getChildren().size()); + } + return; + } root.getChildren().forEach(child -> countRegionOfSourceNodes(child, result)); if (root instanceof SourceNode) { TRegionReplicaSet regionReplicaSet = ((SourceNode) root).getRegionReplicaSet(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java index d9e91787ad4..e0a193b63f1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java @@ -38,7 +38,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement; import org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement; @@ -107,7 +107,8 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { // compute dataNodeSeriesScanNum in LastQueryScanNode if (analysis.getStatement() instanceof QueryStatement - && ((QueryStatement) analysis.getStatement()).isLastQuery()) { + && ((QueryStatement) analysis.getStatement()).isLastQuery() + && queryContext.needUpdateScanNumForLastQuery()) { final Map<Path, AtomicInteger> pathSumMap = new HashMap<>(); dataNodeFIMap .values() @@ -123,8 +124,11 @@ public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner { } private void updateScanNum(PlanNode planNode, Map<Path, AtomicInteger> pathSumMap) { - if (planNode instanceof LastSeriesSourceNode) { - LastSeriesSourceNode lastSeriesSourceNode = (LastSeriesSourceNode) planNode; + if (planNode instanceof LastQueryScanNode) { + LastQueryScanNode lastSeriesSourceNode = (LastQueryScanNode) planNode; + if (!lastSeriesSourceNode.isDeviceInMultiRegion()) { + return; + } pathSumMap.merge( lastSeriesSourceNode.getSeriesPath(), lastSeriesSourceNode.getDataNodeSeriesScanNum(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index 92fad2006bb..1724d2d639d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -20,11 +20,14 @@ package org.apache.iotdb.db.queryengine.plan.planner.distribution; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.commons.schema.SchemaConstant; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; @@ -61,7 +64,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.Inner import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; @@ -83,7 +85,10 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; import org.apache.iotdb.db.utils.constant.SqlConstant; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.filter.basic.Filter; +import org.apache.tsfile.utils.Binary; import java.util.ArrayList; import java.util.Arrays; @@ -773,14 +778,6 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> return processRawSeriesScan(node, context, mergeNode); } - @Override - public List<PlanNode> visitAlignedLastQueryScan( - AlignedLastQueryScanNode node, DistributionPlanContext context) { - LastQueryNode mergeNode = - new LastQueryNode(context.queryContext.getQueryId().genPlanNodeId(), null, false); - return processRawSeriesScan(node, context, mergeNode); - } - private List<PlanNode> processRegionScan(RegionScanNode node, DistributionPlanContext context) { List<PlanNode> planNodeList = splitRegionScanNodeByRegion(node, context); if (planNodeList.size() == 1) { @@ -975,8 +972,14 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> // For last query, we need to keep every FI's root node is LastQueryMergeNode. So we // force every region group have a parent node even if there is only 1 child for it. context.setForceAddParent(); - PlanNode root = processRawMultiChildNode(node, context, false); - if (context.queryMultiRegion) { + boolean isLastQueryWithTransformNode = node.isContainsLastTransformNode(); + PlanNode root = processRawMultiChildNode(node, context, false, isLastQueryWithTransformNode); + // For LastQueryNode, we force the LastQueryTransformNode to be split from the new cloned + // LastQueryNode for some subsequent optimizations. In the case of multiple regions, we do not + // need to do anything to achieve this. The judgement of 'isLastQueryWithTransformNode' here + // is only for the case where the query involves only a single region. See this document for + // details(https://docs.google.com/document/d/1w_weCIr39htOUbkHk2ffGVz2-kqBfdvLSZ2EblJaHMo). + if (context.queryMultiRegion || isLastQueryWithTransformNode) { PlanNode newRoot = genLastQueryRootNode(node, context); // add sort op for each if we add LastQueryMergeNode as root if (newRoot instanceof LastQueryMergeNode && !node.needOrderByTimeseries()) { @@ -990,9 +993,7 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> } private void addSortForEachLastQueryNode(PlanNode root, Ordering timeseriesOrdering) { - if (root instanceof LastQueryNode - && (root.getChildren().get(0) instanceof LastQueryScanNode - || root.getChildren().get(0) instanceof AlignedLastQueryScanNode)) { + if (root instanceof LastQueryNode && (root.getChildren().get(0) instanceof LastQueryScanNode)) { LastQueryNode lastQueryNode = (LastQueryNode) root; lastQueryNode.setTimeseriesOrdering(timeseriesOrdering); // sort children node @@ -1004,8 +1005,6 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> String sortKey = ""; if (child instanceof LastQueryScanNode) { sortKey = ((LastQueryScanNode) child).getOutputSymbolForSort(); - } else if (child instanceof AlignedLastQueryScanNode) { - sortKey = ((AlignedLastQueryScanNode) child).getOutputSymbolForSort(); } return sortKey; })) @@ -1014,11 +1013,18 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> .getChildren() .forEach( child -> { - if (child instanceof AlignedLastQueryScanNode) { - // sort the measurements of AlignedPath for LastQueryMergeOperator - ((AlignedLastQueryScanNode) child) - .getSeriesPath() - .sortMeasurement(Comparator.naturalOrder()); + if (child instanceof LastQueryScanNode) { + // sort the measurements for LastQueryMergeOperator + LastQueryScanNode node = (LastQueryScanNode) child; + ((LastQueryScanNode) child) + .getIdxOfMeasurementSchemas() + .sort( + Comparator.comparing( + idx -> + new Binary( + node.getMeasurementSchema(idx).getMeasurementId(), + TSFileConfig.STRING_CHARSET), + Comparator.naturalOrder())); } }); } else { @@ -1223,12 +1229,15 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> if (containsAggregationSource(node)) { return planAggregationWithTimeJoin(node, context); } - return Collections.singletonList(processRawMultiChildNode(node, context, true)); + return Collections.singletonList(processRawMultiChildNode(node, context, true, false)); } // Only `visitFullOuterTimeJoin` and `visitLastQuery` invoke this method private PlanNode processRawMultiChildNode( - MultiChildProcessNode node, DistributionPlanContext context, boolean isTimeJoin) { + MultiChildProcessNode node, + DistributionPlanContext context, + boolean isTimeJoin, + boolean isLastQueryWithTransformNode) { MultiChildProcessNode root = (MultiChildProcessNode) node.clone(); Map<TRegionReplicaSet, List<SourceNode>> sourceGroup = groupBySourceNodes(node, context); @@ -1248,21 +1257,27 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> // If `forceAddParent` is true, we should not create new MultiChildNode as the parent, either. // At last, we can use the parameter `addParent` to judge whether to create new // MultiChildNode. + // For LastQueryNode, we force the LastQueryTransformNode to be split from the new cloned + // LastQueryNode for some subsequent optimizations. In the case of multiple regions, we do not + // need to do anything to achieve this. The judgement of 'isLastQueryWithTransformNode' here + // is only for the case where the query involves only a single region. See this document for + // details(https://docs.google.com/document/d/1w_weCIr39htOUbkHk2ffGVz2-kqBfdvLSZ2EblJaHMo). boolean appendToRootDirectly = - sourceGroup.size() == 1 || (!addParent && !context.isForceAddParent()); + !isLastQueryWithTransformNode + && (sourceGroup.size() == 1 || (!addParent && !context.isForceAddParent())); if (appendToRootDirectly) { // In non-last query, this code can be reached at most once // And we set region as MainFragmentLocatedRegion, the others Region should transfer data to // this region context.queryContext.setMainFragmentLocatedRegion(region); - seriesScanNodes.forEach(root::addChild); + root.addChildren(seriesScanNodes); addParent = true; } else { // We clone a TimeJoinNode from root to make the params to be consistent. // But we need to assign a new ID to it MultiChildProcessNode parentOfGroup = (MultiChildProcessNode) root.clone(); parentOfGroup.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); - seriesScanNodes.forEach(parentOfGroup::addChild); + parentOfGroup.addChildren(seriesScanNodes); root.addChild(parentOfGroup); } } @@ -1274,7 +1289,7 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> // SeriesScanNode or SeriesAggregateScanNode // So this branch should not be touched. List<PlanNode> children = visit(child, context); - children.forEach(root::addChild); + root.addChildren(children); } } return root; @@ -1285,26 +1300,38 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> // Step 1: Get all source nodes. For the node which is not source, add it as the child of // current TimeJoinNode List<SourceNode> sources = new ArrayList<>(); + Map<String, Map<Integer, List<TRegionReplicaSet>>> cachedRegionReplicas = new HashMap<>(); for (PlanNode child : node.getChildren()) { if (child instanceof SeriesSourceNode) { // If the child is SeriesScanNode, we need to check whether this node should be seperated // into several splits. SeriesSourceNode sourceNode = (SeriesSourceNode) child; List<TRegionReplicaSet> dataDistribution = - analysis.getPartitionInfo( - sourceNode.getPartitionPath(), context.getPartitionTimeFilter()); - if (dataDistribution.size() > 1) { + getDeviceReplicaSets( + sourceNode.getPartitionPath().getDevice(), + context.getPartitionTimeFilter(), + cachedRegionReplicas); + boolean deviceInMultiRegion = dataDistribution.size() > 1; + if (deviceInMultiRegion) { // If there is some series which is distributed in multi DataRegions context.setOneSeriesInMultiRegion(true); } // If the size of dataDistribution is N, this SeriesScanNode should be seperated into N // SeriesScanNode. for (TRegionReplicaSet dataRegion : dataDistribution) { - SeriesSourceNode split = (SeriesSourceNode) sourceNode.clone(); + SeriesSourceNode split = + (SeriesSourceNode) (deviceInMultiRegion ? sourceNode.clone() : sourceNode); split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); split.setRegionReplicaSet(dataRegion); + if (split instanceof LastQueryScanNode) { + ((LastQueryScanNode) split).setDeviceInMultiRegion(deviceInMultiRegion); + } sources.add(split); } + + if (deviceInMultiRegion) { + context.getQueryContext().setNeedUpdateScanNumForLastQuery(true); + } } } @@ -1318,6 +1345,69 @@ public class SourceRewriter extends BaseSourceRewriter<DistributionPlanContext> return sourceGroup; } + private List<TRegionReplicaSet> getDeviceReplicaSets( + String deviceID, + Filter timeFilter, + Map<String, Map<Integer, List<TRegionReplicaSet>>> cache) { + DataPartition dataPartition = analysis.getDataPartitionInfo(); + Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> + dataPartitionMap = dataPartition.getDataPartitionMap(); + + String db = null; + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> seriesPartitionMap = + null; + for (Map.Entry< + String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> + entry : dataPartitionMap.entrySet()) { + if (PathUtils.isStartWith(deviceID, entry.getKey())) { + db = entry.getKey(); + seriesPartitionMap = entry.getValue(); + break; + } + } + if (seriesPartitionMap == null) { + return Collections.singletonList(NOT_ASSIGNED); + } + + Map<Integer, List<TRegionReplicaSet>> slot2ReplicasMap = + cache.computeIfAbsent(db, k -> new HashMap<>()); + TSeriesPartitionSlot tSeriesPartitionSlot = dataPartition.calculateDeviceGroupId(deviceID); + + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> + finalSeriesPartitionMap = seriesPartitionMap; + return slot2ReplicasMap.computeIfAbsent( + tSeriesPartitionSlot.slotId, + k -> + getDataRegionReplicaSetWithTimeFilter( + finalSeriesPartitionMap, tSeriesPartitionSlot, timeFilter)); + } + + public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter( + Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> + seriesPartitionMap, + TSeriesPartitionSlot tSeriesPartitionSlot, + Filter timeFilter) { + Map<TTimePartitionSlot, List<TRegionReplicaSet>> regionReplicaSetMap = + seriesPartitionMap.getOrDefault(tSeriesPartitionSlot, Collections.emptyMap()); + if (regionReplicaSetMap.isEmpty()) { + return Collections.singletonList(NOT_ASSIGNED); + } + List<TRegionReplicaSet> replicaSets = new ArrayList<>(); + Set<TRegionReplicaSet> uniqueValues = new HashSet<>(); + for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> entry : + regionReplicaSetMap.entrySet()) { + if (!TimePartitionUtils.satisfyPartitionStartTime(timeFilter, entry.getKey().startTime)) { + continue; + } + for (TRegionReplicaSet tRegionReplicaSet : entry.getValue()) { + if (uniqueValues.add(tRegionReplicaSet)) { + replicaSets.add(tRegionReplicaSet); + } + } + } + return replicaSets; + } + private boolean containsAggregationSource(FullOuterTimeJoinNode node) { for (PlanNode child : node.getChildren()) { if (child instanceof SeriesAggregationScanNode diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 0fda7777792..b8fefee7201 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -56,7 +56,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; @@ -517,23 +516,10 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter public List<String> visitLastQueryScan(LastQueryScanNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); boxValue.add(String.format("LastQueryScan-%s", node.getPlanNodeId().getId())); - boxValue.add(String.format("Series: %s", node.getSeriesPath())); - if (StringUtil.isNotBlank(node.getOutputViewPath())) { - boxValue.add(String.format("ViewPath: %s", node.getOutputViewPath())); - } - boxValue.add(printRegion(node.getRegionReplicaSet())); - return render(node, boxValue, context); - } - - @Override - public List<String> visitAlignedLastQueryScan( - AlignedLastQueryScanNode node, GraphContext context) { - List<String> boxValue = new ArrayList<>(); - boxValue.add(String.format("AlignedLastQueryScan-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("Aligned: %s", node.isAligned())); boxValue.add( String.format( - "Series: %s%s", - node.getSeriesPath().getDevice(), node.getSeriesPath().getMeasurementList())); + "Series: %s%s", node.getDevicePath().getIDeviceID(), node.getMeasurementSchemas())); if (StringUtil.isNotBlank(node.getOutputViewPath())) { boxValue.add(String.format("ViewPath: %s", node.getOutputViewPath())); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 092f3601c57..6033233abbd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -96,7 +96,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; @@ -165,8 +164,10 @@ public enum PlanNodeType { NODE_MANAGEMENT_MEMORY_MERGE((short) 42), DELETE_DATA((short) 44), DELETE_TIME_SERIES((short) 45), - LAST_QUERY_SCAN((short) 46), - ALIGNED_LAST_QUERY_SCAN((short) 47), + @Deprecated + DEPRECATED_LAST_QUERY_SCAN((short) 46), + @Deprecated + DEPRECATED_ALIGNED_LAST_QUERY_SCAN((short) 47), LAST_QUERY((short) 48), LAST_QUERY_MERGE((short) 49), LAST_QUERY_COLLECT((short) 50), @@ -224,6 +225,8 @@ public enum PlanNodeType { DEVICE_SCHEMA_FETCH_SCAN((short) 96), CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR((short) 97), + + LAST_QUERY_SCAN((short) 98), ; public static final int BYTES = Short.BYTES; @@ -374,9 +377,8 @@ public enum PlanNodeType { case 45: return DeleteTimeSeriesNode.deserialize(buffer); case 46: - return LastQueryScanNode.deserialize(buffer); case 47: - return AlignedLastQueryScanNode.deserialize(buffer); + throw new UnsupportedOperationException("This LastQueryScanNode is deprecated"); case 48: return LastQueryNode.deserialize(buffer); case 49: @@ -478,6 +480,8 @@ public enum PlanNodeType { case 97: throw new UnsupportedOperationException( "You should never see ContinuousSameSearchIndexSeparatorNode in this function, because ContinuousSameSearchIndexSeparatorNode should never be used in network transmission."); + case 98: + return LastQueryScanNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 92261d7ad47..f018817ffe6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -96,7 +96,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryTransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.DeviceRegionScanNode; @@ -163,10 +162,6 @@ public abstract class PlanVisitor<R, C> { return visitSourceNode(node, context); } - public R visitAlignedLastQueryScan(AlignedLastQueryScanNode node, C context) { - return visitSourceNode(node, context); - } - public R visitRegionScan(RegionScanNode node, C context) { return visitSourceNode(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/MultiChildProcessNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/MultiChildProcessNode.java index 02d8508e8fe..b3bd064c910 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/MultiChildProcessNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/MultiChildProcessNode.java @@ -54,6 +54,10 @@ public abstract class MultiChildProcessNode extends ProcessNode { this.children.add(child); } + public void addChildren(List<? extends PlanNode> children) { + this.children.addAll(children); + } + @Override public int allowedChildCount() { return CHILD_COUNT_NO_LIMIT; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java index 5d0589e40d4..4ce3f29750c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java @@ -18,21 +18,34 @@ */ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.VectorMeasurementSchema; import javax.annotation.Nullable; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; @@ -46,31 +59,84 @@ public class LastQueryNode extends MultiChildProcessNode { // if children contains LastTransformNode, this variable is only used in distribute plan private boolean containsLastTransformNode; + // After Logical planning is completed, this map is no longer needed and it will be set to null + private Map<IMeasurementSchema, Integer> measurementSchema2IdxMap; + // All LastSeriesSourceNode share this structure + private final List<IMeasurementSchema> globalMeasurementSchemaList; + public LastQueryNode( PlanNodeId id, @Nullable Ordering timeseriesOrdering, boolean containsLastTransformNode) { super(id); this.timeseriesOrdering = timeseriesOrdering; this.containsLastTransformNode = containsLastTransformNode; + this.measurementSchema2IdxMap = new HashMap<>(); + this.globalMeasurementSchemaList = new ArrayList<>(); } public LastQueryNode( PlanNodeId id, - List<PlanNode> children, @Nullable Ordering timeseriesOrdering, - boolean containsLastTransformNode) { - super(id, children); + boolean containsLastTransformNode, + List<IMeasurementSchema> globalMeasurementSchemaList) { + super(id); this.timeseriesOrdering = timeseriesOrdering; this.containsLastTransformNode = containsLastTransformNode; + this.globalMeasurementSchemaList = globalMeasurementSchemaList; } - @Override - public List<PlanNode> getChildren() { - return children; + public long addDeviceLastQueryScanNode( + PlanNodeId id, + PartialPath devicePath, + boolean aligned, + List<IMeasurementSchema> measurementSchemas, + String outputViewPath) { + List<Integer> idxList = new ArrayList<>(measurementSchemas.size()); + for (IMeasurementSchema measurementSchema : measurementSchemas) { + int idx = + measurementSchema2IdxMap.computeIfAbsent( + measurementSchema, + key -> { + this.globalMeasurementSchemaList.add(key); + return globalMeasurementSchemaList.size() - 1; + }); + idxList.add(idx); + } + LastQueryScanNode scanNode = + new LastQueryScanNode( + id, devicePath, aligned, idxList, outputViewPath, globalMeasurementSchemaList); + children.add(scanNode); + return scanNode.ramBytesUsed(); } - @Override - public void addChild(PlanNode child) { - children.add(child); + public void sort() { + if (timeseriesOrdering == null) { + return; + } + children.sort( + Comparator.comparing( + child -> { + String sortKey = ""; + if (child instanceof LastQueryScanNode) { + sortKey = ((LastQueryScanNode) child).getOutputSymbolForSort(); + } else if (child instanceof LastQueryTransformNode) { + sortKey = ((LastQueryTransformNode) child).getOutputSymbolForSort(); + } + return sortKey; + })); + if (timeseriesOrdering.equals(Ordering.DESC)) { + Collections.reverse(children); + } + } + + public void clearMeasurementSchema2IdxMap() { + this.measurementSchema2IdxMap = null; + } + + public long getMemorySizeOfSharedStructures() { + // MeasurementSchema comes from path, memory has been calculated before + return RamUsageEstimator.alignObjectSize( + RamUsageEstimator.shallowSizeOf(globalMeasurementSchemaList) + + RamUsageEstimator.sizeOfObjectArray(globalMeasurementSchemaList.size())); } @Override @@ -80,12 +146,11 @@ public class LastQueryNode extends MultiChildProcessNode { @Override public PlanNode clone() { - return new LastQueryNode(getPlanNodeId(), timeseriesOrdering, containsLastTransformNode); - } - - @Override - public int allowedChildCount() { - return CHILD_COUNT_NO_LIMIT; + return new LastQueryNode( + getPlanNodeId(), + timeseriesOrdering, + containsLastTransformNode, + globalMeasurementSchemaList); } @Override @@ -135,6 +200,15 @@ public class LastQueryNode extends MultiChildProcessNode { ReadWriteIOUtils.write((byte) 1, byteBuffer); ReadWriteIOUtils.write(timeseriesOrdering.ordinal(), byteBuffer); } + ReadWriteIOUtils.write(globalMeasurementSchemaList.size(), byteBuffer); + for (IMeasurementSchema measurementSchema : globalMeasurementSchemaList) { + if (measurementSchema instanceof MeasurementSchema) { + ReadWriteIOUtils.write((byte) 0, byteBuffer); + } else if (measurementSchema instanceof VectorMeasurementSchema) { + ReadWriteIOUtils.write((byte) 1, byteBuffer); + } + measurementSchema.serializeTo(byteBuffer); + } } @Override @@ -146,6 +220,15 @@ public class LastQueryNode extends MultiChildProcessNode { ReadWriteIOUtils.write((byte) 1, stream); ReadWriteIOUtils.write(timeseriesOrdering.ordinal(), stream); } + ReadWriteIOUtils.write(globalMeasurementSchemaList.size(), stream); + for (IMeasurementSchema measurementSchema : globalMeasurementSchemaList) { + if (measurementSchema instanceof MeasurementSchema) { + ReadWriteIOUtils.write((byte) 0, stream); + } else if (measurementSchema instanceof VectorMeasurementSchema) { + ReadWriteIOUtils.write((byte) 1, stream); + } + measurementSchema.serializeTo(stream); + } } public static LastQueryNode deserialize(ByteBuffer byteBuffer) { @@ -154,8 +237,18 @@ public class LastQueryNode extends MultiChildProcessNode { if (needOrderByTimeseries == 1) { timeseriesOrdering = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; } + int measurementSize = ReadWriteIOUtils.readInt(byteBuffer); + List<IMeasurementSchema> measurementSchemas = new ArrayList<>(measurementSize); + for (int i = 0; i < measurementSize; i++) { + byte type = ReadWriteIOUtils.readByte(byteBuffer); + if (type == 0) { + measurementSchemas.add(MeasurementSchema.deserializeFrom(byteBuffer)); + } else if (type == 1) { + measurementSchemas.add(VectorMeasurementSchema.deserializeFrom(byteBuffer)); + } + } PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new LastQueryNode(planNodeId, timeseriesOrdering, false); + return new LastQueryNode(planNodeId, timeseriesOrdering, false, measurementSchemas); } @Override @@ -163,6 +256,15 @@ public class LastQueryNode extends MultiChildProcessNode { this.children = children; } + @Override + public void addChild(PlanNode child) { + if (child instanceof LastQueryScanNode) { + LastQueryScanNode childNode = (LastQueryScanNode) child; + childNode.setGlobalMeasurementSchemaList(globalMeasurementSchemaList); + } + super.addChild(child); + } + public Ordering getTimeseriesOrdering() { return timeseriesOrdering; } @@ -182,4 +284,11 @@ public class LastQueryNode extends MultiChildProcessNode { public boolean needOrderByTimeseries() { return timeseriesOrdering != null; } + + // Before calling this method, you need to ensure that the current LastQueryNode + // has been divided according to RegionReplicaSet. + public TRegionReplicaSet getRegionReplicaSetByFirstChild() { + SourceNode planNode = (SourceNode) children.get(0); + return planNode.getRegionReplicaSet(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java deleted file mode 100644 index c00e7ed81ec..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedLastQueryScanNode.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source; - -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.commons.path.AlignedPath; -import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.commons.path.PathDeserializeUtil; -import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; - -import com.google.common.collect.ImmutableList; -import org.apache.tsfile.utils.RamUsageEstimator; -import org.apache.tsfile.utils.ReadWriteIOUtils; -import org.eclipse.jetty.util.StringUtil; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; - -public class AlignedLastQueryScanNode extends LastSeriesSourceNode { - private static final long INSTANCE_SIZE = - RamUsageEstimator.shallowSizeOfInstance(AlignedLastQueryScanNode.class); - - // The path of the target series which will be scanned. - private final AlignedPath seriesPath; - - // The id of DataRegion where the node will run - private TRegionReplicaSet regionReplicaSet; - - private final String outputViewPath; - - public AlignedLastQueryScanNode(PlanNodeId id, AlignedPath seriesPath, String outputViewPath) { - super(id, new AtomicInteger(1)); - this.seriesPath = seriesPath; - this.outputViewPath = outputViewPath; - } - - public AlignedLastQueryScanNode( - PlanNodeId id, - AlignedPath seriesPath, - AtomicInteger dataNodeSeriesScanNum, - String outputViewPath, - TRegionReplicaSet regionReplicaSet) { - super(id, dataNodeSeriesScanNum); - this.seriesPath = seriesPath; - this.outputViewPath = outputViewPath; - this.regionReplicaSet = regionReplicaSet; - } - - public AlignedLastQueryScanNode( - PlanNodeId id, - AlignedPath seriesPath, - AtomicInteger dataNodeSeriesScanNum, - String outputViewPath) { - super(id, dataNodeSeriesScanNum); - this.seriesPath = seriesPath; - this.outputViewPath = outputViewPath; - } - - public String getOutputViewPath() { - return outputViewPath; - } - - @Override - public void open() throws Exception { - // Do nothing - } - - @Override - public TRegionReplicaSet getRegionReplicaSet() { - return regionReplicaSet; - } - - @Override - public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) { - this.regionReplicaSet = regionReplicaSet; - } - - @Override - public void close() throws Exception { - // Do nothing - } - - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(); - } - - @Override - public void addChild(PlanNode child) { - throw new UnsupportedOperationException("no child is allowed for SeriesScanNode"); - } - - @Override - public PlanNodeType getType() { - return PlanNodeType.ALIGNED_LAST_QUERY_SCAN; - } - - @Override - public PlanNode clone() { - return new AlignedLastQueryScanNode( - getPlanNodeId(), seriesPath, getDataNodeSeriesScanNum(), outputViewPath, regionReplicaSet); - } - - @Override - public int allowedChildCount() { - return NO_CHILD_ALLOWED; - } - - @Override - public List<String> getOutputColumnNames() { - return LAST_QUERY_HEADER_COLUMNS; - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitAlignedLastQueryScan(this, context); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - AlignedLastQueryScanNode that = (AlignedLastQueryScanNode) o; - return Objects.equals(seriesPath, that.seriesPath) - && Objects.equals(outputViewPath, that.outputViewPath) - && Objects.equals(regionReplicaSet, that.regionReplicaSet); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), seriesPath, outputViewPath, regionReplicaSet); - } - - @Override - public String toString() { - if (StringUtil.isNotBlank(outputViewPath)) { - return String.format( - "AlignedLastQueryScanNode-%s:[SeriesPath: %s, ViewPath: %s, DataRegion: %s]", - this.getPlanNodeId(), - this.getSeriesPath().getFormattedString(), - this.getOutputViewPath(), - PlanNodeUtil.printRegionReplicaSet(this.getRegionReplicaSet())); - } else { - return String.format( - "AlignedLastQueryScanNode-%s:[SeriesPath: %s, DataRegion: %s]", - this.getPlanNodeId(), - this.getSeriesPath().getFormattedString(), - PlanNodeUtil.printRegionReplicaSet(this.getRegionReplicaSet())); - } - } - - @Override - protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.ALIGNED_LAST_QUERY_SCAN.serialize(byteBuffer); - seriesPath.serialize(byteBuffer); - ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), byteBuffer); - ReadWriteIOUtils.write(outputViewPath == null, byteBuffer); - if (outputViewPath != null) { - ReadWriteIOUtils.write(outputViewPath, byteBuffer); - } - } - - @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.ALIGNED_LAST_QUERY_SCAN.serialize(stream); - seriesPath.serialize(stream); - ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), stream); - ReadWriteIOUtils.write(outputViewPath == null, stream); - if (outputViewPath != null) { - ReadWriteIOUtils.write(outputViewPath, stream); - } - } - - public static AlignedLastQueryScanNode deserialize(ByteBuffer byteBuffer) { - AlignedPath partialPath = (AlignedPath) PathDeserializeUtil.deserialize(byteBuffer); - int dataNodeSeriesScanNum = ReadWriteIOUtils.readInt(byteBuffer); - boolean isNull = ReadWriteIOUtils.readBool(byteBuffer); - String outputPathSymbol = isNull ? null : ReadWriteIOUtils.readString(byteBuffer); - PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new AlignedLastQueryScanNode( - planNodeId, partialPath, new AtomicInteger(dataNodeSeriesScanNum), outputPathSymbol); - } - - public AlignedPath getSeriesPath() { - return seriesPath; - } - - public String getOutputSymbolForSort() { - if (outputViewPath != null) { - return outputViewPath; - } - if (seriesPath.getMeasurementList().size() > 1) { - return seriesPath.getDevice(); - } - return seriesPath.transformToPartialPath().getFullPath(); - } - - @Override - public PartialPath getPartitionPath() { - return getSeriesPath(); - } - - @Override - public long ramBytesUsed() { - return INSTANCE_SIZE - + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) - + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath) - + RamUsageEstimator.sizeOf(outputViewPath); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java index 8d4f8291a62..d7908e6b759 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java @@ -16,10 +16,10 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; @@ -33,14 +33,17 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import com.google.common.collect.ImmutableList; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.eclipse.jetty.util.StringUtil; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; public class LastQueryScanNode extends LastSeriesSourceNode { @@ -53,40 +56,85 @@ public class LastQueryScanNode extends LastSeriesSourceNode { ColumnHeaderConstant.VALUE, ColumnHeaderConstant.DATATYPE); - // The path of the target series which will be scanned. - private final MeasurementPath seriesPath; + private final PartialPath devicePath; + private final boolean aligned; + private final List<Integer> indexOfMeasurementSchemas; + // This structure does not need to be serialized or deserialized. + // It will be set when the current Node is added to the child by the upper LastQueryNode. + private List<IMeasurementSchema> globalMeasurementSchemaList; private final String outputViewPath; // The id of DataRegion where the node will run private TRegionReplicaSet regionReplicaSet; + private boolean deviceInMultiRegion = false; - public LastQueryScanNode(PlanNodeId id, MeasurementPath seriesPath, String outputViewPath) { + public LastQueryScanNode( + PlanNodeId id, + PartialPath devicePath, + boolean aligned, + List<Integer> indexOfMeasurementSchemas, + String outputViewPath, + List<IMeasurementSchema> globalMeasurementSchemaList) { super(id, new AtomicInteger(1)); - this.seriesPath = seriesPath; + this.aligned = aligned; + this.devicePath = devicePath; + this.indexOfMeasurementSchemas = indexOfMeasurementSchemas; this.outputViewPath = outputViewPath; + this.globalMeasurementSchemaList = globalMeasurementSchemaList; } public LastQueryScanNode( PlanNodeId id, - MeasurementPath seriesPath, + PartialPath devicePath, + boolean aligned, + List<Integer> indexOfMeasurementSchemas, AtomicInteger dataNodeSeriesScanNum, String outputViewPath) { + this( + id, + devicePath, + aligned, + indexOfMeasurementSchemas, + dataNodeSeriesScanNum, + outputViewPath, + null); + } + + public LastQueryScanNode( + PlanNodeId id, + PartialPath devicePath, + boolean aligned, + List<Integer> indexOfMeasurementSchemas, + AtomicInteger dataNodeSeriesScanNum, + String outputViewPath, + List<IMeasurementSchema> globalMeasurementSchemaList) { super(id, dataNodeSeriesScanNum); - this.seriesPath = seriesPath; + this.aligned = aligned; + this.devicePath = devicePath; + this.indexOfMeasurementSchemas = indexOfMeasurementSchemas; this.outputViewPath = outputViewPath; + this.globalMeasurementSchemaList = globalMeasurementSchemaList; } public LastQueryScanNode( PlanNodeId id, - MeasurementPath seriesPath, + PartialPath devicePath, + boolean aligned, + List<Integer> indexOfMeasurementSchemas, AtomicInteger dataNodeSeriesScanNum, String outputViewPath, - TRegionReplicaSet regionReplicaSet) { + TRegionReplicaSet regionReplicaSet, + boolean deviceInMultiRegion, + List<IMeasurementSchema> globalMeasurementSchemaList) { super(id, dataNodeSeriesScanNum); - this.seriesPath = seriesPath; + this.devicePath = devicePath; + this.aligned = aligned; + this.indexOfMeasurementSchemas = indexOfMeasurementSchemas; this.outputViewPath = outputViewPath; this.regionReplicaSet = regionReplicaSet; + this.deviceInMultiRegion = deviceInMultiRegion; + this.globalMeasurementSchemaList = globalMeasurementSchemaList; } @Override @@ -102,8 +150,12 @@ public class LastQueryScanNode extends LastSeriesSourceNode { this.regionReplicaSet = regionReplicaSet; } - public MeasurementPath getSeriesPath() { - return seriesPath; + public PartialPath getSeriesPath() { + return devicePath; + } + + public boolean isAligned() { + return this.aligned; } public String getOutputViewPath() { @@ -114,7 +166,7 @@ public class LastQueryScanNode extends LastSeriesSourceNode { if (outputViewPath != null) { return outputViewPath; } - return seriesPath.getFullPath(); + return devicePath.toString(); } @Override @@ -138,7 +190,15 @@ public class LastQueryScanNode extends LastSeriesSourceNode { @Override public PlanNode clone() { return new LastQueryScanNode( - getPlanNodeId(), seriesPath, getDataNodeSeriesScanNum(), outputViewPath, regionReplicaSet); + getPlanNodeId(), + devicePath, + aligned, + indexOfMeasurementSchemas, + getDataNodeSeriesScanNum(), + outputViewPath, + regionReplicaSet, + deviceInMultiRegion, + globalMeasurementSchemaList); } @Override @@ -162,30 +222,42 @@ public class LastQueryScanNode extends LastSeriesSourceNode { if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; LastQueryScanNode that = (LastQueryScanNode) o; - return Objects.equals(seriesPath, that.seriesPath) + return Objects.equals(devicePath, that.devicePath) + && Objects.equals(aligned, that.aligned) + && Objects.equals(indexOfMeasurementSchemas, that.indexOfMeasurementSchemas) && Objects.equals(outputViewPath, that.outputViewPath) && Objects.equals(regionReplicaSet, that.regionReplicaSet); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), seriesPath, outputViewPath, regionReplicaSet); + return Objects.hash( + super.hashCode(), + devicePath, + aligned, + indexOfMeasurementSchemas, + outputViewPath, + regionReplicaSet); } @Override public String toString() { if (StringUtil.isNotBlank(outputViewPath)) { return String.format( - "LastQueryScanNode-%s:[SeriesPath: %s, ViewPath: %s, DataRegion: %s]", + "LastQueryScanNode-%s:[Device: %s, Aligned: %s, Measurements: %s, ViewPath: %s, DataRegion: %s]", this.getPlanNodeId(), - this.getSeriesPath(), + this.getDevicePath(), + this.aligned, + this.getMeasurementSchemas(), this.getOutputViewPath(), PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet())); } else { return String.format( - "LastQueryScanNode-%s:[SeriesPath: %s, DataRegion: %s]", + "LastQueryScanNode-%s:[Device: %s, Aligned: %s, Measurements: %s, DataRegion: %s]", this.getPlanNodeId(), - this.getSeriesPath(), + this.getDevicePath(), + this.aligned, + this.getMeasurementSchemas(), PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet())); } } @@ -193,53 +265,106 @@ public class LastQueryScanNode extends LastSeriesSourceNode { @Override protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.LAST_QUERY_SCAN.serialize(byteBuffer); - seriesPath.serialize(byteBuffer); + devicePath.serialize(byteBuffer); + ReadWriteIOUtils.write(aligned, byteBuffer); + ReadWriteIOUtils.write(indexOfMeasurementSchemas.size(), byteBuffer); + for (Integer measurementSchema : indexOfMeasurementSchemas) { + ReadWriteIOUtils.write(measurementSchema, byteBuffer); + } ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), byteBuffer); ReadWriteIOUtils.write(outputViewPath == null, byteBuffer); if (outputViewPath != null) { ReadWriteIOUtils.write(outputViewPath, byteBuffer); } + ReadWriteIOUtils.write(deviceInMultiRegion, byteBuffer); } @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { PlanNodeType.LAST_QUERY_SCAN.serialize(stream); - seriesPath.serialize(stream); + devicePath.serialize(stream); + ReadWriteIOUtils.write(aligned, stream); + ReadWriteIOUtils.write(indexOfMeasurementSchemas.size(), stream); + for (Integer measurementSchema : indexOfMeasurementSchemas) { + ReadWriteIOUtils.write(measurementSchema, stream); + } ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), stream); ReadWriteIOUtils.write(outputViewPath == null, stream); if (outputViewPath != null) { ReadWriteIOUtils.write(outputViewPath, stream); } + ReadWriteIOUtils.write(deviceInMultiRegion, stream); } public static LastQueryScanNode deserialize(ByteBuffer byteBuffer) { - MeasurementPath partialPath = (MeasurementPath) PathDeserializeUtil.deserialize(byteBuffer); + PartialPath devicePath = (PartialPath) PathDeserializeUtil.deserialize(byteBuffer); + boolean aligned = ReadWriteIOUtils.readBool(byteBuffer); + int measurementSize = ReadWriteIOUtils.readInt(byteBuffer); + List<Integer> measurementSchemas = new ArrayList<>(measurementSize); + for (int i = 0; i < measurementSize; i++) { + measurementSchemas.add(ReadWriteIOUtils.readInt(byteBuffer)); + } + int dataNodeSeriesScanNum = ReadWriteIOUtils.readInt(byteBuffer); boolean isNull = ReadWriteIOUtils.readBool(byteBuffer); String outputPathSymbol = isNull ? null : ReadWriteIOUtils.readString(byteBuffer); + boolean deviceInMultiRegion = ReadWriteIOUtils.readBool(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); return new LastQueryScanNode( - planNodeId, partialPath, new AtomicInteger(dataNodeSeriesScanNum), outputPathSymbol); + planNodeId, + devicePath, + aligned, + measurementSchemas, + new AtomicInteger(dataNodeSeriesScanNum), + outputPathSymbol, + null, + deviceInMultiRegion, + null); } - @Override - public PartialPath getPartitionPath() { - return getSeriesPath(); + public void setGlobalMeasurementSchemaList(List<IMeasurementSchema> globalMeasurementSchemaList) { + this.globalMeasurementSchemaList = globalMeasurementSchemaList; } - public String outputPathSymbol() { - if (outputViewPath == null) { - return seriesPath.getFullPath(); - } else { - return outputViewPath; - } + public IMeasurementSchema getMeasurementSchema(int idx) { + int globalIdx = indexOfMeasurementSchemas.get(idx); + return globalMeasurementSchemaList.get(globalIdx); + } + + public PartialPath getDevicePath() { + return this.devicePath; + } + + public boolean isDeviceInMultiRegion() { + return deviceInMultiRegion; + } + + public void setDeviceInMultiRegion(boolean deviceInMultiRegion) { + this.deviceInMultiRegion = deviceInMultiRegion; + } + + public List<Integer> getIdxOfMeasurementSchemas() { + return indexOfMeasurementSchemas; + } + + public List<IMeasurementSchema> getMeasurementSchemas() { + return indexOfMeasurementSchemas.stream() + .map(globalMeasurementSchemaList::get) + .collect(Collectors.toList()); + } + + @Override + public PartialPath getPartitionPath() { + return devicePath; } @Override public long ramBytesUsed() { return INSTANCE_SIZE + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(id) - + MemoryEstimationHelper.getEstimatedSizeOfPartialPath(seriesPath) + // The memory of each String has been calculated before + + MemoryEstimationHelper.getEstimatedSizeOfCopiedPartialPath(devicePath) + + MemoryEstimationHelper.getEstimatedSizeOfIntegerArrayList(indexOfMeasurementSchemas) + RamUsageEstimator.sizeOf(outputViewPath); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryOperatorTest.java index 23139279d58..b69471a2c7c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQueryOperatorTest.java @@ -40,7 +40,6 @@ import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; @@ -55,6 +54,7 @@ import org.junit.Test; import java.io.IOException; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -191,7 +191,7 @@ public class LastQueryOperatorTest { LastQueryOperator lastQueryOperator = new LastQueryOperator( driverContext.getOperatorContexts().get(4), - ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2), + Arrays.asList(updateLastCacheOperator1, updateLastCacheOperator2), LastQueryUtil.createTsBlockBuilder()); int count = 0; @@ -328,7 +328,7 @@ public class LastQueryOperatorTest { LastQueryOperator lastQueryOperator = new LastQueryOperator( driverContext.getOperatorContexts().get(4), - ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2), + Arrays.asList(updateLastCacheOperator1, updateLastCacheOperator2), builder); int count = 0; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQuerySortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQuerySortOperatorTest.java index e7c66a30970..da1ac9bf3ac 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQuerySortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/LastQuerySortOperatorTest.java @@ -40,7 +40,6 @@ import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; @@ -55,6 +54,7 @@ import org.junit.Test; import java.io.IOException; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -192,7 +192,7 @@ public class LastQuerySortOperatorTest { new LastQuerySortOperator( driverContext.getOperatorContexts().get(4), LastQueryUtil.createTsBlockBuilder().build(), - ImmutableList.of(updateLastCacheOperator1, updateLastCacheOperator2), + Arrays.asList(updateLastCacheOperator1, updateLastCacheOperator2), Comparator.naturalOrder()); int count = 0; @@ -329,7 +329,7 @@ public class LastQuerySortOperatorTest { new LastQuerySortOperator( driverContext.getOperatorContexts().get(4), builder.build(), - ImmutableList.of(updateLastCacheOperator2, updateLastCacheOperator1), + Arrays.asList(updateLastCacheOperator2, updateLastCacheOperator1), Comparator.reverseOrder()); int count = 0; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java index f0c930d0d00..e1893916d9e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.queryengine.plan.planner.distribution; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -32,13 +31,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNo import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -194,21 +190,17 @@ public class LastQueryTest { private LogicalQueryPlan constructLastQuery(List<String> paths, MPPQueryContext context) throws IllegalPathException { - List<PlanNode> sourceNodeList = new ArrayList<>(); + LastQueryNode root = new LastQueryNode(context.getQueryId().genPlanNodeId(), null, false); for (String path : paths) { MeasurementPath selectPath = new MeasurementPath(path); - if (selectPath.isUnderAlignedEntity()) { - sourceNodeList.add( - new AlignedLastQueryScanNode( - context.getQueryId().genPlanNodeId(), new AlignedPath(selectPath), null)); - } else { - sourceNodeList.add( - new LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath, null)); - } + root.addDeviceLastQueryScanNode( + context.getQueryId().genPlanNodeId(), + selectPath.getDevicePath(), + selectPath.isUnderAlignedEntity(), + Collections.singletonList(selectPath.getMeasurementSchema()), + null); } - PlanNode root = - new LastQueryNode(context.getQueryId().genPlanNodeId(), sourceNodeList, null, false); return new LogicalQueryPlan(context, root); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java index 993b15562ea..2e51898a1f4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java @@ -39,9 +39,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.RawDataAgg import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedLastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; @@ -52,6 +50,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.junit.Assert; import org.junit.Test; @@ -76,31 +75,37 @@ public class DataQueryLogicalPlannerTest { // fake initResultNodeContext() queryId.genPlanNodeId(); - LastQueryScanNode d1s1 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s1"), null); - LastQueryScanNode d1s2 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s2"), null); - LastQueryScanNode d1s3 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s3"), null); - LastQueryScanNode d2s1 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d2.s1"), null); - LastQueryScanNode d2s2 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d2.s2"), null); - LastQueryScanNode d2s4 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d2.s4"), null); - AlignedLastQueryScanNode d2a = - new AlignedLastQueryScanNode( - queryId.genPlanNodeId(), (AlignedPath) schemaMap.get("root.sg.d2.a"), null); - - List<PlanNode> sourceNodeList = Arrays.asList(d1s1, d1s2, d1s3, d2a, d2s1, d2s2, d2s4); - LastQueryNode lastQueryNode = - new LastQueryNode(queryId.genPlanNodeId(), sourceNodeList, Ordering.ASC, false); + List<IMeasurementSchema> measurementSchemas = + Arrays.asList( + ((MeasurementPath) schemaMap.get("root.sg.d1.s1")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d1.s2")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d1.s3")).getMeasurementSchema()); + MeasurementPath d1s1Path = (MeasurementPath) schemaMap.get("root.sg.d1.s1"); + LastQueryNode lastQueryNode = new LastQueryNode(queryId.genPlanNodeId(), Ordering.ASC, false); + + lastQueryNode.addDeviceLastQueryScanNode( + queryId.genPlanNodeId(), + d1s1Path.getDevicePath(), + d1s1Path.isUnderAlignedEntity(), + measurementSchemas, + null); + + measurementSchemas = + Arrays.asList( + ((MeasurementPath) schemaMap.get("root.sg.d2.s1")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d2.s2")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d2.s4")).getMeasurementSchema()); + MeasurementPath d2s1Path = (MeasurementPath) schemaMap.get("root.sg.d2.s1"); + lastQueryNode.addDeviceLastQueryScanNode( + queryId.genPlanNodeId(), + d2s1Path.getDevicePath(), + d2s1Path.isUnderAlignedEntity(), + measurementSchemas, + null); + + AlignedPath aPath = (AlignedPath) schemaMap.get("root.sg.d2.a"); + lastQueryNode.addDeviceLastQueryScanNode( + queryId.genPlanNodeId(), aPath.getDevicePath(), true, aPath.getSchemaList(), null); PlanNode actualPlan = parseSQLToPlanNode(sql); Assert.assertEquals(actualPlan, lastQueryNode); @@ -114,19 +119,20 @@ public class DataQueryLogicalPlannerTest { // fake initResultNodeContext() queryId.genPlanNodeId(); - LastQueryScanNode d1s3 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s3"), null); - LastQueryScanNode d1s1 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s1"), null); - LastQueryScanNode d1s2 = - new LastQueryScanNode( - queryId.genPlanNodeId(), (MeasurementPath) schemaMap.get("root.sg.d1.s2"), null); - - List<PlanNode> sourceNodeList = Arrays.asList(d1s3, d1s1, d1s2); - LastQueryNode lastQueryNode = - new LastQueryNode(queryId.genPlanNodeId(), sourceNodeList, null, false); + LastQueryNode lastQueryNode = new LastQueryNode(queryId.genPlanNodeId(), null, false); + List<IMeasurementSchema> measurementSchemas = + Arrays.asList( + ((MeasurementPath) schemaMap.get("root.sg.d1.s3")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d1.s1")).getMeasurementSchema(), + ((MeasurementPath) schemaMap.get("root.sg.d1.s2")).getMeasurementSchema()); + MeasurementPath s3Path = (MeasurementPath) schemaMap.get("root.sg.d1.s3"); + lastQueryNode.addDeviceLastQueryScanNode( + queryId.genPlanNodeId(), + s3Path.getDevicePath(), + s3Path.isUnderAlignedEntity(), + measurementSchemas, + null); + SortNode sortNode = new SortNode( queryId.genPlanNodeId(), diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/LastQueryScanNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/LastQueryScanNodeSerdeTest.java new file mode 100644 index 00000000000..270411f3f51 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/LastQueryScanNodeSerdeTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.node.source; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.planner.node.PlanNodeDeserializeHelper; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastQueryScanNode; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + +public class LastQueryScanNodeSerdeTest { + @Test + public void test() throws IllegalPathException { + LastQueryScanNode node = + new LastQueryScanNode( + new PlanNodeId("test"), + new PartialPath("root.test.d1"), + true, + Arrays.asList(0, 1), + null, + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s0", TSDataType.BOOLEAN))); + ByteBuffer byteBuffer = ByteBuffer.allocate(2048); + node.serialize(byteBuffer); + byteBuffer.flip(); + assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node); + + node = + new LastQueryScanNode( + new PlanNodeId("test"), + new PartialPath("root.test.d1"), + false, + Arrays.asList(0, 1), + null, + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s0", TSDataType.BOOLEAN))); + byteBuffer = ByteBuffer.allocate(2048); + node.serialize(byteBuffer); + byteBuffer.flip(); + assertEquals(PlanNodeDeserializeHelper.deserialize(byteBuffer), node); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index 4acf680443b..89931701072 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -118,14 +118,17 @@ public class DataPartition extends Partition { } public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter( - String deviceName, Filter timeFilter) { - String storageGroup = getStorageGroupByDevice(deviceName); - TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); - if (!dataPartitionMap.containsKey(storageGroup) - || !dataPartitionMap.get(storageGroup).containsKey(seriesPartitionSlot)) { + final String deviceName, final Filter timeFilter) { + final String storageGroup = getStorageGroupByDevice(deviceName); + final TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceName); + Map<TTimePartitionSlot, List<TRegionReplicaSet>> regionReplicaSetMap = + dataPartitionMap + .getOrDefault(storageGroup, Collections.emptyMap()) + .getOrDefault(seriesPartitionSlot, Collections.emptyMap()); + if (regionReplicaSetMap.isEmpty()) { return Collections.singletonList(NOT_ASSIGNED); } - return dataPartitionMap.get(storageGroup).get(seriesPartitionSlot).entrySet().stream() + return regionReplicaSetMap.entrySet().stream() .filter( entry -> TimePartitionUtils.satisfyPartitionStartTime(timeFilter, entry.getKey().startTime)) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java index 25b71eb718d..29c92599f24 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java @@ -39,7 +39,7 @@ public abstract class Partition { seriesSlotExecutorName, seriesPartitionSlotNum); } - protected TSeriesPartitionSlot calculateDeviceGroupId(String deviceName) { + public TSeriesPartitionSlot calculateDeviceGroupId(String deviceName) { return executor.getSeriesPartitionSlot(deviceName); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java index ceec1f17c4e..43c266331ea 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java @@ -758,6 +758,10 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable { return nodes[0]; } + public void setIDeviceID(String deviceID) { + this.device = deviceID; + } + @Override public String getDevice() { if (device != null) {
