Repository: incubator-reef
Updated Branches:
  refs/heads/master a9919be4a -> 8c494adf9


[REEF-641] NetworkConnectionService should look up a correct end point id with 
deprecated methods
This addressed the issue by:
 * Make NetworkConnectionFactory open a link without connection factory id if 
it was registered by deprecated method
 * Change NetworkConnectionServiceImpl#unregisterConnectionFactory not to 
unregister the local end point id for the NCF which was registered by 
deprecated method
 * Add DeprecatedNetworkConnectionServiceTest which tests the deprecated 
methods of NetworkConnectionService

JIRA:
  [REEF-641](https://issues.apache.org/jira/browse/REEF-641)

Pull Request:
  Closes #411


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8c494adf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8c494adf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8c494adf

Branch: refs/heads/master
Commit: 8c494adf9ae902fcb0a5d6e62f88a9fe15866cb1
Parents: a9919be
Author: Kim_Geon_Woo <[email protected]>
Authored: Tue Aug 25 14:47:27 2015 +0900
Committer: Brian Cho <[email protected]>
Committed: Fri Aug 28 14:00:45 2015 +0900

----------------------------------------------------------------------
 .../io/network/NetworkConnectionService.java    |   5 +
 .../BindNetworkConnectionServiceToTask.java     |   1 +
 .../network/impl/NetworkConnectionFactory.java  |  15 +
 .../impl/NetworkConnectionServiceImpl.java      |  40 +-
 .../UnbindNetworkConnectionServiceFromTask.java |   2 +
 .../DeprecatedNetworkConnectionServiceTest.java | 407 +++++++++++++++++++
 .../DeprecatedNetworkMessagingTestService.java  | 150 +++++++
 7 files changed, 617 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
index 448b941..f87490b 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java
@@ -26,6 +26,7 @@ import org.apache.reef.wake.Identifier;
 import org.apache.reef.wake.remote.Codec;
 import org.apache.reef.wake.remote.transport.LinkListener;
 
+// TODO[JIRA REEF-637] Annotate the class as @Unstable.
 /**
  * NetworkConnectionService.
  *
@@ -49,6 +50,7 @@ import org.apache.reef.wake.remote.transport.LinkListener;
 @DefaultImplementation(NetworkConnectionServiceImpl.class)
 public interface NetworkConnectionService extends AutoCloseable {
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
   /**
    * Registers an instance of ConnectionFactory corresponding to the 
connectionFactoryId.
    * Binds Codec, EventHandler and LinkListener to the ConnectionFactory.
@@ -109,6 +111,7 @@ public interface NetworkConnectionService extends 
AutoCloseable {
    */
   void close() throws Exception;
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
   /**
    * Registers a network connection service identifier.
    * This can be used for destination identifier
@@ -119,6 +122,7 @@ public interface NetworkConnectionService extends 
AutoCloseable {
   @Deprecated
   void registerId(final Identifier ncsId);
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
   /**
    * Unregister a network connection service identifier.
    * @param ncsId network connection service identifier
@@ -127,6 +131,7 @@ public interface NetworkConnectionService extends 
AutoCloseable {
   @Deprecated
   void unregisterId(final Identifier ncsId);
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
   /**
    * Gets a network connection service client id which is equal to the 
registered id.
    * @deprecated in 0.13. Use ConnectionFactory.getLocalEndPointId instead.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
index ad2f47f..e7d58e3 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java
@@ -27,6 +27,7 @@ import org.apache.reef.wake.IdentifierFactory;
 
 import javax.inject.Inject;
 
+// TODO[JIRA REEF-637] Remove the deprecated class.
 /**
  * TaskStart event handler for registering NetworkConnectionService.
  * Users have to bind this handler into ServiceConfiguration.ON_TASK_STARTED.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
index fc7c11e..4224ec8 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java
@@ -77,9 +77,23 @@ final class NetworkConnectionFactory<T> implements 
ConnectionFactory<T> {
   }
 
   Link<NetworkConnectionServiceMessage<T>> openLink(final Identifier remoteId) 
throws NetworkException {
+    // TODO[JIRA REEF-637] : Remove below if statement.
+    if (isRegisteredByDeprecatedMethod()) {
+      return networkService.openLink(remoteId);
+    }
+
     return networkService.openLink(connectionFactoryId, remoteId);
   }
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
+  /**
+   * @deprecated in 0.13.
+   */
+  @Deprecated
+  boolean isRegisteredByDeprecatedMethod() {
+    return localEndPointId == null;
+  }
+
   @Override
   public Identifier getConnectionFactoryId() {
     return connectionFactoryId;
@@ -90,6 +104,7 @@ final class NetworkConnectionFactory<T> implements 
ConnectionFactory<T> {
     return localEndPointId;
   }
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
   /**
    * @deprecated in 0.13. Use getLocalEndPointId() instead.
    */

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
index 7ad86c8..c11f8ea 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java
@@ -72,6 +72,7 @@ public final class NetworkConnectionServiceImpl implements 
NetworkConnectionServ
    * A map of (id of connection factory, a connection factory instance).
    */
   private final ConcurrentMap<String, NetworkConnectionFactory> connFactoryMap;
+  // TODO[JIRA REEF-637] Remove the deprecated field.
   /**
    * A network connection service identifier.
    * @deprecated in 0.13. Use ConnectionFactory.getLocalEndPointId instead.
@@ -152,6 +153,7 @@ public final class NetworkConnectionServiceImpl implements 
NetworkConnectionServ
     this.isClosed = new AtomicBoolean();
   }
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
   /**
    * @deprecated in 0.13. Use registerConnectionFactory(Identifier, Codec, 
EventHandler, LinkListener, Identifier)
    * instead.
@@ -188,6 +190,7 @@ public final class NetworkConnectionServiceImpl implements 
NetworkConnectionServ
           "The ConnectionFactoryId " + connectionFactoryId + " should not 
contain " + DELIMITER);
     }
   }
+
   @Override
   public <T> ConnectionFactory<T> registerConnectionFactory(
       final Identifier connectionFactoryId,
@@ -207,21 +210,29 @@ public final class NetworkConnectionServiceImpl 
implements NetworkConnectionServ
       throw new NetworkRuntimeException("ConnectionFactory " + 
connectionFactoryId + " was already registered.");
     }
 
+    LOG.log(Level.INFO, "ConnectionFactory {0} was registered", id);
+
     return connectionFactory;
   }
 
   @Override
   public void unregisterConnectionFactory(final Identifier connFactoryId) {
     final String id = connFactoryId.toString();
-    final ConnectionFactory connFactory = connFactoryMap.remove(id);
+    final NetworkConnectionFactory connFactory = connFactoryMap.remove(id);
     if (connFactory != null) {
-      final Identifier localId = 
getEndPointIdWithConnectionFactoryId(connFactoryId, 
connFactory.getLocalEndPointId());
-      nameServiceUnregisteringStage.onNext(localId);
+      LOG.log(Level.INFO, "ConnectionFactory {0} was unregistered", id);
+
+      if (!connFactory.isRegisteredByDeprecatedMethod()) { // TODO[JIRA 
REEF-637] : Remove the redundant check.
+        final Identifier localId = getEndPointIdWithConnectionFactoryId(
+            connFactoryId, connFactory.getLocalEndPointId());
+        nameServiceUnregisteringStage.onNext(localId);
+      }
     } else {
       LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", id);
     }
   }
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
   /**
    * Registers a source identifier of NetworkConnectionService.
    * @param ncsId
@@ -243,6 +254,7 @@ public final class NetworkConnectionServiceImpl implements 
NetworkConnectionServ
 
   /**
    * Open a channel for destination identifier of NetworkConnectionService.
+   * @param connectionFactoryId
    * @param remoteEndPointId
    * @throws NetworkException
    */
@@ -260,6 +272,26 @@ public final class NetworkConnectionServiceImpl implements 
NetworkConnectionServ
     }
   }
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
+  /**
+   * Open a channel for destination identifier of NetworkConnectionService.
+   * @param remoteEndPointId
+   * @throws NetworkException
+   * @deprecated in 0.13. Use openLink(Identifier, Identifier) instead.
+   */
+  @Deprecated
+  <T> Link<NetworkConnectionServiceMessage<T>> openLink(final Identifier 
remoteEndPointId) throws NetworkException {
+    try {
+      final SocketAddress address = nameResolver.lookup(remoteEndPointId);
+      if (address == null) {
+        throw new NetworkException("Lookup " + remoteEndPointId + " is null");
+      }
+      return transport.open(address, nsCodec, nsLinkListener);
+    } catch(final Exception e) {
+      throw new NetworkException(e);
+    }
+  }
+
   private Identifier getEndPointIdWithConnectionFactoryId(
       final Identifier connectionFactoryId, final Identifier endPointId) {
     final String identifier = connectionFactoryId.toString() + DELIMITER + 
endPointId.toString();
@@ -279,6 +311,7 @@ public final class NetworkConnectionServiceImpl implements 
NetworkConnectionServ
     return connFactory;
   }
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
   /**
    * @param ncsId network connection service identifier
    * @deprecated in 0.13.
@@ -292,6 +325,7 @@ public final class NetworkConnectionServiceImpl implements 
NetworkConnectionServ
     this.nameServiceUnregisteringStage.onNext(ncsId);
   }
 
+  // TODO[JIRA REEF-637] Remove the deprecated method.
   /**
    * @return the identifier of this NetworkConnectionService
    * @deprecated in 0.13.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
index 6b94a8b..b9a31fb 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java
@@ -26,6 +26,8 @@ import org.apache.reef.wake.EventHandler;
 import org.apache.reef.wake.IdentifierFactory;
 
 import javax.inject.Inject;
+
+// TODO[JIRA REEF-637] Remove the deprecated class.
 /**
  * TaskStop event handler for unregistering NetworkConnectionService.
  * Users have to bind this handler into ServiceConfiguration.ON_TASK_STOP.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java
new file mode 100644
index 0000000..76ccbc9
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.services.network.util.*;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+// TODO[JIRA REEF-637] Remove the deprecated class.
+/**
+ * Test for deprecated methods, which are deprecated in 0.13, of 
NetworkConnectionService.
+ */
+@Deprecated
+public final class DeprecatedNetworkConnectionServiceTest {
+  private static final Logger LOG = 
Logger.getLogger(DeprecatedNetworkConnectionServiceTest.class.getName());
+
+  private final LocalAddressProvider localAddressProvider;
+  private final String localAddress;
+  private final Identifier groupCommClientId;
+  private final Identifier shuffleClientId;
+
+  public DeprecatedNetworkConnectionServiceTest() throws InjectionException {
+    localAddressProvider = LocalAddressProviderFactory.getInstance();
+    localAddress = localAddressProvider.getLocalAddress();
+
+    final IdentifierFactory idFac = new StringIdentifierFactory();
+    this.groupCommClientId = idFac.getNewInstance("groupComm");
+    this.shuffleClientId = idFac.getNewInstance("shuffle");
+  }
+
+  @Rule
+  public TestName name = new TestName();
+
+  private void runMessagingNetworkConnectionService(final Codec<String> codec) 
throws Exception {
+    final int numMessages = 2000;
+    final Monitor monitor = new Monitor();
+    try (final DeprecatedNetworkMessagingTestService messagingTestService
+             = new DeprecatedNetworkMessagingTestService(localAddress)) {
+      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+
+      try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+        try {
+          conn.open();
+          for (int count = 0; count < numMessages; ++count) {
+            // send messages to the receiver.
+            conn.write("hello" + count);
+          }
+          monitor.mwait();
+        } catch (final NetworkException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkConnectionService messaging test.
+   */
+  @Test
+  public void testMessagingNetworkConnectionService() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runMessagingNetworkConnectionService(new StringCodec());
+  }
+
+  /**
+   * NetworkConnectionService streaming messaging test.
+   */
+  @Test
+  public void testStreamingMessagingNetworkConnectionService() throws 
Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runMessagingNetworkConnectionService(new StreamingStringCodec());
+  }
+
+  public void runNetworkConnServiceWithMultipleConnFactories(final 
Codec<String> stringCodec,
+                                                             final 
Codec<Integer> integerCodec)
+      throws Exception {
+    final ExecutorService executor = Executors.newFixedThreadPool(5);
+
+    final int groupcommMessages = 1000;
+    final Monitor monitor = new Monitor();
+    try (final DeprecatedNetworkMessagingTestService messagingTestService
+             = new DeprecatedNetworkMessagingTestService(localAddress)) {
+
+      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
groupcommMessages, monitor, stringCodec);
+
+      final int shuffleMessages = 2000;
+      final Monitor monitor2 = new Monitor();
+      messagingTestService.registerTestConnectionFactory(shuffleClientId, 
shuffleMessages, monitor2, integerCodec);
+
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try (final Connection<String> conn =
+                   
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+            conn.open();
+            for (int count = 0; count < groupcommMessages; ++count) {
+              // send messages to the receiver.
+              conn.write("hello" + count);
+            }
+            monitor.mwait();
+          } catch (final Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try (final Connection<Integer> conn =
+                   
messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) {
+            conn.open();
+            for (int count = 0; count < shuffleMessages; ++count) {
+              // send messages to the receiver.
+              conn.write(count);
+            }
+            monitor2.mwait();
+          } catch (final Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      monitor.mwait();
+      monitor2.mwait();
+      executor.shutdown();
+    }
+  }
+
+  /**
+   * Test NetworkService registering multiple connection factories.
+   */
+  @Test
+  public void testMultipleConnectionFactoriesTest() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new 
ObjectSerializableCodec<Integer>());
+  }
+
+  /**
+   * Test NetworkService registering multiple connection factories with 
Streamingcodec.
+   */
+  @Test
+  public void testMultipleConnectionFactoriesStreamingTest() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), 
new StreamingIntegerCodec());
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
+
+    for (final int size : messageSizes) {
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      try (final DeprecatedNetworkMessagingTestService messagingTestService
+               = new DeprecatedNetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+
+        // build the message
+        final StringBuilder msb = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+          msb.append("1");
+        }
+        final String message = msb.toString();
+
+        try (final Connection<String> conn =
+                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          final long start = System.currentTimeMillis();
+          try {
+            conn.open();
+            for (int count = 0; count < numMessages; ++count) {
+              // send messages to the receiver.
+              conn.write(message);
+            }
+            monitor.mwait();
+          } catch (final NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+          final long end = System.currentTimeMillis();
+
+          final double runtime = ((double) end - start) / 1000;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages 
/ runtime +
+              " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / 
runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceRateDisjoint() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
+
+    final int numThreads = 4;
+    final int size = 2000;
+    final int numMessages = 300000 / (Math.max(1, size / 512));
+    final int totalNumMessages = numMessages * numThreads;
+
+    final ExecutorService e = Executors.newCachedThreadPool();
+    for (int t = 0; t < numThreads; t++) {
+      final int tt = t;
+
+      e.submit(new Runnable() {
+        public void run() {
+          try (final DeprecatedNetworkMessagingTestService messagingTestService
+                   = new DeprecatedNetworkMessagingTestService(localAddress)) {
+            final Monitor monitor = new Monitor();
+            final Codec<String> codec = new StringCodec();
+
+            
messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+            try (final Connection<String> conn =
+                     
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+              // build the message
+              final StringBuilder msb = new StringBuilder();
+              for (int i = 0; i < size; i++) {
+                msb.append("1");
+              }
+              final String message = msb.toString();
+
+              try {
+                conn.open();
+                for (int count = 0; count < numMessages; ++count) {
+                  // send messages to the receiver.
+                  conn.write(message);
+                }
+                monitor.mwait();
+              } catch (final NetworkException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+              }
+            }
+          } catch (final Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+
+    // start and time
+    final long start = System.currentTimeMillis();
+    final Object ignore = new Object();
+    for (int i = 0; i < numThreads; i++) {
+      barrier.add(ignore);
+    }
+    e.shutdown();
+    e.awaitTermination(100, TimeUnit.SECONDS);
+    final long end = System.currentTimeMillis();
+    final double runtime = ((double) end - start) / 1000;
+    LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages 
/ runtime +
+        " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / 
runtime); // x2 for unicode chars
+  }
+
+  @Test
+  public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() 
throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024};
+
+    for (final int size : messageSizes) {
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final int numThreads = 2;
+      final int totalNumMessages = numMessages * numThreads;
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      try (final DeprecatedNetworkMessagingTestService messagingTestService
+               = new DeprecatedNetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
totalNumMessages, monitor, codec);
+
+        final ExecutorService e = Executors.newCachedThreadPool();
+
+        // build the message
+        final StringBuilder msb = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+          msb.append("1");
+        }
+        final String message = msb.toString();
+        try (final Connection<String> conn =
+                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          final long start = System.currentTimeMillis();
+          for (int i = 0; i < numThreads; i++) {
+            e.submit(new Runnable() {
+              @Override
+              public void run() {
+
+                try {
+                  conn.open();
+                  for (int count = 0; count < numMessages; ++count) {
+                    // send messages to the receiver.
+                    conn.write(message);
+                  }
+                } catch (final Exception e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            });
+          }
+
+          e.shutdown();
+          e.awaitTermination(30, TimeUnit.SECONDS);
+          monitor.mwait();
+          final long end = System.currentTimeMillis();
+          final double runtime = ((double) end - start) / 1000;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime +
+              " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) 
/ runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceBatchingRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    final int batchSize = 1024 * 1024;
+    final int[] messageSizes = {32, 64, 512};
+
+    for (final int size : messageSizes) {
+      final int numMessages = 300 / (Math.max(1, size / 512));
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      try (final DeprecatedNetworkMessagingTestService messagingTestService
+               = new DeprecatedNetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+        try (final Connection<String> conn =
+                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          // build the message
+          final StringBuilder msb = new StringBuilder();
+          for (int i = 0; i < size; i++) {
+            msb.append("1");
+          }
+          final String message = msb.toString();
+
+          final long start = System.currentTimeMillis();
+          try {
+            for (int i = 0; i < numMessages; i++) {
+              final StringBuilder sb = new StringBuilder();
+              for (int j = 0; j < batchSize / size; j++) {
+                sb.append(message);
+              }
+              conn.open();
+              conn.write(sb.toString());
+            }
+            monitor.mwait();
+          } catch (final NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+
+          final long end = System.currentTimeMillis();
+          final double runtime = ((double) end - start) / 1000;
+          final long numAppMessages = numMessages * batchSize / size;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
numAppMessages / runtime +
+              " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / 
runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java
new file mode 100644
index 0000000..4c4bd69
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.services.network.util;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.NetworkConnectionService;
+import 
org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.io.network.naming.NameResolverConfiguration;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+// TODO[JIRA REEF-637] Remove the deprecated class.
+/**
+ * Helper class for DeprecatedNetworkConnectionService test, deprecated in 
0.13.
+ */
+@Deprecated
+public final class DeprecatedNetworkMessagingTestService implements 
AutoCloseable {
+  private static final Logger LOG = 
Logger.getLogger(DeprecatedNetworkMessagingTestService.class.getName());
+
+  private final IdentifierFactory factory;
+  private final NetworkConnectionService receiverNetworkConnService;
+  private final NetworkConnectionService senderNetworkConnService;
+  private final String receiver;
+  private final String sender;
+  private final NameServer nameServer;
+  private final NameResolver receiverResolver;
+  private final NameResolver senderResolver;
+
+  public DeprecatedNetworkMessagingTestService(final String localAddress) 
throws InjectionException {
+    // name server
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.nameServer = injector.getInstance(NameServer.class);
+    final Configuration netConf = NameResolverConfiguration.CONF
+        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
+        .build();
+
+    LOG.log(Level.FINEST, "=== Test network connection service receiver 
start");
+    // network service for receiver
+    this.receiver = "receiver";
+    final Injector injectorReceiver = injector.forkInjector(netConf);
+    this.receiverNetworkConnService = 
injectorReceiver.getInstance(NetworkConnectionService.class);
+    this.receiverResolver = injectorReceiver.getInstance(NameResolver.class);
+    this.factory = 
injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
+    
this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver));
+
+    // network service for sender
+    this.sender = "sender";
+    LOG.log(Level.FINEST, "=== Test network connection service sender start");
+    final Injector injectorSender = injector.forkInjector(netConf);
+    senderNetworkConnService = 
injectorSender.getInstance(NetworkConnectionService.class);
+    
senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender));
+    this.senderResolver = injectorSender.getInstance(NameResolver.class);
+  }
+
+  public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
+                                                final int numMessages, final 
Monitor monitor,
+                                                final Codec<T> codec) throws 
NetworkException {
+    receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec,
+        new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+    senderNetworkConnService.registerConnectionFactory(connFactoryId, codec,
+        new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+  }
+
+  public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier 
connFactoryId) {
+    final Identifier destId = factory.getNewInstance(receiver);
+    return 
(Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId);
+  }
+
+  public void close() throws Exception {
+    senderNetworkConnService.close();
+    receiverNetworkConnService.close();
+    nameServer.close();
+    receiverResolver.close();
+    senderResolver.close();
+  }
+
+  public static final class MessageHandler<T> implements 
EventHandler<Message<T>> {
+    private final int expected;
+    private final Monitor monitor;
+    private AtomicInteger count = new AtomicInteger(0);
+
+    public MessageHandler(final Monitor monitor,
+                          final int expected) {
+      this.monitor = monitor;
+      this.expected = expected;
+    }
+
+    @Override
+    public void onNext(final Message<T> value) {
+      count.incrementAndGet();
+      LOG.log(Level.FINE, "Count: {0}", count.get());
+      LOG.log(Level.FINE,
+          "OUT: {0} received {1} from {2} to {3}",
+          new Object[]{value, value.getSrcId(), value.getDestId()});
+
+      for (final T obj : value.getData()) {
+        LOG.log(Level.FINE, "OUT: data: {0}", obj);
+      }
+
+      if (count.get() == expected) {
+        monitor.mnotify();
+      }
+    }
+  }
+
+  public static final class TestListener<T> implements 
LinkListener<Message<T>> {
+    @Override
+    public void onSuccess(final Message<T> message) {
+      LOG.log(Level.FINE, "success: " + message);
+    }
+    @Override
+    public void onException(final Throwable cause, final SocketAddress 
remoteAddress, final Message<T> message) {
+      LOG.log(Level.WARNING, "exception: " + cause + message);
+      throw new RuntimeException(cause);
+    }
+  }
+}

Reply via email to