This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7054c8ab8a2 Add result size limit and time slice control for
TransformOperator
7054c8ab8a2 is described below
commit 7054c8ab8a259740f9160eda914bb74c36b12873
Author: Weihao Li <[email protected]>
AuthorDate: Mon Mar 30 20:44:02 2026 +0800
Add result size limit and time slice control for TransformOperator
---
.../operator/process/TransformOperator.java | 18 +--
.../execution/operator/TransformOperatorTest.java | 136 +++++++++++++++++++++
2 files changed, 147 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
index 28a80462ea7..ad5192f9807 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
@@ -54,6 +54,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
public class TransformOperator implements ProcessOperator {
@@ -225,6 +226,7 @@ public class TransformOperator implements ProcessOperator {
@Override
public TsBlock next() throws Exception {
+ long start = System.nanoTime();
try {
YieldableState yieldableState = iterateAllColumnsToNextValid();
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
@@ -236,9 +238,10 @@ public class TransformOperator implements ProcessOperator {
final TimeColumnBuilder timeBuilder =
tsBlockBuilder.getTimeColumnBuilder();
final ColumnBuilder[] columnBuilders =
tsBlockBuilder.getValueColumnBuilders();
final int columnCount = columnBuilders.length;
-
- int rowCount = 0;
- while (!timeHeap.isEmpty()) {
+ long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+ while (!timeHeap.isEmpty()
+ && !tsBlockBuilder.isFull()
+ && System.nanoTime() - start < maxRuntime) {
final long currentTime = timeHeap.pollFirst();
// time
@@ -253,25 +256,26 @@ public class TransformOperator implements ProcessOperator
{
}
timeHeap.add(currentTime);
- tsBlockBuilder.declarePositions(rowCount);
return tsBlockBuilder.build();
}
}
prepareEachColumn(columnCount);
- ++rowCount;
+ tsBlockBuilder.declarePosition();
yieldableState = iterateAllColumnsToNextValid();
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
- tsBlockBuilder.declarePositions(rowCount);
return tsBlockBuilder.build();
}
inputLayer.updateRowRecordListEvictionUpperBound();
}
- tsBlockBuilder.declarePositions(rowCount);
+ if (tsBlockBuilder.isEmpty()) {
+ return null;
+ }
+
return tsBlockBuilder.build();
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
index fb9daccde03..044f99b8b88 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
@@ -20,7 +20,10 @@
package org.apache.iotdb.db.queryengine.execution.operator;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.NodeRef;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
@@ -28,18 +31,28 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
import
org.apache.iotdb.db.queryengine.execution.operator.process.TransformOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
import
org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer;
import
org.apache.iotdb.db.queryengine.transformation.dag.input.TsBlockInputDataSet;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.LongColumn;
import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.junit.Assert;
import org.junit.Test;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -139,4 +152,127 @@ public class TransformOperatorTest {
reader.yield();
reader.consumedAll();
}
+
+ @Test
+ public void testTransformResultLimit() throws Exception {
+ UDFClassLoaderManager.setupAndGetInstance();
+ int savedMaxLine =
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+ try {
+ int rowCount = 2001;
+ int maxLine = 200;
+ TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(200);
+ QueryId queryId = new QueryId("stub_query_chunk");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(
+ instanceId,
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification"));
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+ PlanNodeId scanNodeId = new PlanNodeId("scan");
+ driverContext.addOperatorContext(1, scanNodeId,
SeriesScanOperator.class.getSimpleName());
+ PlanNodeId transformNodeId = new PlanNodeId("transform");
+ driverContext.addOperatorContext(2, transformNodeId,
TransformOperator.class.getSimpleName());
+
+ long[] times = new long[rowCount];
+ long[] values = new long[rowCount];
+ for (int i = 0; i < rowCount; i++) {
+ times[i] = i;
+ values[i] = i * 10L;
+ }
+ TsBlock oneBatch =
+ new TsBlock(
+ new TimeColumn(rowCount, times), new LongColumn(rowCount,
Optional.empty(), values));
+
+ Operator childOperator =
+ new Operator() {
+ boolean consumed = false;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return driverContext.getOperatorContexts().get(0);
+ }
+
+ @Override
+ public TsBlock next() {
+ if (!consumed) {
+ consumed = true;
+ return oneBatch;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !consumed;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return consumed;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return oneBatch.getSizeInBytes();
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return oneBatch.getSizeInBytes();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+ };
+
+ TimeSeriesOperand s1 =
+ new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"),
TSDataType.INT64);
+ Map<String, List<InputLocation>> inputLocations =
+ ImmutableMap.of(s1.getExpressionString(), ImmutableList.of(new
InputLocation(0, 0)));
+ Map<NodeRef<Expression>, TSDataType> expressionTypes = new HashMap<>();
+ expressionTypes.put(NodeRef.of(s1), TSDataType.INT64);
+
+ TransformOperator transform =
+ new TransformOperator(
+ driverContext.getOperatorContexts().get(1),
+ childOperator,
+ ImmutableList.of(TSDataType.INT64),
+ inputLocations,
+ new Expression[] {s1},
+ true,
+ ZoneId.systemDefault(),
+ expressionTypes,
+ true);
+
+ int totalOutRows = 0;
+ int nonNullNextCount = 0;
+ while (transform.hasNext()) {
+ TsBlock out = transform.next();
+ if (out != null) {
+ nonNullNextCount++;
+ Assert.assertTrue(
+ "Each batch must be at most " + maxLine + " rows",
out.getPositionCount() <= maxLine);
+ totalOutRows += out.getPositionCount();
+ }
+ }
+ Assert.assertEquals(rowCount, totalOutRows);
+ System.out.println(nonNullNextCount);
+ Assert.assertTrue(nonNullNextCount >= 11);
+ } finally {
+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(savedMaxLine);
+ }
+ }
}