[FLINK-7343][utils] Add network proxy utility to simulate network failures
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7f96f79 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7f96f79 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7f96f79 Branch: refs/heads/master Commit: b7f96f79e7665f10880333d816d1694a227c5437 Parents: e2d3e1f Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Tue Aug 1 18:11:27 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Aug 8 10:13:02 2017 +0200 ---------------------------------------------------------------------- .../flink/networking/NetworkFailureHandler.java | 178 +++++++++++++++++++ .../flink/networking/NetworkFailuresProxy.java | 125 +++++++++++++ .../org/apache/flink/networking/EchoServer.java | 113 ++++++++++++ .../networking/NetworkFailuresProxyTest.java | 124 +++++++++++++ 4 files changed, 540 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java new file mode 100644 index 0000000..0ce0b12 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort + * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked + * flag. + */ +class NetworkFailureHandler extends SimpleChannelUpstreamHandler { + private static final Logger LOG = LoggerFactory.getLogger(NetworkFailureHandler.class); + private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; + + // mapping between source and target channels, used for finding correct target channel to use for given source. + private final Map<Channel, Channel> sourceToTargetChannels = new ConcurrentHashMap<>(); + private final Consumer<NetworkFailureHandler> onClose; + private final ClientSocketChannelFactory channelFactory; + private final String remoteHost; + private final int remotePort; + + private final AtomicBoolean blocked; + + public NetworkFailureHandler( + AtomicBoolean blocked, + Consumer<NetworkFailureHandler> onClose, + ClientSocketChannelFactory channelFactory, + String remoteHost, + int remotePort) { + this.blocked = blocked; + this.onClose = onClose; + this.channelFactory = channelFactory; + this.remoteHost = remoteHost; + this.remotePort = remotePort; + } + + /** + * Closes the specified channel after all queued write requests are flushed. + */ + static void closeOnFlush(Channel channel) { + if (channel.isConnected()) { + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void closeConnections() { + for (Map.Entry<Channel, Channel> entry : sourceToTargetChannels.entrySet()) { + // target channel is closed on source's channel channelClosed even + entry.getKey().close(); + } + } + + @Override + public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + // Suspend incoming traffic until connected to the remote host. + final Channel sourceChannel = event.getChannel(); + sourceChannel.setReadable(false); + + if (blocked.get()) { + sourceChannel.close(); + return; + } + + // Start the connection attempt. + ClientBootstrap targetConnectionBootstrap = new ClientBootstrap(channelFactory); + targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, new TargetChannelHandler(event.getChannel(), blocked)); + ChannelFuture connectFuture = targetConnectionBootstrap.connect(new InetSocketAddress(remoteHost, remotePort)); + sourceToTargetChannels.put(sourceChannel, connectFuture.getChannel()); + + connectFuture.addListener(future -> { + if (future.isSuccess()) { + // Connection attempt succeeded: + // Begin to accept incoming traffic. + sourceChannel.setReadable(true); + } else { + // Close the connection if the connection attempt has failed. + sourceChannel.close(); + } + }); + } + + @Override + public void messageReceived(ChannelHandlerContext context, MessageEvent event) throws Exception { + if (blocked.get()) { + return; + } + + ChannelBuffer msg = (ChannelBuffer) event.getMessage(); + Channel targetChannel = sourceToTargetChannels.get(event.getChannel()); + if (targetChannel == null) { + throw new IllegalStateException("Could not find a target channel for the source channel"); + } + targetChannel.write(msg); + } + + @Override + public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + Channel targetChannel = sourceToTargetChannels.get(event.getChannel()); + if (targetChannel == null) { + return; + } + closeOnFlush(targetChannel); + sourceToTargetChannels.remove(event.getChannel()); + onClose.accept(this); + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception { + LOG.error("Closing communication channel because of an exception", event.getCause()); + closeOnFlush(event.getChannel()); + } + + private static class TargetChannelHandler extends SimpleChannelUpstreamHandler { + private final Channel sourceChannel; + private final AtomicBoolean blocked; + + TargetChannelHandler(Channel sourceChannel, AtomicBoolean blocked) { + this.sourceChannel = sourceChannel; + this.blocked = blocked; + } + + @Override + public void messageReceived(ChannelHandlerContext context, MessageEvent event) throws Exception { + if (blocked.get()) { + return; + } + ChannelBuffer msg = (ChannelBuffer) event.getMessage(); + sourceChannel.write(msg); + } + + @Override + public void channelClosed(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + closeOnFlush(sourceChannel); + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) throws Exception { + LOG.error("Closing communication channel because of an exception", event.getCause()); + closeOnFlush(event.getChannel()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java new file mode 100644 index 0000000..7030049 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class acts as a network proxy - listening on local port and forwarding all of the network to the remote + * host/port. It allows to simulate a network failures in the communication. + */ +public class NetworkFailuresProxy implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(NetworkFailuresProxy.class); + private static final String NETWORK_FAILURE_HANDLER_NAME = "network_failure_handler"; + + private final Executor executor = Executors.newCachedThreadPool(); + private final ServerBootstrap serverBootstrap; + private final Channel channel; + private final AtomicBoolean blocked = new AtomicBoolean(); + // collection of networkFailureHandlers so that we can call {@link NetworkFailureHandler.closeConnections} on them. + private final Set<NetworkFailureHandler> networkFailureHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + public NetworkFailuresProxy(int localPort, String remoteHost, int remotePort) { + LOG.info("Proxying [*:{}] to [{}:{}]", localPort, remoteHost, remotePort); + + // Configure the bootstrap. + serverBootstrap = new ServerBootstrap( + new NioServerSocketChannelFactory(executor, executor)); + + // Set up the event pipeline factory. + ClientSocketChannelFactory channelFactory = new NioClientSocketChannelFactory(executor, executor); + serverBootstrap.setOption("child.tcpNoDelay", true); + serverBootstrap.setOption("child.keepAlive", true); + serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = Channels.pipeline(); + + // synchronized for a race between blocking and creating new handlers + synchronized (networkFailureHandlers) { + NetworkFailureHandler failureHandler = new NetworkFailureHandler( + blocked, + networkFailureHandler -> networkFailureHandlers.remove(networkFailureHandler), + channelFactory, + remoteHost, + remotePort); + networkFailureHandlers.add(failureHandler); + pipeline.addLast(NETWORK_FAILURE_HANDLER_NAME, failureHandler); + } + return pipeline; + } + }); + channel = serverBootstrap.bind(new InetSocketAddress(localPort)); + + } + + /** + * @return local port on which {@link NetworkFailuresProxy} is listening. + */ + public int getLocalPort() { + return ((InetSocketAddress) channel.getLocalAddress()).getPort(); + } + + /** + * Blocks all ongoing traffic, closes all ongoing and closes any new incoming connections. + */ + public void blockTraffic() { + setTrafficBlocked(true); + } + + /** + * Resumes normal communication. + */ + public void unblockTraffic() { + setTrafficBlocked(false); + } + + @Override + public void close() throws Exception { + channel.close(); + } + + private void setTrafficBlocked(boolean blocked) { + this.blocked.set(blocked); + if (blocked) { + // synchronized for a race between blocking and creating new handlers + synchronized (networkFailureHandlers) { + for (NetworkFailureHandler failureHandler : networkFailureHandlers) { + failureHandler.closeConnections(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java new file mode 100644 index 0000000..06e77ea --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * TCP EchoServer for test purposes. + */ +public class EchoServer extends Thread implements AutoCloseable { + private final ServerSocket serverSocket = new ServerSocket(0); + private final int socketTimeout; + private final List<EchoWorkerThread> workerThreads = Collections.synchronizedList(new ArrayList<>()); + + private volatile boolean close = false; + private Exception threadException; + + public EchoServer(int socketTimeout) throws IOException { + serverSocket.setSoTimeout(socketTimeout); + this.socketTimeout = socketTimeout; + } + + public int getLocalPort() { + return serverSocket.getLocalPort(); + } + + @Override + public void run() { + while (!close) { + try { + EchoWorkerThread thread = new EchoWorkerThread(serverSocket.accept(), socketTimeout); + thread.start(); + } catch (IOException e) { + threadException = e; + } + } + } + + @Override + public void close() throws Exception { + for (EchoWorkerThread thread : workerThreads) { + thread.close(); + thread.join(); + } + close = true; + if (threadException != null) { + throw threadException; + } + serverSocket.close(); + this.join(); + } + + private static class EchoWorkerThread extends Thread implements AutoCloseable { + private final PrintWriter output; + private final BufferedReader input; + + private volatile boolean close; + private Exception threadException; + + public EchoWorkerThread(Socket clientSocket, int socketTimeout) throws IOException { + output = new PrintWriter(clientSocket.getOutputStream(), true); + input = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); + clientSocket.setSoTimeout(socketTimeout); + } + + @Override + public void run() { + try { + String inputLine; + while (!close && (inputLine = input.readLine()) != null) { + output.println(inputLine); + } + } catch (IOException e) { + threadException = e; + } + } + + @Override + public void close() throws Exception { + close = true; + if (threadException != null) { + throw threadException; + } + input.close(); + output.close(); + this.join(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java new file mode 100644 index 0000000..0046868 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.net.Socket; +import java.net.SocketException; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for NetworkFailuresProxy. + */ +public class NetworkFailuresProxyTest { + public static final int SOCKET_TIMEOUT = 500_000; + + @Test + public void testProxy() throws Exception { + try ( + EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT); + NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort()); + EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { + echoServer.start(); + + assertEquals("42", echoClient.write("42")); + assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!")); + } + } + + @Test + public void testMultipleConnections() throws Exception { + try ( + EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT); + NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort()); + EchoClient echoClient1 = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT); + EchoClient echoClient2 = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { + echoServer.start(); + + assertEquals("42", echoClient1.write("42")); + assertEquals("Ala ma kota!", echoClient2.write("Ala ma kota!")); + assertEquals("Ala hat eine Katze!", echoClient1.write("Ala hat eine Katze!")); + } + } + + @Test + public void testBlockTraffic() throws Exception { + try ( + EchoServer echoServer = new EchoServer(SOCKET_TIMEOUT); + NetworkFailuresProxy proxy = new NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort())) { + echoServer.start(); + + try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { + assertEquals("42", echoClient.write("42")); + proxy.blockTraffic(); + try { + echoClient.write("Ala ma kota!"); + } catch (SocketException ex) { + assertEquals("Connection reset", ex.getMessage()); + } + } + + try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { + assertEquals(null, echoClient.write("42")); + } catch (SocketException ex) { + assertEquals("Connection reset", ex.getMessage()); + } + + proxy.unblockTraffic(); + try (EchoClient echoClient = new EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) { + assertEquals("42", echoClient.write("42")); + assertEquals("Ala ma kota!", echoClient.write("Ala ma kota!")); + } + } + } + + /** + * Simple echo client that sends a message over the network and waits for the answer. + */ + public static class EchoClient implements AutoCloseable { + private final Socket socket; + private final PrintWriter output; + private final BufferedReader input; + + public EchoClient(String hostName, int portNumber, int socketTimeout) throws IOException { + socket = new Socket(hostName, portNumber); + socket.setSoTimeout(socketTimeout); + output = new PrintWriter(socket.getOutputStream(), true); + input = new BufferedReader(new InputStreamReader(socket.getInputStream())); + } + + public String write(String message) throws IOException { + output.println(message); + return input.readLine(); + } + + @Override + public void close() throws Exception { + input.close(); + output.close(); + socket.close(); + } + } +}