This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c7ef6db  [FLINK-12154][network] Remove legacy fields for 
SingleInputGate (#8136)
c7ef6db is described below

commit c7ef6db71a807cf21cc1ef3a13f4d95520dfc0fe
Author: zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Mon Apr 15 19:01:08 2019 +0800

    [FLINK-12154][network] Remove legacy fields for SingleInputGate (#8136)
    
    This work is a preparation for FLINK-11726.
    
    In SingleInputGate#create, we could remove unused parameter 
ExecutionAttemptID.
    And for the constructor of SingleInputGate, we could remove unused 
parameter TaskIOMetricGroup.
    Then we introduce createSingleInputGate for reusing the process of creating 
SingleInputGate in related tests.
---
 .../partition/consumer/SingleInputGate.java        |  5 +--
 .../org/apache/flink/runtime/taskmanager/Task.java |  1 -
 .../runtime/io/network/NetworkEnvironmentTest.java | 18 ++------
 ...editBasedPartitionRequestClientHandlerTest.java | 10 ++---
 .../netty/PartitionRequestClientHandlerTest.java   | 25 +----------
 .../network/netty/PartitionRequestClientTest.java  |  6 +--
 .../network/partition/InputChannelTestUtils.java   | 24 ++++++++++
 .../network/partition/InputGateConcurrentTest.java | 33 ++------------
 .../network/partition/InputGateFairnessTest.java   | 52 +++++++---------------
 .../partition/consumer/LocalInputChannelTest.java  | 30 +++++--------
 .../partition/consumer/RemoteInputChannelTest.java | 36 +++++----------
 .../partition/consumer/SingleInputGateTest.java    | 16 ++-----
 .../partition/consumer/TestSingleInputGate.java    | 21 +--------
 .../partition/consumer/UnionInputGateTest.java     | 24 ++--------
 .../StreamNetworkBenchmarkEnvironment.java         |  1 -
 15 files changed, 87 insertions(+), 215 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index e7822fe..9c0196e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -193,7 +192,6 @@ public class SingleInputGate implements InputGate {
                int consumedSubpartitionIndex,
                int numberOfInputChannels,
                TaskActions taskActions,
-               TaskIOMetricGroup metrics,
                boolean isCreditBased) {
 
                this.owningTaskName = checkNotNull(owningTaskName);
@@ -664,7 +662,6 @@ public class SingleInputGate implements InputGate {
        public static SingleInputGate create(
                String owningTaskName,
                JobID jobId,
-               ExecutionAttemptID executionId,
                InputGateDeploymentDescriptor igdd,
                NetworkEnvironment networkEnvironment,
                TaskEventPublisher taskEventPublisher,
@@ -683,7 +680,7 @@ public class SingleInputGate implements InputGate {
 
                final SingleInputGate inputGate = new SingleInputGate(
                        owningTaskName, jobId, consumedResultId, 
consumedPartitionType, consumedSubpartitionIndex,
-                       icdd.length, taskActions, metrics, 
networkConfig.isCreditBased());
+                       icdd.length, taskActions, 
networkConfig.isCreditBased());
 
                // Create the input channels. There is one input channel for 
each consumed partition.
                final InputChannel[] inputChannels = new 
InputChannel[icdd.length];
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b3d952d..5e7174c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -404,7 +404,6 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
                        SingleInputGate gate = SingleInputGate.create(
                                taskNameWithSubtaskAndId,
                                jobId,
-                               executionId,
                                inputGateDeploymentDescriptor,
                                networkEnvironment,
                                taskEventDispatcher,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index b8803c5..1d8bae0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import 
org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -28,7 +29,6 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfigurationBuilder;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
@@ -300,23 +300,13 @@ public class NetworkEnvironmentTest {
         *
         * @param partitionType
         *              the consumed partition type
-        * @param channels
+        * @param numberOfChannels
         *              the number of input channels
         *
         * @return input gate with some fake settings
         */
-       private SingleInputGate createSingleInputGate(
-                       final ResultPartitionType partitionType, final int 
channels) {
-               return spy(new SingleInputGate(
-                       "Test Task Name",
-                       new JobID(),
-                       new IntermediateDataSetID(),
-                       partitionType,
-                       0,
-                       channels,
-                       new NoOpTaskActions(),
-                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                       enableCreditBasedFlowControl));
+       private SingleInputGate createSingleInputGate(ResultPartitionType 
partitionType, int numberOfChannels) {
+               return 
spy(InputChannelTestUtils.createSingleInputGate(numberOfChannels, 
partitionType, enableCreditBasedFlowControl));
        }
 
        private static void createRemoteInputChannel(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 78c2a67..8cdf221 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -45,8 +45,8 @@ import org.junit.Test;
 
 import static 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createBufferResponse;
 import static 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createRemoteInputChannel;
-import static 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createSingleInputGate;
 import static 
org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest.blockChannel;
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -137,7 +137,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
        @Test
        public void testReceiveBuffer() throws Exception {
                final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
                try {
                        final BufferPool bufferPool = 
networkBufferPool.createBufferPool(8, 8);
@@ -170,7 +170,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
         */
        @Test
        public void testThrowExceptionForNoAvailableBuffer() throws Exception {
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
 
                final CreditBasedPartitionRequestClientHandler handler = new 
CreditBasedPartitionRequestClientHandler();
@@ -246,7 +246,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
                        channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
                final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel1 = 
createRemoteInputChannel(inputGate, client);
                final RemoteInputChannel inputChannel2 = 
createRemoteInputChannel(inputGate, client);
                try {
@@ -347,7 +347,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
                        channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
                final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate, client);
                try {
                        final BufferPool bufferPool = 
networkBufferPool.createBufferPool(6, 6);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 129485b..b9ad1f8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -30,14 +29,11 @@ import 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
@@ -48,6 +44,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -131,7 +128,7 @@ public class PartitionRequestClientHandlerTest {
        @Test
        public void testReceiveBuffer() throws Exception {
                final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
                try {
                        final BufferPool bufferPool = 
networkBufferPool.createBufferPool(8, 8);
@@ -208,24 +205,6 @@ public class PartitionRequestClientHandlerTest {
        // 
---------------------------------------------------------------------------------------------
 
        /**
-        * Creates and returns the single input gate for credit-based testing.
-        *
-        * @return The new created single input gate.
-        */
-       static SingleInputGate createSingleInputGate() {
-               return new SingleInputGate(
-                       "InputGate",
-                       new JobID(),
-                       new IntermediateDataSetID(),
-                       ResultPartitionType.PIPELINED,
-                       0,
-                       1,
-                       new NoOpTaskActions(),
-                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                       true);
-       }
-
-       /**
         * Creates and returns a remote input channel for the specific input 
gate.
         *
         * @param inputGate The input gate owns the created input channel.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
index 42206a6..91a9d5a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 import org.junit.Test;
 
 import static 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createRemoteInputChannel;
-import static 
org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandlerTest.createSingleInputGate;
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -53,7 +53,7 @@ public class PartitionRequestClientTest {
                        channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
                final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate, client, 1, 2);
 
                try {
@@ -107,7 +107,7 @@ public class PartitionRequestClientTest {
                        channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
                final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate, client);
 
                try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index f7db40b..57a9212 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -18,9 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -60,6 +64,26 @@ public class InputChannelTestUtils {
                return manager;
        }
 
+       public static SingleInputGate createSingleInputGate(int 
numberOfChannels) {
+               return createSingleInputGate(numberOfChannels, 
ResultPartitionType.PIPELINED, true);
+       }
+
+       public static SingleInputGate createSingleInputGate(
+               int numberOfChannels,
+               ResultPartitionType partitionType,
+               boolean isCreditBased) {
+
+               return new SingleInputGate(
+                       "InputGate",
+                       new JobID(),
+                       new IntermediateDataSetID(),
+                       partitionType,
+                       0,
+                       numberOfChannels,
+                       new NoOpTaskActions(),
+                       isCreditBased);
+       }
+
        public static ConnectionManager createDummyConnectionManager() throws 
Exception {
                final PartitionRequestClient mockClient = 
mock(PartitionRequestClient.class);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index f972750..13987c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -29,10 +28,8 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 
 import org.junit.Test;
 
@@ -43,6 +40,7 @@ import java.util.Random;
 
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager;
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
@@ -64,14 +62,7 @@ public class InputGateConcurrentTest {
 
                final ResultPartitionManager resultPartitionManager = 
createResultPartitionManager(partitions);
 
-               final SingleInputGate gate = new SingleInputGate(
-                               "Test Task Name",
-                               new JobID(),
-                               new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
-                               0, numberOfChannels,
-                               new NoOpTaskActions(),
-                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                               true);
+               final SingleInputGate gate = 
createSingleInputGate(numberOfChannels);
 
                for (int i = 0; i < numberOfChannels; i++) {
                        LocalInputChannel channel = new LocalInputChannel(gate, 
i, new ResultPartitionID(),
@@ -100,15 +91,7 @@ public class InputGateConcurrentTest {
                final ConnectionManager connManager = 
createDummyConnectionManager();
                final Source[] sources = new Source[numberOfChannels];
 
-               final SingleInputGate gate = new SingleInputGate(
-                               "Test Task Name",
-                               new JobID(),
-                               new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
-                               0,
-                               numberOfChannels,
-                               new NoOpTaskActions(),
-                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                               true);
+               final SingleInputGate gate = 
createSingleInputGate(numberOfChannels);
 
                for (int i = 0; i < numberOfChannels; i++) {
                        RemoteInputChannel channel = new RemoteInputChannel(
@@ -150,15 +133,7 @@ public class InputGateConcurrentTest {
 
                final Source[] sources = new Source[numberOfChannels];
 
-               final SingleInputGate gate = new SingleInputGate(
-                               "Test Task Name",
-                               new JobID(),
-                               new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
-                               0,
-                               numberOfChannels,
-                               new NoOpTaskActions(),
-                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                               true);
+               final SingleInputGate gate = 
createSingleInputGate(numberOfChannels);
 
                for (int i = 0, local = 0; i < numberOfChannels; i++) {
                        if (localOrRemote.get(i)) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 9d83f47..d6768fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -34,7 +34,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -91,14 +90,7 @@ public class InputGateFairnessTest {
 
                ResultPartitionManager resultPartitionManager = 
createResultPartitionManager(sources);
 
-               SingleInputGate gate = new FairnessVerifyingInputGate(
-                               "Test Task Name",
-                               new JobID(),
-                               new IntermediateDataSetID(),
-                               0, numberOfChannels,
-                               new NoOpTaskActions(),
-                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                               true);
+               final SingleInputGate gate = 
createFairnessVerifyingInputGate(numberOfChannels);
 
                for (int i = 0; i < numberOfChannels; i++) {
                        LocalInputChannel channel = new LocalInputChannel(gate, 
i, new ResultPartitionID(),
@@ -145,14 +137,7 @@ public class InputGateFairnessTest {
 
                        ResultPartitionManager resultPartitionManager = 
createResultPartitionManager(sources);
 
-                       SingleInputGate gate = new FairnessVerifyingInputGate(
-                               "Test Task Name",
-                               new JobID(),
-                               new IntermediateDataSetID(),
-                               0, numberOfChannels,
-                               new NoOpTaskActions(),
-                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                               true);
+                       final SingleInputGate gate = 
createFairnessVerifyingInputGate(numberOfChannels);
 
                        for (int i = 0; i < numberOfChannels; i++) {
                                LocalInputChannel channel = new 
LocalInputChannel(gate, i, new ResultPartitionID(),
@@ -196,14 +181,7 @@ public class InputGateFairnessTest {
 
                // ----- create some source channels and fill them with buffers 
-----
 
-               SingleInputGate gate = new FairnessVerifyingInputGate(
-                               "Test Task Name",
-                               new JobID(),
-                               new IntermediateDataSetID(),
-                               0, numberOfChannels,
-                               new NoOpTaskActions(),
-                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                               true);
+               final SingleInputGate gate = 
createFairnessVerifyingInputGate(numberOfChannels);
 
                final ConnectionManager connManager = 
createDummyConnectionManager();
 
@@ -252,14 +230,7 @@ public class InputGateFairnessTest {
 
                // ----- create some source channels and fill them with buffers 
-----
 
-               SingleInputGate gate = new FairnessVerifyingInputGate(
-                               "Test Task Name",
-                               new JobID(),
-                               new IntermediateDataSetID(),
-                               0, numberOfChannels,
-                               new NoOpTaskActions(),
-                               
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                               true);
+               final SingleInputGate gate = 
createFairnessVerifyingInputGate(numberOfChannels);
 
                final ConnectionManager connManager = 
createDummyConnectionManager();
 
@@ -304,6 +275,17 @@ public class InputGateFairnessTest {
        //  Utilities
        // 
------------------------------------------------------------------------
 
+       private SingleInputGate createFairnessVerifyingInputGate(int 
numberOfChannels) {
+               return new FairnessVerifyingInputGate(
+                       "Test Task Name",
+                       new JobID(),
+                       new IntermediateDataSetID(),
+                       0,
+                       numberOfChannels,
+                       new NoOpTaskActions(),
+                       true);
+       }
+
        private void fillRandom(PipelinedSubpartition[] partitions, int 
numPerPartition, BufferConsumer buffer) throws Exception {
                ArrayList<Integer> poss = new ArrayList<>(partitions.length * 
numPerPartition);
 
@@ -357,12 +339,10 @@ public class InputGateFairnessTest {
                                int consumedSubpartitionIndex,
                                int numberOfInputChannels,
                                TaskActions taskActions,
-                               TaskIOMetricGroup metrics,
                                boolean isCreditBased) {
 
                        super(owningTaskName, jobId, consumedResultId, 
ResultPartitionType.PIPELINED,
-                               consumedSubpartitionIndex,
-                                       numberOfInputChannels, taskActions, 
metrics, isCreditBased);
+                               consumedSubpartitionIndex, 
numberOfInputChannels, taskActions, isCreditBased);
 
                        try {
                                Field f = 
SingleInputGate.class.getDeclaredField("inputChannelsWithData");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index d89defa..33c191e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -66,6 +66,7 @@ import java.util.concurrent.TimeUnit;
 
 import scala.Tuple2;
 
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -293,17 +294,7 @@ public class LocalInputChannelTest {
         */
        @Test
        public void testConcurrentReleaseAndRetriggerPartitionRequest() throws 
Exception {
-               final SingleInputGate gate = new SingleInputGate(
-                       "test task name",
-                       new JobID(),
-                       new IntermediateDataSetID(),
-                       ResultPartitionType.PIPELINED,
-                       0,
-                       1,
-                       new NoOpTaskActions(),
-                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                       true
-               );
+               final SingleInputGate gate = createSingleInputGate(1);
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                when(partitionManager
@@ -492,15 +483,14 @@ public class LocalInputChannelTest {
                        checkArgument(numberOfExpectedBuffersPerChannel >= 1);
 
                        this.inputGate = new SingleInputGate(
-                                       "Test Name",
-                                       new JobID(),
-                                       new IntermediateDataSetID(),
-                                       ResultPartitionType.PIPELINED,
-                                       subpartitionIndex,
-                                       numberOfInputChannels,
-                                       new NoOpTaskActions(),
-                                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                                       true);
+                               "Test Name",
+                               new JobID(),
+                               new IntermediateDataSetID(),
+                               ResultPartitionType.PIPELINED,
+                               subpartitionIndex,
+                               numberOfInputChannels,
+                               new NoOpTaskActions(),
+                               true);
 
                        // Set buffer pool
                        inputGate.setBufferPool(bufferPool);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 5868ef8..ec60813 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -29,11 +28,8 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -53,6 +49,7 @@ import java.util.concurrent.Future;
 
 import scala.Tuple2;
 
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.Matchers.isA;
@@ -353,7 +350,7 @@ public class RemoteInputChannelTest {
                final int numExclusiveBuffers = 2;
                final int numFloatingBuffers = 14;
 
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
                Throwable thrown = null;
@@ -493,7 +490,7 @@ public class RemoteInputChannelTest {
                final int numExclusiveBuffers = 2;
                final int numFloatingBuffers = 14;
 
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
                Throwable thrown = null;
@@ -569,7 +566,7 @@ public class RemoteInputChannelTest {
                final int numExclusiveBuffers = 2;
                final int numFloatingBuffers = 14;
 
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
                Throwable thrown = null;
@@ -659,7 +656,7 @@ public class RemoteInputChannelTest {
                final int numExclusiveBuffers = 2;
                final int numFloatingBuffers = 3;
 
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel channel1 = 
spy(createRemoteInputChannel(inputGate));
                final RemoteInputChannel channel2 = 
spy(createRemoteInputChannel(inputGate));
                final RemoteInputChannel channel3 = 
spy(createRemoteInputChannel(inputGate));
@@ -731,7 +728,7 @@ public class RemoteInputChannelTest {
                final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(
                        numTotalBuffers, 32);
 
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel successfulRemoteIC = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(successfulRemoteIC.partitionId.getPartitionId(), 
successfulRemoteIC);
 
@@ -796,7 +793,7 @@ public class RemoteInputChannelTest {
 
                final ExecutorService executor = 
Executors.newFixedThreadPool(2);
 
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
                Throwable thrown = null;
@@ -859,7 +856,7 @@ public class RemoteInputChannelTest {
 
                final ExecutorService executor = 
Executors.newFixedThreadPool(3);
 
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
                Throwable thrown = null;
@@ -911,7 +908,7 @@ public class RemoteInputChannelTest {
 
                final ExecutorService executor = 
Executors.newFixedThreadPool(3);
 
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
                Throwable thrown = null;
@@ -966,7 +963,7 @@ public class RemoteInputChannelTest {
 
                final ExecutorService executor = 
Executors.newFixedThreadPool(2);
 
-               final SingleInputGate inputGate = createSingleInputGate();
+               final SingleInputGate inputGate = createSingleInputGate(1);
                final RemoteInputChannel inputChannel  = 
createRemoteInputChannel(inputGate);
                
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), 
inputChannel);
                Throwable thrown = null;
@@ -1033,19 +1030,6 @@ public class RemoteInputChannelTest {
 
        // 
---------------------------------------------------------------------------------------------
 
-       private SingleInputGate createSingleInputGate() {
-               return new SingleInputGate(
-                       "InputGate",
-                       new JobID(),
-                       new IntermediateDataSetID(),
-                       ResultPartitionType.PIPELINED,
-                       0,
-                       1,
-                       new NoOpTaskActions(),
-                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                       true);
-       }
-
        private RemoteInputChannel createRemoteInputChannel(SingleInputGate 
inputGate)
                        throws IOException, InterruptedException {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 2fc5232..e7aa3cd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -59,6 +59,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
@@ -352,7 +353,6 @@ public class SingleInputGateTest {
                SingleInputGate gate = SingleInputGate.create(
                        "TestTask",
                        new JobID(),
-                       new ExecutionAttemptID(),
                        gateDesc,
                        netEnv,
                        new TaskEventDispatcher(),
@@ -547,18 +547,8 @@ public class SingleInputGateTest {
                return createInputGate(numberOfInputChannels, 
ResultPartitionType.PIPELINED);
        }
 
-       private SingleInputGate createInputGate(
-                       int numberOfInputChannels, ResultPartitionType 
partitionType) {
-               SingleInputGate inputGate = new SingleInputGate(
-                       "Test Task Name",
-                       new JobID(),
-                       new IntermediateDataSetID(),
-                       partitionType,
-                       0,
-                       numberOfInputChannels,
-                       new NoOpTaskActions(),
-                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                       enableCreditBasedFlowControl);
+       private SingleInputGate createInputGate(int numberOfInputChannels, 
ResultPartitionType partitionType) {
+               SingleInputGate inputGate = 
createSingleInputGate(numberOfInputChannels, partitionType, 
enableCreditBasedFlowControl);
 
                assertEquals(partitionType, 
inputGate.getConsumedPartitionType());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 68d3dc4..464ab7c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -18,12 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -31,6 +26,7 @@ import org.mockito.stubbing.Answer;
 import java.lang.reflect.Field;
 import java.util.ArrayDeque;
 
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -45,23 +41,10 @@ public class TestSingleInputGate {
 
        protected final TestInputChannel[] inputChannels;
 
-       public TestSingleInputGate(int numberOfInputChannels) {
-               this(numberOfInputChannels, true);
-       }
-
        public TestSingleInputGate(int numberOfInputChannels, boolean 
initialize) {
                checkArgument(numberOfInputChannels >= 1);
 
-               SingleInputGate realGate = new SingleInputGate(
-                       "Test Task Name",
-                       new JobID(),
-                       new IntermediateDataSetID(),
-                       ResultPartitionType.PIPELINED,
-                       0,
-                       numberOfInputChannels,
-                       new NoOpTaskActions(),
-                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                       true);
+               SingleInputGate realGate = 
createSingleInputGate(numberOfInputChannels);
 
                this.inputGate = spy(realGate);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 2eaa3c1..082ccec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,14 +18,9 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
-
 import org.junit.Test;
 
+import static 
org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.verifyBufferOrEvent;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -46,21 +41,8 @@ public class UnionInputGateTest {
        @Test(timeout = 120 * 1000)
        public void testBasicGetNextLogic() throws Exception {
                // Setup
-               final String testTaskName = "Test Task";
-               final SingleInputGate ig1 = new SingleInputGate(
-                       testTaskName, new JobID(),
-                       new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
-                       0, 3,
-                       new NoOpTaskActions(),
-                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                       true);
-               final SingleInputGate ig2 = new SingleInputGate(
-                       testTaskName, new JobID(),
-                       new IntermediateDataSetID(), 
ResultPartitionType.PIPELINED,
-                       0, 5,
-                       new NoOpTaskActions(),
-                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(),
-                       true);
+               final SingleInputGate ig1 = createSingleInputGate(3);
+               final SingleInputGate ig2 = createSingleInputGate(5);
 
                final UnionInputGate union = new UnionInputGate(new 
SingleInputGate[]{ig1, ig2});
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 40ff046..87dfa13 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -256,7 +256,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                        SingleInputGate gate = SingleInputGate.create(
                                "receiving task[" + channel + "]",
                                jobId,
-                               executionAttemptID,
                                gateDescriptor,
                                environment,
                                new TaskEventDispatcher(),

Reply via email to