This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ee6a4de [FLINK-10606][network][test] Construct NetworkEnvironment simple for tests ee6a4de is described below commit ee6a4dead18348f72166f980b01868d896e1f88f Author: Zhijiang <wangzhijiang...@aliyun.com> AuthorDate: Mon Oct 22 11:13:41 2018 +0800 [FLINK-10606][network][test] Construct NetworkEnvironment simple for tests --- .../runtime/io/network/NetworkEnvironment.java | 50 ++++++++++++++++------ .../runtime/io/network/NetworkEnvironmentTest.java | 31 +------------- .../partition/consumer/SingleInputGateTest.java | 43 ++++--------------- .../TaskManagerComponentsStartupShutdownTest.java | 15 +------ 4 files changed, 49 insertions(+), 90 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index f254756..1363175 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -90,19 +90,43 @@ public class NetworkEnvironment { private boolean isShutdown; public NetworkEnvironment( - NetworkBufferPool networkBufferPool, - ConnectionManager connectionManager, - ResultPartitionManager resultPartitionManager, - TaskEventDispatcher taskEventDispatcher, - KvStateRegistry kvStateRegistry, - KvStateServer kvStateServer, - KvStateClientProxy kvStateClientProxy, - IOMode defaultIOMode, - int partitionRequestInitialBackoff, - int partitionRequestMaxBackoff, - int networkBuffersPerChannel, - int extraNetworkBuffersPerGate, - boolean enableCreditBased) { + int numBuffers, + int memorySegmentSize, + int partitionRequestInitialBackoff, + int partitionRequestMaxBackoff, + int networkBuffersPerChannel, + int extraNetworkBuffersPerGate, + boolean enableCreditBased) { + this( + new NetworkBufferPool(numBuffers, memorySegmentSize), + new LocalConnectionManager(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new KvStateRegistry(), + null, + null, + IOManager.IOMode.SYNC, + partitionRequestInitialBackoff, + partitionRequestMaxBackoff, + networkBuffersPerChannel, + extraNetworkBuffersPerGate, + enableCreditBased); + } + + public NetworkEnvironment( + NetworkBufferPool networkBufferPool, + ConnectionManager connectionManager, + ResultPartitionManager resultPartitionManager, + TaskEventDispatcher taskEventDispatcher, + KvStateRegistry kvStateRegistry, + KvStateServer kvStateServer, + KvStateClientProxy kvStateClientProxy, + IOMode defaultIOMode, + int partitionRequestInitialBackoff, + int partitionRequestMaxBackoff, + int networkBuffersPerChannel, + int extraNetworkBuffersPerGate, + boolean enableCreditBased) { this.networkBufferPool = checkNotNull(networkBufferPool); this.connectionManager = checkNotNull(connectionManager); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index f0f1926..8c2fb7a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -31,7 +30,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskActions; @@ -79,21 +77,8 @@ public class NetworkEnvironmentTest { */ @Test public void testRegisterTaskUsesBoundedBuffers() throws Exception { - final NetworkEnvironment network = new NetworkEnvironment( - new NetworkBufferPool(numBuffers, memorySegmentSize), - new LocalConnectionManager(), - new ResultPartitionManager(), - new TaskEventDispatcher(), - new KvStateRegistry(), - null, - null, - IOManager.IOMode.SYNC, - 0, - 0, - 2, - 8, - enableCreditBasedFlowControl); + numBuffers, memorySegmentSize, 0, 0, 2, 8, enableCreditBasedFlowControl); // result partitions ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2); @@ -197,19 +182,7 @@ public class NetworkEnvironmentTest { private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception { final NetworkEnvironment network = new NetworkEnvironment( - new NetworkBufferPool(bufferPoolSize, memorySegmentSize), - new LocalConnectionManager(), - new ResultPartitionManager(), - new TaskEventDispatcher(), - new KvStateRegistry(), - null, - null, - IOManager.IOMode.SYNC, - 0, - 0, - 2, - 8, - enableCreditBasedFlowControl); + bufferPoolSize, memorySegmentSize, 0, 0, 2, 8, enableCreditBasedFlowControl); final ConnectionManager connManager = createDummyConnectionManager(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 4bf5b22..63f1855 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.LocalConnectionManager; @@ -45,7 +44,6 @@ import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.taskmanager.TaskActions; import org.junit.Test; @@ -71,7 +69,6 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -344,7 +341,8 @@ public class SingleInputGateTest { int initialBackoff = 137; int maxBackoff = 1001; - final NetworkEnvironment netEnv = createNetworkEnvironment(2, 8, initialBackoff, maxBackoff); + final NetworkEnvironment netEnv = new NetworkEnvironment( + 100, 32, initialBackoff, maxBackoff, 2, 8, enableCreditBasedFlowControl); SingleInputGate gate = SingleInputGate.create( "TestTask", @@ -403,8 +401,8 @@ public class SingleInputGateTest { final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED); int buffersPerChannel = 2; int extraNetworkBuffersPerGate = 8; - final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel, - extraNetworkBuffersPerGate, 0, 0); + final NetworkEnvironment network = new NetworkEnvironment( + 100, 32, 0, 0, buffersPerChannel, extraNetworkBuffersPerGate, enableCreditBasedFlowControl); try { final ResultPartitionID resultPartitionId = new ResultPartitionID(); @@ -415,8 +413,6 @@ public class SingleInputGateTest { NetworkBufferPool bufferPool = network.getNetworkBufferPool(); if (enableCreditBasedFlowControl) { - verify(bufferPool, - times(1)).requestMemorySegments(buffersPerChannel); RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels() .get(resultPartitionId.getPartitionId()); // only the exclusive buffers should be assigned/available now @@ -444,7 +440,8 @@ public class SingleInputGateTest { final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED); int buffersPerChannel = 2; int extraNetworkBuffersPerGate = 8; - final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel, extraNetworkBuffersPerGate, 0, 0); + final NetworkEnvironment network = new NetworkEnvironment( + 100, 32, 0, 0, buffersPerChannel, extraNetworkBuffersPerGate, enableCreditBasedFlowControl); try { final ResultPartitionID resultPartitionId = new ResultPartitionID(); @@ -454,8 +451,6 @@ public class SingleInputGateTest { NetworkBufferPool bufferPool = network.getNetworkBufferPool(); if (enableCreditBasedFlowControl) { - verify(bufferPool, times(0)).requestMemorySegments(buffersPerChannel); - assertEquals(bufferPool.getTotalNumberOfMemorySegments(), bufferPool.getNumberOfAvailableMemorySegments()); // note: exclusive buffers are not handed out into LocalBufferPool and are thus not counted @@ -471,8 +466,6 @@ public class SingleInputGateTest { ResultPartitionLocation.createRemote(connectionId))); if (enableCreditBasedFlowControl) { - verify(bufferPool, - times(1)).requestMemorySegments(buffersPerChannel); RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels() .get(resultPartitionId.getPartitionId()); // only the exclusive buffers should be assigned/available now @@ -499,7 +492,8 @@ public class SingleInputGateTest { public void testUpdateUnknownInputChannel() throws Exception { final SingleInputGate inputGate = createInputGate(2); int buffersPerChannel = 2; - final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel, 8, 0, 0); + final NetworkEnvironment network = new NetworkEnvironment( + 100, 32, 0, 0, buffersPerChannel, 8, enableCreditBasedFlowControl); try { final ResultPartitionID localResultPartitionId = new ResultPartitionID(); @@ -543,27 +537,6 @@ public class SingleInputGateTest { // --------------------------------------------------------------------------------------------- - private NetworkEnvironment createNetworkEnvironment( - int buffersPerChannel, - int extraNetworkBuffersPerGate, - int initialBackoff, - int maxBackoff) { - return new NetworkEnvironment( - spy(new NetworkBufferPool(100, 32)), - new LocalConnectionManager(), - new ResultPartitionManager(), - new TaskEventDispatcher(), - new KvStateRegistry(), - null, - null, - IOManager.IOMode.SYNC, - initialBackoff, - maxBackoff, - buffersPerChannel, - extraNetworkBuffersPerGate, - enableCreditBasedFlowControl); - } - private SingleInputGate createInputGate() { return createInputGate(2); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 9669513..c3118c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -33,11 +33,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.LocalConnectionManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.memory.MemoryManager; @@ -45,7 +41,6 @@ import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; -import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -146,14 +141,8 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { final MemoryManager memManager = new MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false); final IOManager ioManager = new IOManagerAsync(TMP_DIR); final NetworkEnvironment network = new NetworkEnvironment( - new NetworkBufferPool(32, netConf.networkBufferSize()), - new LocalConnectionManager(), - new ResultPartitionManager(), - new TaskEventDispatcher(), - new KvStateRegistry(), - null, - null, - netConf.ioMode(), + 32, + netConf.networkBufferSize(), netConf.partitionRequestInitialBackoff(), netConf.partitionRequestMaxBackoff(), netConf.networkBuffersPerChannel(),