[REEF-664] REEF-IO test and main packages do not match

- Move tests that were in org.apache.reef.services to org.apache.reef.io.
- Remove a duplicate class and an unused class

JIRA:
  [REEF-664](https://issues.apache.org/jira/browse/REEF-664)

Pull Request:
  Closes #433


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/3325748d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/3325748d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/3325748d

Branch: refs/heads/master
Commit: 3325748dc61f0db8285efd2f0cff225a9dd45a18
Parents: 9cab341
Author: Brian Cho <[email protected]>
Authored: Fri Aug 28 16:49:58 2015 +0900
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 28 11:24:06 2015 -0700

----------------------------------------------------------------------
 .../DeprecatedNetworkConnectionServiceTest.java | 405 ++++++++++++++
 .../network/NetworkConnectionServiceTest.java   | 397 ++++++++++++++
 .../reef/io/network/NetworkServiceTest.java     | 548 +++++++++++++++++++
 .../network/naming/LocalNameResolverTest.java   | 102 ++++
 .../reef/io/network/naming/NameClientTest.java  | 161 ++++++
 .../reef/io/network/naming/NamingTest.java      | 401 ++++++++++++++
 .../reef/io/network/naming/package-info.java    |  22 +
 .../apache/reef/io/network/package-info.java    |  22 +
 .../DeprecatedNetworkMessagingTestService.java  | 150 +++++
 .../reef/io/network/util/LoggingUtils.java      |  49 ++
 .../apache/reef/io/network/util/Monitor.java    |  40 ++
 .../util/NetworkMessagingTestService.java       | 157 ++++++
 .../io/network/util/StreamingIntegerCodec.java  |  57 ++
 .../io/network/util/StreamingStringCodec.java   |  56 ++
 .../reef/io/network/util/TimeoutHandler.java    |  36 ++
 .../apache/reef/io/storage/ExternalMapTest.java |  94 ++++
 .../org/apache/reef/io/storage/FramingTest.java | 102 ++++
 .../reef/io/storage/MergingIteratorTest.java    |  53 ++
 .../reef/io/storage/SortingSpoolTest.java       | 117 ++++
 .../apache/reef/io/storage/SpoolFileTest.java   | 207 +++++++
 .../reef/io/storage/TupleSerializerTest.java    | 103 ++++
 .../apache/reef/io/storage/package-info.java    |  22 +
 .../DeprecatedNetworkConnectionServiceTest.java | 407 --------------
 .../services/network/LocalNameResolverTest.java | 104 ----
 .../reef/services/network/NameClientTest.java   | 162 ------
 .../reef/services/network/NamingTest.java       | 402 --------------
 .../network/NetworkConnectionServiceTest.java   | 400 --------------
 .../services/network/NetworkServiceTest.java    | 547 ------------------
 .../apache/reef/services/network/TestEvent.java |  38 --
 .../reef/services/network/package-info.java     |  22 -
 .../DeprecatedNetworkMessagingTestService.java  | 150 -----
 .../services/network/util/LoggingUtils.java     |  49 --
 .../reef/services/network/util/Monitor.java     |  40 --
 .../util/NetworkMessagingTestService.java       | 157 ------
 .../network/util/StreamingIntegerCodec.java     |  57 --
 .../network/util/StreamingStringCodec.java      |  56 --
 .../reef/services/network/util/StringCodec.java |  34 --
 .../services/network/util/TimeoutHandler.java   |  36 --
 .../services/network/util/package-info.java     |  22 -
 .../reef/services/storage/ExternalMapTest.java  |  94 ----
 .../reef/services/storage/FramingTest.java      | 104 ----
 .../services/storage/MergingIteratorTest.java   |  54 --
 .../reef/services/storage/SortingSpoolTest.java | 117 ----
 .../reef/services/storage/SpoolFileTest.java    | 207 -------
 .../services/storage/TupleSerializerTest.java   | 105 ----
 .../reef/services/storage/package-info.java     |  22 -
 46 files changed, 3301 insertions(+), 3386 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/DeprecatedNetworkConnectionServiceTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/DeprecatedNetworkConnectionServiceTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/DeprecatedNetworkConnectionServiceTest.java
new file mode 100644
index 0000000..2fcb125
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/DeprecatedNetworkConnectionServiceTest.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.util.*;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+// TODO[JIRA REEF-637] Remove the deprecated class.
+/**
+ * Test for deprecated methods, which are deprecated in 0.13, of 
NetworkConnectionService.
+ */
+@Deprecated
+public final class DeprecatedNetworkConnectionServiceTest {
+  private static final Logger LOG = 
Logger.getLogger(DeprecatedNetworkConnectionServiceTest.class.getName());
+
+  private final LocalAddressProvider localAddressProvider;
+  private final String localAddress;
+  private final Identifier groupCommClientId;
+  private final Identifier shuffleClientId;
+
+  public DeprecatedNetworkConnectionServiceTest() throws InjectionException {
+    localAddressProvider = LocalAddressProviderFactory.getInstance();
+    localAddress = localAddressProvider.getLocalAddress();
+
+    final IdentifierFactory idFac = new StringIdentifierFactory();
+    this.groupCommClientId = idFac.getNewInstance("groupComm");
+    this.shuffleClientId = idFac.getNewInstance("shuffle");
+  }
+
+  @Rule
+  public TestName name = new TestName();
+
+  private void runMessagingNetworkConnectionService(final Codec<String> codec) 
throws Exception {
+    final int numMessages = 2000;
+    final Monitor monitor = new Monitor();
+    try (final DeprecatedNetworkMessagingTestService messagingTestService
+             = new DeprecatedNetworkMessagingTestService(localAddress)) {
+      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+
+      try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+        try {
+          conn.open();
+          for (int count = 0; count < numMessages; ++count) {
+            // send messages to the receiver.
+            conn.write("hello" + count);
+          }
+          monitor.mwait();
+        } catch (final NetworkException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkConnectionService messaging test.
+   */
+  @Test
+  public void testMessagingNetworkConnectionService() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runMessagingNetworkConnectionService(new StringCodec());
+  }
+
+  /**
+   * NetworkConnectionService streaming messaging test.
+   */
+  @Test
+  public void testStreamingMessagingNetworkConnectionService() throws 
Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runMessagingNetworkConnectionService(new StreamingStringCodec());
+  }
+
+  public void runNetworkConnServiceWithMultipleConnFactories(final 
Codec<String> stringCodec,
+                                                             final 
Codec<Integer> integerCodec)
+      throws Exception {
+    final ExecutorService executor = Executors.newFixedThreadPool(5);
+
+    final int groupcommMessages = 1000;
+    final Monitor monitor = new Monitor();
+    try (final DeprecatedNetworkMessagingTestService messagingTestService
+             = new DeprecatedNetworkMessagingTestService(localAddress)) {
+
+      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
groupcommMessages, monitor, stringCodec);
+
+      final int shuffleMessages = 2000;
+      final Monitor monitor2 = new Monitor();
+      messagingTestService.registerTestConnectionFactory(shuffleClientId, 
shuffleMessages, monitor2, integerCodec);
+
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try (final Connection<String> conn =
+                   
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+            conn.open();
+            for (int count = 0; count < groupcommMessages; ++count) {
+              // send messages to the receiver.
+              conn.write("hello" + count);
+            }
+            monitor.mwait();
+          } catch (final Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try (final Connection<Integer> conn =
+                   
messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) {
+            conn.open();
+            for (int count = 0; count < shuffleMessages; ++count) {
+              // send messages to the receiver.
+              conn.write(count);
+            }
+            monitor2.mwait();
+          } catch (final Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      monitor.mwait();
+      monitor2.mwait();
+      executor.shutdown();
+    }
+  }
+
+  /**
+   * Test NetworkService registering multiple connection factories.
+   */
+  @Test
+  public void testMultipleConnectionFactoriesTest() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new 
ObjectSerializableCodec<Integer>());
+  }
+
+  /**
+   * Test NetworkService registering multiple connection factories with 
Streamingcodec.
+   */
+  @Test
+  public void testMultipleConnectionFactoriesStreamingTest() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), 
new StreamingIntegerCodec());
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
+
+    for (final int size : messageSizes) {
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      try (final DeprecatedNetworkMessagingTestService messagingTestService
+               = new DeprecatedNetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+
+        // build the message
+        final StringBuilder msb = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+          msb.append("1");
+        }
+        final String message = msb.toString();
+
+        try (final Connection<String> conn =
+                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          final long start = System.currentTimeMillis();
+          try {
+            conn.open();
+            for (int count = 0; count < numMessages; ++count) {
+              // send messages to the receiver.
+              conn.write(message);
+            }
+            monitor.mwait();
+          } catch (final NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+          final long end = System.currentTimeMillis();
+
+          final double runtime = ((double) end - start) / 1000;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages 
/ runtime +
+              " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / 
runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceRateDisjoint() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
+
+    final int numThreads = 4;
+    final int size = 2000;
+    final int numMessages = 300000 / (Math.max(1, size / 512));
+    final int totalNumMessages = numMessages * numThreads;
+
+    final ExecutorService e = Executors.newCachedThreadPool();
+    for (int t = 0; t < numThreads; t++) {
+      final int tt = t;
+
+      e.submit(new Runnable() {
+        public void run() {
+          try (final DeprecatedNetworkMessagingTestService messagingTestService
+                   = new DeprecatedNetworkMessagingTestService(localAddress)) {
+            final Monitor monitor = new Monitor();
+            final Codec<String> codec = new StringCodec();
+
+            
messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+            try (final Connection<String> conn =
+                     
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+              // build the message
+              final StringBuilder msb = new StringBuilder();
+              for (int i = 0; i < size; i++) {
+                msb.append("1");
+              }
+              final String message = msb.toString();
+
+              try {
+                conn.open();
+                for (int count = 0; count < numMessages; ++count) {
+                  // send messages to the receiver.
+                  conn.write(message);
+                }
+                monitor.mwait();
+              } catch (final NetworkException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+              }
+            }
+          } catch (final Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+
+    // start and time
+    final long start = System.currentTimeMillis();
+    final Object ignore = new Object();
+    for (int i = 0; i < numThreads; i++) {
+      barrier.add(ignore);
+    }
+    e.shutdown();
+    e.awaitTermination(100, TimeUnit.SECONDS);
+    final long end = System.currentTimeMillis();
+    final double runtime = ((double) end - start) / 1000;
+    LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages 
/ runtime +
+        " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / 
runtime); // x2 for unicode chars
+  }
+
+  @Test
+  public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() 
throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024};
+
+    for (final int size : messageSizes) {
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final int numThreads = 2;
+      final int totalNumMessages = numMessages * numThreads;
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      try (final DeprecatedNetworkMessagingTestService messagingTestService
+               = new DeprecatedNetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
totalNumMessages, monitor, codec);
+
+        final ExecutorService e = Executors.newCachedThreadPool();
+
+        // build the message
+        final StringBuilder msb = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+          msb.append("1");
+        }
+        final String message = msb.toString();
+        try (final Connection<String> conn =
+                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          final long start = System.currentTimeMillis();
+          for (int i = 0; i < numThreads; i++) {
+            e.submit(new Runnable() {
+              @Override
+              public void run() {
+
+                try {
+                  conn.open();
+                  for (int count = 0; count < numMessages; ++count) {
+                    // send messages to the receiver.
+                    conn.write(message);
+                  }
+                } catch (final Exception e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            });
+          }
+
+          e.shutdown();
+          e.awaitTermination(30, TimeUnit.SECONDS);
+          monitor.mwait();
+          final long end = System.currentTimeMillis();
+          final double runtime = ((double) end - start) / 1000;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime +
+              " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) 
/ runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceBatchingRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    final int batchSize = 1024 * 1024;
+    final int[] messageSizes = {32, 64, 512};
+
+    for (final int size : messageSizes) {
+      final int numMessages = 300 / (Math.max(1, size / 512));
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      try (final DeprecatedNetworkMessagingTestService messagingTestService
+               = new DeprecatedNetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+        try (final Connection<String> conn =
+                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          // build the message
+          final StringBuilder msb = new StringBuilder();
+          for (int i = 0; i < size; i++) {
+            msb.append("1");
+          }
+          final String message = msb.toString();
+
+          final long start = System.currentTimeMillis();
+          try {
+            for (int i = 0; i < numMessages; i++) {
+              final StringBuilder sb = new StringBuilder();
+              for (int j = 0; j < batchSize / size; j++) {
+                sb.append(message);
+              }
+              conn.open();
+              conn.write(sb.toString());
+            }
+            monitor.mwait();
+          } catch (final NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+
+          final long end = System.currentTimeMillis();
+          final double runtime = ((double) end - start) / 1000;
+          final long numAppMessages = numMessages * batchSize / size;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
numAppMessages / runtime +
+              " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / 
runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java
new file mode 100644
index 0000000..e5f6313
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.util.*;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default Network connection service test.
+ */
+public class NetworkConnectionServiceTest {
+  private static final Logger LOG = 
Logger.getLogger(NetworkConnectionServiceTest.class.getName());
+
+  private final LocalAddressProvider localAddressProvider;
+  private final String localAddress;
+  private final Identifier groupCommClientId;
+  private final Identifier shuffleClientId;
+
+  public NetworkConnectionServiceTest() throws InjectionException {
+    localAddressProvider = LocalAddressProviderFactory.getInstance();
+    localAddress = localAddressProvider.getLocalAddress();
+
+    final IdentifierFactory idFac = new StringIdentifierFactory();
+    this.groupCommClientId = idFac.getNewInstance("groupComm");
+    this.shuffleClientId = idFac.getNewInstance("shuffle");
+  }
+
+  @Rule
+  public TestName name = new TestName();
+
+  private void runMessagingNetworkConnectionService(final Codec<String> codec) 
throws Exception {
+    final int numMessages = 2000;
+    final Monitor monitor = new Monitor();
+    try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
+      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+
+      try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+        try {
+          conn.open();
+          for (int count = 0; count < numMessages; ++count) {
+            // send messages to the receiver.
+            conn.write("hello" + count);
+          }
+          monitor.mwait();
+        } catch (final NetworkException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkConnectionService messaging test.
+   */
+  @Test
+  public void testMessagingNetworkConnectionService() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runMessagingNetworkConnectionService(new StringCodec());
+  }
+
+  /**
+   * NetworkConnectionService streaming messaging test.
+   */
+  @Test
+  public void testStreamingMessagingNetworkConnectionService() throws 
Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runMessagingNetworkConnectionService(new StreamingStringCodec());
+  }
+
+  public void runNetworkConnServiceWithMultipleConnFactories(final 
Codec<String> stringCodec,
+                                                             final 
Codec<Integer> integerCodec)
+      throws Exception {
+    final ExecutorService executor = Executors.newFixedThreadPool(5);
+
+    final int groupcommMessages = 1000;
+    final Monitor monitor = new Monitor();
+    try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
+
+      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
groupcommMessages, monitor, stringCodec);
+
+      final int shuffleMessages = 2000;
+      final Monitor monitor2 = new Monitor();
+      messagingTestService.registerTestConnectionFactory(shuffleClientId, 
shuffleMessages, monitor2, integerCodec);
+
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try (final Connection<String> conn =
+                   
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+            conn.open();
+            for (int count = 0; count < groupcommMessages; ++count) {
+              // send messages to the receiver.
+              conn.write("hello" + count);
+            }
+            monitor.mwait();
+          } catch (final Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try (final Connection<Integer> conn =
+                   
messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) {
+            conn.open();
+            for (int count = 0; count < shuffleMessages; ++count) {
+              // send messages to the receiver.
+              conn.write(count);
+            }
+            monitor2.mwait();
+          } catch (final Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+      });
+
+      monitor.mwait();
+      monitor2.mwait();
+      executor.shutdown();
+    }
+  }
+
+  /**
+   * Test NetworkService registering multiple connection factories.
+   */
+  @Test
+  public void testMultipleConnectionFactoriesTest() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new 
ObjectSerializableCodec<Integer>());
+  }
+
+  /**
+   * Test NetworkService registering multiple connection factories with 
Streamingcodec.
+   */
+  @Test
+  public void testMultipleConnectionFactoriesStreamingTest() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), 
new StreamingIntegerCodec());
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
+
+    for (final int size : messageSizes) {
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+
+        // build the message
+        final StringBuilder msb = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+          msb.append("1");
+        }
+        final String message = msb.toString();
+
+        try (final Connection<String> conn =
+                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          final long start = System.currentTimeMillis();
+          try {
+            conn.open();
+            for (int count = 0; count < numMessages; ++count) {
+              // send messages to the receiver.
+              conn.write(message);
+            }
+            monitor.mwait();
+          } catch (final NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+          final long end = System.currentTimeMillis();
+
+          final double runtime = ((double) end - start) / 1000;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages 
/ runtime +
+              " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / 
runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceRateDisjoint() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
+
+    final int numThreads = 4;
+    final int size = 2000;
+    final int numMessages = 300000 / (Math.max(1, size / 512));
+    final int totalNumMessages = numMessages * numThreads;
+
+    final ExecutorService e = Executors.newCachedThreadPool();
+    for (int t = 0; t < numThreads; t++) {
+      final int tt = t;
+
+      e.submit(new Runnable() {
+        public void run() {
+          try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
+            final Monitor monitor = new Monitor();
+            final Codec<String> codec = new StringCodec();
+
+            
messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+            try (final Connection<String> conn =
+                     
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+              // build the message
+              final StringBuilder msb = new StringBuilder();
+              for (int i = 0; i < size; i++) {
+                msb.append("1");
+              }
+              final String message = msb.toString();
+
+              try {
+                conn.open();
+                for (int count = 0; count < numMessages; ++count) {
+                  // send messages to the receiver.
+                  conn.write(message);
+                }
+                monitor.mwait();
+              } catch (final NetworkException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+              }
+            }
+          } catch (final Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+
+    // start and time
+    final long start = System.currentTimeMillis();
+    final Object ignore = new Object();
+    for (int i = 0; i < numThreads; i++) {
+      barrier.add(ignore);
+    }
+    e.shutdown();
+    e.awaitTermination(100, TimeUnit.SECONDS);
+    final long end = System.currentTimeMillis();
+    final double runtime = ((double) end - start) / 1000;
+    LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages 
/ runtime +
+        " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / 
runtime); // x2 for unicode chars
+  }
+
+  @Test
+  public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() 
throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+    final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024};
+
+    for (final int size : messageSizes) {
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final int numThreads = 2;
+      final int totalNumMessages = numMessages * numThreads;
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
totalNumMessages, monitor, codec);
+
+        final ExecutorService e = Executors.newCachedThreadPool();
+
+        // build the message
+        final StringBuilder msb = new StringBuilder();
+        for (int i = 0; i < size; i++) {
+          msb.append("1");
+        }
+        final String message = msb.toString();
+        try (final Connection<String> conn =
+                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          final long start = System.currentTimeMillis();
+          for (int i = 0; i < numThreads; i++) {
+            e.submit(new Runnable() {
+              @Override
+              public void run() {
+
+                try {
+                  conn.open();
+                  for (int count = 0; count < numMessages; ++count) {
+                    // send messages to the receiver.
+                    conn.write(message);
+                  }
+                } catch (final Exception e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            });
+          }
+
+          e.shutdown();
+          e.awaitTermination(30, TimeUnit.SECONDS);
+          monitor.mwait();
+          final long end = System.currentTimeMillis();
+          final double runtime = ((double) end - start) / 1000;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + 
+              " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) 
/ runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkConnServiceBatchingRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    final int batchSize = 1024 * 1024;
+    final int[] messageSizes = {32, 64, 512};
+
+    for (final int size : messageSizes) {
+      final int numMessages = 300 / (Math.max(1, size / 512));
+      final Monitor monitor = new Monitor();
+      final Codec<String> codec = new StringCodec();
+      try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
+        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
+        try (final Connection<String> conn =
+                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
+
+          // build the message
+          final StringBuilder msb = new StringBuilder();
+          for (int i = 0; i < size; i++) {
+            msb.append("1");
+          }
+          final String message = msb.toString();
+
+          final long start = System.currentTimeMillis();
+          try {
+            for (int i = 0; i < numMessages; i++) {
+              final StringBuilder sb = new StringBuilder();
+              for (int j = 0; j < batchSize / size; j++) {
+                sb.append(message);
+              }
+              conn.open();
+              conn.write(sb.toString());
+            }
+            monitor.mwait();
+          } catch (final NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+
+          final long end = System.currentTimeMillis();
+          final double runtime = ((double) end - start) / 1000;
+          final long numAppMessages = numMessages * batchSize / size;
+          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
numAppMessages / runtime +
+              " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / 
runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java
new file mode 100644
index 0000000..5f6f937
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.impl.NetworkService;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.io.network.naming.NameResolverConfiguration;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.io.network.naming.NameServerParameters;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.io.network.util.Monitor;
+import org.apache.reef.io.network.util.StringCodec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Network service test.
+ */
+public class NetworkServiceTest {
+  private static final Logger LOG = 
Logger.getLogger(NetworkServiceTest.class.getName());
+
+  private final LocalAddressProvider localAddressProvider;
+  private final String localAddress;
+
+  public NetworkServiceTest() throws InjectionException {
+    localAddressProvider = LocalAddressProviderFactory.getInstance();
+    localAddress = localAddressProvider.getLocalAddress();
+  }
+
+  @Rule
+  public TestName name = new TestName();
+
+  /**
+   * NetworkService messaging test.
+   */
+  @Test
+  public void testMessagingNetworkService() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    final IdentifierFactory factory = new StringIdentifierFactory();
+
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
+    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      final int nameServerPort = server.getPort();
+
+      final int numMessages = 10;
+      final Monitor monitor = new Monitor();
+
+      // network service
+      final String name2 = "task2";
+      final String name1 = "task1";
+      final Configuration nameResolverConf =
+          
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
+          .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+          .build())
+          .build();
+
+      final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+      LOG.log(Level.FINEST, "=== Test network service receiver start");
+      LOG.log(Level.FINEST, "=== Test network service sender start");
+      try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
+           NetworkService<String> ns2 = new NetworkService<String>(factory, 0, 
nameResolver,
+               new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+               new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
+           final NetworkService<String> ns1 = new 
NetworkService<String>(factory, 0, nameResolver,
+               new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+               new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
+
+        ns2.registerId(factory.getNewInstance(name2));
+        final int port2 = ns2.getTransport().getListeningPort();
+        server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
+
+        ns1.registerId(factory.getNewInstance(name1));
+        final int port1 = ns1.getTransport().getListeningPort();
+        server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
+
+        final Identifier destId = factory.getNewInstance(name2);
+
+        try (final Connection<String> conn = ns1.newConnection(destId)) {
+          conn.open();
+          for (int count = 0; count < numMessages; ++count) {
+            conn.write("hello! " + count);
+          }
+          monitor.mwait();
+
+        } catch (final NetworkException e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkServiceRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    final IdentifierFactory factory = new StringIdentifierFactory();
+
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
+    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      final int nameServerPort = server.getPort();
+
+      final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
+
+      for (final int size : messageSizes) {
+        final int numMessages = 300000 / (Math.max(1, size / 512));
+        final Monitor monitor = new Monitor();
+
+        // network service
+        final String name2 = "task2";
+        final String name1 = "task1";
+        final Configuration nameResolverConf =
+            
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+            .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
+            .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+            .build())
+            .build();
+
+        final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+        LOG.log(Level.FINEST, "=== Test network service receiver start");
+        LOG.log(Level.FINEST, "=== Test network service sender start");
+        try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
+             NetworkService<String> ns2 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
+             NetworkService<String> ns1 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
+
+          ns2.registerId(factory.getNewInstance(name2));
+          final int port2 = ns2.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
+
+          ns1.registerId(factory.getNewInstance(name1));
+          final int port1 = ns1.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
+
+          final Identifier destId = factory.getNewInstance(name2);
+
+          // build the message
+          final StringBuilder msb = new StringBuilder();
+          for (int i = 0; i < size; i++) {
+            msb.append("1");
+          }
+          final String message = msb.toString();
+
+          final long start = System.currentTimeMillis();
+          try (Connection<String> conn = ns1.newConnection(destId)) {
+            for (int i = 0; i < numMessages; i++) {
+              conn.open();
+              conn.write(message);
+            }
+            monitor.mwait();
+          } catch (final NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+          final long end = System.currentTimeMillis();
+          final double runtime = ((double) end - start) / 1000;
+          LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
numMessages / runtime +
+              " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / 
runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkServiceRateDisjoint() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    final IdentifierFactory factory = new StringIdentifierFactory();
+
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
+    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      final int nameServerPort = server.getPort();
+
+      final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
+
+      final int numThreads = 4;
+      final int size = 2000;
+      final int numMessages = 300000 / (Math.max(1, size / 512));
+      final int totalNumMessages = numMessages * numThreads;
+
+      final ExecutorService e = Executors.newCachedThreadPool();
+      for (int t = 0; t < numThreads; t++) {
+        final int tt = t;
+
+        e.submit(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              final Monitor monitor = new Monitor();
+
+              // network service
+              final String name2 = "task2-" + tt;
+              final String name1 = "task1-" + tt;
+              final Configuration nameResolverConf =
+                  
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+                  .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
localAddress)
+                  .set(NameResolverConfiguration.NAME_SERVICE_PORT, 
nameServerPort)
+                  .build())
+                  .build();
+
+              final Injector injector = 
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+              LOG.log(Level.FINEST, "=== Test network service receiver start");
+              LOG.log(Level.FINEST, "=== Test network service sender start");
+              try (final NameResolver nameResolver = 
injector.getInstance(NameResolver.class);
+                   NetworkService<String> ns2 = new 
NetworkService<String>(factory, 0, nameResolver,
+                       new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                       new MessageHandler<String>(name2, monitor, numMessages),
+                       new ExceptionHandler(), localAddressProvider);
+                   NetworkService<String> ns1 = new 
NetworkService<String>(factory, 0, nameResolver,
+                       new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                       new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
+
+                ns2.registerId(factory.getNewInstance(name2));
+                final int port2 = ns2.getTransport().getListeningPort();
+                server.register(factory.getNewInstance(name2), new 
InetSocketAddress(localAddress, port2));
+
+                ns1.registerId(factory.getNewInstance(name1));
+                final int port1 = ns1.getTransport().getListeningPort();
+                server.register(factory.getNewInstance(name1), new 
InetSocketAddress(localAddress, port1));
+
+                final Identifier destId = factory.getNewInstance(name2);
+
+                // build the message
+                final StringBuilder msb = new StringBuilder();
+                for (int i = 0; i < size; i++) {
+                  msb.append("1");
+                }
+                final String message = msb.toString();
+
+                try (Connection<String> conn = ns1.newConnection(destId)) {
+                  for (int i = 0; i < numMessages; i++) {
+                    conn.open();
+                    conn.write(message);
+                  }
+                  monitor.mwait();
+                } catch (final NetworkException e) {
+                  e.printStackTrace();
+                  throw new RuntimeException(e);
+                }
+              }
+            } catch (final Exception e) {
+              e.printStackTrace();
+              throw new RuntimeException(e);
+            }
+          }
+        });
+      }
+
+      // start and time
+      final long start = System.currentTimeMillis();
+      final Object ignore = new Object();
+      for (int i = 0; i < numThreads; i++) {
+        barrier.add(ignore);
+      }
+      e.shutdown();
+      e.awaitTermination(100, TimeUnit.SECONDS);
+      final long end = System.currentTimeMillis();
+
+      final double runtime = ((double) end - start) / 1000;
+      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + 
+          " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / 
runtime); // x2 for unicode chars
+    }
+  }
+
+  @Test
+  public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws 
Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    final IdentifierFactory factory = new StringIdentifierFactory();
+
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
+    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      final int nameServerPort = server.getPort();
+
+      final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024};
+
+      for (final int size : messageSizes) {
+        final int numMessages = 300000 / (Math.max(1, size / 512));
+        final int numThreads = 2;
+        final int totalNumMessages = numMessages * numThreads;
+        final Monitor monitor = new Monitor();
+
+
+        // network service
+        final String name2 = "task2";
+        final String name1 = "task1";
+        final Configuration nameResolverConf =
+            
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+            .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
+            .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+            .build())
+            .build();
+
+        final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+        LOG.log(Level.FINEST, "=== Test network service receiver start");
+        LOG.log(Level.FINEST, "=== Test network service sender start");
+        try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
+             NetworkService<String> ns2 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name2, monitor, totalNumMessages),
+                 new ExceptionHandler(), localAddressProvider);
+             NetworkService<String> ns1 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
+
+          ns2.registerId(factory.getNewInstance(name2));
+          final int port2 = ns2.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
+
+          ns1.registerId(factory.getNewInstance(name1));
+          final int port1 = ns1.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
+
+          final Identifier destId = factory.getNewInstance(name2);
+
+          try (final Connection<String> conn = ns1.newConnection(destId)) {
+            conn.open();
+
+            // build the message
+            final StringBuilder msb = new StringBuilder();
+            for (int i = 0; i < size; i++) {
+              msb.append("1");
+            }
+            final String message = msb.toString();
+
+            final ExecutorService e = Executors.newCachedThreadPool();
+
+            final long start = System.currentTimeMillis();
+            for (int i = 0; i < numThreads; i++) {
+              e.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                  for (int i = 0; i < numMessages; i++) {
+                    conn.write(message);
+                  }
+                }
+              });
+            }
+
+
+            e.shutdown();
+            e.awaitTermination(30, TimeUnit.SECONDS);
+            monitor.mwait();
+
+            final long end = System.currentTimeMillis();
+            final double runtime = ((double) end - start) / 1000;
+
+            LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + 
+                " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * 
size) / runtime); // x2 for unicode chars
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * NetworkService messaging rate benchmark.
+   */
+  @Test
+  public void testMessagingNetworkServiceBatchingRate() throws Exception {
+    LOG.log(Level.FINEST, name.getMethodName());
+
+    final IdentifierFactory factory = new StringIdentifierFactory();
+
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
+    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      final int nameServerPort = server.getPort();
+
+      final int batchSize = 1024 * 1024;
+      final int[] messageSizes = {32, 64, 512};
+
+      for (final int size : messageSizes) {
+        final int numMessages = 300 / (Math.max(1, size / 512));
+        final Monitor monitor = new Monitor();
+
+        // network service
+        final String name2 = "task2";
+        final String name1 = "task1";
+        final Configuration nameResolverConf =
+            
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
+            .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
+            .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
+            .build())
+            .build();
+
+        final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
+
+        LOG.log(Level.FINEST, "=== Test network service receiver start");
+        LOG.log(Level.FINEST, "=== Test network service sender start");
+        try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
+             NetworkService<String> ns2 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
+             NetworkService<String> ns1 = new NetworkService<String>(factory, 
0, nameResolver,
+                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
+                 new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
+
+          ns2.registerId(factory.getNewInstance(name2));
+          final int port2 = ns2.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
+
+          ns1.registerId(factory.getNewInstance(name1));
+          final int port1 = ns1.getTransport().getListeningPort();
+          server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
+
+          final Identifier destId = factory.getNewInstance(name2);
+
+          // build the message
+          final StringBuilder msb = new StringBuilder();
+          for (int i = 0; i < size; i++) {
+            msb.append("1");
+          }
+          final String message = msb.toString();
+
+          final long start = System.currentTimeMillis();
+          try (Connection<String> conn = ns1.newConnection(destId)) {
+            for (int i = 0; i < numMessages; i++) {
+              final StringBuilder sb = new StringBuilder();
+              for (int j = 0; j < batchSize / size; j++) {
+                sb.append(message);
+              }
+              conn.open();
+              conn.write(sb.toString());
+            }
+            monitor.mwait();
+          } catch (final NetworkException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+          final long end = System.currentTimeMillis();
+          final double runtime = ((double) end - start) / 1000;
+          final long numAppMessages = numMessages * batchSize / size;
+          LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
numAppMessages / runtime +
+              " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / 
runtime); // x2 for unicode chars
+        }
+      }
+    }
+  }
+
+  /**
+   * Test message handler.
+   */
+  class MessageHandler<T> implements EventHandler<Message<T>> {
+
+    private final String name;
+    private final int expected;
+    private final Monitor monitor;
+    private AtomicInteger count = new AtomicInteger(0);
+
+    MessageHandler(final String name, final Monitor monitor, final int 
expected) {
+      this.name = name;
+      this.monitor = monitor;
+      this.expected = expected;
+    }
+
+    @Override
+    public void onNext(final Message<T> value) {
+
+      count.incrementAndGet();
+
+      LOG.log(Level.FINEST,
+          "OUT: {0} received {1} from {2} to {3}",
+          new Object[]{name, value.getData(), value.getSrcId(), 
value.getDestId()});
+
+      for (final T obj : value.getData()) {
+        LOG.log(Level.FINEST, "OUT: data: {0}", obj);
+      }
+
+      if (count.get() == expected) {
+        monitor.mnotify();
+      }
+    }
+  }
+
+  /**
+   * Test exception handler.
+   */
+  class ExceptionHandler implements EventHandler<Exception> {
+    @Override
+    public void onNext(final Exception error) {
+      System.err.println(error);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/LocalNameResolverTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/LocalNameResolverTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/LocalNameResolverTest.java
new file mode 100644
index 0000000..bd1c25a
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/LocalNameResolverTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+public class LocalNameResolverTest {
+
+  private final LocalAddressProvider localAddressProvider;
+
+  public LocalNameResolverTest() throws InjectionException {
+    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+  }
+
+  /**
+   * Test method for {@link 
org.apache.reef.io.network.naming.LocalNameResolverImpl#close()}.
+   *
+   * @throws Exception
+   */
+  @Test
+  public final void testClose() throws Exception {
+    final String localAddress = localAddressProvider.getLocalAddress();
+    final IdentifierFactory factory = new StringIdentifierFactory();
+    try (final NameResolver resolver = 
Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF
+        .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 10000)
+        .build()).getInstance(NameResolver.class)) {
+      final Identifier id = factory.getNewInstance("Task1");
+      resolver.register(id, new InetSocketAddress(localAddress, 7001));
+      resolver.unregister(id);
+      Thread.sleep(100);
+    }
+  }
+
+  /**
+   * Test method for {@link 
org.apache.reef.io.network.naming.LocalNameResolverImpl#lookup(Identifier id)}.
+   * To check caching behavior with expireAfterAccess & expireAfterWrite
+   * Changing NameCache's pattern to expireAfterAccess causes this test to fail
+   *
+   * @throws Exception
+   */
+  @Test
+  public final void testLookup() throws Exception {
+    final IdentifierFactory factory = new StringIdentifierFactory();
+    final String localAddress = localAddressProvider.getLocalAddress();
+    try (final NameResolver resolver = 
Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF
+        .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 150)
+        .build()).getInstance(NameResolver.class)) {
+      final Identifier id = factory.getNewInstance("Task1");
+      final InetSocketAddress socketAddr = new InetSocketAddress(localAddress, 
7001);
+      resolver.register(id, socketAddr);
+      InetSocketAddress lookupAddr = resolver.lookup(id); // caches the entry
+      Assert.assertTrue(socketAddr.equals(lookupAddr));
+      resolver.unregister(id);
+      Thread.sleep(100);
+      try {
+        lookupAddr = resolver.lookup(id);
+        Thread.sleep(100);
+        //With expireAfterAccess, the previous lookup would reset expiry to 
150ms
+        //more and 100ms wait will not expire the item and will return the 
cached value
+        //With expireAfterWrite, the extra wait of 100 ms will expire the item
+        //resulting in NamingException and the test passes
+        lookupAddr = resolver.lookup(id);
+        Assert.assertNull("resolver.lookup(id)", lookupAddr);
+      } catch (final Exception e) {
+        if (e instanceof ExecutionException) {
+          Assert.assertTrue("Execution Exception cause is instanceof 
NamingException",
+              e.getCause() instanceof NamingException);
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NameClientTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NameClientTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NameClientTest.java
new file mode 100644
index 0000000..5c81afc
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NameClientTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.network.naming.exception.NamingException;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+
+public class NameClientTest {
+
+  private final LocalAddressProvider localAddressProvider;
+
+  public NameClientTest() throws InjectionException {
+    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+  }
+
+  private static final int RETRY_COUNT, RETRY_TIMEOUT;
+
+  static {
+    final Tang tang = Tang.Factory.getTang();
+    try {
+      RETRY_COUNT = 
tang.newInjector().getNamedInstance(NameResolverRetryCount.class);
+      RETRY_TIMEOUT = 
tang.newInjector().getNamedInstance(NameResolverRetryTimeout.class);
+    } catch (final InjectionException e1) {
+      throw new RuntimeException("Exception while trying to find default 
values for retryCount & Timeout", e1);
+    }
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  /**
+   * Test method for {@link 
org.apache.reef.io.network.naming.NameClient#close()}.
+   *
+   * @throws Exception
+   */
+  @Test
+  public final void testClose() throws Exception {
+    final String localAddress = localAddressProvider.getLocalAddress();
+    final IdentifierFactory factory = new StringIdentifierFactory();
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
+    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      final int serverPort = server.getPort();
+      final Configuration nameResolverConf = NameResolverConfiguration.CONF
+          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+          .set(NameResolverConfiguration.NAME_SERVICE_PORT, serverPort)
+          .set(NameResolverConfiguration.CACHE_TIMEOUT, 10000)
+          .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT)
+          .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT)
+          .build();
+
+      try (final NameResolver client =
+               
Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class))
 {
+        final Identifier id = factory.getNewInstance("Task1");
+        client.register(id, new InetSocketAddress(localAddress, 7001));
+        client.unregister(id);
+        Thread.sleep(100);
+      }
+    }
+  }
+
+  /**
+   * Test method for {@link 
org.apache.reef.io.network.naming.NameClient#lookup()}.
+   * To check caching behavior with expireAfterAccess & expireAfterWrite
+   * Changing NameCache's pattern to expireAfterAccess causes this test to fail
+   *
+   * @throws Exception
+   */
+  @Test
+  public final void testLookup() throws Exception {
+    final String localAddress = localAddressProvider.getLocalAddress();
+    final IdentifierFactory factory = new StringIdentifierFactory();
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
+    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      final int serverPort = server.getPort();
+      final Configuration nameResolverConf = NameResolverConfiguration.CONF
+          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+          .set(NameResolverConfiguration.NAME_SERVICE_PORT, serverPort)
+          .set(NameResolverConfiguration.CACHE_TIMEOUT, 150)
+          .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT)
+          .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT)
+          .build();
+
+      try (final NameResolver client =
+               
Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class))
 {
+        final Identifier id = factory.getNewInstance("Task1");
+        client.register(id, new InetSocketAddress(localAddress, 7001));
+        client.lookup(id); // caches the entry
+        client.unregister(id);
+        Thread.sleep(100);
+        try {
+          InetSocketAddress addr = client.lookup(id);
+          Thread.sleep(100);
+          //With expireAfterAccess, the previous lookup would reset expiry to 
150ms
+          //more and 100ms wait will not expire the item and will return the 
cached value
+          //With expireAfterWrite, the extra wait of 100 ms will expire the 
item
+          //resulting in NamingException and the test passes
+          addr = client.lookup(id);
+          Assert.assertNull("client.lookup(id)", addr);
+        } catch (final Exception e) {
+          if (e instanceof ExecutionException) {
+            Assert.assertTrue("Execution Exception cause is instanceof 
NamingException",
+                e.getCause() instanceof NamingException);
+          } else {
+            throw e;
+          }
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NamingTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NamingTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NamingTest.java
new file mode 100644
index 0000000..ed881ab
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NamingTest.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.naming;
+
+import org.apache.reef.io.naming.NameAssignment;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
+import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
+import org.apache.reef.io.network.util.StringIdentifierFactory;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Naming server and client test.
+ */
+public class NamingTest {
+
+  private static final Logger LOG = 
Logger.getLogger(NamingTest.class.getName());
+  private static final int RETRY_COUNT;
+  private static final int RETRY_TIMEOUT;
+
+  static {
+    try {
+      final Injector injector = Tang.Factory.getTang().newInjector();
+      RETRY_COUNT = injector.getNamedInstance(NameResolverRetryCount.class);
+      RETRY_TIMEOUT = 
injector.getNamedInstance(NameResolverRetryTimeout.class);
+    } catch (final InjectionException ex) {
+      final String msg = "Exception while trying to find default values for 
retryCount & Timeout";
+      LOG.log(Level.SEVERE, msg, ex);
+      throw new RuntimeException(msg, ex);
+    }
+  }
+
+  private final LocalAddressProvider localAddressProvider;
+  @Rule
+  public final TestName name = new TestName();
+  static final long TTL = 30000;
+  private final IdentifierFactory factory = new StringIdentifierFactory();
+  private int port;
+
+  public NamingTest() throws InjectionException {
+    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
+  }
+
+  /**
+   * NameServer and NameLookupClient test.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNamingLookup() throws Exception {
+
+    final String localAddress = localAddressProvider.getLocalAddress();
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    // names 
+    final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
+    idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
+    idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
+
+    // run a server
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
+    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      this.port = server.getPort();
+      for (final Identifier id : idToAddrMap.keySet()) {
+        server.register(id, idToAddrMap.get(id));
+      }
+
+      // run a client
+      try (final NameLookupClient client = new NameLookupClient(localAddress, 
this.port,
+          10000, this.factory, RETRY_COUNT, RETRY_TIMEOUT, new 
NameCache(this.TTL), this.localAddressProvider)) {
+
+        final Identifier id1 = this.factory.getNewInstance("task1");
+        final Identifier id2 = this.factory.getNewInstance("task2");
+
+        final Map<Identifier, InetSocketAddress> respMap = new 
HashMap<Identifier, InetSocketAddress>();
+        final InetSocketAddress addr1 = client.lookup(id1);
+        respMap.put(id1, addr1);
+        final InetSocketAddress addr2 = client.lookup(id2);
+        respMap.put(id2, addr2);
+
+        for (final Identifier id : respMap.keySet()) {
+          LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
+        }
+
+        Assert.assertTrue(isEqual(idToAddrMap, respMap));
+      }
+    }
+  }
+
+  /**
+   * Test concurrent lookups (threads share a client).
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNamingLookup() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    final String localAddress = localAddressProvider.getLocalAddress();
+    // test it 3 times to make failure likely
+    for (int i = 0; i < 3; i++) {
+
+      LOG.log(Level.FINEST, "test {0}", i);
+
+      // names 
+      final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
+      idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
+      idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
+      idToAddrMap.put(this.factory.getNewInstance("task3"), new 
InetSocketAddress(localAddress, 7003));
+
+      // run a server
+      final Injector injector = Tang.Factory.getTang().newInjector();
+      
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
+      injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+      try (final NameServer server = injector.getInstance(NameServer.class)) {
+        this.port = server.getPort();
+        for (final Identifier id : idToAddrMap.keySet()) {
+          server.register(id, idToAddrMap.get(id));
+        }
+
+        // run a client
+        try (final NameLookupClient client = new 
NameLookupClient(localAddress, this.port,
+            10000, this.factory, RETRY_COUNT, RETRY_TIMEOUT, new 
NameCache(this.TTL), this.localAddressProvider)) {
+
+          final Identifier id1 = this.factory.getNewInstance("task1");
+          final Identifier id2 = this.factory.getNewInstance("task2");
+          final Identifier id3 = this.factory.getNewInstance("task3");
+
+          final ExecutorService e = Executors.newCachedThreadPool();
+
+          final ConcurrentMap<Identifier, InetSocketAddress> respMap = 
+              new ConcurrentHashMap<Identifier, InetSocketAddress>();
+
+          final Future<?> f1 = e.submit(new Runnable() {
+            @Override
+            public void run() {
+              InetSocketAddress addr = null;
+              try {
+                addr = client.lookup(id1);
+              } catch (final Exception e) {
+                LOG.log(Level.SEVERE, "Lookup failed", e);
+                Assert.fail(e.toString());
+              }
+              respMap.put(id1, addr);
+            }
+          });
+          final Future<?> f2 = e.submit(new Runnable() {
+            @Override
+            public void run() {
+              InetSocketAddress addr = null;
+              try {
+                addr = client.lookup(id2);
+              } catch (final Exception e) {
+                LOG.log(Level.SEVERE, "Lookup failed", e);
+                Assert.fail(e.toString());
+              }
+              respMap.put(id2, addr);
+            }
+          });
+          final Future<?> f3 = e.submit(new Runnable() {
+            @Override
+            public void run() {
+              InetSocketAddress addr = null;
+              try {
+                addr = client.lookup(id3);
+              } catch (final Exception e) {
+                LOG.log(Level.SEVERE, "Lookup failed", e);
+                Assert.fail(e.toString());
+              }
+              respMap.put(id3, addr);
+            }
+          });
+
+          f1.get();
+          f2.get();
+          f3.get();
+
+          for (final Identifier id : respMap.keySet()) {
+            LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
+          }
+
+          Assert.assertTrue(isEqual(idToAddrMap, respMap));
+        }
+      }
+    }
+  }
+
+  /**
+   * NameServer and NameRegistryClient test.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNamingRegistry() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
+    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      this.port = server.getPort();
+      final String localAddress = localAddressProvider.getLocalAddress();
+
+      // names to start with
+      final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
+      idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
+      idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
+
+      // registration
+      // invoke registration from the client side
+      try (final NameRegistryClient client = 
+          new NameRegistryClient(localAddress, this.port, this.factory, 
this.localAddressProvider)) {
+        for (final Identifier id : idToAddrMap.keySet()) {
+          client.register(id, idToAddrMap.get(id));
+        }
+
+        // wait
+        final Set<Identifier> ids = idToAddrMap.keySet();
+        busyWait(server, ids.size(), ids);
+
+        // check the server side
+        Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, 
InetSocketAddress>();
+        Iterable<NameAssignment> nas = server.lookup(ids);
+
+        for (final NameAssignment na : nas) {
+          LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
+              new Object[]{na.getIdentifier(), na.getAddress()});
+          serverMap.put(na.getIdentifier(), na.getAddress());
+        }
+
+        Assert.assertTrue(isEqual(idToAddrMap, serverMap));
+
+        // un-registration
+        for (final Identifier id : idToAddrMap.keySet()) {
+          client.unregister(id);
+        }
+
+        // wait
+        busyWait(server, 0, ids);
+
+        serverMap = new HashMap<Identifier, InetSocketAddress>();
+        nas = server.lookup(ids);
+        for (final NameAssignment na : nas) {
+          serverMap.put(na.getIdentifier(), na.getAddress());
+        }
+
+        Assert.assertEquals(0, serverMap.size());
+      }
+    }
+  }
+
+  /**
+   * NameServer and NameClient test.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNameClient() throws Exception {
+
+    LOG.log(Level.FINEST, this.name.getMethodName());
+
+    final String localAddress = localAddressProvider.getLocalAddress();
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
+    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
+    try (final NameServer server = injector.getInstance(NameServer.class)) {
+      this.port = server.getPort();
+
+      final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
+      idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
+      idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
+
+      // registration
+      // invoke registration from the client side
+      final Configuration nameResolverConf = NameResolverConfiguration.CONF
+          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+          .set(NameResolverConfiguration.NAME_SERVICE_PORT, this.port)
+          .set(NameResolverConfiguration.CACHE_TIMEOUT, this.TTL)
+          .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT)
+          .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT)
+          .build();
+
+      try (final NameResolver client
+               = 
Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class))
 {
+        for (final Identifier id : idToAddrMap.keySet()) {
+          client.register(id, idToAddrMap.get(id));
+        }
+
+        // wait
+        final Set<Identifier> ids = idToAddrMap.keySet();
+        busyWait(server, ids.size(), ids);
+
+        // lookup
+        final Identifier id1 = this.factory.getNewInstance("task1");
+        final Identifier id2 = this.factory.getNewInstance("task2");
+
+        final Map<Identifier, InetSocketAddress> respMap = new 
HashMap<Identifier, InetSocketAddress>();
+        InetSocketAddress addr1 = client.lookup(id1);
+        respMap.put(id1, addr1);
+        InetSocketAddress addr2 = client.lookup(id2);
+        respMap.put(id2, addr2);
+
+        for (final Identifier id : respMap.keySet()) {
+          LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
+        }
+
+        Assert.assertTrue(isEqual(idToAddrMap, respMap));
+
+        // un-registration
+        for (final Identifier id : idToAddrMap.keySet()) {
+          client.unregister(id);
+        }
+
+        // wait
+        busyWait(server, 0, ids);
+
+        final Map<Identifier, InetSocketAddress> serverMap = new 
HashMap<Identifier, InetSocketAddress>();
+        addr1 = server.lookup(id1);
+        if (addr1 != null) {
+          serverMap.put(id1, addr1);
+        }
+        addr2 = server.lookup(id1);
+        if (addr2 != null) {
+          serverMap.put(id2, addr2);
+        }
+
+        Assert.assertEquals(0, serverMap.size());
+      }
+    }
+  }
+
+  private boolean isEqual(final Map<Identifier, InetSocketAddress> map1,
+                          final Map<Identifier, InetSocketAddress> map2) {
+
+    if (map1.size() != map2.size()) {
+      return false;
+    }
+
+    for (final Identifier id : map1.keySet()) {
+      final InetSocketAddress addr1 = map1.get(id);
+      final InetSocketAddress addr2 = map2.get(id);
+      if (!addr1.equals(addr2)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private void busyWait(final NameServer server, final int expected, final 
Set<Identifier> ids) {
+    int count = 0;
+    for (;;) {
+      final Iterable<NameAssignment> nas = server.lookup(ids);
+      for (@SuppressWarnings("unused") final NameAssignment na : nas) {
+        ++count;
+      }
+      if (count == expected) {
+        break;
+      }
+      count = 0;
+    }
+  }
+}


Reply via email to