This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ee6a4de  [FLINK-10606][network][test] Construct NetworkEnvironment 
simple for tests
ee6a4de is described below

commit ee6a4dead18348f72166f980b01868d896e1f88f
Author: Zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Mon Oct 22 11:13:41 2018 +0800

    [FLINK-10606][network][test] Construct NetworkEnvironment simple for tests
---
 .../runtime/io/network/NetworkEnvironment.java     | 50 ++++++++++++++++------
 .../runtime/io/network/NetworkEnvironmentTest.java | 31 +-------------
 .../partition/consumer/SingleInputGateTest.java    | 43 ++++---------------
 .../TaskManagerComponentsStartupShutdownTest.java  | 15 +------
 4 files changed, 49 insertions(+), 90 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index f254756..1363175 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -90,19 +90,43 @@ public class NetworkEnvironment {
        private boolean isShutdown;
 
        public NetworkEnvironment(
-                       NetworkBufferPool networkBufferPool,
-                       ConnectionManager connectionManager,
-                       ResultPartitionManager resultPartitionManager,
-                       TaskEventDispatcher taskEventDispatcher,
-                       KvStateRegistry kvStateRegistry,
-                       KvStateServer kvStateServer,
-                       KvStateClientProxy kvStateClientProxy,
-                       IOMode defaultIOMode,
-                       int partitionRequestInitialBackoff,
-                       int partitionRequestMaxBackoff,
-                       int networkBuffersPerChannel,
-                       int extraNetworkBuffersPerGate,
-                       boolean enableCreditBased) {
+               int numBuffers,
+               int memorySegmentSize,
+               int partitionRequestInitialBackoff,
+               int partitionRequestMaxBackoff,
+               int networkBuffersPerChannel,
+               int extraNetworkBuffersPerGate,
+               boolean enableCreditBased) {
+               this(
+                       new NetworkBufferPool(numBuffers, memorySegmentSize),
+                       new LocalConnectionManager(),
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       new KvStateRegistry(),
+                       null,
+                       null,
+                       IOManager.IOMode.SYNC,
+                       partitionRequestInitialBackoff,
+                       partitionRequestMaxBackoff,
+                       networkBuffersPerChannel,
+                       extraNetworkBuffersPerGate,
+                       enableCreditBased);
+       }
+
+       public NetworkEnvironment(
+               NetworkBufferPool networkBufferPool,
+               ConnectionManager connectionManager,
+               ResultPartitionManager resultPartitionManager,
+               TaskEventDispatcher taskEventDispatcher,
+               KvStateRegistry kvStateRegistry,
+               KvStateServer kvStateServer,
+               KvStateClientProxy kvStateClientProxy,
+               IOMode defaultIOMode,
+               int partitionRequestInitialBackoff,
+               int partitionRequestMaxBackoff,
+               int networkBuffersPerChannel,
+               int extraNetworkBuffersPerGate,
+               boolean enableCreditBased) {
 
                this.networkBufferPool = checkNotNull(networkBufferPool);
                this.connectionManager = checkNotNull(connectionManager);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index f0f1926..8c2fb7a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -31,7 +30,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
@@ -79,21 +77,8 @@ public class NetworkEnvironmentTest {
         */
        @Test
        public void testRegisterTaskUsesBoundedBuffers() throws Exception {
-
                final NetworkEnvironment network = new NetworkEnvironment(
-                       new NetworkBufferPool(numBuffers, memorySegmentSize),
-                       new LocalConnectionManager(),
-                       new ResultPartitionManager(),
-                       new TaskEventDispatcher(),
-                       new KvStateRegistry(),
-                       null,
-                       null,
-                       IOManager.IOMode.SYNC,
-                       0,
-                       0,
-                       2,
-                       8,
-                       enableCreditBasedFlowControl);
+                       numBuffers, memorySegmentSize, 0, 0, 2, 8, 
enableCreditBasedFlowControl);
 
                // result partitions
                ResultPartition rp1 = 
createResultPartition(ResultPartitionType.PIPELINED, 2);
@@ -197,19 +182,7 @@ public class NetworkEnvironmentTest {
 
        private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) 
throws Exception {
                final NetworkEnvironment network = new NetworkEnvironment(
-                       new NetworkBufferPool(bufferPoolSize, 
memorySegmentSize),
-                       new LocalConnectionManager(),
-                       new ResultPartitionManager(),
-                       new TaskEventDispatcher(),
-                       new KvStateRegistry(),
-                       null,
-                       null,
-                       IOManager.IOMode.SYNC,
-                       0,
-                       0,
-                       2,
-                       8,
-                       enableCreditBasedFlowControl);
+                       bufferPoolSize, memorySegmentSize, 0, 0, 2, 8, 
enableCreditBasedFlowControl);
 
                final ConnectionManager connManager = 
createDummyConnectionManager();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 4bf5b22..63f1855 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -45,7 +44,6 @@ import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
 import org.junit.Test;
@@ -71,7 +69,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -344,7 +341,8 @@ public class SingleInputGateTest {
                int initialBackoff = 137;
                int maxBackoff = 1001;
 
-               final NetworkEnvironment netEnv = createNetworkEnvironment(2, 
8, initialBackoff, maxBackoff);
+               final NetworkEnvironment netEnv = new NetworkEnvironment(
+                       100, 32, initialBackoff, maxBackoff, 2, 8, 
enableCreditBasedFlowControl);
 
                SingleInputGate gate = SingleInputGate.create(
                        "TestTask",
@@ -403,8 +401,8 @@ public class SingleInputGateTest {
                final SingleInputGate inputGate = createInputGate(1, 
ResultPartitionType.PIPELINED_BOUNDED);
                int buffersPerChannel = 2;
                int extraNetworkBuffersPerGate = 8;
-               final NetworkEnvironment network = 
createNetworkEnvironment(buffersPerChannel,
-                       extraNetworkBuffersPerGate, 0, 0);
+               final NetworkEnvironment network = new NetworkEnvironment(
+                       100, 32, 0, 0, buffersPerChannel, 
extraNetworkBuffersPerGate, enableCreditBasedFlowControl);
 
                try {
                        final ResultPartitionID resultPartitionId = new 
ResultPartitionID();
@@ -415,8 +413,6 @@ public class SingleInputGateTest {
 
                        NetworkBufferPool bufferPool = 
network.getNetworkBufferPool();
                        if (enableCreditBasedFlowControl) {
-                               verify(bufferPool,
-                                       
times(1)).requestMemorySegments(buffersPerChannel);
                                RemoteInputChannel remote = 
(RemoteInputChannel) inputGate.getInputChannels()
                                        
.get(resultPartitionId.getPartitionId());
                                // only the exclusive buffers should be 
assigned/available now
@@ -444,7 +440,8 @@ public class SingleInputGateTest {
                final SingleInputGate inputGate = createInputGate(1, 
ResultPartitionType.PIPELINED_BOUNDED);
                int buffersPerChannel = 2;
                int extraNetworkBuffersPerGate = 8;
-               final NetworkEnvironment network = 
createNetworkEnvironment(buffersPerChannel, extraNetworkBuffersPerGate, 0, 0);
+               final NetworkEnvironment network = new NetworkEnvironment(
+                       100, 32, 0, 0, buffersPerChannel, 
extraNetworkBuffersPerGate, enableCreditBasedFlowControl);
 
                try {
                        final ResultPartitionID resultPartitionId = new 
ResultPartitionID();
@@ -454,8 +451,6 @@ public class SingleInputGateTest {
                        NetworkBufferPool bufferPool = 
network.getNetworkBufferPool();
 
                        if (enableCreditBasedFlowControl) {
-                               verify(bufferPool, 
times(0)).requestMemorySegments(buffersPerChannel);
-
                                
assertEquals(bufferPool.getTotalNumberOfMemorySegments(),
                                        
bufferPool.getNumberOfAvailableMemorySegments());
                                // note: exclusive buffers are not handed out 
into LocalBufferPool and are thus not counted
@@ -471,8 +466,6 @@ public class SingleInputGateTest {
                                
ResultPartitionLocation.createRemote(connectionId)));
 
                        if (enableCreditBasedFlowControl) {
-                               verify(bufferPool,
-                                       
times(1)).requestMemorySegments(buffersPerChannel);
                                RemoteInputChannel remote = 
(RemoteInputChannel) inputGate.getInputChannels()
                                        
.get(resultPartitionId.getPartitionId());
                                // only the exclusive buffers should be 
assigned/available now
@@ -499,7 +492,8 @@ public class SingleInputGateTest {
        public void testUpdateUnknownInputChannel() throws Exception {
                final SingleInputGate inputGate = createInputGate(2);
                int buffersPerChannel = 2;
-               final NetworkEnvironment network = 
createNetworkEnvironment(buffersPerChannel, 8, 0, 0);
+               final NetworkEnvironment network = new NetworkEnvironment(
+                       100, 32, 0, 0, buffersPerChannel, 8, 
enableCreditBasedFlowControl);
 
                try {
                        final ResultPartitionID localResultPartitionId = new 
ResultPartitionID();
@@ -543,27 +537,6 @@ public class SingleInputGateTest {
 
        // 
---------------------------------------------------------------------------------------------
 
-       private NetworkEnvironment createNetworkEnvironment(
-                       int buffersPerChannel,
-                       int extraNetworkBuffersPerGate,
-                       int initialBackoff,
-                       int maxBackoff) {
-               return new NetworkEnvironment(
-                       spy(new NetworkBufferPool(100, 32)),
-                       new LocalConnectionManager(),
-                       new ResultPartitionManager(),
-                       new TaskEventDispatcher(),
-                       new KvStateRegistry(),
-                       null,
-                       null,
-                       IOManager.IOMode.SYNC,
-                       initialBackoff,
-                       maxBackoff,
-                       buffersPerChannel,
-                       extraNetworkBuffersPerGate,
-                       enableCreditBasedFlowControl);
-       }
-
        private SingleInputGate createInputGate() {
                return createInputGate(2);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 9669513..c3118c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -33,11 +33,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -45,7 +41,6 @@ import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -146,14 +141,8 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                        final MemoryManager memManager = new 
MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, 
false);
                        final IOManager ioManager = new IOManagerAsync(TMP_DIR);
                        final NetworkEnvironment network = new 
NetworkEnvironment(
-                               new NetworkBufferPool(32, 
netConf.networkBufferSize()),
-                               new LocalConnectionManager(),
-                               new ResultPartitionManager(),
-                               new TaskEventDispatcher(),
-                               new KvStateRegistry(),
-                               null,
-                               null,
-                               netConf.ioMode(),
+                               32,
+                               netConf.networkBufferSize(),
                                netConf.partitionRequestInitialBackoff(),
                                netConf.partitionRequestMaxBackoff(),
                                netConf.networkBuffersPerChannel(),

Reply via email to