Repository: incubator-reef
Updated Branches:
  refs/heads/master b88ea6b6c -> e82a7e31e


[REEF-182] Clean up Link and LinkListener for better message and error handling

  This removes the throw clause of the Link write method, removes unnecessary
  try catch statements, modifies LinkListener to get notifications on
  successful or failed send events, and creates new LinkListener
  implementation.

JIRA:
  [REEF-182] https://issues.apache.org/jira/browse/REEF-182

Pull Request:
  Closes #96

Author:
  Geon Woo Kim [email protected]


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/e82a7e31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/e82a7e31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/e82a7e31

Branch: refs/heads/master
Commit: e82a7e31e6e423b0d66eedf2035291669cf56f7a
Parents: b88ea6b
Author: Geon Woo Kim <[email protected]>
Authored: Sun Mar 1 18:44:41 2015 +0900
Committer: Byung-Gon Chun <[email protected]>
Committed: Mon Mar 2 16:35:39 2015 +0900

----------------------------------------------------------------------
 .../reef/io/network/impl/NSConnection.java      | 14 +++---
 .../reef/io/network/impl/NetworkService.java    |  8 ++--
 .../io/network/naming/NameRegistryClient.java   |  6 +--
 .../reef/io/network/naming/NameServerImpl.java  | 16 +------
 .../remote/impl/RemoteSenderEventHandler.java   |  5 +--
 .../apache/reef/wake/remote/transport/Link.java |  8 ++--
 .../wake/remote/transport/LinkListener.java     | 17 ++++++--
 .../transport/netty/LoggingLinkListener.java    | 24 ++++++++---
 .../wake/remote/transport/netty/NettyLink.java  | 45 ++++++++++++++------
 .../reef/wake/test/remote/LargeMsgTest.java     |  6 +--
 10 files changed, 80 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
