Repository: flink
Updated Branches:
  refs/heads/master f399b3fbb -> fcdd56e54


[FLINK-7427][network] integrate PartitionRequestProtocol into NettyProtocol

- removes one level of (unneeded) abstraction for clarity

This closes #4528.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57cef728
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57cef728
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57cef728

Branch: refs/heads/master
Commit: 57cef728dbec5c806ad4068e25f97d9b53b2d1af
Parents: f399b3f
Author: Nico Kruber <n...@data-artisans.com>
Authored: Thu Aug 10 16:58:19 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Jan 5 14:56:58 2018 +0100

----------------------------------------------------------------------
 .../network/netty/NettyConnectionManager.java   |   4 +-
 .../runtime/io/network/netty/NettyProtocol.java | 120 +++++++++++++++++-
 .../network/netty/PartitionRequestProtocol.java | 127 -------------------
 .../netty/CancelPartitionRequestTest.java       |   4 +-
 .../netty/ClientTransportErrorHandlingTest.java |  26 ++--
 .../network/netty/NettyClientServerSslTest.java |  27 ++--
 .../NettyServerLowAndHighWatermarkTest.java     |   2 +-
 .../PartitionRequestClientFactoryTest.java      |   3 +-
 .../netty/ServerTransportErrorHandlingTest.java |   8 +-
 9 files changed, 149 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index fcf618a..1d98715 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -46,8 +46,8 @@ public class NettyConnectionManager implements 
