This is an automated email from the ASF dual-hosted git repository. weihao pushed a commit to branch perfectTransform in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8f2087bc7c9907262985831635be4fef13d88af5 Author: Weihao Li <[email protected]> AuthorDate: Mon Mar 30 18:05:06 2026 +0800 add Signed-off-by: Weihao Li <[email protected]> --- .../operator/process/TransformOperator.java | 18 ++- .../process/join/FullOuterTimeJoinOperator.java | 8 +- .../execution/operator/TransformOperatorTest.java | 136 ++++++++++++++++++ .../join/FullOuterTimeJoinOperatorTest.java | 155 +++++++++++++++++++++ 4 files changed, 309 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java index 28a80462ea7..ad5192f9807 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java @@ -54,6 +54,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; public class TransformOperator implements ProcessOperator { @@ -225,6 +226,7 @@ public class TransformOperator implements ProcessOperator { @Override public TsBlock next() throws Exception { + long start = System.nanoTime(); try { YieldableState yieldableState = iterateAllColumnsToNextValid(); if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) { @@ -236,9 +238,10 @@ public class TransformOperator implements ProcessOperator { final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder(); final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); final int columnCount = columnBuilders.length; - - int rowCount = 0; - while (!timeHeap.isEmpty()) { + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + while (!timeHeap.isEmpty() + && !tsBlockBuilder.isFull() + && System.nanoTime() - start < maxRuntime) { final long currentTime = timeHeap.pollFirst(); // time @@ -253,25 +256,26 @@ public class TransformOperator implements ProcessOperator { } timeHeap.add(currentTime); - tsBlockBuilder.declarePositions(rowCount); return tsBlockBuilder.build(); } } prepareEachColumn(columnCount); - ++rowCount; + tsBlockBuilder.declarePosition(); yieldableState = iterateAllColumnsToNextValid(); if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) { - tsBlockBuilder.declarePositions(rowCount); return tsBlockBuilder.build(); } inputLayer.updateRowRecordListEvictionUpperBound(); } - tsBlockBuilder.declarePositions(rowCount); + if (tsBlockBuilder.isEmpty()) { + return null; + } + return tsBlockBuilder.build(); } catch (Exception e) { throw new RuntimeException(e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperator.java index 819a1562225..eb9f940a015 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperator.java @@ -39,6 +39,7 @@ import org.apache.tsfile.utils.RamUsageEstimator; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.successfulAsList; @@ -161,6 +162,8 @@ public class FullOuterTimeJoinOperator extends AbstractConsumeAllOperator { } TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder(); + long startTime = System.nanoTime(); + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); long currentTime; do { currentTime = timeSelector.pollFirst(); @@ -171,7 +174,10 @@ public class FullOuterTimeJoinOperator extends AbstractConsumeAllOperator { prepareForTimeHeap(); - } while (comparator.lessThan(currentTime, currentEndTime) && !timeSelector.isEmpty()); + } while (comparator.lessThan(currentTime, currentEndTime) + && !timeSelector.isEmpty() + && !tsBlockBuilder.isFull() + && System.nanoTime() - startTime < maxRuntime); resultTsBlock = tsBlockBuilder.build(); return checkTsBlockSizeAndGetResult(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java index fb9daccde03..044f99b8b88 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java @@ -20,7 +20,10 @@ package org.apache.iotdb.db.queryengine.execution.operator; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.NodeRef; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; @@ -28,18 +31,28 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.queryengine.execution.operator.process.TransformOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.transformation.api.LayerReader; import org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer; import org.apache.iotdb.db.queryengine.transformation.dag.input.TsBlockInputDataSet; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; import org.apache.tsfile.read.common.block.column.LongColumn; import org.apache.tsfile.read.common.block.column.TimeColumn; +import org.junit.Assert; import org.junit.Test; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; @@ -139,4 +152,127 @@ public class TransformOperatorTest { reader.yield(); reader.consumedAll(); } + + @Test + public void testTransformResultLimit() throws Exception { + UDFClassLoaderManager.setupAndGetInstance(); + int savedMaxLine = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); + try { + int rowCount = 2001; + int maxLine = 200; + TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(200); + QueryId queryId = new QueryId("stub_query_chunk"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine( + instanceId, + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification")); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + PlanNodeId scanNodeId = new PlanNodeId("scan"); + driverContext.addOperatorContext(1, scanNodeId, SeriesScanOperator.class.getSimpleName()); + PlanNodeId transformNodeId = new PlanNodeId("transform"); + driverContext.addOperatorContext(2, transformNodeId, TransformOperator.class.getSimpleName()); + + long[] times = new long[rowCount]; + long[] values = new long[rowCount]; + for (int i = 0; i < rowCount; i++) { + times[i] = i; + values[i] = i * 10L; + } + TsBlock oneBatch = + new TsBlock( + new TimeColumn(rowCount, times), new LongColumn(rowCount, Optional.empty(), values)); + + Operator childOperator = + new Operator() { + boolean consumed = false; + + @Override + public OperatorContext getOperatorContext() { + return driverContext.getOperatorContexts().get(0); + } + + @Override + public TsBlock next() { + if (!consumed) { + consumed = true; + return oneBatch; + } + return null; + } + + @Override + public boolean hasNext() { + return !consumed; + } + + @Override + public void close() {} + + @Override + public boolean isFinished() { + return consumed; + } + + @Override + public long calculateMaxPeekMemory() { + return oneBatch.getSizeInBytes(); + } + + @Override + public long calculateMaxReturnSize() { + return oneBatch.getSizeInBytes(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + }; + + TimeSeriesOperand s1 = + new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"), TSDataType.INT64); + Map<String, List<InputLocation>> inputLocations = + ImmutableMap.of(s1.getExpressionString(), ImmutableList.of(new InputLocation(0, 0))); + Map<NodeRef<Expression>, TSDataType> expressionTypes = new HashMap<>(); + expressionTypes.put(NodeRef.of(s1), TSDataType.INT64); + + TransformOperator transform = + new TransformOperator( + driverContext.getOperatorContexts().get(1), + childOperator, + ImmutableList.of(TSDataType.INT64), + inputLocations, + new Expression[] {s1}, + true, + ZoneId.systemDefault(), + expressionTypes, + true); + + int totalOutRows = 0; + int nonNullNextCount = 0; + while (transform.hasNext()) { + TsBlock out = transform.next(); + if (out != null) { + nonNullNextCount++; + Assert.assertTrue( + "Each batch must be at most " + maxLine + " rows", out.getPositionCount() <= maxLine); + totalOutRows += out.getPositionCount(); + } + } + Assert.assertEquals(rowCount, totalOutRows); + System.out.println(nonNullNextCount); + Assert.assertTrue(nonNullNextCount >= 11); + } finally { + TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(savedMaxLine); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperatorTest.java new file mode 100644 index 00000000000..d6693351049 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/FullOuterTimeJoinOperatorTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join; + +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger; +import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + +import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.units.Duration; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class FullOuterTimeJoinOperatorTest { + + @Test + public void testResultSizeLimit() throws Exception { + final int rowCount = 2000; + final int maxLine = 128; + + int savedMaxLine = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); + Duration savedRunTimeSlice = OperatorContext.getMaxRunTime(); + try { + TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(maxLine); + OperatorContext.setMaxRunTime(new Duration(1, TimeUnit.MINUTES)); + + OperatorContext operatorContext = Mockito.mock(OperatorContext.class); + Operator child1 = createSingleBatchOperator(operatorContext, rowCount, 0); + Operator child2 = createSingleBatchOperator(operatorContext, rowCount, 10000); + + List<TSDataType> dataTypes = Arrays.asList(TSDataType.INT32, TSDataType.INT32); + FullOuterTimeJoinOperator fullOuterTimeJoinOperator = + new FullOuterTimeJoinOperator( + operatorContext, + Arrays.asList(child1, child2), + Ordering.ASC, + dataTypes, + Arrays.asList( + new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()), + new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator())), + new AscTimeComparator()); + + int totalRows = 0; + int nonNullTsBlockCount = 0; + ListenableFuture<?> listenableFuture = fullOuterTimeJoinOperator.isBlocked(); + listenableFuture.get(); + while (!fullOuterTimeJoinOperator.isFinished() && fullOuterTimeJoinOperator.hasNext()) { + TsBlock tsBlock = fullOuterTimeJoinOperator.next(); + if (tsBlock != null && !tsBlock.isEmpty()) { + nonNullTsBlockCount++; + Assert.assertTrue(tsBlock.getPositionCount() <= maxLine); + totalRows += tsBlock.getPositionCount(); + } + listenableFuture = fullOuterTimeJoinOperator.isBlocked(); + listenableFuture.get(); + } + + Assert.assertEquals(rowCount, totalRows); + Assert.assertTrue(nonNullTsBlockCount >= 16); + } finally { + TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(savedMaxLine); + OperatorContext.setMaxRunTime(savedRunTimeSlice); + } + } + + private Operator createSingleBatchOperator( + OperatorContext operatorContext, int rowCount, int valueOffset) { + return new Operator() { + private boolean consumed = false; + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public TsBlock next() { + if (consumed) { + return null; + } + consumed = true; + + TsBlockBuilder builder = new TsBlockBuilder(rowCount, Arrays.asList(TSDataType.INT32)); + for (int i = 0; i < rowCount; i++) { + builder.getTimeColumnBuilder().writeLong(i); + builder.getColumnBuilder(0).writeInt(valueOffset + i); + } + builder.declarePositions(rowCount); + return builder.build(); + } + + @Override + public boolean hasNext() { + return !consumed; + } + + @Override + public void close() {} + + @Override + public boolean isFinished() { + return consumed; + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + }; + } +}