index a4b4538..43d3573 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NSConnection.java
@@ -28,6 +28,7 @@ import org.apache.reef.wake.remote.transport.LinkListener;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -101,12 +102,7 @@ class NSConnection<T> implements Connection<T> {
    */
   @Override
   public void write(final T obj) throws NetworkException {
-    try {
-      this.link.write(new NSMessage<T>(this.srcId, this.destId, obj));
-    } catch (final IOException ex) {
-      LOG.log(Level.WARNING, "Could not write to " + this.destId, ex);
-      throw new NetworkException(ex);
-    }
+    this.link.write(new NSMessage<T>(this.srcId, this.destId, obj));
   }
 
   /**
@@ -129,6 +125,10 @@ final class NSMessageLinkListener<T> implements 
LinkListener<NSMessage<T>> {
   }
 
   @Override
-  public void messageReceived(final NSMessage<T> message) {
+  public void onSuccess(final NSMessage<T> message) {
+  }
+
+  @Override
+  public void onException(final Throwable cause, final SocketAddress 
remoteAddress, final NSMessage<T> message) {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
index 6120193..37d6128 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkService.java
@@ -39,9 +39,11 @@ import org.apache.reef.wake.remote.Codec;
 import org.apache.reef.wake.remote.impl.TransportEvent;
 import org.apache.reef.wake.remote.transport.LinkListener;
 import org.apache.reef.wake.remote.transport.Transport;
+import org.apache.reef.wake.remote.transport.netty.LoggingLinkListener;
 
 import javax.inject.Inject;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
@@ -204,11 +206,7 @@ public final class NetworkService<T> implements Stage, 
ConnectionFactory<T> {
     }
 
     final Connection<T> newConnection = new NSConnection<T>(
-        this.myId, destId, new LinkListener<T>() {
-      @Override
-      public void messageReceived(final Object message) {
-      }
-    }, this);
+        this.myId, destId, new LoggingLinkListener<T>(), this);
 
     final Connection<T> existing = this.idToConnMap.putIfAbsent(destId, 
newConnection);
     return existing == null ? newConnection : existing;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
index 554a33b..938dab8 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameRegistryClient.java
@@ -143,11 +143,7 @@ public class NameRegistryClient implements Stage, 
NamingRegistry {
   @Override
   public void unregister(Identifier id) throws IOException {
     Link<NamingMessage> link = transport.open(serverSocketAddr, codec,
-        new LinkListener<NamingMessage>() {
-          @Override
-          public void messageReceived(NamingMessage message) {
-          }
-        });
+        new LoggingLinkListener<NamingMessage>());
     link.write(new NamingUnregisterRequest(id));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
index 306682a..4cb0cc6 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
@@ -242,13 +242,7 @@ class NamingLookupRequestHandler implements 
EventHandler<NamingLookupRequest> {
   public void onNext(final NamingLookupRequest value) {
     final List<NameAssignment> nas = server.lookup(value.getIdentifiers());
     final byte[] resp = codec.encode(new NamingLookupResponse(nas));
-    try {
-      value.getLink().write(resp);
-    } catch (final IOException e) {
-      //Actually, there is no way Link.write can throw and IOException
-      //after netty4 merge. This needs to cleaned up
-      LOG.throwing("NamingLookupRequestHandler", "onNext", e);
-    }
+    value.getLink().write(resp);
   }
 }
 
@@ -272,13 +266,7 @@ class NamingRegisterRequestHandler implements 
EventHandler<NamingRegisterRequest
   public void onNext(final NamingRegisterRequest value) {
     server.register(value.getNameAssignment().getIdentifier(), 
value.getNameAssignment().getAddress());
     final byte[] resp = codec.encode(new NamingRegisterResponse(value));
-    try {
-      value.getLink().write(resp);
-    } catch (final IOException e) {
-      //Actually, there is no way Link.write can throw and IOException
-      //after netty4 merge. This needs to cleaned up
-      LOG.throwing("NamingRegisterRequestHandler", "onNext", e);
-    }
+    value.getLink().write(resp);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
index 779a1fe..04a96d8 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java
@@ -75,7 +75,7 @@ class RemoteSenderEventHandler<T> implements 
EventHandler<RemoteEvent<T>> {
         LOG.log(Level.FINEST, "{0}", event);
         linkRef.get().write(encoder.encode(event));
       }
-    } catch (InterruptedException | IOException e) {
+    } catch (InterruptedException e) {
       e.printStackTrace();
       throw new RemoteRuntimeException(e);
     }
@@ -112,9 +112,6 @@ class RemoteSenderEventHandler<T> implements 
EventHandler<RemoteEvent<T>> {
           LOG.log(Level.FINEST, "Send an event from " + 
linkRef.get().getLocalAddress() + " to " + linkRef.get().getRemoteAddress() + " 
value " + value);
         linkRef.get().write(encoder.encode(value));
       }
-    } catch (IOException ex) {
-      ex.printStackTrace();
-      throw new RemoteRuntimeException(ex);
     } catch (RemoteRuntimeException ex2) {
       ex2.printStackTrace();
       throw ex2;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Link.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Link.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Link.java
index c0aad44..a05e8f7 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Link.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Link.java
@@ -18,7 +18,6 @@
  */
 package org.apache.reef.wake.remote.transport;
 
-import java.io.IOException;
 import java.net.SocketAddress;
 
 /**
@@ -26,7 +25,7 @@ import java.net.SocketAddress;
  *
  * @param <T> type of the message.
  */
-public interface Link<T> extends LinkListener<T> {
+public interface Link<T> {
 
   /**
    * Gets its local address.
@@ -43,10 +42,9 @@ public interface Link<T> extends LinkListener<T> {
   SocketAddress getRemoteAddress();
 
   /**
-   * Writes the value to this link.
+   * Asynchronously writes the value to this link.
    *
    * @param value the data value.
-   * @throws IOException
    */
-  void write(T value) throws IOException;
+  void write(T value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/LinkListener.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/LinkListener.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/LinkListener.java
index f10d153..7cd101d 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/LinkListener.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/LinkListener.java
@@ -18,6 +18,8 @@
  */
 package org.apache.reef.wake.remote.transport;
 
+import java.net.SocketAddress;
+
 /**
  * Link listener
  *
@@ -26,9 +28,18 @@ package org.apache.reef.wake.remote.transport;
 public interface LinkListener<T> {
 
   /**
-   * Handles the received message
+   * Called when the sent message is successfully transferred
+   *
+   * @param message the sent message
+   */
+  public void onSuccess(T message);
+
+  /**
+   * Called when the sent message to remoteAddress is failed to be transferred.
    *
-   * @param message the message
+   * @param cause the cause of exception
+   * @param remoteAddress the exception occurred remote address
+   * @param message the send message
    */
-  public void messageReceived(T message);
+  public void onException(Throwable cause, SocketAddress remoteAddress, T 
message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
index c551ccf..1f77684 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java
@@ -20,11 +20,12 @@ package org.apache.reef.wake.remote.transport.netty;
 
 import org.apache.reef.wake.remote.transport.LinkListener;
 
+import java.net.SocketAddress;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
- * Link listener that logs a message received
+ * Link listener that logs whether the message is sent successfully
  *
  * @param <T> type
  */
@@ -33,14 +34,23 @@ public class LoggingLinkListener<T> implements 
LinkListener<T> {
   private static final Logger LOG = 
Logger.getLogger(LoggingLinkListener.class.getName());
 
   /**
-   * Handles the message received
-   *
-   * @param message the message
+   * Called when the sent message is transferred successfully
    */
   @Override
-  public void messageReceived(T message) {
-    if (LOG.isLoggable(Level.FINEST))
-      LOG.log(Level.FINEST, "The linklistener " + this.getClass().toString() + 
"has received " + message);
+  public void onSuccess(T message) {
+    if (LOG.isLoggable(Level.FINEST)) {
+      LOG.log(Level.FINEST, "The message is successfully sent : {0}", new 
Object[]{message});
+    }
   }
 
+  /**
+   * Called when the sent message to remoteAddress is failed to be transferred.
+   */
+  @Override
+  public void onException(Throwable cause, SocketAddress remoteAddress, T 
message) {
+    if (LOG.isLoggable(Level.FINEST)) {
+      LOG.log(Level.FINEST, "The message to {0} is failed to be sent. message 
: {1}, cause : {2}"
+          , new Object[]{remoteAddress, message, cause});
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
index ce70193..5971d98 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java
@@ -20,17 +20,21 @@ package org.apache.reef.wake.remote.transport.netty;
 
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import org.apache.reef.wake.remote.Encoder;
 import org.apache.reef.wake.remote.transport.Link;
 import org.apache.reef.wake.remote.transport.LinkListener;
 
-import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
  * Link implementation with Netty
+ *
+ * If you set a LinkListener<T>, it keeps message until writeAndFlush 
operation completes
+ * and notifies whether the sent message transferred successfully through the 
listener.
  */
 public class NettyLink<T> implements Link<T> {
 
@@ -71,22 +75,15 @@ public class NettyLink<T> implements Link<T> {
    * @param message the message
    */
   @Override
-  public void write(final T message) throws IOException {
+  public void write(final T message) {
     LOG.log(Level.FINEST, "write {0} {1}", new Object[]{channel, message});
     byte[] allData = encoder.encode(message);
     // byte[] -> ByteBuf
-    channel.writeAndFlush(Unpooled.wrappedBuffer(allData));
-  }
-
-  /**
-   * Handles the message received
-   *
-   * @param message the message
-   */
-  @Override
-  public void messageReceived(final T message) {
-    if (listener != null) {
-      listener.messageReceived(message);
+    if (listener !=  null) {
+      channel.writeAndFlush(Unpooled.wrappedBuffer(allData))
+          .addListener(new NettyChannelFutureListener<>(message, listener));
+    } else {
+      channel.writeAndFlush(Unpooled.wrappedBuffer(allData));
     }
   }
 
@@ -115,3 +112,23 @@ public class NettyLink<T> implements Link<T> {
     return "localAddr: " + getLocalAddress() + " remoteAddr: " + 
getRemoteAddress();
   }
 }
+
+class NettyChannelFutureListener<T> implements ChannelFutureListener {
+
+  private final T message;
+  private LinkListener<T> listener;
+
+  NettyChannelFutureListener(final T message, final LinkListener<T> listener) {
+    this.message = message;
+    this.listener = listener;
+  }
+
+  @Override
+  public void operationComplete(ChannelFuture channelFuture) throws Exception {
+    if (channelFuture.isSuccess()) {
+      listener.onSuccess(message);
+    } else {
+      listener.onException(channelFuture.cause(), 
channelFuture.channel().remoteAddress(), message);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e82a7e31/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java
index 40721e1..db59b34 100644
--- 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/LargeMsgTest.java
@@ -86,11 +86,7 @@ public class LargeMsgTest {
 
       @Override
       public void onNext(byte[] value) {
-        try {
-          link.write(value);
-        } catch (IOException e) {
-          e.printStackTrace();
-        }
+        link.write(value);
       }
     }, 3, new LoggingEventHandler<Throwable>());
     writeSubmitter.onNext(values[0]);

Reply via email to