ConnectionManager {
        @Override
        public void start(ResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher)
                        throws IOException {
-               PartitionRequestProtocol partitionRequestProtocol =
-                               new PartitionRequestProtocol(partitionProvider, 
taskEventDispatcher);
+               NettyProtocol partitionRequestProtocol =
+                               new NettyProtocol(partitionProvider, 
taskEventDispatcher);
 
                client.init(partitionRequestProtocol, bufferPool);
                server.init(partitionRequestProtocol, bufferPool);

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
index bcfe558..7de00e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
@@ -18,12 +18,126 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 
-public interface NettyProtocol {
+import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder;
+
+/**
+ * Defines the server and client channel handlers, i.e. the protocol, used by 
netty.
+ */
+public class NettyProtocol {
+
+       private final NettyMessage.NettyMessageEncoder
+               messageEncoder = new NettyMessage.NettyMessageEncoder();
+
+       private final NettyMessage.NettyMessageDecoder messageDecoder = new 
NettyMessage.NettyMessageDecoder();
+
+       private final ResultPartitionProvider partitionProvider;
+       private final TaskEventDispatcher taskEventDispatcher;
+
+       NettyProtocol(ResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher) {
+               this.partitionProvider = partitionProvider;
+               this.taskEventDispatcher = taskEventDispatcher;
+       }
+
+       /**
+        * Returns the server channel handlers.
+        *
+        * <pre>
+        * +-------------------------------------------------------------------+
+        * |                        SERVER CHANNEL PIPELINE                    |
+        * |                                                                   |
+        * |    +----------+----------+ (3) write  +----------------------+    |
+        * |    | Queue of queues     +----------->| Message encoder      |    |
+        * |    +----------+----------+            +-----------+----------+    |
+        * |              /|\                                 \|/              |
+        * |               | (2) enqueue                       |               |
+        * |    +----------+----------+                        |               |
+        * |    | Request handler     |                        |               |
+        * |    +----------+----------+                        |               |
+        * |              /|\                                  |               |
+        * |               |                                   |               |
+        * |    +----------+----------+                        |               |
+        * |    | Message decoder     |                        |               |
+        * |    +----------+----------+                        |               |
+        * |              /|\                                  |               |
+        * |               |                                   |               |
+        * |    +----------+----------+                        |               |
+        * |    | Frame decoder       |                        |               |
+        * |    +----------+----------+                        |               |
+        * |              /|\                                  |               |
+        * +---------------+-----------------------------------+---------------+
+        * |               | (1) client request               \|/
+        * +---------------+-----------------------------------+---------------+
+        * |               |                                   |               |
+        * |       [ Socket.read() ]                    [ Socket.write() ]     |
+        * |                                                                   |
+        * |  Netty Internal I/O Threads (Transport Implementation)            |
+        * +-------------------------------------------------------------------+
+        * </pre>
+        *
+        * @return channel handlers
+        */
+       public ChannelHandler[] getServerChannelHandlers() {
+               PartitionRequestQueue queueOfPartitionQueues = new 
PartitionRequestQueue();
+               PartitionRequestServerHandler serverHandler = new 
PartitionRequestServerHandler(
+                       partitionProvider, taskEventDispatcher, 
queueOfPartitionQueues);
 
-       ChannelHandler[] getServerChannelHandlers();
+               return new ChannelHandler[] {
+                       messageEncoder,
+                       createFrameLengthDecoder(),
+                       messageDecoder,
+                       serverHandler,
+                       queueOfPartitionQueues
+               };
+       }
 
-       ChannelHandler[] getClientChannelHandlers();
+       /**
+        * Returns the client channel handlers.
+        *
+        * <pre>
+        *     +-----------+----------+            +----------------------+
+        *     | Remote input channel |            | request client       |
+        *     +-----------+----------+            +-----------+----------+
+        *                 |                                   | (1) write
+        * +---------------+-----------------------------------+---------------+
+        * |               |     CLIENT CHANNEL PIPELINE       |               |
+        * |               |                                  \|/              |
+        * |    +----------+----------+            +----------------------+    |
+        * |    | Request handler     +            | Message encoder      |    |
+        * |    +----------+----------+            +-----------+----------+    |
+        * |              /|\                                 \|/              |
+        * |               |                                   |               |
+        * |    +----------+----------+                        |               |
+        * |    | Message decoder     |                        |               |
+        * |    +----------+----------+                        |               |
+        * |              /|\                                  |               |
+        * |               |                                   |               |
+        * |    +----------+----------+                        |               |
+        * |    | Frame decoder       |                        |               |
+        * |    +----------+----------+                        |               |
+        * |              /|\                                  |               |
+        * +---------------+-----------------------------------+---------------+
+        * |               | (3) server response              \|/ (2) client 
request
+        * +---------------+-----------------------------------+---------------+
+        * |               |                                   |               |
+        * |       [ Socket.read() ]                    [ Socket.write() ]     |
+        * |                                                                   |
+        * |  Netty Internal I/O Threads (Transport Implementation)            |
+        * +-------------------------------------------------------------------+
+        * </pre>
+        *
+        * @return channel handlers
+        */
+       public ChannelHandler[] getClientChannelHandlers() {
+               return new ChannelHandler[] {
+                       messageEncoder,
+                       createFrameLengthDecoder(),
+                       messageDecoder,
+                       new PartitionRequestClientHandler()};
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
deleted file mode 100644
index b6614b6..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty;
-
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
-
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-
-import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder;
-import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder;
-
-class PartitionRequestProtocol implements NettyProtocol {
-
-       private final NettyMessageEncoder messageEncoder = new 
NettyMessageEncoder();
-
-       private final NettyMessage.NettyMessageDecoder messageDecoder = new 
NettyMessage.NettyMessageDecoder();
-
-       private final ResultPartitionProvider partitionProvider;
-       private final TaskEventDispatcher taskEventDispatcher;
-
-       PartitionRequestProtocol(ResultPartitionProvider partitionProvider, 
TaskEventDispatcher taskEventDispatcher) {
-               this.partitionProvider = partitionProvider;
-               this.taskEventDispatcher = taskEventDispatcher;
-       }
-
-       // +-------------------------------------------------------------------+
-       // |                        SERVER CHANNEL PIPELINE                    |
-       // |                                                                   |
-       // |    +----------+----------+ (3) write  +----------------------+    |
-       // |    | Queue of queues     +----------->| Message encoder      |    |
-       // |    +----------+----------+            +-----------+----------+    |
-       // |              /|\                                 \|/              |
-       // |               | (2) enqueue                       |               |
-       // |    +----------+----------+                        |               |
-       // |    | Request handler     |                        |               |
-       // |    +----------+----------+                        |               |
-       // |              /|\                                  |               |
-       // |               |                                   |               |
-       // |    +----------+----------+                        |               |
-       // |    | Message decoder     |                        |               |
-       // |    +----------+----------+                        |               |
-       // |              /|\                                  |               |
-       // |               |                                   |               |
-       // |    +----------+----------+                        |               |
-       // |    | Frame decoder       |                        |               |
-       // |    +----------+----------+                        |               |
-       // |              /|\                                  |               |
-       // +---------------+-----------------------------------+---------------+
-       // |               | (1) client request               \|/
-       // +---------------+-----------------------------------+---------------+
-       // |               |                                   |               |
-       // |       [ Socket.read() ]                    [ Socket.write() ]     |
-       // |                                                                   |
-       // |  Netty Internal I/O Threads (Transport Implementation)            |
-       // +-------------------------------------------------------------------+
-
-       @Override
-       public ChannelHandler[] getServerChannelHandlers() {
-               PartitionRequestQueue queueOfPartitionQueues = new 
PartitionRequestQueue();
-               PartitionRequestServerHandler serverHandler = new 
PartitionRequestServerHandler(
-                               partitionProvider, taskEventDispatcher, 
queueOfPartitionQueues);
-
-               return new ChannelHandler[] {
-                               messageEncoder,
-                               createFrameLengthDecoder(),
-                               messageDecoder,
-                               serverHandler,
-                               queueOfPartitionQueues
-               };
-       }
-
-       //     +-----------+----------+            +----------------------+
-       //     | Remote input channel |            | request client       |
-       //     +-----------+----------+            +-----------+----------+
-       //                 |                                   | (1) write
-       // +---------------+-----------------------------------+---------------+
-       // |               |     CLIENT CHANNEL PIPELINE       |               |
-       // |               |                                  \|/              |
-       // |    +----------+----------+            +----------------------+    |
-       // |    | Request handler     +            | Message encoder      |    |
-       // |    +----------+----------+            +-----------+----------+    |
-       // |              /|\                                 \|/              |
-       // |               |                                   |               |
-       // |    +----------+----------+                        |               |
-       // |    | Message decoder     |                        |               |
-       // |    +----------+----------+                        |               |
-       // |              /|\                                  |               |
-       // |               |                                   |               |
-       // |    +----------+----------+                        |               |
-       // |    | Frame decoder       |                        |               |
-       // |    +----------+----------+                        |               |
-       // |              /|\                                  |               |
-       // +---------------+-----------------------------------+---------------+
-       // |               | (3) server response              \|/ (2) client 
request
-       // +---------------+-----------------------------------+---------------+
-       // |               |                                   |               |
-       // |       [ Socket.read() ]                    [ Socket.write() ]     |
-       // |                                                                   |
-       // |  Netty Internal I/O Threads (Transport Implementation)            |
-       // +-------------------------------------------------------------------+
-
-       @Override
-       public ChannelHandler[] getClientChannelHandlers() {
-               return new ChannelHandler[] {
-                               messageEncoder,
-                               createFrameLengthDecoder(),
-                               messageDecoder,
-                               new PartitionRequestClientHandler()};
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 912fae2..c9f063b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -89,7 +89,7 @@ public class CancelPartitionRequestTest {
                                        }
                                });
 
-                       PartitionRequestProtocol protocol = new 
PartitionRequestProtocol(
+                       NettyProtocol protocol = new NettyProtocol(
                                        partitions, 
mock(TaskEventDispatcher.class));
 
                        serverAndClient = initServerAndClient(protocol);
@@ -140,7 +140,7 @@ public class CancelPartitionRequestTest {
                                                }
                                        });
 
-                       PartitionRequestProtocol protocol = new 
PartitionRequestProtocol(
+                       NettyProtocol protocol = new NettyProtocol(
                                        partitions, 
mock(TaskEventDispatcher.class));
 
                        serverAndClient = initServerAndClient(protocol);

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index 5754e36..eebdc29 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -76,18 +76,14 @@ public class ClientTransportErrorHandlingTest {
        @Test
        public void testExceptionOnWrite() throws Exception {
 
-               NettyProtocol protocol = new NettyProtocol() {
+               NettyProtocol protocol = new NettyProtocol(
+                               mock(ResultPartitionProvider.class),
+                               mock(TaskEventDispatcher.class)) {
+
                        @Override
                        public ChannelHandler[] getServerChannelHandlers() {
                                return new ChannelHandler[0];
                        }
-
-                       @Override
-                       public ChannelHandler[] getClientChannelHandlers() {
-                               return new PartitionRequestProtocol(
-                                               
mock(ResultPartitionProvider.class),
-                                               
mock(TaskEventDispatcher.class)).getClientChannelHandlers();
-                       }
                };
 
                // We need a real server and client in this test, because 
Netty's EmbeddedChannel is
@@ -215,7 +211,10 @@ public class ClientTransportErrorHandlingTest {
        @Test
        public void testExceptionOnRemoteClose() throws Exception {
 
-               NettyProtocol protocol = new NettyProtocol() {
+               NettyProtocol protocol = new NettyProtocol(
+                               mock(ResultPartitionProvider.class),
+                               mock(TaskEventDispatcher.class)) {
+
                        @Override
                        public ChannelHandler[] getServerChannelHandlers() {
                                return new ChannelHandler[] {
@@ -230,13 +229,6 @@ public class ClientTransportErrorHandlingTest {
                                                }
                                };
                        }
-
-                       @Override
-                       public ChannelHandler[] getClientChannelHandlers() {
-                               return new PartitionRequestProtocol(
-                                               
mock(ResultPartitionProvider.class),
-                                               
mock(TaskEventDispatcher.class)).getClientChannelHandlers();
-                       }
                };
 
                NettyServerAndClient serverAndClient = 
initServerAndClient(protocol, createConfig());
@@ -380,7 +372,7 @@ public class ClientTransportErrorHandlingTest {
        // 
---------------------------------------------------------------------------------------------
 
        private EmbeddedChannel createEmbeddedChannel() {
-               PartitionRequestProtocol protocol = new 
PartitionRequestProtocol(
+               NettyProtocol protocol = new NettyProtocol(
                                mock(ResultPartitionProvider.class),
                                mock(TaskEventDispatcher.class));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 3f2d363..20031b3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -38,19 +38,20 @@ import static org.junit.Assert.assertTrue;
 public class NettyClientServerSslTest {
 
        /**
-        * Verify valid ssl configuration and connection
-        *
+        * Verify valid ssl configuration and connection.
         */
        @Test
        public void testValidSslConnection() throws Exception {
-               NettyProtocol protocol = new NettyProtocol() {
+               NettyProtocol protocol = new NettyProtocol(null, null) {
                        @Override
                        public ChannelHandler[] getServerChannelHandlers() {
                                return new ChannelHandler[0];
                        }
 
                        @Override
-                       public ChannelHandler[] getClientChannelHandlers() { 
return new ChannelHandler[0]; }
+                       public ChannelHandler[] getClientChannelHandlers() {
+                               return new ChannelHandler[0];
+                       }
                };
 
                NettyConfig nettyConfig = new NettyConfig(
@@ -72,19 +73,20 @@ public class NettyClientServerSslTest {
        }
 
        /**
-        * Verify failure on invalid ssl configuration
-        *
+        * Verify failure on invalid ssl configuration.
         */
        @Test
        public void testInvalidSslConfiguration() throws Exception {
-               NettyProtocol protocol = new NettyProtocol() {
+               NettyProtocol protocol = new NettyProtocol(null, null) {
                        @Override
                        public ChannelHandler[] getServerChannelHandlers() {
                                return new ChannelHandler[0];
                        }
 
                        @Override
-                       public ChannelHandler[] getClientChannelHandlers() { 
return new ChannelHandler[0]; }
+                       public ChannelHandler[] getClientChannelHandlers() {
+                               return new ChannelHandler[0];
+                       }
                };
 
                Configuration config = createSslConfig();
@@ -110,19 +112,20 @@ public class NettyClientServerSslTest {
        }
 
        /**
-        * Verify SSL handshake error when untrusted server certificate is used
-        *
+        * Verify SSL handshake error when untrusted server certificate is used.
         */
        @Test
        public void testSslHandshakeError() throws Exception {
-               NettyProtocol protocol = new NettyProtocol() {
+               NettyProtocol protocol = new NettyProtocol(null, null) {
                        @Override
                        public ChannelHandler[] getServerChannelHandlers() {
                                return new ChannelHandler[0];
                        }
 
                        @Override
-                       public ChannelHandler[] getClientChannelHandlers() { 
return new ChannelHandler[0]; }
+                       public ChannelHandler[] getClientChannelHandlers() {
+                               return new ChannelHandler[0];
+                       }
                };
 
                Configuration config = createSslConfig();

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
index 0fbfcac..9291bc0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java
@@ -81,7 +81,7 @@ public class NettyServerLowAndHighWatermarkTest {
                final int expectedHighWatermark = 2 * pageSize;
 
                final AtomicReference<Throwable> error = new 
AtomicReference<Throwable>();
-               final NettyProtocol protocol = new NettyProtocol() {
+               final NettyProtocol protocol = new NettyProtocol(null, null) {
                        @Override
                        public ChannelHandler[] getServerChannelHandlers() {
                                // The channel handler implements the test

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 91a052f..d971634 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -57,7 +57,8 @@ public class PartitionRequestClientFactoryTest {
                final CountDownLatch syncOnConnect = new CountDownLatch(1);
 
                final Tuple2<NettyServer, NettyClient> netty = 
createNettyServerAndClient(
-                               new NettyProtocol() {
+                               new NettyProtocol(null, null) {
+
                                        @Override
                                        public ChannelHandler[] 
getServerChannelHandlers() {
                                                return new ChannelHandler[0];

http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index d365fba..5916162 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -73,13 +73,7 @@ public class ServerTransportErrorHandlingTest {
                                }
                        });
 
-               NettyProtocol protocol = new NettyProtocol() {
-                       @Override
-                       public ChannelHandler[] getServerChannelHandlers() {
-                               return new PartitionRequestProtocol(
-                                       partitionManager,
-                                       
mock(TaskEventDispatcher.class)).getServerChannelHandlers();
-                       }
+               NettyProtocol protocol = new NettyProtocol(partitionManager, 
mock(TaskEventDispatcher.class)) {
 
                        @Override
                        public ChannelHandler[] getClientChannelHandlers() {

Reply via email to