Repository: flink Updated Branches: refs/heads/master f399b3fbb -> fcdd56e54
[FLINK-7427][network] integrate PartitionRequestProtocol into NettyProtocol - removes one level of (unneeded) abstraction for clarity This closes #4528. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57cef728 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57cef728 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57cef728 Branch: refs/heads/master Commit: 57cef728dbec5c806ad4068e25f97d9b53b2d1af Parents: f399b3f Author: Nico Kruber <n...@data-artisans.com> Authored: Thu Aug 10 16:58:19 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Fri Jan 5 14:56:58 2018 +0100 ---------------------------------------------------------------------- .../network/netty/NettyConnectionManager.java | 4 +- .../runtime/io/network/netty/NettyProtocol.java | 120 +++++++++++++++++- .../network/netty/PartitionRequestProtocol.java | 127 ------------------- .../netty/CancelPartitionRequestTest.java | 4 +- .../netty/ClientTransportErrorHandlingTest.java | 26 ++-- .../network/netty/NettyClientServerSslTest.java | 27 ++-- .../NettyServerLowAndHighWatermarkTest.java | 2 +- .../PartitionRequestClientFactoryTest.java | 3 +- .../netty/ServerTransportErrorHandlingTest.java | 8 +- 9 files changed, 149 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java index fcf618a..1d98715 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java @@ -46,8 +46,8 @@ public class NettyConnectionManager implements ConnectionManager { @Override public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException { - PartitionRequestProtocol partitionRequestProtocol = - new PartitionRequestProtocol(partitionProvider, taskEventDispatcher); + NettyProtocol partitionRequestProtocol = + new NettyProtocol(partitionProvider, taskEventDispatcher); client.init(partitionRequestProtocol, bufferPool); server.init(partitionRequestProtocol, bufferPool); http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java index bcfe558..7de00e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java @@ -18,12 +18,126 @@ package org.apache.flink.runtime.io.network.netty; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; + import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -public interface NettyProtocol { +import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder; + +/** + * Defines the server and client channel handlers, i.e. the protocol, used by netty. + */ +public class NettyProtocol { + + private final NettyMessage.NettyMessageEncoder + messageEncoder = new NettyMessage.NettyMessageEncoder(); + + private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder(); + + private final ResultPartitionProvider partitionProvider; + private final TaskEventDispatcher taskEventDispatcher; + + NettyProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) { + this.partitionProvider = partitionProvider; + this.taskEventDispatcher = taskEventDispatcher; + } + + /** + * Returns the server channel handlers. + * + * <pre> + * +-------------------------------------------------------------------+ + * | SERVER CHANNEL PIPELINE | + * | | + * | +----------+----------+ (3) write +----------------------+ | + * | | Queue of queues +----------->| Message encoder | | + * | +----------+----------+ +-----------+----------+ | + * | /|\ \|/ | + * | | (2) enqueue | | + * | +----------+----------+ | | + * | | Request handler | | | + * | +----------+----------+ | | + * | /|\ | | + * | | | | + * | +----------+----------+ | | + * | | Message decoder | | | + * | +----------+----------+ | | + * | /|\ | | + * | | | | + * | +----------+----------+ | | + * | | Frame decoder | | | + * | +----------+----------+ | | + * | /|\ | | + * +---------------+-----------------------------------+---------------+ + * | | (1) client request \|/ + * +---------------+-----------------------------------+---------------+ + * | | | | + * | [ Socket.read() ] [ Socket.write() ] | + * | | + * | Netty Internal I/O Threads (Transport Implementation) | + * +-------------------------------------------------------------------+ + * </pre> + * + * @return channel handlers + */ + public ChannelHandler[] getServerChannelHandlers() { + PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue(); + PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( + partitionProvider, taskEventDispatcher, queueOfPartitionQueues); - ChannelHandler[] getServerChannelHandlers(); + return new ChannelHandler[] { + messageEncoder, + createFrameLengthDecoder(), + messageDecoder, + serverHandler, + queueOfPartitionQueues + }; + } - ChannelHandler[] getClientChannelHandlers(); + /** + * Returns the client channel handlers. + * + * <pre> + * +-----------+----------+ +----------------------+ + * | Remote input channel | | request client | + * +-----------+----------+ +-----------+----------+ + * | | (1) write + * +---------------+-----------------------------------+---------------+ + * | | CLIENT CHANNEL PIPELINE | | + * | | \|/ | + * | +----------+----------+ +----------------------+ | + * | | Request handler + | Message encoder | | + * | +----------+----------+ +-----------+----------+ | + * | /|\ \|/ | + * | | | | + * | +----------+----------+ | | + * | | Message decoder | | | + * | +----------+----------+ | | + * | /|\ | | + * | | | | + * | +----------+----------+ | | + * | | Frame decoder | | | + * | +----------+----------+ | | + * | /|\ | | + * +---------------+-----------------------------------+---------------+ + * | | (3) server response \|/ (2) client request + * +---------------+-----------------------------------+---------------+ + * | | | | + * | [ Socket.read() ] [ Socket.write() ] | + * | | + * | Netty Internal I/O Threads (Transport Implementation) | + * +-------------------------------------------------------------------+ + * </pre> + * + * @return channel handlers + */ + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[] { + messageEncoder, + createFrameLengthDecoder(), + messageDecoder, + new PartitionRequestClientHandler()}; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java deleted file mode 100644 index b6614b6..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestProtocol.java +++ /dev/null @@ -1,127 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.netty; - -import org.apache.flink.runtime.io.network.TaskEventDispatcher; -import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; - -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; - -import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder; -import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder; - -class PartitionRequestProtocol implements NettyProtocol { - - private final NettyMessageEncoder messageEncoder = new NettyMessageEncoder(); - - private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder(); - - private final ResultPartitionProvider partitionProvider; - private final TaskEventDispatcher taskEventDispatcher; - - PartitionRequestProtocol(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) { - this.partitionProvider = partitionProvider; - this.taskEventDispatcher = taskEventDispatcher; - } - - // +-------------------------------------------------------------------+ - // | SERVER CHANNEL PIPELINE | - // | | - // | +----------+----------+ (3) write +----------------------+ | - // | | Queue of queues +----------->| Message encoder | | - // | +----------+----------+ +-----------+----------+ | - // | /|\ \|/ | - // | | (2) enqueue | | - // | +----------+----------+ | | - // | | Request handler | | | - // | +----------+----------+ | | - // | /|\ | | - // | | | | - // | +----------+----------+ | | - // | | Message decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // | | | | - // | +----------+----------+ | | - // | | Frame decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // +---------------+-----------------------------------+---------------+ - // | | (1) client request \|/ - // +---------------+-----------------------------------+---------------+ - // | | | | - // | [ Socket.read() ] [ Socket.write() ] | - // | | - // | Netty Internal I/O Threads (Transport Implementation) | - // +-------------------------------------------------------------------+ - - @Override - public ChannelHandler[] getServerChannelHandlers() { - PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue(); - PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( - partitionProvider, taskEventDispatcher, queueOfPartitionQueues); - - return new ChannelHandler[] { - messageEncoder, - createFrameLengthDecoder(), - messageDecoder, - serverHandler, - queueOfPartitionQueues - }; - } - - // +-----------+----------+ +----------------------+ - // | Remote input channel | | request client | - // +-----------+----------+ +-----------+----------+ - // | | (1) write - // +---------------+-----------------------------------+---------------+ - // | | CLIENT CHANNEL PIPELINE | | - // | | \|/ | - // | +----------+----------+ +----------------------+ | - // | | Request handler + | Message encoder | | - // | +----------+----------+ +-----------+----------+ | - // | /|\ \|/ | - // | | | | - // | +----------+----------+ | | - // | | Message decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // | | | | - // | +----------+----------+ | | - // | | Frame decoder | | | - // | +----------+----------+ | | - // | /|\ | | - // +---------------+-----------------------------------+---------------+ - // | | (3) server response \|/ (2) client request - // +---------------+-----------------------------------+---------------+ - // | | | | - // | [ Socket.read() ] [ Socket.write() ] | - // | | - // | Netty Internal I/O Threads (Transport Implementation) | - // +-------------------------------------------------------------------+ - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[] { - messageEncoder, - createFrameLengthDecoder(), - messageDecoder, - new PartitionRequestClientHandler()}; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java index 912fae2..c9f063b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java @@ -89,7 +89,7 @@ public class CancelPartitionRequestTest { } }); - PartitionRequestProtocol protocol = new PartitionRequestProtocol( + NettyProtocol protocol = new NettyProtocol( partitions, mock(TaskEventDispatcher.class)); serverAndClient = initServerAndClient(protocol); @@ -140,7 +140,7 @@ public class CancelPartitionRequestTest { } }); - PartitionRequestProtocol protocol = new PartitionRequestProtocol( + NettyProtocol protocol = new NettyProtocol( partitions, mock(TaskEventDispatcher.class)); serverAndClient = initServerAndClient(protocol); http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java index 5754e36..eebdc29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java @@ -76,18 +76,14 @@ public class ClientTransportErrorHandlingTest { @Test public void testExceptionOnWrite() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol( + mock(ResultPartitionProvider.class), + mock(TaskEventDispatcher.class)) { + @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new PartitionRequestProtocol( - mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class)).getClientChannelHandlers(); - } }; // We need a real server and client in this test, because Netty's EmbeddedChannel is @@ -215,7 +211,10 @@ public class ClientTransportErrorHandlingTest { @Test public void testExceptionOnRemoteClose() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol( + mock(ResultPartitionProvider.class), + mock(TaskEventDispatcher.class)) { + @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[] { @@ -230,13 +229,6 @@ public class ClientTransportErrorHandlingTest { } }; } - - @Override - public ChannelHandler[] getClientChannelHandlers() { - return new PartitionRequestProtocol( - mock(ResultPartitionProvider.class), - mock(TaskEventDispatcher.class)).getClientChannelHandlers(); - } }; NettyServerAndClient serverAndClient = initServerAndClient(protocol, createConfig()); @@ -380,7 +372,7 @@ public class ClientTransportErrorHandlingTest { // --------------------------------------------------------------------------------------------- private EmbeddedChannel createEmbeddedChannel() { - PartitionRequestProtocol protocol = new PartitionRequestProtocol( + NettyProtocol protocol = new NettyProtocol( mock(ResultPartitionProvider.class), mock(TaskEventDispatcher.class)); http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java index 3f2d363..20031b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java @@ -38,19 +38,20 @@ import static org.junit.Assert.assertTrue; public class NettyClientServerSslTest { /** - * Verify valid ssl configuration and connection - * + * Verify valid ssl configuration and connection. */ @Test public void testValidSslConnection() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } @Override - public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[0]; + } }; NettyConfig nettyConfig = new NettyConfig( @@ -72,19 +73,20 @@ public class NettyClientServerSslTest { } /** - * Verify failure on invalid ssl configuration - * + * Verify failure on invalid ssl configuration. */ @Test public void testInvalidSslConfiguration() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } @Override - public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[0]; + } }; Configuration config = createSslConfig(); @@ -110,19 +112,20 @@ public class NettyClientServerSslTest { } /** - * Verify SSL handshake error when untrusted server certificate is used - * + * Verify SSL handshake error when untrusted server certificate is used. */ @Test public void testSslHandshakeError() throws Exception { - NettyProtocol protocol = new NettyProtocol() { + NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; } @Override - public ChannelHandler[] getClientChannelHandlers() { return new ChannelHandler[0]; } + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[0]; + } }; Configuration config = createSslConfig(); http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java index 0fbfcac..9291bc0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyServerLowAndHighWatermarkTest.java @@ -81,7 +81,7 @@ public class NettyServerLowAndHighWatermarkTest { final int expectedHighWatermark = 2 * pageSize; final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); - final NettyProtocol protocol = new NettyProtocol() { + final NettyProtocol protocol = new NettyProtocol(null, null) { @Override public ChannelHandler[] getServerChannelHandlers() { // The channel handler implements the test http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index 91a052f..d971634 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -57,7 +57,8 @@ public class PartitionRequestClientFactoryTest { final CountDownLatch syncOnConnect = new CountDownLatch(1); final Tuple2<NettyServer, NettyClient> netty = createNettyServerAndClient( - new NettyProtocol() { + new NettyProtocol(null, null) { + @Override public ChannelHandler[] getServerChannelHandlers() { return new ChannelHandler[0]; http://git-wip-us.apache.org/repos/asf/flink/blob/57cef728/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java index d365fba..5916162 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java @@ -73,13 +73,7 @@ public class ServerTransportErrorHandlingTest { } }); - NettyProtocol protocol = new NettyProtocol() { - @Override - public ChannelHandler[] getServerChannelHandlers() { - return new PartitionRequestProtocol( - partitionManager, - mock(TaskEventDispatcher.class)).getServerChannelHandlers(); - } + NettyProtocol protocol = new NettyProtocol(partitionManager, mock(TaskEventDispatcher.class)) { @Override public ChannelHandler[] getClientChannelHandlers() {