This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch MemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d0a2ba4e967ee9b33ff89cbe2ace68c433cc1bcf Author: zyk990424 <[email protected]> AuthorDate: Fri Aug 12 12:46:16 2022 +0800 add schema operator memory UT --- .../schema/NodeManageMemoryMergeOperator.java | 4 +- .../operator/schema/NodePathsCountOperator.java | 4 +- .../mpp/execution/operator/OperatorMemoryTest.java | 454 +++++++++++++++++++++ 3 files changed, 458 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java index b7688b023a..315872a27f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodeManageMemoryMergeOperator.java @@ -136,7 +136,7 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator { public long calculateMaxPeekMemory() { // todo calculate the result based on all the scan node; currently, this is shadowed by // schemaQueryMergeNode - return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + return Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory()); } @Override @@ -146,6 +146,6 @@ public class NodeManageMemoryMergeOperator implements ProcessOperator { @Override public long calculateRetainedSizeAfterCallingNext() { - return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java index 37b05c4e58..1417f302e6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/schema/NodePathsCountOperator.java @@ -104,7 +104,7 @@ public class NodePathsCountOperator implements ProcessOperator { public long calculateMaxPeekMemory() { // todo calculate the result based on all the scan node; currently, this is shadowed by // schemaQueryMergeNode - return 2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + return Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory()); } @Override @@ -114,6 +114,6 @@ public class NodePathsCountOperator implements ProcessOperator { @Override public long calculateRetainedSizeAfterCallingNext() { - return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; + return DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext(); } } diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java index c718942108..d0598e6494 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java @@ -45,6 +45,21 @@ import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryMergeOpe import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator; import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator; import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.LevelTimeSeriesCountOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.NodeManageMemoryMergeOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsConvertOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsCountOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.NodePathsSchemaScanOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.PathsUsingTemplateScanOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchMergeOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaFetchScanOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryMergeOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.SchemaQueryOrderByHeatOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesCountOperator; +import org.apache.iotdb.db.mpp.execution.operator.schema.TimeSeriesSchemaScanOperator; import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator; @@ -72,6 +87,7 @@ import org.mockito.Mockito; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Random; @@ -619,4 +635,442 @@ public class OperatorMemoryTest { operator.calculateMaxReturnSize()); assertEquals(512, operator.calculateRetainedSizeAfterCallingNext()); } + + @Test + public void TimeSeriesSchemaScanOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesScanOperator.class.getSimpleName()); + + TimeSeriesSchemaScanOperator operator = + new TimeSeriesSchemaScanOperator( + planNodeId, + fragmentInstanceContext.getOperatorContexts().get(0), + 0, + 0, + null, + null, + null, + false, + false, + false, + null); + + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize()); + assertEquals(0, operator.calculateRetainedSizeAfterCallingNext()); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void DeviceSchemaScanOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesScanOperator.class.getSimpleName()); + + DevicesSchemaScanOperator operator = + new DevicesSchemaScanOperator( + planNodeId, + fragmentInstanceContext.getOperatorContexts().get(0), + 0, + 0, + null, + false, + false); + + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize()); + assertEquals(0, operator.calculateRetainedSizeAfterCallingNext()); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void PathsUsingTemplateScanOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesScanOperator.class.getSimpleName()); + + PathsUsingTemplateScanOperator operator = + new PathsUsingTemplateScanOperator( + planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), 0); + + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize()); + assertEquals(0, operator.calculateRetainedSizeAfterCallingNext()); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void TimeSeriesCountOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesScanOperator.class.getSimpleName()); + + TimeSeriesCountOperator operator = + new TimeSeriesCountOperator( + planNodeId, + fragmentInstanceContext.getOperatorContexts().get(0), + null, + false, + null, + null, + false); + + assertEquals(4L, operator.calculateMaxPeekMemory()); + assertEquals(4L, operator.calculateMaxReturnSize()); + assertEquals(0, operator.calculateRetainedSizeAfterCallingNext()); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void LevelTimeSeriesCountOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesScanOperator.class.getSimpleName()); + + LevelTimeSeriesCountOperator operator = + new LevelTimeSeriesCountOperator( + planNodeId, + fragmentInstanceContext.getOperatorContexts().get(0), + null, + false, + 4, + null, + null, + false); + + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize()); + assertEquals(0, operator.calculateRetainedSizeAfterCallingNext()); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void DevicesCountOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesScanOperator.class.getSimpleName()); + + DevicesCountOperator operator = + new DevicesCountOperator( + planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, false); + + assertEquals(4L, operator.calculateMaxPeekMemory()); + assertEquals(4L, operator.calculateMaxReturnSize()); + assertEquals(0, operator.calculateRetainedSizeAfterCallingNext()); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void SchemaQueryMergeOperatorTest() { + QueryId queryId = new QueryId("stub_query"); + List<Operator> children = new ArrayList<>(4); + + long expectedMaxReturnSize = 0; + long expectedMaxPeekMemory = 0; + long expectedRetainedSize = 0; + + for (int i = 0; i < 4; i++) { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L); + Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L); + expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory()); + expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize()); + expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext(); + children.add(child); + } + + SchemaQueryMergeOperator operator = + new SchemaQueryMergeOperator( + queryId.genPlanNodeId(), Mockito.mock(OperatorContext.class), children); + + assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory()); + assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize()); + assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext()); + } + + @Test + public void CountMergeOperatorTest() { + QueryId queryId = new QueryId("stub_query"); + List<Operator> children = new ArrayList<>(4); + + long expectedMaxReturnSize = 0; + long expectedMaxPeekMemory = 0; + long expectedRetainedSize = 0; + + for (int i = 0; i < 4; i++) { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L); + Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L); + expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory()); + expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize()); + expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext(); + children.add(child); + } + + CountMergeOperator operator = + new CountMergeOperator( + queryId.genPlanNodeId(), Mockito.mock(OperatorContext.class), children); + + assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory()); + assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize()); + assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext()); + } + + @Test + public void SchemaFetchScanOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesScanOperator.class.getSimpleName()); + + SchemaFetchScanOperator operator = + new SchemaFetchScanOperator( + planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, null, null); + + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize()); + assertEquals(0, operator.calculateRetainedSizeAfterCallingNext()); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void SchemaFetchMergeOperatorTest() { + List<Operator> children = new ArrayList<>(4); + + long expectedMaxReturnSize = 0; + long expectedMaxPeekMemory = 0; + long expectedRetainedSize = 0; + + for (int i = 0; i < 4; i++) { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L); + Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L); + expectedMaxPeekMemory = Math.max(expectedMaxPeekMemory, child.calculateMaxPeekMemory()); + expectedMaxReturnSize = Math.max(expectedMaxReturnSize, child.calculateMaxReturnSize()); + expectedRetainedSize += child.calculateRetainedSizeAfterCallingNext(); + children.add(child); + } + + SchemaFetchMergeOperator operator = + new SchemaFetchMergeOperator(Mockito.mock(OperatorContext.class), children, null); + + assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory()); + assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize()); + assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext()); + } + + @Test + public void SchemaQueryOrderByHeatOperatorTest() { + List<Operator> children = new ArrayList<>(4); + + long expectedMaxReturnSize = 0; + long expectedMaxPeekMemory = 0; + long expectedRetainedSize = 0; + + for (int i = 0; i < 4; i++) { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L); + Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L); + expectedMaxPeekMemory += child.calculateMaxReturnSize(); + expectedMaxReturnSize += child.calculateMaxReturnSize(); + expectedRetainedSize += + child.calculateRetainedSizeAfterCallingNext() + child.calculateMaxReturnSize(); + children.add(child); + } + + SchemaQueryOrderByHeatOperator operator = + new SchemaQueryOrderByHeatOperator(Mockito.mock(OperatorContext.class), children); + + assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory()); + assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize()); + assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext()); + } + + @Test + public void NodePathsSchemaScanOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesScanOperator.class.getSimpleName()); + + NodePathsSchemaScanOperator operator = + new NodePathsSchemaScanOperator( + planNodeId, fragmentInstanceContext.getOperatorContexts().get(0), null, 4); + + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxPeekMemory()); + assertEquals(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, operator.calculateMaxReturnSize()); + assertEquals(0, operator.calculateRetainedSizeAfterCallingNext()); + + } finally { + instanceNotificationExecutor.shutdown(); + } + } + + @Test + public void NodePathsConvertOperatorTest() { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L); + Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L); + + long expectedMaxPeekMemory = child.calculateMaxPeekMemory() + child.calculateMaxReturnSize(); + long expectedMaxReturnSize = child.calculateMaxReturnSize(); + long expectedRetainedSize = child.calculateRetainedSizeAfterCallingNext(); + + NodePathsConvertOperator operator = + new NodePathsConvertOperator(Mockito.mock(OperatorContext.class), child); + + assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory()); + assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize()); + assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext()); + } + + @Test + public void NodePathsCountOperatorTest() { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L); + Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L); + + long expectedMaxPeekMemory = + Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory()); + long expectedMaxReturnSize = + Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize()); + long expectedRetainedSize = + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext(); + + NodePathsCountOperator operator = + new NodePathsCountOperator(Mockito.mock(OperatorContext.class), child); + + assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory()); + assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize()); + assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext()); + } + + @Test + public void NodeManageMemoryMergeOperatorTest() { + Operator child = Mockito.mock(Operator.class); + Mockito.when(child.calculateMaxPeekMemory()).thenReturn(128 * 1024L); + Mockito.when(child.calculateMaxReturnSize()).thenReturn(64 * 1024L); + Mockito.when(child.calculateRetainedSizeAfterCallingNext()).thenReturn(0L); + + long expectedMaxPeekMemory = + Math.max(2L * DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxPeekMemory()); + long expectedMaxReturnSize = + Math.max(DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, child.calculateMaxReturnSize()); + long expectedRetainedSize = + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES + child.calculateRetainedSizeAfterCallingNext(); + + NodeManageMemoryMergeOperator operator = + new NodeManageMemoryMergeOperator( + Mockito.mock(OperatorContext.class), Collections.emptySet(), child); + + assertEquals(expectedMaxPeekMemory, operator.calculateMaxPeekMemory()); + assertEquals(expectedMaxReturnSize, operator.calculateMaxReturnSize()); + assertEquals(expectedRetainedSize, operator.calculateRetainedSizeAfterCallingNext()); + } }
