This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7054c8ab8a2 Add result size limit and time slice control for 
TransformOperator
7054c8ab8a2 is described below

commit 7054c8ab8a259740f9160eda914bb74c36b12873
Author: Weihao Li <[email protected]>
AuthorDate: Mon Mar 30 20:44:02 2026 +0800

    Add result size limit and time slice control for TransformOperator
---
 .../operator/process/TransformOperator.java        |  18 +--
 .../execution/operator/TransformOperatorTest.java  | 136 +++++++++++++++++++++
 2 files changed, 147 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
index 28a80462ea7..ad5192f9807 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java
@@ -54,6 +54,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 public class TransformOperator implements ProcessOperator {
 
@@ -225,6 +226,7 @@ public class TransformOperator implements ProcessOperator {
   @Override
   public TsBlock next() throws Exception {
 
+    long start = System.nanoTime();
     try {
       YieldableState yieldableState = iterateAllColumnsToNextValid();
       if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
@@ -236,9 +238,10 @@ public class TransformOperator implements ProcessOperator {
       final TimeColumnBuilder timeBuilder = 
tsBlockBuilder.getTimeColumnBuilder();
       final ColumnBuilder[] columnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
       final int columnCount = columnBuilders.length;
-
-      int rowCount = 0;
-      while (!timeHeap.isEmpty()) {
+      long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+      while (!timeHeap.isEmpty()
+          && !tsBlockBuilder.isFull()
+          && System.nanoTime() - start < maxRuntime) {
         final long currentTime = timeHeap.pollFirst();
 
         // time
@@ -253,25 +256,26 @@ public class TransformOperator implements ProcessOperator 
{
             }
             timeHeap.add(currentTime);
 
-            tsBlockBuilder.declarePositions(rowCount);
             return tsBlockBuilder.build();
           }
         }
 
         prepareEachColumn(columnCount);
 
-        ++rowCount;
+        tsBlockBuilder.declarePosition();
 
         yieldableState = iterateAllColumnsToNextValid();
         if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
-          tsBlockBuilder.declarePositions(rowCount);
           return tsBlockBuilder.build();
         }
 
         inputLayer.updateRowRecordListEvictionUpperBound();
       }
 
-      tsBlockBuilder.declarePositions(rowCount);
+      if (tsBlockBuilder.isEmpty()) {
+        return null;
+      }
+
       return tsBlockBuilder.build();
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
index fb9daccde03..044f99b8b88 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java
@@ -20,7 +20,10 @@
 package org.apache.iotdb.db.queryengine.execution.operator;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.NodeRef;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
@@ -28,18 +31,28 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.TransformOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.input.TsBlockInputDataSet;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.column.LongColumn;
 import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
@@ -139,4 +152,127 @@ public class TransformOperatorTest {
     reader.yield();
     reader.consumedAll();
   }
+
+  @Test
+  public void testTransformResultLimit() throws Exception {
+    UDFClassLoaderManager.setupAndGetInstance();
+    int savedMaxLine = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
+    try {
+      int rowCount = 2001;
+      int maxLine = 200;
+      TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(200);
+      QueryId queryId = new QueryId("stub_query_chunk");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(
+              instanceId,
+              IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification"));
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+      PlanNodeId scanNodeId = new PlanNodeId("scan");
+      driverContext.addOperatorContext(1, scanNodeId, 
SeriesScanOperator.class.getSimpleName());
+      PlanNodeId transformNodeId = new PlanNodeId("transform");
+      driverContext.addOperatorContext(2, transformNodeId, 
TransformOperator.class.getSimpleName());
+
+      long[] times = new long[rowCount];
+      long[] values = new long[rowCount];
+      for (int i = 0; i < rowCount; i++) {
+        times[i] = i;
+        values[i] = i * 10L;
+      }
+      TsBlock oneBatch =
+          new TsBlock(
+              new TimeColumn(rowCount, times), new LongColumn(rowCount, 
Optional.empty(), values));
+
+      Operator childOperator =
+          new Operator() {
+            boolean consumed = false;
+
+            @Override
+            public OperatorContext getOperatorContext() {
+              return driverContext.getOperatorContexts().get(0);
+            }
+
+            @Override
+            public TsBlock next() {
+              if (!consumed) {
+                consumed = true;
+                return oneBatch;
+              }
+              return null;
+            }
+
+            @Override
+            public boolean hasNext() {
+              return !consumed;
+            }
+
+            @Override
+            public void close() {}
+
+            @Override
+            public boolean isFinished() {
+              return consumed;
+            }
+
+            @Override
+            public long calculateMaxPeekMemory() {
+              return oneBatch.getSizeInBytes();
+            }
+
+            @Override
+            public long calculateMaxReturnSize() {
+              return oneBatch.getSizeInBytes();
+            }
+
+            @Override
+            public long calculateRetainedSizeAfterCallingNext() {
+              return 0;
+            }
+
+            @Override
+            public long ramBytesUsed() {
+              return 0;
+            }
+          };
+
+      TimeSeriesOperand s1 =
+          new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"), 
TSDataType.INT64);
+      Map<String, List<InputLocation>> inputLocations =
+          ImmutableMap.of(s1.getExpressionString(), ImmutableList.of(new 
InputLocation(0, 0)));
+      Map<NodeRef<Expression>, TSDataType> expressionTypes = new HashMap<>();
+      expressionTypes.put(NodeRef.of(s1), TSDataType.INT64);
+
+      TransformOperator transform =
+          new TransformOperator(
+              driverContext.getOperatorContexts().get(1),
+              childOperator,
+              ImmutableList.of(TSDataType.INT64),
+              inputLocations,
+              new Expression[] {s1},
+              true,
+              ZoneId.systemDefault(),
+              expressionTypes,
+              true);
+
+      int totalOutRows = 0;
+      int nonNullNextCount = 0;
+      while (transform.hasNext()) {
+        TsBlock out = transform.next();
+        if (out != null) {
+          nonNullNextCount++;
+          Assert.assertTrue(
+              "Each batch must be at most " + maxLine + " rows", 
out.getPositionCount() <= maxLine);
+          totalOutRows += out.getPositionCount();
+        }
+      }
+      Assert.assertEquals(rowCount, totalOutRows);
+      System.out.println(nonNullNextCount);
+      Assert.assertTrue(nonNullNextCount >= 11);
+    } finally {
+      
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(savedMaxLine);
+    }
+  }
 }

Reply via email to