Repository: incubator-reef Updated Branches: refs/heads/master b88ea6b6c -> e82a7e31e
[REEF-182] Clean up Link and LinkListener for better message and error handling This removes the throw clause of the Link write method, removes unnecessary try catch statements, modifies LinkListener to get notifications on successful or failed send events, and creates new LinkListener implementation. JIRA: [REEF-182] https://issues.apache.org/jira/browse/REEF-182 Pull Request: Closes #96 Author: Geon Woo Kim [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/e82a7e31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/e82a7e31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/e82a7e31 Branch: refs/heads/master Commit: e82a7e31e6e423b0d66eedf2035291669cf56f7a Parents: b88ea6b Author: Geon Woo Kim <[email protected]> Authored: Sun Mar 1 18:44:41 2015 +0900 Committer: Byung-Gon Chun <[email protected]> Committed: Mon Mar 2 16:35:39 2015 +0900 ---------------------------------------------------------------------- .../reef/io/network/impl/NSConnection.java | 14 +++--- .../reef/io/network/impl/NetworkService.java | 8 ++-- .../io/network/naming/NameRegistryClient.java | 6 +-- .../reef/io/network/naming/NameServerImpl.java | 16 +------ .../remote/impl/RemoteSenderEventHandler.java | 5 +-- .../apache/reef/wake/remote/transport/Link.java | 8 ++-- .../wake/remote/transport/LinkListener.java | 17 ++++++-- .../transport/netty/LoggingLinkListener.java | 24 ++++++++--- .../wake/remote/transport/netty/NettyLink.java | 45 ++++++++++++++------ .../reef/wake/test/remote/LargeMsgTest.java | 6 +-- 10 files changed, 80 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java index a4b4538..43d3573 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java @@ -28,6 +28,7 @@ import org.apache.reef.wake.remote.transport.LinkListener; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.logging.Level; import java.util.logging.Logger; @@ -101,12 +102,7 @@ class NSConnection<T> implements Connection<T> { */ @Override public void write(final T obj) throws NetworkException { - try { - this.link.write(new NSMessage<T>(this.srcId, this.destId, obj)); - } catch (final IOException ex) { - LOG.log(Level.WARNING, "Could not write to " + this.destId, ex); - throw new NetworkException(ex); - } + this.link.write(new NSMessage<T>(this.srcId, this.destId, obj)); } /** @@ -129,6 +125,10 @@ final class NSMessageLinkListener<T> implements LinkListener<NSMessage<T>> { } @Override - public void messageReceived(final NSMessage<T> message) { + public void onSuccess(final NSMessage<T> message) { + } + + @Override + public void onException(final Throwable cause, final SocketAddress remoteAddress, final NSMessage<T> message) { } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java index 6120193..37d6128 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java @@ -39,9 +39,11 @@ import org.apache.reef.wake.remote.Codec; import org.apache.reef.wake.remote.impl.TransportEvent; import org.apache.reef.wake.remote.transport.LinkListener; import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener; import javax.inject.Inject; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; @@ -204,11 +206,7 @@ public final class NetworkService<T> implements Stage, ConnectionFactory<T> { } final Connection<T> newConnection = new NSConnection<T>( - this.myId, destId, new LinkListener<T>() { - @Override - public void messageReceived(final Object message) { - } - }, this); + this.myId, destId, new LoggingLinkListener<T>(), this); final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, newConnection); return existing == null ? newConnection : existing; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java index 554a33b..938dab8 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java @@ -143,11 +143,7 @@ public class NameRegistryClient implements Stage, NamingRegistry { @Override public void unregister(Identifier id) throws IOException { Link<NamingMessage> link = transport.open(serverSocketAddr, codec, - new LinkListener<NamingMessage>() { - @Override - public void messageReceived(NamingMessage message) { - } - }); + new LoggingLinkListener<NamingMessage>()); link.write(new NamingUnregisterRequest(id)); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java index 306682a..4cb0cc6 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java @@ -242,13 +242,7 @@ class NamingLookupRequestHandler implements EventHandler<NamingLookupRequest> { public void onNext(final NamingLookupRequest value) { final List<NameAssignment> nas = server.lookup(value.getIdentifiers()); final byte[] resp = codec.encode(new NamingLookupResponse(nas)); - try { - value.getLink().write(resp); - } catch (final IOException e) { - //Actually, there is no way Link.write can throw and IOException - //after netty4 merge. This needs to cleaned up - LOG.throwing("NamingLookupRequestHandler", "onNext", e); - } + value.getLink().write(resp); } } @@ -272,13 +266,7 @@ class NamingRegisterRequestHandler implements EventHandler<NamingRegisterRequest public void onNext(final NamingRegisterRequest value) { server.register(value.getNameAssignment().getIdentifier(), value.getNameAssignment().getAddress()); final byte[] resp = codec.encode(new NamingRegisterResponse(value)); - try { - value.getLink().write(resp); - } catch (final IOException e) { - //Actually, there is no way Link.write can throw and IOException - //after netty4 merge. This needs to cleaned up - LOG.throwing("NamingRegisterRequestHandler", "onNext", e); - } + value.getLink().write(resp); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java index 779a1fe..04a96d8 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java @@ -75,7 +75,7 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { LOG.log(Level.FINEST, "{0}", event); linkRef.get().write(encoder.encode(event)); } - } catch (InterruptedException | IOException e) { + } catch (InterruptedException e) { e.printStackTrace(); throw new RemoteRuntimeException(e); } @@ -112,9 +112,6 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { LOG.log(Level.FINEST, "Send an event from " + linkRef.get().getLocalAddress() + " to " + linkRef.get().getRemoteAddress() + " value " + value); linkRef.get().write(encoder.encode(value)); } - } catch (IOException ex) { - ex.printStackTrace(); - throw new RemoteRuntimeException(ex); } catch (RemoteRuntimeException ex2) { ex2.printStackTrace(); throw ex2; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Link.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Link.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Link.java index c0aad44..a05e8f7 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Link.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Link.java @@ -18,7 +18,6 @@ */ package org.apache.reef.wake.remote.transport; -import java.io.IOException; import java.net.SocketAddress; /** @@ -26,7 +25,7 @@ import java.net.SocketAddress; * * @param <T> type of the message. */ -public interface Link<T> extends LinkListener<T> { +public interface Link<T> { /** * Gets its local address. @@ -43,10 +42,9 @@ public interface Link<T> extends LinkListener<T> { SocketAddress getRemoteAddress(); /** - * Writes the value to this link. + * Asynchronously writes the value to this link. * * @param value the data value. - * @throws IOException */ - void write(T value) throws IOException; + void write(T value); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/LinkListener.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/LinkListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/LinkListener.java index f10d153..7cd101d 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/LinkListener.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/LinkListener.java @@ -18,6 +18,8 @@ */ package org.apache.reef.wake.remote.transport; +import java.net.SocketAddress; + /** * Link listener * @@ -26,9 +28,18 @@ package org.apache.reef.wake.remote.transport; public interface LinkListener<T> { /** - * Handles the received message + * Called when the sent message is successfully transferred + * + * @param message the sent message + */ + public void onSuccess(T message); + + /** + * Called when the sent message to remoteAddress is failed to be transferred. * - * @param message the message + * @param cause the cause of exception + * @param remoteAddress the exception occurred remote address + * @param message the send message */ - public void messageReceived(T message); + public void onException(Throwable cause, SocketAddress remoteAddress, T message); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java index c551ccf..1f77684 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java @@ -20,11 +20,12 @@ package org.apache.reef.wake.remote.transport.netty; import org.apache.reef.wake.remote.transport.LinkListener; +import java.net.SocketAddress; import java.util.logging.Level; import java.util.logging.Logger; /** - * Link listener that logs a message received + * Link listener that logs whether the message is sent successfully * * @param <T> type */ @@ -33,14 +34,23 @@ public class LoggingLinkListener<T> implements LinkListener<T> { private static final Logger LOG = Logger.getLogger(LoggingLinkListener.class.getName()); /** - * Handles the message received - * - * @param message the message + * Called when the sent message is transferred successfully */ @Override - public void messageReceived(T message) { - if (LOG.isLoggable(Level.FINEST)) - LOG.log(Level.FINEST, "The linklistener " + this.getClass().toString() + "has received " + message); + public void onSuccess(T message) { + if (LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "The message is successfully sent : {0}", new Object[]{message}); + } } + /** + * Called when the sent message to remoteAddress is failed to be transferred. + */ + @Override + public void onException(Throwable cause, SocketAddress remoteAddress, T message) { + if (LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "The message to {0} is failed to be sent. message : {1}, cause : {2}" + , new Object[]{remoteAddress, message, cause}); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java index ce70193..5971d98 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java @@ -20,17 +20,21 @@ package org.apache.reef.wake.remote.transport.netty; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import org.apache.reef.wake.remote.Encoder; import org.apache.reef.wake.remote.transport.Link; import org.apache.reef.wake.remote.transport.LinkListener; -import java.io.IOException; import java.net.SocketAddress; import java.util.logging.Level; import java.util.logging.Logger; /** * Link implementation with Netty + * + * If you set a LinkListener<T>, it keeps message until writeAndFlush operation completes + * and notifies whether the sent message transferred successfully through the listener. */ public class NettyLink<T> implements Link<T> { @@ -71,22 +75,15 @@ public class NettyLink<T> implements Link<T> { * @param message the message */ @Override - public void write(final T message) throws IOException { + public void write(final T message) { LOG.log(Level.FINEST, "write {0} {1}", new Object[]{channel, message}); byte[] allData = encoder.encode(message); // byte[] -> ByteBuf - channel.writeAndFlush(Unpooled.wrappedBuffer(allData)); - } - - /** - * Handles the message received - * - * @param message the message - */ - @Override - public void messageReceived(final T message) { - if (listener != null) { - listener.messageReceived(message); + if (listener != null) { + channel.writeAndFlush(Unpooled.wrappedBuffer(allData)) + .addListener(new NettyChannelFutureListener<>(message, listener)); + } else { + channel.writeAndFlush(Unpooled.wrappedBuffer(allData)); } } @@ -115,3 +112,23 @@ public class NettyLink<T> implements Link<T> { return "localAddr: " + getLocalAddress() + " remoteAddr: " + getRemoteAddress(); } } + +class NettyChannelFutureListener<T> implements ChannelFutureListener { + + private final T message; + private LinkListener<T> listener; + + NettyChannelFutureListener(final T message, final LinkListener<T> listener) { + this.message = message; + this.listener = listener; + } + + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + if (channelFuture.isSuccess()) { + listener.onSuccess(message); + } else { + listener.onException(channelFuture.cause(), channelFuture.channel().remoteAddress(), message); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java index 40721e1..db59b34 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java @@ -86,11 +86,7 @@ public class LargeMsgTest { @Override public void onNext(byte[] value) { - try { - link.write(value); - } catch (IOException e) { - e.printStackTrace(); - } + link.write(value); } }, 3, new LoggingEventHandler<Throwable>()); writeSubmitter.onNext(values[0]);
