This is an automated email from the ASF dual-hosted git repository.

weihao pushed a commit to branch fix1.3Sort
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a28db929e112dcf773e928f67af2e10c0776fc63
Author: Weihao Li <[email protected]>
AuthorDate: Tue Feb 3 15:57:28 2026 +0800

    Fix according review
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../execution/operator/process/SortOperator.java   |   4 +-
 .../iotdb/db/utils/sort/FileSpillerReader.java     |   5 +
 .../apache/iotdb/db/utils/sort/MemoryReader.java   |   5 +
 .../org/apache/iotdb/db/utils/sort/SortReader.java |   3 +
 .../operator/SortOperatorSortBranchTest.java       | 206 +++++++++++++++++++++
 5 files changed, 221 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
index 834429cd1c3..b2bfc3785e7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
@@ -266,7 +266,7 @@ public class SortOperator implements ProcessOperator {
         mergeSortHeap.push(mergeSortKey);
       } else {
         noMoreData[readerIndex] = true;
-        sortBufferManager.releaseOneSortBranch();
+        sortReaders.get(readerIndex).releaseMemory();
       }
 
       // break if time is out or tsBlockBuilder is full or sortBuffer is not 
enough
@@ -289,7 +289,7 @@ public class SortOperator implements ProcessOperator {
           mergeSortHeap.push(mergeSortKey);
         } else {
           noMoreData[i] = true;
-          sortBufferManager.releaseOneSortBranch();
+          sortReader.releaseMemory();
         }
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
index d3ce760cdc8..98ca07dcd03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/FileSpillerReader.java
@@ -136,6 +136,11 @@ public class FileSpillerReader implements SortReader {
     return true;
   }
 
+  @Override
+  public void releaseMemory() {
+    sortBufferManager.releaseOneSortBranch();
+  }
+
   @Override
   public void close() throws IoTDBException {
     try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/MemoryReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/MemoryReader.java
index 65537daad70..1adc68be334 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/MemoryReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/MemoryReader.java
@@ -49,6 +49,11 @@ public class MemoryReader implements SortReader {
     return cachedData != null && rowIndex != size;
   }
 
+  @Override
+  public void releaseMemory() {
+    // do nothing, memory reader will not occupy sort buffer
+  }
+
   @Override
   public void close() throws IoTDBException {
     // do nothing
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortReader.java
index 734d8932825..d3c8a7099bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/sort/SortReader.java
@@ -39,6 +39,9 @@ public interface SortReader {
    */
   boolean hasNext() throws IoTDBException;
 
+  /** Release the memory resource (managed by {@link SortBufferManager}) of 
reader. */
+  void releaseMemory();
+
   /**
    * Close the sortReader and release resources.
    *
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorSortBranchTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorSortBranchTest.java
new file mode 100644
index 00000000000..0c2e1064bd9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/SortOperatorSortBranchTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.apache.iotdb.db.utils.EnvironmentUtils.cleanDir;
+import static org.junit.Assert.assertEquals;
+
+public class SortOperatorSortBranchTest {
+  private final String sortDir = "target" + File.separator + "sort";
+  private final String sortTmpPrefixPath = sortDir + File.separator + "tmp";
+
+  private int dataNodeId;
+
+  private int maxTsBlockSizeInBytes;
+
+  private long sortBufferSize;
+
+  @Before
+  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+    maxTsBlockSizeInBytes = 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+    sortBufferSize = 
IoTDBDescriptor.getInstance().getConfig().getSortBufferSize();
+    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
+    TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(15);
+    IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(150);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    cleanDir(sortDir);
+    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId);
+    
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockSizeInBytes(maxTsBlockSizeInBytes);
+    
IoTDBDescriptor.getInstance().getConfig().setSortBufferSize(sortBufferSize);
+  }
+
+  private SortOperator genSortOperator() {
+
+    // Construct operator tree
+    QueryId queryId = new QueryId("stub_query");
+
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(
+            instanceId,
+            IoTDBThreadPoolFactory.newFixedThreadPool(
+                1, "sort-operator-test-instance-notification"));
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+    PlanNodeId planNodeId1 = new PlanNodeId("1");
+    driverContext.addOperatorContext(1, planNodeId1, 
SeriesScanOperator.class.getSimpleName());
+    PlanNodeId planNodeId2 = new PlanNodeId("2");
+    driverContext.addOperatorContext(2, planNodeId2, 
SortOperator.class.getSimpleName());
+    List<TSDataType> outputTypes = ImmutableList.of(TSDataType.INT32);
+
+    Operator childOperator =
+        new Operator() {
+          int index = 0;
+          private final List<int[]> data =
+              ImmutableList.of(
+                  new int[] {
+                    20, 20, 21, 22, 23, 24, 25, 26,
+                  },
+                  new int[] {});
+
+          @Override
+          public OperatorContext getOperatorContext() {
+            return driverContext.getOperatorContexts().get(0);
+          }
+
+          @Override
+          public TsBlock next() {
+            TsBlockBuilder builder = new TsBlockBuilder(outputTypes);
+            int[] currentData = data.get(index);
+            index++;
+
+            ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders();
+            for (int i = 0; i < currentData.length; i++) {
+              columnBuilders[0].writeInt(currentData[i]);
+              builder.getTimeColumnBuilder().writeLong(currentData[i]);
+            }
+            builder.declarePositions(currentData.length);
+            TsBlock result = builder.build();
+            return result;
+          }
+
+          @Override
+          public boolean hasNext() throws Exception {
+            return !isFinished();
+          }
+
+          @Override
+          public void close() throws Exception {}
+
+          @Override
+          public boolean isFinished() throws Exception {
+            return index == data.size();
+          }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 0;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 0;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
+
+          @Override
+          public long ramBytesUsed() {
+            return 0;
+          }
+        };
+
+    OperatorContext operatorContext = 
driverContext.getOperatorContexts().get(1);
+    Comparator<SortKey> comparator =
+        MergeSortComparator.getComparator(
+            Arrays.asList(new SortItem(OrderByKey.DATANODEID, Ordering.ASC)),
+            ImmutableList.of(0),
+            outputTypes);
+    return new SortOperator(
+        operatorContext,
+        childOperator,
+        ImmutableList.of(TSDataType.INT32),
+        sortTmpPrefixPath,
+        comparator);
+  }
+
+  @Test
+  public void sortTest() throws Exception {
+    SortOperator operator = genSortOperator();
+    int[] expected =
+        new int[] {
+          20, 20, 21, 22, 23, 24, 25, 26,
+        };
+    int index = 0;
+    while (operator.hasNext()) {
+      TsBlock block = operator.next();
+      if (block != null) {
+        for (int i = 0; i < block.getPositionCount(); i++, index++) {
+          assertEquals(expected[index], block.getColumn(0).getInt(i));
+        }
+      }
+    }
+  }
+}

Reply via email to