[FLINK-7343][utils] Add network proxy utility to simulate network failures

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7f96f79
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7f96f79
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7f96f79

Branch: refs/heads/master
Commit: b7f96f79e7665f10880333d816d1694a227c5437
Parents: e2d3e1f
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Tue Aug 1 18:11:27 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200

----------------------------------------------------------------------
 .../flink/networking/NetworkFailureHandler.java | 178 +++++++++++++++++++
 .../flink/networking/NetworkFailuresProxy.java  | 125 +++++++++++++
 .../org/apache/flink/networking/EchoServer.java | 113 ++++++++++++
 .../networking/NetworkFailuresProxyTest.java    | 124 +++++++++++++
 4 files changed, 540 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
new file mode 100644
index 0000000..0ce0b12
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to the 
target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic can 
be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+       private static final Logger LOG = 
LoggerFactory.getLogger(NetworkFailureHandler.class);
+       private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
+
+       // mapping between source and target channels, used for finding correct 
target channel to use for given source.
+       private final Map<Channel, Channel> sourceToTargetChannels = new 
ConcurrentHashMap<>();
+       private final Consumer<NetworkFailureHandler> onClose;
+       private final ClientSocketChannelFactory channelFactory;
+       private final String remoteHost;
+       private final int remotePort;
+
+       private final AtomicBoolean blocked;
+
+       public NetworkFailureHandler(
+                       AtomicBoolean blocked,
+                       Consumer<NetworkFailureHandler> onClose,
+                       ClientSocketChannelFactory channelFactory,
+                       String remoteHost,
+                       int remotePort) {
+               this.blocked = blocked;
+               this.onClose = onClose;
+               this.channelFactory = channelFactory;
+               this.remoteHost = remoteHost;
+               this.remotePort = remotePort;
+       }
+
+       /**
+        * Closes the specified channel after all queued write requests are 
flushed.
+        */
+       static void closeOnFlush(Channel channel) {
+               if (channel.isConnected()) {
+                       
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+               }
+       }
+
+       public void closeConnections() {
+               for (Map.Entry<Channel, Channel> entry : 
sourceToTargetChannels.entrySet()) {
+                       // target channel is closed on source's channel 
channelClosed even
+                       entry.getKey().close();
+               }
+       }
+
+       @Override
+       public void channelOpen(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+               // Suspend incoming traffic until connected to the remote host.
+               final Channel sourceChannel = event.getChannel();
+               sourceChannel.setReadable(false);
+
+               if (blocked.get()) {
+                       sourceChannel.close();
+                       return;
+               }
+
+               // Start the connection attempt.
+               ClientBootstrap targetConnectionBootstrap = new 
ClientBootstrap(channelFactory);
+               
targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, 
new TargetChannelHandler(event.getChannel(), blocked));
+               ChannelFuture connectFuture = 
targetConnectionBootstrap.connect(new InetSocketAddress(remoteHost, 
remotePort));
+               sourceToTargetChannels.put(sourceChannel, 
connectFuture.getChannel());
+
+               connectFuture.addListener(future -> {
+                       if (future.isSuccess()) {
+                               // Connection attempt succeeded:
+                               // Begin to accept incoming traffic.
+                               sourceChannel.setReadable(true);
+                       } else {
+                               // Close the connection if the connection 
attempt has failed.
+                               sourceChannel.close();
+                       }
+               });
+       }
+
+       @Override
+       public void messageReceived(ChannelHandlerContext context, MessageEvent 
event) throws Exception {
+               if (blocked.get()) {
+                       return;
+               }
+
+               ChannelBuffer msg = (ChannelBuffer) event.getMessage();
+               Channel targetChannel = 
sourceToTargetChannels.get(event.getChannel());
+               if (targetChannel == null) {
+                       throw new IllegalStateException("Could not find a 
target channel for the source channel");
+               }
+               targetChannel.write(msg);
+       }
+
+       @Override
+       public void channelClosed(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+               Channel targetChannel = 
sourceToTargetChannels.get(event.getChannel());
+               if (targetChannel == null) {
+                       return;
+               }
+               closeOnFlush(targetChannel);
+               sourceToTargetChannels.remove(event.getChannel());
+               onClose.accept(this);
+       }
+
+       @Override
+       public void exceptionCaught(ChannelHandlerContext context, 
ExceptionEvent event) throws Exception {
+               LOG.error("Closing communication channel because of an 
exception", event.getCause());
+               closeOnFlush(event.getChannel());
+       }
+
+       private static class TargetChannelHandler extends 
SimpleChannelUpstreamHandler {
+               private final Channel sourceChannel;
+               private final AtomicBoolean blocked;
+
+               TargetChannelHandler(Channel sourceChannel, AtomicBoolean 
blocked) {
+                       this.sourceChannel = sourceChannel;
+                       this.blocked = blocked;
+               }
+
+               @Override
+               public void messageReceived(ChannelHandlerContext context, 
MessageEvent event) throws Exception {
+                       if (blocked.get()) {
+                               return;
+                       }
+                       ChannelBuffer msg = (ChannelBuffer) event.getMessage();
+                       sourceChannel.write(msg);
+               }
+
+               @Override
+               public void channelClosed(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+                       closeOnFlush(sourceChannel);
+               }
+
+               @Override
+               public void exceptionCaught(ChannelHandlerContext context, 
ExceptionEvent event) throws Exception {
+                       LOG.error("Closing communication channel because of an 
exception", event.getCause());
+                       closeOnFlush(event.getChannel());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
new file mode 100644
index 0000000..7030049
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailuresProxy.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class acts as a network proxy - listening on local port and forwarding 
all of the network to the remote
+ * host/port. It allows to simulate a network failures in the communication.
+ */
+public class NetworkFailuresProxy implements AutoCloseable {
+       private static final Logger LOG = 
LoggerFactory.getLogger(NetworkFailuresProxy.class);
+       private static final String NETWORK_FAILURE_HANDLER_NAME = 
"network_failure_handler";
+
+       private final Executor executor = Executors.newCachedThreadPool();
+       private final ServerBootstrap serverBootstrap;
+       private final Channel channel;
+       private final AtomicBoolean blocked = new AtomicBoolean();
+       // collection of networkFailureHandlers so that we can call {@link 
NetworkFailureHandler.closeConnections} on them.
+       private final Set<NetworkFailureHandler> networkFailureHandlers = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+       public NetworkFailuresProxy(int localPort, String remoteHost, int 
remotePort) {
+               LOG.info("Proxying [*:{}] to [{}:{}]", localPort, remoteHost, 
remotePort);
+
+               // Configure the bootstrap.
+               serverBootstrap = new ServerBootstrap(
+                       new NioServerSocketChannelFactory(executor, executor));
+
+               // Set up the event pipeline factory.
+               ClientSocketChannelFactory channelFactory = new 
NioClientSocketChannelFactory(executor, executor);
+               serverBootstrap.setOption("child.tcpNoDelay", true);
+               serverBootstrap.setOption("child.keepAlive", true);
+               serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() 
{
+                       public ChannelPipeline getPipeline() throws Exception {
+                               ChannelPipeline pipeline = Channels.pipeline();
+
+                               // synchronized for a race between blocking and 
creating new handlers
+                               synchronized (networkFailureHandlers) {
+                                       NetworkFailureHandler failureHandler = 
new NetworkFailureHandler(
+                                               blocked,
+                                               networkFailureHandler -> 
networkFailureHandlers.remove(networkFailureHandler),
+                                               channelFactory,
+                                               remoteHost,
+                                               remotePort);
+                                       
networkFailureHandlers.add(failureHandler);
+                                       
pipeline.addLast(NETWORK_FAILURE_HANDLER_NAME, failureHandler);
+                               }
+                               return pipeline;
+                       }
+               });
+               channel = serverBootstrap.bind(new 
InetSocketAddress(localPort));
+
+       }
+
+       /**
+        * @return local port on which {@link NetworkFailuresProxy} is 
listening.
+        */
+       public int getLocalPort() {
+               return ((InetSocketAddress) 
channel.getLocalAddress()).getPort();
+       }
+
+       /**
+        * Blocks all ongoing traffic, closes all ongoing and closes any new 
incoming connections.
+        */
+       public void blockTraffic() {
+               setTrafficBlocked(true);
+       }
+
+       /**
+        * Resumes normal communication.
+        */
+       public void unblockTraffic() {
+               setTrafficBlocked(false);
+       }
+
+       @Override
+       public void close() throws Exception {
+               channel.close();
+       }
+
+       private void setTrafficBlocked(boolean blocked) {
+               this.blocked.set(blocked);
+               if (blocked) {
+                       // synchronized for a race between blocking and 
creating new handlers
+                       synchronized (networkFailureHandlers) {
+                               for (NetworkFailureHandler failureHandler : 
networkFailureHandlers) {
+                                       failureHandler.closeConnections();
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
new file mode 100644
index 0000000..06e77ea
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/EchoServer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * TCP EchoServer for test purposes.
+ */
+public class EchoServer extends Thread implements AutoCloseable {
+       private final ServerSocket serverSocket = new ServerSocket(0);
+       private final int socketTimeout;
+       private final List<EchoWorkerThread> workerThreads = 
Collections.synchronizedList(new ArrayList<>());
+
+       private volatile boolean close = false;
+       private Exception threadException;
+
+       public EchoServer(int socketTimeout) throws IOException {
+               serverSocket.setSoTimeout(socketTimeout);
+               this.socketTimeout = socketTimeout;
+       }
+
+       public int getLocalPort() {
+               return serverSocket.getLocalPort();
+       }
+
+       @Override
+       public void run() {
+               while (!close) {
+                       try {
+                               EchoWorkerThread thread = new 
EchoWorkerThread(serverSocket.accept(), socketTimeout);
+                               thread.start();
+                       } catch (IOException e) {
+                               threadException = e;
+                       }
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               for (EchoWorkerThread thread : workerThreads) {
+                       thread.close();
+                       thread.join();
+               }
+               close = true;
+               if (threadException != null) {
+                       throw threadException;
+               }
+               serverSocket.close();
+               this.join();
+       }
+
+       private static class EchoWorkerThread extends Thread implements 
AutoCloseable {
+               private final PrintWriter output;
+               private final BufferedReader input;
+
+               private volatile boolean close;
+               private Exception threadException;
+
+               public EchoWorkerThread(Socket clientSocket, int socketTimeout) 
throws IOException {
+                       output = new 
PrintWriter(clientSocket.getOutputStream(), true);
+                       input = new BufferedReader(new 
InputStreamReader(clientSocket.getInputStream()));
+                       clientSocket.setSoTimeout(socketTimeout);
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               String inputLine;
+                               while (!close && (inputLine = input.readLine()) 
!= null) {
+                                       output.println(inputLine);
+                               }
+                       } catch (IOException e) {
+                               threadException = e;
+                       }
+               }
+
+               @Override
+               public void close() throws Exception {
+                       close = true;
+                       if (threadException != null) {
+                               throw threadException;
+                       }
+                       input.close();
+                       output.close();
+                       this.join();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7f96f79/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
new file mode 100644
index 0000000..0046868
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/networking/NetworkFailuresProxyTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.Socket;
+import java.net.SocketException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for NetworkFailuresProxy.
+ */
+public class NetworkFailuresProxyTest {
+       public static final int SOCKET_TIMEOUT = 500_000;
+
+       @Test
+       public void testProxy() throws Exception {
+               try (
+                               EchoServer echoServer = new 
EchoServer(SOCKET_TIMEOUT);
+                               NetworkFailuresProxy proxy = new 
NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort());
+                               EchoClient echoClient = new 
EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) {
+                       echoServer.start();
+
+                       assertEquals("42", echoClient.write("42"));
+                       assertEquals("Ala ma kota!", echoClient.write("Ala ma 
kota!"));
+               }
+       }
+
+       @Test
+       public void testMultipleConnections() throws Exception {
+               try (
+                               EchoServer echoServer = new 
EchoServer(SOCKET_TIMEOUT);
+                               NetworkFailuresProxy proxy = new 
NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort());
+                               EchoClient echoClient1 = new 
EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT);
+                               EchoClient echoClient2 = new 
EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) {
+                       echoServer.start();
+
+                       assertEquals("42", echoClient1.write("42"));
+                       assertEquals("Ala ma kota!", echoClient2.write("Ala ma 
kota!"));
+                       assertEquals("Ala hat eine Katze!", 
echoClient1.write("Ala hat eine Katze!"));
+               }
+       }
+
+       @Test
+       public void testBlockTraffic() throws Exception {
+               try (
+                               EchoServer echoServer = new 
EchoServer(SOCKET_TIMEOUT);
+                               NetworkFailuresProxy proxy = new 
NetworkFailuresProxy(0, "localhost", echoServer.getLocalPort())) {
+                       echoServer.start();
+
+                       try (EchoClient echoClient = new 
EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) {
+                               assertEquals("42", echoClient.write("42"));
+                               proxy.blockTraffic();
+                               try {
+                                       echoClient.write("Ala ma kota!");
+                               } catch (SocketException ex) {
+                                       assertEquals("Connection reset", 
ex.getMessage());
+                               }
+                       }
+
+                       try (EchoClient echoClient = new 
EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) {
+                               assertEquals(null, echoClient.write("42"));
+                       } catch (SocketException ex) {
+                               assertEquals("Connection reset", 
ex.getMessage());
+                       }
+
+                       proxy.unblockTraffic();
+                       try (EchoClient echoClient = new 
EchoClient("localhost", proxy.getLocalPort(), SOCKET_TIMEOUT)) {
+                               assertEquals("42", echoClient.write("42"));
+                               assertEquals("Ala ma kota!", 
echoClient.write("Ala ma kota!"));
+                       }
+               }
+       }
+
+       /**
+        * Simple echo client that sends a message over the network and waits 
for the answer.
+        */
+       public static class EchoClient implements AutoCloseable {
+               private final Socket socket;
+               private final PrintWriter output;
+               private final BufferedReader input;
+
+               public EchoClient(String hostName, int portNumber, int 
socketTimeout) throws IOException {
+                       socket = new Socket(hostName, portNumber);
+                       socket.setSoTimeout(socketTimeout);
+                       output = new PrintWriter(socket.getOutputStream(), 
true);
+                       input = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
+               }
+
+               public String write(String message) throws IOException {
+                       output.println(message);
+                       return input.readLine();
+               }
+
+               @Override
+               public void close() throws Exception {
+                       input.close();
+                       output.close();
+                       socket.close();
+               }
+       }
+}

Reply via email to