This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/FixIntoOperator1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 64da044e25a7fd42e527d24b7f49f1195d0d5d8d Author: Minghui Liu <[email protected]> AuthorDate: Wed Nov 30 20:53:47 2022 +0800 fix memory calculation --- .../operator/process/AbstractIntoOperator.java | 16 ++++-- .../operator/process/DeviceViewIntoOperator.java | 13 ++++- .../execution/operator/process/IntoOperator.java | 8 ++- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 66 +++++++++++++++++++++- 4 files changed, 93 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index dd117e6d97..6b17fdc625 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -68,17 +68,25 @@ public abstract class AbstractIntoOperator implements ProcessOperator { private final ExecutorService writeOperationExecutor; private ListenableFuture<TSStatus> writeOperationFuture; + private final long maxRetainedSize; + private final long maxReturnSize; + public AbstractIntoOperator( OperatorContext operatorContext, Operator child, List<InsertTabletStatementGenerator> insertTabletStatementGenerators, Map<String, InputLocation> sourceColumnToInputLocationMap, - ExecutorService intoOperationExecutor) { + ExecutorService intoOperationExecutor, + long maxStatementSize, + long maxReturnSize) { this.operatorContext = operatorContext; this.child = child; this.insertTabletStatementGenerators = insertTabletStatementGenerators; this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap; this.writeOperationExecutor = intoOperationExecutor; + + this.maxRetainedSize = child.calculateMaxReturnSize() + maxStatementSize; + this.maxReturnSize = maxReturnSize; } protected static List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators( @@ -260,17 +268,17 @@ public abstract class AbstractIntoOperator implements ProcessOperator { @Override public long calculateMaxPeekMemory() { - return child.calculateMaxPeekMemory(); + return maxReturnSize + maxRetainedSize + child.calculateMaxPeekMemory(); } @Override public long calculateMaxReturnSize() { - return child.calculateMaxReturnSize(); + return maxReturnSize; } @Override public long calculateRetainedSizeAfterCallingNext() { - return child.calculateRetainedSizeAfterCallingNext(); + return maxRetainedSize + child.calculateRetainedSizeAfterCallingNext(); } public static class InsertTabletStatementGenerator { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java index 07cd3c0a0a..183d7d6140 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java @@ -62,8 +62,17 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { Map<String, Boolean> targetDeviceToAlignedMap, Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap, Map<String, InputLocation> sourceColumnToInputLocationMap, - ExecutorService intoOperationExecutor) { - super(operatorContext, child, null, sourceColumnToInputLocationMap, intoOperationExecutor); + ExecutorService intoOperationExecutor, + long maxStatementSize, + long maxReturnSize) { + super( + operatorContext, + child, + null, + sourceColumnToInputLocationMap, + intoOperationExecutor, + maxStatementSize, + maxReturnSize); this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap; this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap; this.targetDeviceToAlignedMap = targetDeviceToAlignedMap; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java index 5aba97e22a..89953edc69 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java @@ -50,14 +50,18 @@ public class IntoOperator extends AbstractIntoOperator { Map<String, Boolean> targetDeviceToAlignedMap, List<Pair<String, PartialPath>> sourceTargetPathPairList, Map<String, InputLocation> sourceColumnToInputLocationMap, - ExecutorService intoOperationExecutor) { + ExecutorService intoOperationExecutor, + long maxStatementSize, + long maxReturnSize) { super( operatorContext, child, constructInsertTabletStatementGenerators( targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap), sourceColumnToInputLocationMap, - intoOperationExecutor); + intoOperationExecutor, + maxStatementSize, + maxReturnSize); this.sourceTargetPathPairList = sourceTargetPathPairList; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index a6c2ebd261..a858f56833 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache; import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory; @@ -170,6 +171,7 @@ import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; import org.apache.iotdb.db.mpp.plan.statement.component.SortItem; import org.apache.iotdb.db.mpp.plan.statement.component.SortKey; import org.apache.iotdb.db.mpp.plan.statement.literal.Literal; +import org.apache.iotdb.db.mpp.statistics.StatisticsManager; import org.apache.iotdb.db.mpp.transformation.dag.column.ColumnTransformer; import org.apache.iotdb.db.mpp.transformation.dag.column.leaf.LeafColumnTransformer; import org.apache.iotdb.db.mpp.transformation.dag.udf.UDTFContext; @@ -177,6 +179,8 @@ import org.apache.iotdb.db.utils.datastructure.TimeSelector; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.IntColumn; +import org.apache.iotdb.tsfile.read.common.block.column.LongColumn; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.operator.Gt; import org.apache.iotdb.tsfile.read.filter.operator.GtEq; @@ -1375,7 +1379,17 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP sourceColumnToInputLocationMap, context.getTypeProvider()); + int rowLimit = + IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit(); + long maxStatementSize = calculateStatementSizePerLine(targetPathToDataTypeMap) * rowLimit; + long maxReturnSize = + node.getChild().getOutputColumnNames().size() + * (LongColumn.SIZE_IN_BYTES_PER_POSITION + + IntColumn.SIZE_IN_BYTES_PER_POSITION + + 256 * Byte.BYTES); + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); + return new IntoOperator( operatorContext, child, @@ -1384,7 +1398,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP intoPathDescriptor.getTargetDeviceToAlignedMap(), intoPathDescriptor.getSourceTargetPathPairList(), sourceColumnToInputLocationMap, - context.getIntoOperationExecutor()); + context.getIntoOperationExecutor(), + maxStatementSize, + maxReturnSize); } @Override @@ -1409,6 +1425,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP new HashMap<>(); Map<String, Map<PartialPath, Map<String, String>>> sourceDeviceToTargetPathMap = deviceViewIntoPathDescriptor.getSourceDeviceToTargetPathMap(); + long statementSizePerLine = 0L; for (Map.Entry<String, Map<PartialPath, Map<String, String>>> deviceEntry : sourceDeviceToTargetPathMap.entrySet()) { String sourceDevice = deviceEntry.getKey(); @@ -1424,8 +1441,19 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP deviceToTargetPathSourceInputLocationMap.put( sourceDevice, targetPathToSourceInputLocationMap); deviceToTargetPathDataTypeMap.put(sourceDevice, targetPathToDataTypeMap); + statementSizePerLine += calculateStatementSizePerLine(targetPathToDataTypeMap); } + int rowLimit = + IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit(); + long maxStatementSize = statementSizePerLine * rowLimit; + long maxReturnSize = + deviceToTargetPathDataTypeMap.size() + * (node.getChild().getOutputColumnNames().size() - 1) + * (LongColumn.SIZE_IN_BYTES_PER_POSITION + + IntColumn.SIZE_IN_BYTES_PER_POSITION + + 512 * Byte.BYTES); + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); return new DeviceViewIntoOperator( operatorContext, @@ -1435,7 +1463,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP deviceViewIntoPathDescriptor.getTargetDeviceToAlignedMap(), deviceViewIntoPathDescriptor.getDeviceToSourceTargetPathPairListMap(), sourceColumnToInputLocationMap, - context.getIntoOperationExecutor()); + context.getIntoOperationExecutor(), + maxStatementSize, + maxReturnSize); } private Map<String, InputLocation> constructSourceColumnToInputLocationMap(PlanNode node) { @@ -1469,6 +1499,38 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } } + private long calculateStatementSizePerLine( + Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap) { + long maxStatementSize = Long.BYTES; + List<TSDataType> dataTypes = + targetPathToDataTypeMap.values().stream() + .flatMap(stringTSDataTypeMap -> stringTSDataTypeMap.values().stream()) + .collect(Collectors.toList()); + for (TSDataType dataType : dataTypes) { + maxStatementSize += getValueSizePerLine(dataType); + } + return maxStatementSize; + } + + private static long getValueSizePerLine(TSDataType tsDataType) { + switch (tsDataType) { + case INT32: + return Integer.BYTES; + case INT64: + return Long.BYTES; + case FLOAT: + return Float.BYTES; + case DOUBLE: + return Double.BYTES; + case BOOLEAN: + return Byte.BYTES; + case TEXT: + return StatisticsManager.getInstance().getMaxBinarySizeInBytes(new PartialPath()); + default: + throw new UnsupportedOperationException("Unknown data type " + tsDataType); + } + } + @Override public Operator visitTimeJoin(TimeJoinNode node, LocalExecutionPlanContext context) { List<Operator> children =
