This is an automated email from the ASF dual-hosted git repository. weihao pushed a commit to branch lwh/1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7672bbc6f511a80f2025373d1b73daea120f7c34 Author: Weihao Li <[email protected]> AuthorDate: Mon Mar 30 18:19:04 2026 +0800 add Signed-off-by: Weihao Li <[email protected]> --- .../operator/process/TransformOperator.java | 18 +-- .../execution/operator/TransformOperatorTest.java | 136 +++++++++++++++++++++ 2 files changed, 147 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java index be418dba049..e5ae29f0c4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java @@ -55,6 +55,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; public class TransformOperator implements ProcessOperator { @@ -230,6 +231,7 @@ public class TransformOperator implements ProcessOperator { @Override public TsBlock next() throws Exception { + long start = System.nanoTime(); try { YieldableState yieldableState = iterateAllColumnsToNextValid(); if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) { @@ -241,9 +243,10 @@ public class TransformOperator implements ProcessOperator { final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder(); final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); final int columnCount = columnBuilders.length; - - int rowCount = 0; - while (!timeHeap.isEmpty()) { + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + while (!timeHeap.isEmpty() + && !tsBlockBuilder.isFull() + && System.nanoTime() - start < maxRuntime) { final long currentTime = timeHeap.pollFirst(); // time @@ -258,25 +261,26 @@ public class TransformOperator implements ProcessOperator { } timeHeap.add(currentTime); - tsBlockBuilder.declarePositions(rowCount); return tsBlockBuilder.build(); } } prepareEachColumn(columnCount); - ++rowCount; + tsBlockBuilder.declarePosition(); yieldableState = iterateAllColumnsToNextValid(); if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) { - tsBlockBuilder.declarePositions(rowCount); return tsBlockBuilder.build(); } inputLayer.updateRowRecordListEvictionUpperBound(); } - tsBlockBuilder.declarePositions(rowCount); + if (tsBlockBuilder.isEmpty()) { + return null; + } + return tsBlockBuilder.build(); } catch (Exception e) { throw new RuntimeException(e); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java index fb9daccde03..044f99b8b88 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java @@ -20,7 +20,10 @@ package org.apache.iotdb.db.queryengine.execution.operator; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.NodeRef; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; @@ -28,18 +31,28 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.queryengine.execution.operator.process.TransformOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.transformation.api.LayerReader; import org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer; import org.apache.iotdb.db.queryengine.transformation.dag.input.TsBlockInputDataSet; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.column.LongColumn; import org.apache.tsfile.read.common.block.column.TimeColumn; +import org.junit.Assert; import org.junit.Test; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; @@ -139,4 +152,127 @@ public class TransformOperatorTest { reader.yield(); reader.consumedAll(); } + + @Test + public void testTransformResultLimit() throws Exception { + UDFClassLoaderManager.setupAndGetInstance(); + int savedMaxLine = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); + try { + int rowCount = 2001; + int maxLine = 200; + TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(200); + QueryId queryId = new QueryId("stub_query_chunk"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine( + instanceId, + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification")); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + PlanNodeId scanNodeId = new PlanNodeId("scan"); + driverContext.addOperatorContext(1, scanNodeId, SeriesScanOperator.class.getSimpleName()); + PlanNodeId transformNodeId = new PlanNodeId("transform"); + driverContext.addOperatorContext(2, transformNodeId, TransformOperator.class.getSimpleName()); + + long[] times = new long[rowCount]; + long[] values = new long[rowCount]; + for (int i = 0; i < rowCount; i++) { + times[i] = i; + values[i] = i * 10L; + } + TsBlock oneBatch = + new TsBlock( + new TimeColumn(rowCount, times), new LongColumn(rowCount, Optional.empty(), values)); + + Operator childOperator = + new Operator() { + boolean consumed = false; + + @Override + public OperatorContext getOperatorContext() { + return driverContext.getOperatorContexts().get(0); + } + + @Override + public TsBlock next() { + if (!consumed) { + consumed = true; + return oneBatch; + } + return null; + } + + @Override + public boolean hasNext() { + return !consumed; + } + + @Override + public void close() {} + + @Override + public boolean isFinished() { + return consumed; + } + + @Override + public long calculateMaxPeekMemory() { + return oneBatch.getSizeInBytes(); + } + + @Override + public long calculateMaxReturnSize() { + return oneBatch.getSizeInBytes(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + }; + + TimeSeriesOperand s1 = + new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"), TSDataType.INT64); + Map<String, List<InputLocation>> inputLocations = + ImmutableMap.of(s1.getExpressionString(), ImmutableList.of(new InputLocation(0, 0))); + Map<NodeRef<Expression>, TSDataType> expressionTypes = new HashMap<>(); + expressionTypes.put(NodeRef.of(s1), TSDataType.INT64); + + TransformOperator transform = + new TransformOperator( + driverContext.getOperatorContexts().get(1), + childOperator, + ImmutableList.of(TSDataType.INT64), + inputLocations, + new Expression[] {s1}, + true, + ZoneId.systemDefault(), + expressionTypes, + true); + + int totalOutRows = 0; + int nonNullNextCount = 0; + while (transform.hasNext()) { + TsBlock out = transform.next(); + if (out != null) { + nonNullNextCount++; + Assert.assertTrue( + "Each batch must be at most " + maxLine + " rows", out.getPositionCount() <= maxLine); + totalOutRows += out.getPositionCount(); + } + } + Assert.assertEquals(rowCount, totalOutRows); + System.out.println(nonNullNextCount); + Assert.assertTrue(nonNullNextCount >= 11); + } finally { + TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(savedMaxLine); + } + } }
