This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a49958c1f30 Correct binary column memory calculation (#14796)
a49958c1f30 is described below

commit a49958c1f30c5e9d6a795a03489421346c28594d
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Feb 7 15:38:09 2025 +0800

    Correct binary column memory calculation (#14796)
    
    * use getSizeInBytes instead of getRetainedSizeInBytes in some places
    
    * change tsfile version
---
 .../db/queryengine/execution/exchange/SharedTsBlockQueue.java |  8 ++++----
 .../db/queryengine/execution/exchange/sink/SinkChannel.java   | 11 +++++------
 .../execution/exchange/source/LocalSourceHandle.java          |  4 +---
 .../db/queryengine/execution/operator/AbstractOperator.java   |  2 +-
 .../execution/operator/process/AbstractSortOperator.java      |  2 +-
 .../queryengine/execution/operator/process/SortOperator.java  |  2 +-
 .../execution/operator/process/TableStreamSortOperator.java   |  2 +-
 .../aggregation/grouped/HashAggregationOperator.java          |  4 +---
 .../aggregation/grouped/StreamingAggregationOperator.java     |  9 ++++-----
 .../aggregation/grouped/StreamingHashAggregationOperator.java |  4 +---
 .../apache/iotdb/db/queryengine/execution/exchange/Utils.java |  3 +++
 .../db/queryengine/execution/operator/OperatorMemoryTest.java |  2 ++
 pom.xml                                                       |  2 +-
 13 files changed, 26 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index b3b94f66282..f3810426398 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -196,8 +196,8 @@ public class SharedTsBlockQueue {
             localFragmentInstanceId.getQueryId(),
             fullFragmentInstanceId,
             localPlanNodeId,
-            tsBlock.getRetainedSizeInBytes());
-    bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
+            tsBlock.getSizeInBytes());
+    bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
     // Every time LocalSourceHandle consumes a TsBlock, it needs to send the 
event to
     // corresponding LocalSinkChannel.
     if (sinkChannel != null) {
@@ -236,10 +236,10 @@ public class SharedTsBlockQueue {
                 localFragmentInstanceId.getQueryId(),
                 fullFragmentInstanceId,
                 localPlanNodeId,
-                tsBlock.getRetainedSizeInBytes(),
+                tsBlock.getSizeInBytes(),
                 maxBytesCanReserve);
     blockedOnMemory = pair.left;
-    bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+    bufferRetainedSizeInBytes += tsBlock.getSizeInBytes();
 
     // reserve memory failed, we should wait until there is enough memory
     if (!Boolean.TRUE.equals(pair.right)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index 9f29a8a753f..8915938eb03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -201,7 +201,7 @@ public class SinkChannel implements ISinkChannel {
       if (noMoreTsBlocks) {
         return;
       }
-      long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
+      long sizeInBytes = tsBlock.getSizeInBytes();
       int startSequenceId;
       startSequenceId = nextSequenceId;
       blocked =
@@ -211,17 +211,16 @@ public class SinkChannel implements ISinkChannel {
                   localFragmentInstanceId.getQueryId(),
                   fullFragmentInstanceId,
                   localPlanNodeId,
-                  retainedSizeInBytes,
+                  sizeInBytes,
                   maxBytesCanReserve)
               .left;
-      bufferRetainedSizeInBytes += retainedSizeInBytes;
+      bufferRetainedSizeInBytes += sizeInBytes;
 
       sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock, 
currentTsBlockSize));
       nextSequenceId += 1;
-      currentTsBlockSize = retainedSizeInBytes;
+      currentTsBlockSize = sizeInBytes;
 
-      // TODO: consider merge multiple NewDataBlockEvent for less network 
traffic.
-      submitSendNewDataBlockEventTask(startSequenceId, 
ImmutableList.of(retainedSizeInBytes));
+      submitSendNewDataBlockEventTask(startSequenceId, 
ImmutableList.of(sizeInBytes));
     } finally {
       DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
           SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index 4da89f72d07..63feb571cb0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -124,9 +124,7 @@ public class LocalSourceHandle implements ISourceHandle {
       if (tsBlock != null) {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug(
-              "[GetTsBlockFromQueue] TsBlock:{} size:{}",
-              currSequenceId,
-              tsBlock.getRetainedSizeInBytes());
+              "[GetTsBlockFromQueue] TsBlock:{} size:{}", currSequenceId, 
tsBlock.getSizeInBytes());
         }
         currSequenceId++;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java
index 1278dc15d8d..5c6d731f9a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java
@@ -44,7 +44,7 @@ public abstract class AbstractOperator implements Operator {
     long oneTupleSize =
         Math.max(
             1,
-            (tsBlock.getRetainedSizeInBytes() - tsBlock.getTotalInstanceSize())
+            (tsBlock.getSizeInBytes() - tsBlock.getTotalInstanceSize())
                 / tsBlock.getPositionCount());
     if (oneTupleSize > maxReturnSize) {
       // make sure at least one-tuple-at-a-time
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
index c017cded6b7..0bfa1d3e767 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
@@ -162,7 +162,7 @@ public abstract class AbstractSortOperator implements 
ProcessOperator {
   }
 
   protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
-    long bytesSize = tsBlock.getRetainedSizeInBytes();
+    long bytesSize = tsBlock.getSizeInBytes();
     if (bytesSize + cachedBytes < sortBufferManager.getSortBufferSize()) {
       cachedBytes += bytesSize;
       for (int i = 0; i < tsBlock.getPositionCount(); i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
index eb556e1cfac..31029a30da6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
@@ -62,7 +62,7 @@ public abstract class SortOperator extends 
AbstractSortOperator {
       if (tsBlock == null) {
         return null;
       }
-      dataSize += tsBlock.getRetainedSizeInBytes();
+      dataSize += tsBlock.getSizeInBytes();
       cacheTsBlock(tsBlock);
     } catch (IoTDBException e) {
       clear();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java
index 71e938f4f30..90565a85875 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java
@@ -119,7 +119,7 @@ public class TableStreamSortOperator extends 
AbstractSortOperator {
           return null;
         }
         // record total sorted data size
-        dataSize += currentTsBlock.getRetainedSizeInBytes();
+        dataSize += currentTsBlock.getSizeInBytes();
 
         // if currentTsBlock line count + remainingCount is still less than 
minLinesToOutput, just
         // cache it
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
index 31ddcbe293b..ce7e91f8448 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
@@ -45,8 +45,6 @@ public class HashAggregationOperator extends AbstractOperator 
{
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(HashAggregationOperator.class);
 
-  private final OperatorContext operatorContext;
-
   private final Operator child;
 
   private final List<Type> groupByTypes;
@@ -81,7 +79,7 @@ public class HashAggregationOperator extends AbstractOperator 
{
       long maxPartialMemory,
       boolean spillEnabled,
       long unspillMemoryLimit) {
-    this.operatorContext = operatorContext;
+    super.operatorContext = operatorContext;
     this.child = child;
     this.groupByTypes = ImmutableList.copyOf(groupByTypes);
     this.groupByChannels = ImmutableList.copyOf(groupByChannels);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
index 7b0d37c365f..3ed2bbb1bde 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
@@ -23,7 +23,7 @@ import 
org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
 import org.apache.iotdb.db.utils.datastructure.SortKey;
@@ -52,8 +52,6 @@ public class StreamingAggregationOperator extends 
AbstractOperator {
   private static final long INSTANCE_SIZE =
       
RamUsageEstimator.shallowSizeOfInstance(StreamingAggregationOperator.class);
 
-  private final OperatorContext operatorContext;
-
   private final Operator child;
 
   private final List<TableAggregator> aggregators;
@@ -85,7 +83,7 @@ public class StreamingAggregationOperator extends 
AbstractOperator {
       long maxPartialMemory,
       boolean spillEnabled,
       long unSpillMemoryLimit) {
-    this.operatorContext = operatorContext;
+    super.operatorContext = operatorContext;
     this.child = child;
     this.groupByChannels = Ints.toArray(groupByChannels);
     this.groupKeyComparator = groupKeyComparator;
@@ -208,7 +206,8 @@ public class StreamingAggregationOperator extends 
AbstractOperator {
       outputs.add(
           resultBuilder.build(
               new RunLengthEncodedColumn(
-                  TableScanOperator.TIME_COLUMN_TEMPLATE, 
resultBuilder.getPositionCount())));
+                  AbstractTableScanOperator.TIME_COLUMN_TEMPLATE,
+                  resultBuilder.getPositionCount())));
       resultBuilder.reset();
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java
index 374a6964de2..8862a813718 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java
@@ -53,8 +53,6 @@ public class StreamingHashAggregationOperator extends 
AbstractOperator {
   private static final long INSTANCE_SIZE =
       
RamUsageEstimator.shallowSizeOfInstance(StreamingHashAggregationOperator.class);
 
-  private final OperatorContext operatorContext;
-
   private final Operator child;
 
   private final int[] preGroupedChannels;
@@ -103,7 +101,7 @@ public class StreamingHashAggregationOperator extends 
AbstractOperator {
       long maxPartialMemory,
       boolean spillEnabled,
       long unSpillMemoryLimit) {
-    this.operatorContext = operatorContext;
+    super.operatorContext = operatorContext;
     this.child = child;
 
     this.preGroupedChannels = Ints.toArray(preGroupedChannels);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
index 0c6eac0dbed..327d4a34c39 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
@@ -41,6 +41,7 @@ public class Utils {
     for (int i = 0; i < numOfTsBlocks; i++) {
       TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
       
Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
+      Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize);
       mockTsBlocks.add(mockTsBlock);
     }
 
@@ -50,6 +51,7 @@ public class Utils {
   public static TsBlock createMockTsBlock(long mockTsBlockSize) {
     TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
     
Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
+    Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize);
     return mockTsBlock;
   }
 
@@ -144,6 +146,7 @@ public class Utils {
     TsBlockSerde mockTsBlockSerde = Mockito.mock(TsBlockSerde.class);
     TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
     
Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
+    Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize);
     Mockito.when(mockTsBlockSerde.deserialize(Mockito.any(ByteBuffer.class)))
         .thenReturn(mockTsBlock);
     return mockTsBlockSerde;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
index 1bba28545bf..e7f662ab573 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
@@ -253,6 +253,7 @@ public class OperatorMemoryTest {
   public void lastCacheScanOperatorTest() {
     TsBlock tsBlock = Mockito.mock(TsBlock.class);
     Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(1024L);
+    Mockito.when(tsBlock.getSizeInBytes()).thenReturn(1024L);
     LastCacheScanOperator lastCacheScanOperator = new 
LastCacheScanOperator(null, null, tsBlock);
 
     assertEquals(1024, lastCacheScanOperator.calculateMaxPeekMemory());
@@ -383,6 +384,7 @@ public class OperatorMemoryTest {
   public void lastQuerySortOperatorTest() {
     TsBlock tsBlock = Mockito.mock(TsBlock.class);
     Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
+    Mockito.when(tsBlock.getSizeInBytes()).thenReturn(16 * 1024L);
     Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
     List<Operator> children = new ArrayList<>(4);
 
diff --git a/pom.xml b/pom.xml
index 1df4651c4d7..fe95956c1bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,7 +167,7 @@
         <thrift.version>0.14.1</thrift.version>
         <xz.version>1.9</xz.version>
         <zstd-jni.version>1.5.6-3</zstd-jni.version>
-        <tsfile.version>2.1.0-250206-SNAPSHOT</tsfile.version>
+        <tsfile.version>2.1.0-250207-SNAPSHOT</tsfile.version>
     </properties>
     <!--
     if we claim dependencies in dependencyManagement, then we do not claim

Reply via email to