This is an automated email from the ASF dual-hosted git repository. weihao pushed a commit to branch fix1.3Sort in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a28db929e112dcf773e928f67af2e10c0776fc63 Author: Weihao Li <[email protected]> AuthorDate: Tue Feb 3 15:57:28 2026 +0800 Fix according review Signed-off-by: Weihao Li <[email protected]> --- .../execution/operator/process/SortOperator.java | 4 +- .../iotdb/db/utils/sort/FileSpillerReader.java | 5 + .../apache/iotdb/db/utils/sort/MemoryReader.java | 5 + .../org/apache/iotdb/db/utils/sort/SortReader.java | 3 + .../operator/SortOperatorSortBranchTest.java | 206 +++++++++++++++++++++ 5 files changed, 221 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java index 834429cd1c3..b2bfc3785e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java @@ -266,7 +266,7 @@ public class SortOperator implements ProcessOperator { mergeSortHeap.push(mergeSortKey); } else { noMoreData[readerIndex] = true; - sortBufferManager.releaseOneSortBranch(); + sortReaders.get(readerIndex).releaseMemory(); } // break if time is out or tsBlockBuilder is full or sortBuffer is not enough @@ -289,7 +289,7 @@ public class SortOperator implements ProcessOperator { mergeSortHeap.push(mergeSortKey); } else { noMoreData[i] = true; - sortBufferManager.releaseOneSortBranch(); + sortReader.releaseMemory(); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java index d3ce760cdc8..98ca07dcd03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java @@ -136,6 +136,11 @@ public class FileSpillerReader implements SortReader { return true; } + @Override + public void releaseMemory() { + sortBufferManager.releaseOneSortBranch(); + } + @Override public void close() throws IoTDBException { try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/MemoryReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/MemoryReader.java index 65537daad70..1adc68be334 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/MemoryReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/MemoryReader.java @@ -49,6 +49,11 @@ public class MemoryReader implements SortReader { return cachedData != null && rowIndex != size; } + @Override + public void releaseMemory() { + // do nothing, memory reader will not occupy sort buffer + } + @Override public void close() throws IoTDBException { // do nothing diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortReader.java index 734d8932825..d3c8a7099bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortReader.java @@ -39,6 +39,9 @@ public interface SortReader { */ boolean hasNext() throws IoTDBException; + /** Release the memory resource (managed by {@link SortBufferManager}) of reader. */ + void releaseMemory(); + /** * Close the sortReader and release resources. * diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorSortBranchTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorSortBranchTest.java new file mode 100644 index 00000000000..0c2e1064bd9 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorSortBranchTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator; +import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; +import org.apache.iotdb.db.utils.datastructure.SortKey; + +import com.google.common.collect.ImmutableList; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanDir; +import static org.junit.Assert.assertEquals; + +public class SortOperatorSortBranchTest { + private final String sortDir = "target" + File.separator + "sort"; + private final String sortTmpPrefixPath = sortDir + File.separator + "tmp"; + + private int dataNodeId; + + private int maxTsBlockSizeInBytes; + + private long sortBufferSize; + + @Before + public void setUp() throws MetadataException, IOException, WriteProcessException { + dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + maxTsBlockSizeInBytes = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + sortBufferSize = IoTDBDescriptor.getInstance().getConfig().getSortBufferSize(); + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0); + TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(15); + IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(150); + } + + @After + public void tearDown() throws IOException { + cleanDir(sortDir); + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId); + TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes); + IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize); + } + + private SortOperator genSortOperator() { + + // Construct operator tree + QueryId queryId = new QueryId("stub_query"); + + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine( + instanceId, + IoTDBThreadPoolFactory.newFixedThreadPool( + 1, "sort-operator-test-instance-notification")); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId2 = new PlanNodeId("2"); + driverContext.addOperatorContext(2, planNodeId2, SortOperator.class.getSimpleName()); + List<TSDataType> outputTypes = ImmutableList.of(TSDataType.INT32); + + Operator childOperator = + new Operator() { + int index = 0; + private final List<int[]> data = + ImmutableList.of( + new int[] { + 20, 20, 21, 22, 23, 24, 25, 26, + }, + new int[] {}); + + @Override + public OperatorContext getOperatorContext() { + return driverContext.getOperatorContexts().get(0); + } + + @Override + public TsBlock next() { + TsBlockBuilder builder = new TsBlockBuilder(outputTypes); + int[] currentData = data.get(index); + index++; + + ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders(); + for (int i = 0; i < currentData.length; i++) { + columnBuilders[0].writeInt(currentData[i]); + builder.getTimeColumnBuilder().writeLong(currentData[i]); + } + builder.declarePositions(currentData.length); + TsBlock result = builder.build(); + return result; + } + + @Override + public boolean hasNext() throws Exception { + return !isFinished(); + } + + @Override + public void close() throws Exception {} + + @Override + public boolean isFinished() throws Exception { + return index == data.size(); + } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return 0; + } + + @Override + public long ramBytesUsed() { + return 0; + } + }; + + OperatorContext operatorContext = driverContext.getOperatorContexts().get(1); + Comparator<SortKey> comparator = + MergeSortComparator.getComparator( + Arrays.asList(new SortItem(OrderByKey.DATANODEID, Ordering.ASC)), + ImmutableList.of(0), + outputTypes); + return new SortOperator( + operatorContext, + childOperator, + ImmutableList.of(TSDataType.INT32), + sortTmpPrefixPath, + comparator); + } + + @Test + public void sortTest() throws Exception { + SortOperator operator = genSortOperator(); + int[] expected = + new int[] { + 20, 20, 21, 22, 23, 24, 25, 26, + }; + int index = 0; + while (operator.hasNext()) { + TsBlock block = operator.next(); + if (block != null) { + for (int i = 0; i < block.getPositionCount(); i++, index++) { + assertEquals(expected[index], block.getColumn(0).getInt(i)); + } + } + } + } +}
