[REEF-1935] Improve logging in Wake NettyMessagingTransport and related classes
Summary of changes: * Implement `.toString()` for `LinkReference` * Implement `.toString()` for `RemoteEventEncoder` * Implement `.toString()` for `NettyMessagingTransport` * Improve logging and remove redundant code in `NettyMessagingTransport` constructor JIRA: [REEF-1935](https://issues.apache.org/jira/browse/REEF-1935) Pull Request: This closes #1401 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/48e65c0d Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/48e65c0d Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/48e65c0d Branch: refs/heads/REEF-335 Commit: 48e65c0dcb52a47774dbb09300c0a142becf52d5 Parents: daf33d5 Author: Sergiy Matusevych <[email protected]> Authored: Wed Oct 25 16:37:35 2017 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Oct 25 20:38:33 2017 -0700 ---------------------------------------------------------------------- .../wake/remote/impl/RemoteEventDecoder.java | 4 + .../wake/remote/impl/RemoteEventEncoder.java | 4 + .../remote/transport/netty/LinkReference.java | 5 ++ .../netty/NettyMessagingTransport.java | 84 +++++++++----------- 4 files changed, 49 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/48e65c0d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java index fdf10b2..f65646f 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java @@ -59,4 +59,8 @@ public class RemoteEventDecoder<T> implements Decoder<RemoteEvent<T>> { } } + @Override + public String toString() { + return String.format("RemoteEventDecoder: { decoder: %s }", this.decoder); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/48e65c0d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java index 29e3be6..4a3fa0b 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java @@ -61,4 +61,8 @@ public class RemoteEventEncoder<T> implements Encoder<RemoteEvent<T>> { return builder.build().toByteArray(); } + @Override + public String toString() { + return String.format("RemoteEventEncoder: { encoder: %s }", this.encoder); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/48e65c0d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java index 6e780b8..21cd4d5 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LinkReference.java @@ -49,4 +49,9 @@ final class LinkReference { AtomicInteger getConnectInProgress() { return this.connectInProgress; } + + @Override + public String toString() { + return String.format("LinkReference: { link: %s }", this.link); // NettyLink has a good .toString() implementation + } } http://git-wip-us.apache.org/repos/asf/reef/blob/48e65c0d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java index 2643030..ea221ec 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java @@ -54,7 +54,6 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; -import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -86,14 +85,12 @@ public final class NettyMessagingTransport implements Transport { private final EventLoopGroup serverWorkerGroup; private final Bootstrap clientBootstrap; - private final ServerBootstrap serverBootstrap; private final Channel acceptor; private final ChannelGroup clientChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final ChannelGroup serverChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - private final int serverPort; - private final SocketAddress localAddress; + private final InetSocketAddress localAddress; private final NettyClientEventListener clientEventListener; private final NettyServerEventListener serverEventListener; @@ -105,7 +102,7 @@ public final class NettyMessagingTransport implements Transport { * Constructs a messaging transport. * * @param hostAddress the server host address - * @param port the server listening port; when it is 0, randomly assign a port number + * @param listenPort the server listening port; when it is 0, randomly assign a port number * @param clientStage the client-side stage that handles transport events * @param serverStage the server-side stage that handles transport events * @param numberOfTries the number of tries of connection @@ -115,7 +112,7 @@ public final class NettyMessagingTransport implements Transport { @Inject private NettyMessagingTransport( @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress, - @Parameter(RemoteConfiguration.Port.class) final int port, + @Parameter(RemoteConfiguration.Port.class) final int listenPort, @Parameter(RemoteConfiguration.RemoteClientStage.class) final EStage<TransportEvent> clientStage, @Parameter(RemoteConfiguration.RemoteServerStage.class) final EStage<TransportEvent> serverStage, @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries, @@ -123,9 +120,8 @@ public final class NettyMessagingTransport implements Transport { final TcpPortProvider tcpPortProvider, final LocalAddressProvider localAddressProvider) { - int p = port; - if (p < 0) { - throw new RemoteRuntimeException("Invalid server port: " + p); + if (listenPort < 0) { + throw new RemoteRuntimeException("Invalid server port: " + listenPort); } final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress; @@ -142,16 +138,16 @@ public final class NettyMessagingTransport implements Transport { this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + ":ClientWorker")); - this.clientBootstrap = new Bootstrap(); - this.clientBootstrap.group(this.clientWorkerGroup) + this.clientBootstrap = new Bootstrap() + .group(this.clientWorkerGroup) .channel(NioSocketChannel.class) .handler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("client", this.clientChannelGroup, this.clientEventListener))) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, true); - this.serverBootstrap = new ServerBootstrap(); - this.serverBootstrap.group(this.serverBossGroup, this.serverWorkerGroup) + final ServerBootstrap serverBootstrap = new ServerBootstrap() + .group(this.serverBossGroup, this.serverWorkerGroup) .channel(NioServerSocketChannel.class) .childHandler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("server", this.serverChannelGroup, this.serverEventListener))) @@ -159,56 +155,43 @@ public final class NettyMessagingTransport implements Transport { .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true); - LOG.log(Level.FINE, "Binding to {0}", p); + LOG.log(Level.FINE, "Binding to {0}:{1}", new Object[] {host, listenPort}); - Channel acceptorFound = null; try { - if (p > 0) { - acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, p)).sync().channel(); + if (listenPort > 0) { + this.localAddress = new InetSocketAddress(host, listenPort); + this.acceptor = serverBootstrap.bind(this.localAddress).sync().channel(); } else { - final Iterator<Integer> ports = tcpPortProvider.iterator(); - while (acceptorFound == null) { - if (!ports.hasNext()) { - throw new IllegalStateException("tcpPortProvider cannot find a free port."); - } - p = ports.next(); - LOG.log(Level.FINEST, "Try port {0}", p); + InetSocketAddress socketAddr = null; + Channel acceptorFound = null; + for (int port : tcpPortProvider) { + LOG.log(Level.FINEST, "Try port {0}", port); try { - acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, p)).sync().channel(); + socketAddr = new InetSocketAddress(host, port); + acceptorFound = serverBootstrap.bind(socketAddr).sync().channel(); + break; } catch (final Exception ex) { - if (ex instanceof BindException) { - LOG.log(Level.FINEST, "The port {0} is already bound. Try again", p); + if (ex instanceof BindException) { // Not visible to catch :( + LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port); } else { throw ex; } } } + if (acceptorFound == null) { + throw new IllegalStateException("TcpPortProvider could not find a free port."); + } + this.localAddress = socketAddr; + this.acceptor = acceptorFound; } - } catch (final IllegalStateException ex) { - final RuntimeException transportException = - new TransportRuntimeException("tcpPortProvider failed to return free ports.", ex); - LOG.log(Level.SEVERE, "Cannot find a free port with " + tcpPortProvider, transportException); - + } catch (final IllegalStateException | InterruptedException ex) { + LOG.log(Level.SEVERE, "Cannot bind to port " + listenPort, ex); this.clientWorkerGroup.shutdownGracefully(); this.serverBossGroup.shutdownGracefully(); this.serverWorkerGroup.shutdownGracefully(); - throw transportException; - - } catch (final Exception ex) { - final RuntimeException transportException = - new TransportRuntimeException("Cannot bind to port " + p, ex); - LOG.log(Level.SEVERE, "Cannot bind to port " + p, ex); - - this.clientWorkerGroup.shutdownGracefully(); - this.serverBossGroup.shutdownGracefully(); - this.serverWorkerGroup.shutdownGracefully(); - throw transportException; + throw new TransportRuntimeException("Cannot bind to port " + listenPort, ex); } - this.acceptor = acceptorFound; - this.serverPort = p; - this.localAddress = new InetSocketAddress(host, this.serverPort); - LOG.log(Level.FINE, "Starting netty transport socket address: {0}", this.localAddress); } @@ -372,7 +355,7 @@ public final class NettyMessagingTransport implements Transport { */ @Override public int getListeningPort() { - return this.serverPort; + return this.localAddress.getPort(); } /** @@ -385,4 +368,9 @@ public final class NettyMessagingTransport implements Transport { this.clientEventListener.registerErrorHandler(handler); this.serverEventListener.registerErrorHandler(handler); } + + @Override + public String toString() { + return String.format("NettyMessagingTransport: { address: %s }", this.localAddress); + } }
