This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch changcheng_0213 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 43056b51a26ac8477a70e9f51e1596d5ca8390e3 Author: Liao Lanyu <[email protected]> AuthorDate: Fri Feb 10 11:02:52 2023 +0800 [IOTDB-5501] Fix memory leak in MemoryPool --- .../iotdb/db/mpp/common/FragmentInstanceId.java | 7 ++++- .../mpp/execution/exchange/SharedTsBlockQueue.java | 23 ++++++++++---- .../db/mpp/execution/exchange/SinkHandle.java | 23 ++++++++++---- .../db/mpp/execution/exchange/SourceHandle.java | 25 ++++++++++++---- .../iotdb/db/mpp/execution/memory/MemoryPool.java | 35 ++++++++++++++++------ .../execution/exchange/LocalSinkHandleTest.java | 14 +++++++-- .../db/mpp/execution/exchange/SinkHandleTest.java | 28 +++++++++++------ .../mpp/execution/exchange/SourceHandleTest.java | 10 +++++-- 8 files changed, 125 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java index b1c12bb729..793066b94a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java @@ -107,6 +107,11 @@ public class FragmentInstanceId { } public static String createFullId(String queryId, int fragmentId, String instanceId) { - return queryId + '.' + fragmentId + '.' + instanceId; + return queryId + "." + fragmentId + "." + instanceId; + } + + public static String createFragmentInstanceIdFromTFragmentInstanceId( + TFragmentInstanceId tFragmentInstanceId) { + return tFragmentInstanceId.getFragmentId() + "." + tFragmentInstanceId.getInstanceId(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java index ade913aebc..a40202de46 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.exchange; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -49,6 +50,8 @@ public class SharedTsBlockQueue { private final String localPlanNodeId; + private final String fullFragmentInstanceId; + private final LocalMemoryManager localMemoryManager; private boolean noMoreTsBlocks = false; @@ -81,6 +84,8 @@ public class SharedTsBlockQueue { LocalMemoryManager localMemoryManager) { this.localFragmentInstanceId = Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be null"); + this.fullFragmentInstanceId = + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId); this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be null"); this.localMemoryManager = Validate.notNull(localMemoryManager, "local memory manager cannot be null"); @@ -159,7 +164,7 @@ public class SharedTsBlockQueue { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, tsBlock.getRetainedSizeInBytes()); bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes(); @@ -186,7 +191,7 @@ public class SharedTsBlockQueue { .getQueryPool() .reserve( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, tsBlock.getRetainedSizeInBytes(), maxBytesCanReserve); @@ -233,11 +238,15 @@ public class SharedTsBlockQueue { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } + localMemoryManager + .getQueryPool() + .clearMemoryReservationMap( + localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); } /** Destroy the queue and cancel the future. Should only be called in abnormal case */ @@ -258,11 +267,15 @@ public class SharedTsBlockQueue { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } + localMemoryManager + .getQueryPool() + .clearMemoryReservationMap( + localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); } /** Destroy the queue and cancel the future. Should only be called in abnormal case */ @@ -283,7 +296,7 @@ public class SharedTsBlockQueue { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java index 139387d510..fbc36ed34d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; @@ -69,6 +70,8 @@ public class SinkHandle implements ISinkHandle { private final String localPlanNodeId; private final TFragmentInstanceId localFragmentInstanceId; + + private final String fullFragmentInstanceId; private final LocalMemoryManager localMemoryManager; private final ExecutorService executorService; private final TsBlockSerde serde; @@ -122,6 +125,8 @@ public class SinkHandle implements ISinkHandle { this.remotePlanNodeId = Validate.notNull(remotePlanNodeId); this.localPlanNodeId = Validate.notNull(localPlanNodeId); this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId); + this.fullFragmentInstanceId = + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId); this.localMemoryManager = Validate.notNull(localMemoryManager); this.executorService = Validate.notNull(executorService); this.serde = Validate.notNull(serde); @@ -138,7 +143,7 @@ public class SinkHandle implements ISinkHandle { .getQueryPool() .reserve( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know maxBytesCanReserve after @@ -179,7 +184,7 @@ public class SinkHandle implements ISinkHandle { .getQueryPool() .reserve( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, retainedSizeInBytes, maxBytesCanReserve) @@ -223,11 +228,15 @@ public class SinkHandle implements ISinkHandle { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } + localMemoryManager + .getQueryPool() + .clearMemoryReservationMap( + localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); sinkHandleListener.onAborted(this); logger.debug("[EndAbortSinkHandle]"); } @@ -243,11 +252,15 @@ public class SinkHandle implements ISinkHandle { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } + localMemoryManager + .getQueryPool() + .clearMemoryReservationMap( + localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); sinkHandleListener.onFinish(this); logger.debug("[EndCloseSinkHandle]"); } @@ -327,7 +340,7 @@ public class SinkHandle implements ISinkHandle { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, freedBytes); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java index d164e9d1de..386fdd40b8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; @@ -68,6 +69,8 @@ public class SourceHandle implements ISourceHandle { private final TEndPoint remoteEndpoint; private final TFragmentInstanceId remoteFragmentInstanceId; private final TFragmentInstanceId localFragmentInstanceId; + + private final String fullFragmentInstanceId; private final String localPlanNodeId; private final LocalMemoryManager localMemoryManager; private final ExecutorService executorService; @@ -123,6 +126,8 @@ public class SourceHandle implements ISourceHandle { this.remoteEndpoint = Validate.notNull(remoteEndpoint); this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId); this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId); + this.fullFragmentInstanceId = + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId); this.localPlanNodeId = Validate.notNull(localPlanNodeId); this.localMemoryManager = Validate.notNull(localMemoryManager); this.executorService = Validate.notNull(executorService); @@ -172,7 +177,7 @@ public class SourceHandle implements ISourceHandle { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, retainedSize); @@ -214,7 +219,7 @@ public class SourceHandle implements ISourceHandle { .getQueryPool() .reserve( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, bytesToReserve, maxBytesCanReserve); @@ -316,11 +321,15 @@ public class SourceHandle implements ISourceHandle { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } + localMemoryManager + .getQueryPool() + .clearMemoryReservationMap( + localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); aborted = true; sourceHandleListener.onAborted(this); } @@ -349,11 +358,15 @@ public class SourceHandle implements ISourceHandle { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, bufferRetainedSizeInBytes); bufferRetainedSizeInBytes = 0; } + localMemoryManager + .getQueryPool() + .clearMemoryReservationMap( + localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, localPlanNodeId); closed = true; currSequenceId = lastSequenceId + 1; sourceHandleListener.onFinished(this); @@ -417,7 +430,7 @@ public class SourceHandle implements ISourceHandle { "Query[%s]-[%s-%s-SourceHandle-%s]", localFragmentInstanceId.getQueryId(), localFragmentInstanceId.getFragmentId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId); } @@ -527,7 +540,7 @@ public class SourceHandle implements ISourceHandle { .getQueryPool() .free( localFragmentInstanceId.getQueryId(), - localFragmentInstanceId.getInstanceId(), + fullFragmentInstanceId, localPlanNodeId, reservedBytes); sourceHandleListener.onFailure(SourceHandle.this, t); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java index 190cf24eb0..6972111810 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java @@ -292,14 +292,11 @@ public class MemoryPool { Validate.isTrue(bytes <= queryReservedBytes); queryReservedBytes -= bytes; - if (queryReservedBytes == 0) { - queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId); - } else { - queryMemoryReservations - .get(queryId) - .get(fragmentInstanceId) - .put(planNodeId, queryReservedBytes); - } + queryMemoryReservations + .get(queryId) + .get(fragmentInstanceId) + .put(planNodeId, queryReservedBytes); + reservedBytes -= bytes; if (memoryReservationFutures.isEmpty()) { @@ -350,7 +347,7 @@ public class MemoryPool { future.set(null); } catch (Throwable t) { // ignore it, because we still need to notify other future - LOGGER.error("error happened while trying to free memory: ", t); + LOGGER.warn("error happened while trying to free memory: ", t); } } } @@ -369,4 +366,24 @@ public class MemoryPool { public long getReservedBytes() { return reservedBytes; } + + public void clearMemoryReservationMap( + String queryId, String fragmentInstanceId, String planNodeId) { + if (queryMemoryReservations.get(queryId) == null + || queryMemoryReservations.get(queryId).get(fragmentInstanceId) == null) { + return; + } + Map<String, Long> planNodeIdToBytesReserved = + queryMemoryReservations.get(queryId).get(fragmentInstanceId); + if (planNodeIdToBytesReserved.get(planNodeId) == null + || planNodeIdToBytesReserved.get(planNodeId) <= 0) { + planNodeIdToBytesReserved.remove(planNodeId); + if (planNodeIdToBytesReserved.isEmpty()) { + queryMemoryReservations.get(queryId).remove(fragmentInstanceId); + } + if (queryMemoryReservations.get(queryId).isEmpty()) { + queryMemoryReservations.remove(queryId); + } + } + } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java index 0a022ae06b..8bcb1b300b 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.execution.exchange; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; import org.apache.iotdb.db.mpp.execution.memory.MemoryPool; @@ -86,7 +87,8 @@ public class LocalSinkHandleTest { Mockito.verify(spyMemoryPool, Mockito.times(11)) .reserve( queryId, - localFragmentInstanceId.getInstanceId(), + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + remoteFragmentInstanceId), remotePlanNodeId, mockTsBlockSize, Long.MAX_VALUE); @@ -102,7 +104,12 @@ public class LocalSinkHandleTest { Assert.assertFalse(localSinkHandle.isFinished()); Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes()); Mockito.verify(spyMemoryPool, Mockito.times(11)) - .free(queryId, localFragmentInstanceId.getInstanceId(), remotePlanNodeId, mockTsBlockSize); + .free( + queryId, + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + remoteFragmentInstanceId), + remotePlanNodeId, + mockTsBlockSize); // Set no-more-TsBlocks. localSinkHandle.setNoMoreTsBlocks(); @@ -169,7 +176,8 @@ public class LocalSinkHandleTest { Mockito.verify(spyMemoryPool, Mockito.times(11)) .reserve( queryId, - localFragmentInstanceId.getInstanceId(), + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + remoteFragmentInstanceId), remotePlanNodeId, mockTsBlockSize, Long.MAX_VALUE); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java index 0c8ebf9f73..d50e69dd2a 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; import org.apache.iotdb.db.mpp.execution.memory.MemoryPool; @@ -116,7 +117,8 @@ public class SinkHandleTest { // Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1)) // .reserve( // queryId, - // localFragmentInstanceId.getInstanceId(), + // + // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId), // localPlanNodeId, // mockTsBlockSize * numOfMockTsBlock, // Long.MAX_VALUE); @@ -164,7 +166,8 @@ public class SinkHandleTest { Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1)) .free( queryId, - localFragmentInstanceId.getInstanceId(), + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + localFragmentInstanceId), localPlanNodeId, numOfMockTsBlock * mockTsBlockSize); Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle); @@ -201,7 +204,8 @@ public class SinkHandleTest { MemoryPool mockMemoryPool = Utils.createMockBlockedMemoryPool( queryId, - localFragmentInstanceId.getInstanceId(), + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + localFragmentInstanceId), localPlanNodeId, numOfMockTsBlock, mockTsBlockSize); @@ -261,7 +265,8 @@ public class SinkHandleTest { // Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1)) // .reserve( // queryId, - // localFragmentInstanceId.getInstanceId(), + // + // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId), // localPlanNodeId, // mockTsBlockSize * numOfMockTsBlock, // Long.MAX_VALUE); @@ -302,7 +307,8 @@ public class SinkHandleTest { Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1)) .free( queryId, - localFragmentInstanceId.getInstanceId(), + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + localFragmentInstanceId), localPlanNodeId, numOfMockTsBlock * mockTsBlockSize); @@ -318,7 +324,8 @@ public class SinkHandleTest { // Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(3)) // .reserve( // queryId, - // localFragmentInstanceId.getInstanceId(), + // + // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId), // localPlanNodeId, // mockTsBlockSize * numOfMockTsBlock, // Long.MAX_VALUE); @@ -378,7 +385,8 @@ public class SinkHandleTest { Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(2)) .free( queryId, - localFragmentInstanceId.getInstanceId(), + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + localFragmentInstanceId), localPlanNodeId, numOfMockTsBlock * mockTsBlockSize); Mockito.verify(mockSinkHandleListener, Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle); @@ -401,7 +409,8 @@ public class SinkHandleTest { MemoryPool mockMemoryPool = Utils.createMockBlockedMemoryPool( queryId, - localFragmentInstanceId.getInstanceId(), + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + localFragmentInstanceId), localPlanNodeId, numOfMockTsBlock, mockTsBlockSize); @@ -462,7 +471,8 @@ public class SinkHandleTest { // Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1)) // .reserve( // queryId, - // localFragmentInstanceId.getInstanceId(), + // + // FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId), // localPlanNodeId, // mockTsBlockSize * numOfMockTsBlock, // Long.MAX_VALUE); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java index 1a5ad7560f..d0b8f49f29 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener; import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager; import org.apache.iotdb.db.mpp.execution.memory.MemoryPool; @@ -252,7 +253,8 @@ public class SourceHandleTest { Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6)) .reserve( queryId, - localFragmentInstanceId.getInstanceId(), + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + localFragmentInstanceId), localPlanNodeId, MOCK_TSBLOCK_SIZE, maxBytesCanReserve); @@ -283,7 +285,11 @@ public class SourceHandleTest { for (int i = 0; i < numOfMockTsBlock; i++) { Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i)) .free( - queryId, localFragmentInstanceId.getInstanceId(), localPlanNodeId, MOCK_TSBLOCK_SIZE); + queryId, + FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( + localFragmentInstanceId), + localPlanNodeId, + MOCK_TSBLOCK_SIZE); sourceHandle.receive(); try { if (i < 5) {
