This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a49958c1f30 Correct binary column memory calculation (#14796)
a49958c1f30 is described below
commit a49958c1f30c5e9d6a795a03489421346c28594d
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Feb 7 15:38:09 2025 +0800
Correct binary column memory calculation (#14796)
* use getSizeInBytes instead of getRetainedSizeInBytes in some places
* change tsfile version
---
.../db/queryengine/execution/exchange/SharedTsBlockQueue.java | 8 ++++----
.../db/queryengine/execution/exchange/sink/SinkChannel.java | 11 +++++------
.../execution/exchange/source/LocalSourceHandle.java | 4 +---
.../db/queryengine/execution/operator/AbstractOperator.java | 2 +-
.../execution/operator/process/AbstractSortOperator.java | 2 +-
.../queryengine/execution/operator/process/SortOperator.java | 2 +-
.../execution/operator/process/TableStreamSortOperator.java | 2 +-
.../aggregation/grouped/HashAggregationOperator.java | 4 +---
.../aggregation/grouped/StreamingAggregationOperator.java | 9 ++++-----
.../aggregation/grouped/StreamingHashAggregationOperator.java | 4 +---
.../apache/iotdb/db/queryengine/execution/exchange/Utils.java | 3 +++
.../db/queryengine/execution/operator/OperatorMemoryTest.java | 2 ++
pom.xml | 2 +-
13 files changed, 26 insertions(+), 29 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index b3b94f66282..f3810426398 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -196,8 +196,8 @@ public class SharedTsBlockQueue {
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
- tsBlock.getRetainedSizeInBytes());
- bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
+ tsBlock.getSizeInBytes());
+ bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
// Every time LocalSourceHandle consumes a TsBlock, it needs to send the
event to
// corresponding LocalSinkChannel.
if (sinkChannel != null) {
@@ -236,10 +236,10 @@ public class SharedTsBlockQueue {
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
- tsBlock.getRetainedSizeInBytes(),
+ tsBlock.getSizeInBytes(),
maxBytesCanReserve);
blockedOnMemory = pair.left;
- bufferRetainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
+ bufferRetainedSizeInBytes += tsBlock.getSizeInBytes();
// reserve memory failed, we should wait until there is enough memory
if (!Boolean.TRUE.equals(pair.right)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
index 9f29a8a753f..8915938eb03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.java
@@ -201,7 +201,7 @@ public class SinkChannel implements ISinkChannel {
if (noMoreTsBlocks) {
return;
}
- long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
+ long sizeInBytes = tsBlock.getSizeInBytes();
int startSequenceId;
startSequenceId = nextSequenceId;
blocked =
@@ -211,17 +211,16 @@ public class SinkChannel implements ISinkChannel {
localFragmentInstanceId.getQueryId(),
fullFragmentInstanceId,
localPlanNodeId,
- retainedSizeInBytes,
+ sizeInBytes,
maxBytesCanReserve)
.left;
- bufferRetainedSizeInBytes += retainedSizeInBytes;
+ bufferRetainedSizeInBytes += sizeInBytes;
sequenceIdToTsBlock.put(nextSequenceId, new Pair<>(tsBlock,
currentTsBlockSize));
nextSequenceId += 1;
- currentTsBlockSize = retainedSizeInBytes;
+ currentTsBlockSize = sizeInBytes;
- // TODO: consider merge multiple NewDataBlockEvent for less network
traffic.
- submitSendNewDataBlockEventTask(startSequenceId,
ImmutableList.of(retainedSizeInBytes));
+ submitSendNewDataBlockEventTask(startSequenceId,
ImmutableList.of(sizeInBytes));
} finally {
DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(
SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - startTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index 4da89f72d07..63feb571cb0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -124,9 +124,7 @@ public class LocalSourceHandle implements ISourceHandle {
if (tsBlock != null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
- "[GetTsBlockFromQueue] TsBlock:{} size:{}",
- currSequenceId,
- tsBlock.getRetainedSizeInBytes());
+ "[GetTsBlockFromQueue] TsBlock:{} size:{}", currSequenceId,
tsBlock.getSizeInBytes());
}
currSequenceId++;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java
index 1278dc15d8d..5c6d731f9a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/AbstractOperator.java
@@ -44,7 +44,7 @@ public abstract class AbstractOperator implements Operator {
long oneTupleSize =
Math.max(
1,
- (tsBlock.getRetainedSizeInBytes() - tsBlock.getTotalInstanceSize())
+ (tsBlock.getSizeInBytes() - tsBlock.getTotalInstanceSize())
/ tsBlock.getPositionCount());
if (oneTupleSize > maxReturnSize) {
// make sure at least one-tuple-at-a-time
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
index c017cded6b7..0bfa1d3e767 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AbstractSortOperator.java
@@ -162,7 +162,7 @@ public abstract class AbstractSortOperator implements
ProcessOperator {
}
protected void cacheTsBlock(TsBlock tsBlock) throws IoTDBException {
- long bytesSize = tsBlock.getRetainedSizeInBytes();
+ long bytesSize = tsBlock.getSizeInBytes();
if (bytesSize + cachedBytes < sortBufferManager.getSortBufferSize()) {
cachedBytes += bytesSize;
for (int i = 0; i < tsBlock.getPositionCount(); i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
index eb556e1cfac..31029a30da6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/SortOperator.java
@@ -62,7 +62,7 @@ public abstract class SortOperator extends
AbstractSortOperator {
if (tsBlock == null) {
return null;
}
- dataSize += tsBlock.getRetainedSizeInBytes();
+ dataSize += tsBlock.getSizeInBytes();
cacheTsBlock(tsBlock);
} catch (IoTDBException e) {
clear();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java
index 71e938f4f30..90565a85875 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TableStreamSortOperator.java
@@ -119,7 +119,7 @@ public class TableStreamSortOperator extends
AbstractSortOperator {
return null;
}
// record total sorted data size
- dataSize += currentTsBlock.getRetainedSizeInBytes();
+ dataSize += currentTsBlock.getSizeInBytes();
// if currentTsBlock line count + remainingCount is still less than
minLinesToOutput, just
// cache it
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
index 31ddcbe293b..ce7e91f8448 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
@@ -45,8 +45,6 @@ public class HashAggregationOperator extends AbstractOperator
{
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(HashAggregationOperator.class);
- private final OperatorContext operatorContext;
-
private final Operator child;
private final List<Type> groupByTypes;
@@ -81,7 +79,7 @@ public class HashAggregationOperator extends AbstractOperator
{
long maxPartialMemory,
boolean spillEnabled,
long unspillMemoryLimit) {
- this.operatorContext = operatorContext;
+ super.operatorContext = operatorContext;
this.child = child;
this.groupByTypes = ImmutableList.copyOf(groupByTypes);
this.groupByChannels = ImmutableList.copyOf(groupByChannels);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
index 7b0d37c365f..3ed2bbb1bde 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingAggregationOperator.java
@@ -23,7 +23,7 @@ import
org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
import org.apache.iotdb.db.utils.datastructure.SortKey;
@@ -52,8 +52,6 @@ public class StreamingAggregationOperator extends
AbstractOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(StreamingAggregationOperator.class);
- private final OperatorContext operatorContext;
-
private final Operator child;
private final List<TableAggregator> aggregators;
@@ -85,7 +83,7 @@ public class StreamingAggregationOperator extends
AbstractOperator {
long maxPartialMemory,
boolean spillEnabled,
long unSpillMemoryLimit) {
- this.operatorContext = operatorContext;
+ super.operatorContext = operatorContext;
this.child = child;
this.groupByChannels = Ints.toArray(groupByChannels);
this.groupKeyComparator = groupKeyComparator;
@@ -208,7 +206,8 @@ public class StreamingAggregationOperator extends
AbstractOperator {
outputs.add(
resultBuilder.build(
new RunLengthEncodedColumn(
- TableScanOperator.TIME_COLUMN_TEMPLATE,
resultBuilder.getPositionCount())));
+ AbstractTableScanOperator.TIME_COLUMN_TEMPLATE,
+ resultBuilder.getPositionCount())));
resultBuilder.reset();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java
index 374a6964de2..8862a813718 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/StreamingHashAggregationOperator.java
@@ -53,8 +53,6 @@ public class StreamingHashAggregationOperator extends
AbstractOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(StreamingHashAggregationOperator.class);
- private final OperatorContext operatorContext;
-
private final Operator child;
private final int[] preGroupedChannels;
@@ -103,7 +101,7 @@ public class StreamingHashAggregationOperator extends
AbstractOperator {
long maxPartialMemory,
boolean spillEnabled,
long unSpillMemoryLimit) {
- this.operatorContext = operatorContext;
+ super.operatorContext = operatorContext;
this.child = child;
this.preGroupedChannels = Ints.toArray(preGroupedChannels);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
index 0c6eac0dbed..327d4a34c39 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/Utils.java
@@ -41,6 +41,7 @@ public class Utils {
for (int i = 0; i < numOfTsBlocks; i++) {
TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
+ Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize);
mockTsBlocks.add(mockTsBlock);
}
@@ -50,6 +51,7 @@ public class Utils {
public static TsBlock createMockTsBlock(long mockTsBlockSize) {
TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
+ Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize);
return mockTsBlock;
}
@@ -144,6 +146,7 @@ public class Utils {
TsBlockSerde mockTsBlockSerde = Mockito.mock(TsBlockSerde.class);
TsBlock mockTsBlock = Mockito.mock(TsBlock.class);
Mockito.when(mockTsBlock.getRetainedSizeInBytes()).thenReturn(mockTsBlockSize);
+ Mockito.when(mockTsBlock.getSizeInBytes()).thenReturn(mockTsBlockSize);
Mockito.when(mockTsBlockSerde.deserialize(Mockito.any(ByteBuffer.class)))
.thenReturn(mockTsBlock);
return mockTsBlockSerde;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
index 1bba28545bf..e7f662ab573 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
@@ -253,6 +253,7 @@ public class OperatorMemoryTest {
public void lastCacheScanOperatorTest() {
TsBlock tsBlock = Mockito.mock(TsBlock.class);
Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(1024L);
+ Mockito.when(tsBlock.getSizeInBytes()).thenReturn(1024L);
LastCacheScanOperator lastCacheScanOperator = new
LastCacheScanOperator(null, null, tsBlock);
assertEquals(1024, lastCacheScanOperator.calculateMaxPeekMemory());
@@ -383,6 +384,7 @@ public class OperatorMemoryTest {
public void lastQuerySortOperatorTest() {
TsBlock tsBlock = Mockito.mock(TsBlock.class);
Mockito.when(tsBlock.getRetainedSizeInBytes()).thenReturn(16 * 1024L);
+ Mockito.when(tsBlock.getSizeInBytes()).thenReturn(16 * 1024L);
Mockito.when(tsBlock.getPositionCount()).thenReturn(16);
List<Operator> children = new ArrayList<>(4);
diff --git a/pom.xml b/pom.xml
index 1df4651c4d7..fe95956c1bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,7 +167,7 @@
<thrift.version>0.14.1</thrift.version>
<xz.version>1.9</xz.version>
<zstd-jni.version>1.5.6-3</zstd-jni.version>
- <tsfile.version>2.1.0-250206-SNAPSHOT</tsfile.version>
+ <tsfile.version>2.1.0-250207-SNAPSHOT</tsfile.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim