This is an automated email from the ASF dual-hosted git repository. weihao pushed a commit to branch lastAlias in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8344511a1c205df922d2809513e805ba6716779e Author: Weihao Li <[email protected]> AuthorDate: Thu Jan 29 08:55:11 2026 +0800 draft Signed-off-by: Weihao Li <[email protected]> --- .../AlignedUpdateViewPathLastCacheOperator.java | 22 +++-- .../process/last/UpdateLastCacheOperator.java | 2 +- .../last/UpdateViewPathLastCacheOperator.java | 23 ++++- .../queryengine/plan/analyze/AnalyzeVisitor.java | 6 +- .../plan/planner/LogicalPlanBuilder.java | 37 ++++++- .../plan/planner/OperatorTreeGenerator.java | 22 +++-- .../plan/planner/plan/node/PlanGraphPrinter.java | 8 +- .../plan/node/process/last/LastQueryNode.java | 6 +- .../plan/node/source/LastQueryScanNode.java | 106 +++++++++++++++------ .../plan/planner/distribution/LastQueryTest.java | 3 + .../logical/DataQueryLogicalPlannerTest.java | 11 ++- .../planner/node/source/SourceNodeSerdeTest.java | 2 + 12 files changed, 185 insertions(+), 63 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java index 77902a10e37..3046e80e231 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AlignedUpdateViewPathLastCacheOperator.java @@ -28,11 +28,14 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.Tr import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.TsPrimitiveType; +import java.util.List; + import static com.google.common.base.Preconditions.checkArgument; public class AlignedUpdateViewPathLastCacheOperator extends AlignedUpdateLastCacheOperator { - private final String outputViewPath; + private final List<String> outputPaths; + private int outputPathIndex = 0; public AlignedUpdateViewPathLastCacheOperator( OperatorContext operatorContext, @@ -41,7 +44,7 @@ public class AlignedUpdateViewPathLastCacheOperator extends AlignedUpdateLastCac TreeDeviceSchemaCacheManager treeDeviceSchemaCacheManager, boolean needUpdateCache, boolean needUpdateNullEntry, - String outputViewPath, + List<String> outputPaths, boolean deviceInMultiRegion) { super( operatorContext, @@ -51,19 +54,26 @@ public class AlignedUpdateViewPathLastCacheOperator extends AlignedUpdateLastCac needUpdateCache, needUpdateNullEntry, deviceInMultiRegion); - checkArgument(seriesPath.getMeasurementList().size() == 1); - this.outputViewPath = outputViewPath; + checkArgument(outputPaths != null, "outputPaths shouldn't be null"); + this.outputPaths = outputPaths; } @Override protected void appendLastValueToTsBlockBuilder( long lastTime, TsPrimitiveType lastValue, MeasurementPath measurementPath, String type) { + String outputPath = outputPaths.get(outputPathIndex); LastQueryUtil.appendLastValueRespectBlob( - tsBlockBuilder, lastTime, outputViewPath, lastValue, type); + tsBlockBuilder, + lastTime, + outputPath == null ? measurementPath.getFullPath() : outputPath, + lastValue, + type); + outputPathIndex++; } @Override public long ramBytesUsed() { - return super.ramBytesUsed() + RamUsageEstimator.sizeOf(outputViewPath); + return super.ramBytesUsed() + + outputPaths.stream().mapToLong(path -> RamUsageEstimator.sizeOf(path)).sum(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java index 36f71f03d55..a1b0a7fd8dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateLastCacheOperator.java @@ -40,7 +40,7 @@ public class UpdateLastCacheOperator extends AbstractUpdateLastCacheOperator { // fullPath for queried time series // It should be exact PartialPath, neither MeasurementPath nor AlignedPath, because lastCache only // accept PartialPath - private final MeasurementPath fullPath; + protected final MeasurementPath fullPath; // type for queried time series protected final String dataType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateViewPathLastCacheOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateViewPathLastCacheOperator.java index 84f8dd40b12..8ecb151fc10 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateViewPathLastCacheOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/UpdateViewPathLastCacheOperator.java @@ -28,9 +28,14 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.TsPrimitiveType; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; + public class UpdateViewPathLastCacheOperator extends UpdateLastCacheOperator { - private final String outputViewPath; + private final List<String> outputPaths; + private int outputPathIndex = 0; public UpdateViewPathLastCacheOperator( OperatorContext operatorContext, @@ -40,7 +45,7 @@ public class UpdateViewPathLastCacheOperator extends UpdateLastCacheOperator { TreeDeviceSchemaCacheManager treeDeviceSchemaCacheManager, boolean needUpdateCache, boolean needUpdateNullEntry, - String outputViewPath) { + List<String> outputPaths) { super( operatorContext, child, @@ -49,17 +54,25 @@ public class UpdateViewPathLastCacheOperator extends UpdateLastCacheOperator { treeDeviceSchemaCacheManager, needUpdateCache, needUpdateNullEntry); - this.outputViewPath = outputViewPath; + checkArgument(outputPaths != null, "outputPaths shouldn't be null"); + this.outputPaths = outputPaths; } @Override protected void appendLastValueToTsBlockBuilder(long lastTime, TsPrimitiveType lastValue) { + String outputPath = outputPaths.get(outputPathIndex); LastQueryUtil.appendLastValueRespectBlob( - tsBlockBuilder, lastTime, outputViewPath, lastValue, dataType); + tsBlockBuilder, + lastTime, + outputPath == null ? fullPath.getFullPath() : outputPath, + lastValue, + dataType); + outputPathIndex++; } @Override public long ramBytesUsed() { - return super.ramBytesUsed() + RamUsageEstimator.sizeOf(outputViewPath); + return super.ramBytesUsed() + + outputPaths.stream().mapToLong(path -> RamUsageEstimator.sizeOf(path)).sum(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 32bc088ff6e..f0f5d419f38 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -597,7 +597,11 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> timeseriesOrdering != null ? new TreeMap<>(timeseriesOrdering.getStringComparator()) : new LinkedHashMap<>()) - .put(outputPath.getMeasurement(), timeSeriesOperand); + .put( + outputPath.isMeasurementAliasExists() + ? outputPath.getMeasurementAlias() + : outputPath.getMeasurement(), + timeSeriesOperand); } else { lastQueryNonWritableViewSourceExpressionMap = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 5414b5ac020..6bc348e530c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -253,12 +253,18 @@ public class LogicalPlanBuilder { for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { MeasurementPath selectedPath = (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath(); - String outputViewPath = + String outputPath = sourceExpression.isViewExpression() ? sourceExpression.getViewPath().getFullPath() : null; - TSDataType outputViewPathType = - outputViewPath == null ? null : selectedPath.getSeriesType(); + TSDataType outputViewPathType = outputPath == null ? null : selectedPath.getSeriesType(); + if (selectedPath.isMeasurementAliasExists()) { + outputPath = + selectedPath + .getDevicePath() + .concatAsMeasurementPath(selectedPath.getMeasurementAlias()) + .toString(); + } PartialPath devicePath = selectedPath.getDevicePath(); // For expression with view path, we do not use the deviceId in Map.Entry because it is a @@ -270,7 +276,8 @@ public class LogicalPlanBuilder { devicePath, selectedPath.isUnderAlignedEntity(), Collections.singletonList(selectedPath.getMeasurementSchema()), - outputViewPath, + Collections.singletonList(outputPath), + outputViewPathType != null, outputViewPathType); this.context.reserveMemoryForFrontEnd(memCost); } @@ -279,12 +286,31 @@ public class LogicalPlanBuilder { List<IMeasurementSchema> measurementSchemas = new ArrayList<>(measurementToExpressionsOfDevice.size()); PartialPath devicePath = null; + List<String> outputPaths = null; + int i = 0; for (Expression sourceExpression : measurementToExpressionsOfDevice.values()) { MeasurementPath selectedPath = (MeasurementPath) ((TimeSeriesOperand) sourceExpression).getPath(); aligned = selectedPath.isUnderAlignedEntity(); devicePath = devicePath == null ? selectedPath.getDevicePath() : devicePath; measurementSchemas.add(selectedPath.getMeasurementSchema()); + + // series has alias and use alias to SELECT + if (selectedPath.isMeasurementAliasExists()) { + if (outputPaths == null) { + // fill null as default value + outputPaths = + new ArrayList<>( + Collections.nCopies(measurementToExpressionsOfDevice.size(), null)); + } + outputPaths.add( + i, + selectedPath + .getDevicePath() + .concatAsMeasurementPath(selectedPath.getMeasurementAlias()) + .toString()); + } + i++; } // DeviceId is needed in the distribution plan stage devicePath.setIDeviceID(deviceId); @@ -294,7 +320,8 @@ public class LogicalPlanBuilder { devicePath, aligned, measurementSchemas, - null, + outputPaths, + false, null); this.context.reserveMemoryForFrontEnd(memCost); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index f869ce59d47..cde2d4c825f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -2812,7 +2812,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP fullPath); } - return Objects.isNull(node.getOutputViewPath()) + return Objects.isNull(node.getOutputPaths()) ? new UpdateLastCacheOperator( operatorContext, lastQueryScan, @@ -2829,11 +2829,11 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP DATA_NODE_SCHEMA_CACHE, isNeedUpdateLastCache, context.isNeedUpdateNullEntry(), - node.getOutputViewPath()); + node.getOutputPaths()); } private AlignedUpdateLastCacheOperator createAlignedUpdateLastCacheOperator( - final String outputViewPath, + final List<String> outputPaths, final PlanNodeId planNodeId, final AlignedPath unCachedPath, final LocalExecutionPlanContext context, @@ -2870,7 +2870,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } } - return Objects.isNull(outputViewPath) + return Objects.isNull(outputPaths) ? new AlignedUpdateLastCacheOperator( operatorContext, lastQueryScan, @@ -2886,7 +2886,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP DATA_NODE_SCHEMA_CACHE, isNeedUpdateLastCache, context.isNeedUpdateNullEntry(), - outputViewPath, + outputPaths, deviceInMultiRegion); } @@ -2987,6 +2987,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP updateFilterUsingTTL( context.getGlobalTimeFilter(), DataNodeTTLCache.getInstance().getTTLForTree(devicePath.getNodes())); + boolean hasOutputPath = node.getOutputPaths() != null; for (int i = 0; i < idxOfMeasurementSchemas.size(); i++) { IMeasurementSchema measurementSchema = node.getMeasurementSchema(i); final MeasurementPath measurementPath = @@ -3017,9 +3018,14 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP unCachedMeasurementIndexes.add(i); } } else { // cached last value is satisfied, put it into LastCacheScanOperator - if (node.getOutputViewPath() != null) { + String outputPath = hasOutputPath ? node.getOutputPaths().get(i) : null; + if (outputPath != null) { context.addCachedLastValue( - timeValuePair, node.getOutputViewPath(), node.getOutputViewPathType()); + timeValuePair, + node.getOutputPaths().get(i), + node.isOutputPathForView() + ? node.getOutputViewPathType() + : measurementSchema.getType()); } else { context.addCachedLastValue( timeValuePair, measurementPath.getFullPath(), measurementSchema.getType()); @@ -3036,7 +3042,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP unCachedPath.addMeasurement(measurementSchema.getMeasurementName(), measurementSchema); } return createAlignedUpdateLastCacheOperator( - node.getOutputViewPath(), + node.getOutputPaths(), node.getPlanNodeId(), unCachedPath, context, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 1b35ffe00f5..8b753c72f08 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -95,7 +95,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValuesNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; import com.google.common.base.Joiner; -import org.apache.tsfile.external.commons.lang3.StringUtils; import org.apache.tsfile.external.commons.lang3.Validate; import org.apache.tsfile.utils.Pair; @@ -558,9 +557,10 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter boxValue.add( String.format( "Series: %s%s", node.getDevicePath().getIDeviceID(), node.getMeasurementSchemas())); - if (StringUtils.isNotBlank(node.getOutputViewPath())) { - boxValue.add(String.format("ViewPath: %s", node.getOutputViewPath())); - } + // TODO + /*if (StringUtils.isNotBlank(node.getOutputPaths())) { + boxValue.add(String.format("ViewPath: %s", node.getOutputPaths())); + }*/ boxValue.add(printRegion(node.getRegionReplicaSet())); return render(node, boxValue, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java index cbae6e9abd9..e6a71dc197b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/last/LastQueryNode.java @@ -90,7 +90,8 @@ public class LastQueryNode extends MultiChildProcessNode { PartialPath devicePath, boolean aligned, List<IMeasurementSchema> measurementSchemas, - String outputViewPath, + List<String> outputPaths, + boolean isOutputPathForView, TSDataType outputViewPathType) { List<Integer> idxList = new ArrayList<>(measurementSchemas.size()); for (IMeasurementSchema measurementSchema : measurementSchemas) { @@ -109,7 +110,8 @@ public class LastQueryNode extends MultiChildProcessNode { devicePath, aligned, idxList, - outputViewPath, + outputPaths, + isOutputPathForView, outputViewPathType, globalMeasurementSchemaList); children.add(scanNode); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java index 0387e2b31a2..d6e0967ae51 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastQueryScanNode.java @@ -32,7 +32,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import com.google.common.collect.ImmutableList; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.external.commons.lang3.StringUtils; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -46,6 +45,8 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static org.glassfish.jersey.internal.guava.Preconditions.checkState; + public class LastQueryScanNode extends LastSeriesSourceNode { private static final long INSTANCE_SIZE = @@ -64,7 +65,10 @@ public class LastQueryScanNode extends LastSeriesSourceNode { // It will be set when the current Node is added to the child by the upper LastQueryNode. private List<IMeasurementSchema> globalMeasurementSchemaList; - private final String outputViewPath; + // Store alias of paths or viewPath in this field. + private final List<String> outputPaths; + // Indicate if there is viewPath stored in outputPaths. + private final boolean isOutputPathForView; private final TSDataType outputViewPathType; // The id of DataRegion where the node will run @@ -76,16 +80,18 @@ public class LastQueryScanNode extends LastSeriesSourceNode { PartialPath devicePath, boolean aligned, List<Integer> indexOfMeasurementSchemas, - String outputViewPath, + List<String> outputPaths, + boolean isOutputPathForView, TSDataType outputViewPathType, List<IMeasurementSchema> globalMeasurementSchemaList) { super(id, new AtomicInteger(1)); this.aligned = aligned; this.devicePath = devicePath; this.indexOfMeasurementSchemas = indexOfMeasurementSchemas; - this.outputViewPath = outputViewPath; + this.outputPaths = outputPaths; this.outputViewPathType = outputViewPathType; this.globalMeasurementSchemaList = globalMeasurementSchemaList; + this.isOutputPathForView = isOutputPathForView; } public LastQueryScanNode( @@ -94,7 +100,8 @@ public class LastQueryScanNode extends LastSeriesSourceNode { boolean aligned, List<Integer> indexOfMeasurementSchemas, AtomicInteger dataNodeSeriesScanNum, - String outputViewPath, + List<String> outputPaths, + boolean isOutputPathForView, TSDataType outputViewPathType) { this( id, @@ -102,7 +109,8 @@ public class LastQueryScanNode extends LastSeriesSourceNode { aligned, indexOfMeasurementSchemas, dataNodeSeriesScanNum, - outputViewPath, + outputPaths, + isOutputPathForView, outputViewPathType, null); } @@ -113,14 +121,16 @@ public class LastQueryScanNode extends LastSeriesSourceNode { boolean aligned, List<Integer> indexOfMeasurementSchemas, AtomicInteger dataNodeSeriesScanNum, - String outputViewPath, + List<String> outputPaths, + boolean isOutputPathForView, TSDataType outputViewPathType, List<IMeasurementSchema> globalMeasurementSchemaList) { super(id, dataNodeSeriesScanNum); this.aligned = aligned; this.devicePath = devicePath; this.indexOfMeasurementSchemas = indexOfMeasurementSchemas; - this.outputViewPath = outputViewPath; + this.outputPaths = outputPaths; + this.isOutputPathForView = isOutputPathForView; this.outputViewPathType = outputViewPathType; this.globalMeasurementSchemaList = globalMeasurementSchemaList; } @@ -131,7 +141,8 @@ public class LastQueryScanNode extends LastSeriesSourceNode { boolean aligned, List<Integer> indexOfMeasurementSchemas, AtomicInteger dataNodeSeriesScanNum, - String outputViewPath, + List<String> outputPaths, + boolean isOutputPathForView, TSDataType outputViewPathType, TRegionReplicaSet regionReplicaSet, boolean deviceInMultiRegion, @@ -140,7 +151,8 @@ public class LastQueryScanNode extends LastSeriesSourceNode { this.devicePath = devicePath; this.aligned = aligned; this.indexOfMeasurementSchemas = indexOfMeasurementSchemas; - this.outputViewPath = outputViewPath; + this.outputPaths = outputPaths; + this.isOutputPathForView = isOutputPathForView; this.outputViewPathType = outputViewPathType; this.regionReplicaSet = regionReplicaSet; this.deviceInMultiRegion = deviceInMultiRegion; @@ -168,17 +180,24 @@ public class LastQueryScanNode extends LastSeriesSourceNode { return this.aligned; } - public String getOutputViewPath() { - return outputViewPath; + public List<String> getOutputPaths() { + return outputPaths; } public TSDataType getOutputViewPathType() { return outputViewPathType; } + public boolean isOutputPathForView() { + return isOutputPathForView; + } + public String getOutputSymbolForSort() { - if (outputViewPath != null) { - return outputViewPath; + if (isOutputPathForView) { + checkState( + outputPaths != null && outputPaths.size() == 1, + "LastQueryScanNode outputPaths size should be 1 when it's a viewPath"); + return outputPaths.get(0); } return devicePath.toString(); } @@ -209,7 +228,8 @@ public class LastQueryScanNode extends LastSeriesSourceNode { aligned, indexOfMeasurementSchemas, getDataNodeSeriesScanNum(), - outputViewPath, + outputPaths, + isOutputPathForView, outputViewPathType, regionReplicaSet, deviceInMultiRegion, @@ -240,7 +260,7 @@ public class LastQueryScanNode extends LastSeriesSourceNode { return Objects.equals(devicePath, that.devicePath) && Objects.equals(aligned, that.aligned) && Objects.equals(indexOfMeasurementSchemas, that.indexOfMeasurementSchemas) - && Objects.equals(outputViewPath, that.outputViewPath) + && Objects.equals(outputPaths, that.outputPaths) && Objects.equals(outputViewPathType, that.outputViewPathType) && Objects.equals(regionReplicaSet, that.regionReplicaSet); } @@ -252,20 +272,20 @@ public class LastQueryScanNode extends LastSeriesSourceNode { devicePath, aligned, indexOfMeasurementSchemas, - outputViewPath, + outputPaths, regionReplicaSet); } @Override public String toString() { - if (StringUtils.isNotBlank(outputViewPath)) { + if (outputPaths != null) { return String.format( - "LastQueryScanNode-%s:[Device: %s, Aligned: %s, Measurements: %s, ViewPath: %s, DataRegion: %s]", + "LastQueryScanNode-%s:[Device: %s, Aligned: %s, Measurements: %s, OutputPaths: %s, DataRegion: %s]", this.getPlanNodeId(), this.getDevicePath(), this.aligned, this.getMeasurementSchemas(), - this.getOutputViewPath(), + this.getOutputPaths(), PlanNodeUtil.printRegionReplicaSet(getRegionReplicaSet())); } else { return String.format( @@ -288,9 +308,16 @@ public class LastQueryScanNode extends LastSeriesSourceNode { ReadWriteIOUtils.write(measurementSchema, byteBuffer); } ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), byteBuffer); - ReadWriteIOUtils.write(outputViewPath == null, byteBuffer); - if (outputViewPath != null) { - ReadWriteIOUtils.write(outputViewPath, byteBuffer); + ReadWriteIOUtils.write(outputPaths == null, byteBuffer); + if (outputPaths != null) { + int size = outputPaths.size(); + ReadWriteIOUtils.write(size, byteBuffer); + for (int i = 0; i < size; i++) { + ReadWriteIOUtils.write(outputPaths.get(i), byteBuffer); + } + } + ReadWriteIOUtils.write(isOutputPathForView, byteBuffer); + if (isOutputPathForView) { ReadWriteIOUtils.write(outputViewPathType, byteBuffer); } ReadWriteIOUtils.write(deviceInMultiRegion, byteBuffer); @@ -306,9 +333,16 @@ public class LastQueryScanNode extends LastSeriesSourceNode { ReadWriteIOUtils.write(measurementSchema, stream); } ReadWriteIOUtils.write(getDataNodeSeriesScanNum().get(), stream); - ReadWriteIOUtils.write(outputViewPath == null, stream); - if (outputViewPath != null) { - ReadWriteIOUtils.write(outputViewPath, stream); + ReadWriteIOUtils.write(outputPaths == null, stream); + if (outputPaths != null) { + int size = outputPaths.size(); + ReadWriteIOUtils.write(size, stream); + for (int i = 0; i < size; i++) { + ReadWriteIOUtils.write(outputPaths.get(i), stream); + } + } + ReadWriteIOUtils.write(isOutputPathForView, stream); + if (isOutputPathForView) { ReadWriteIOUtils.write(outputViewPathType, stream); } ReadWriteIOUtils.write(deviceInMultiRegion, stream); @@ -325,8 +359,17 @@ public class LastQueryScanNode extends LastSeriesSourceNode { int dataNodeSeriesScanNum = ReadWriteIOUtils.readInt(byteBuffer); boolean isNull = ReadWriteIOUtils.readBool(byteBuffer); - String outputPathSymbol = isNull ? null : ReadWriteIOUtils.readString(byteBuffer); - TSDataType dataType = isNull ? null : ReadWriteIOUtils.readDataType(byteBuffer); + List<String> outputPaths = null; + if (!isNull) { + int size = ReadWriteIOUtils.readInt(byteBuffer); + outputPaths = new ArrayList<>(size); + while (size > 0) { + outputPaths.add(ReadWriteIOUtils.readString(byteBuffer)); + size--; + } + } + boolean isOutputPathForView = ReadWriteIOUtils.readBool(byteBuffer); + TSDataType dataType = isOutputPathForView ? null : ReadWriteIOUtils.readDataType(byteBuffer); boolean deviceInMultiRegion = ReadWriteIOUtils.readBool(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); return new LastQueryScanNode( @@ -335,7 +378,8 @@ public class LastQueryScanNode extends LastSeriesSourceNode { aligned, measurementSchemas, new AtomicInteger(dataNodeSeriesScanNum), - outputPathSymbol, + outputPaths, + isOutputPathForView, dataType, null, deviceInMultiRegion, @@ -389,6 +433,8 @@ public class LastQueryScanNode extends LastSeriesSourceNode { // The memory of each String has been calculated before + MemoryEstimationHelper.getEstimatedSizeOfCopiedPartialPath(devicePath) + MemoryEstimationHelper.getEstimatedSizeOfIntegerArrayList(indexOfMeasurementSchemas) - + RamUsageEstimator.sizeOf(outputViewPath); + + (outputPaths == null + ? 0L + : outputPaths.stream().mapToLong(path -> RamUsageEstimator.sizeOf(path)).sum()); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java index b077b38eb3e..203ccf3f330 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/LastQueryTest.java @@ -60,6 +60,7 @@ public class LastQueryTest { new MeasurementSchema("s1", TSDataType.BOOLEAN), new MeasurementSchema("s2", TSDataType.INT32)), null, + false, null); lastQueryNode.addDeviceLastQueryScanNode( new PlanNodeId("test_last_query_scan2"), @@ -67,6 +68,7 @@ public class LastQueryTest { false, Collections.singletonList(new MeasurementSchema("s0", TSDataType.BOOLEAN)), null, + false, null); Analysis analysis = Util.constructAnalysis(); @@ -256,6 +258,7 @@ public class LastQueryTest { selectPath.isUnderAlignedEntity(), Collections.singletonList(selectPath.getMeasurementSchema()), null, + false, null); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java index 2298b6ab631..34cbe60bb07 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/logical/DataQueryLogicalPlannerTest.java @@ -90,6 +90,7 @@ public class DataQueryLogicalPlannerTest { d1s1Path.isUnderAlignedEntity(), measurementSchemas, null, + false, null); measurementSchemas = @@ -104,11 +105,18 @@ public class DataQueryLogicalPlannerTest { d2s1Path.isUnderAlignedEntity(), measurementSchemas, null, + false, null); AlignedPath aPath = (AlignedPath) schemaMap.get("root.sg.d2.a"); lastQueryNode.addDeviceLastQueryScanNode( - queryId.genPlanNodeId(), aPath.getDevicePath(), true, aPath.getSchemaList(), null, null); + queryId.genPlanNodeId(), + aPath.getDevicePath(), + true, + aPath.getSchemaList(), + null, + false, + null); PlanNode actualPlan = parseSQLToPlanNode(sql); Assert.assertEquals(actualPlan, lastQueryNode); @@ -135,6 +143,7 @@ public class DataQueryLogicalPlannerTest { s3Path.isUnderAlignedEntity(), measurementSchemas, null, + false, null); SortNode sortNode = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java index 068b2bbc4b6..1b68a32c4c5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/source/SourceNodeSerdeTest.java @@ -45,6 +45,7 @@ public class SourceNodeSerdeTest { true, Arrays.asList(0, 1), null, + false, null, Arrays.asList( new MeasurementSchema("s1", TSDataType.INT32), @@ -61,6 +62,7 @@ public class SourceNodeSerdeTest { false, Arrays.asList(0, 1), null, + false, null, Arrays.asList( new MeasurementSchema("s1", TSDataType.INT32),
