http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
deleted file mode 100644
index d550dae..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network;
-
-import org.apache.reef.io.naming.NameAssignment;
-import org.apache.reef.io.network.naming.*;
-import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
-import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
-import org.apache.reef.io.network.util.StringIdentifierFactory;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Naming server and client test.
- */
-public class NamingTest {
-
-  private static final Logger LOG = 
Logger.getLogger(NamingTest.class.getName());
-  private static final int RETRY_COUNT;
-  private static final int RETRY_TIMEOUT;
-
-  static {
-    try {
-      final Injector injector = Tang.Factory.getTang().newInjector();
-      RETRY_COUNT = injector.getNamedInstance(NameResolverRetryCount.class);
-      RETRY_TIMEOUT = 
injector.getNamedInstance(NameResolverRetryTimeout.class);
-    } catch (final InjectionException ex) {
-      final String msg = "Exception while trying to find default values for 
retryCount & Timeout";
-      LOG.log(Level.SEVERE, msg, ex);
-      throw new RuntimeException(msg, ex);
-    }
-  }
-
-  private final LocalAddressProvider localAddressProvider;
-  @Rule
-  public final TestName name = new TestName();
-  static final long TTL = 30000;
-  private final IdentifierFactory factory = new StringIdentifierFactory();
-  private int port;
-
-  public NamingTest() throws InjectionException {
-    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
-  }
-
-  /**
-   * NameServer and NameLookupClient test.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testNamingLookup() throws Exception {
-
-    final String localAddress = localAddressProvider.getLocalAddress();
-    LOG.log(Level.FINEST, this.name.getMethodName());
-
-    // names 
-    final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
-    idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
-    idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
-
-    // run a server
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
-    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    try (final NameServer server = injector.getInstance(NameServer.class)) {
-      this.port = server.getPort();
-      for (final Identifier id : idToAddrMap.keySet()) {
-        server.register(id, idToAddrMap.get(id));
-      }
-
-      // run a client
-      try (final NameLookupClient client = new NameLookupClient(localAddress, 
this.port,
-          10000, this.factory, RETRY_COUNT, RETRY_TIMEOUT, new 
NameCache(this.TTL), this.localAddressProvider)) {
-
-        final Identifier id1 = this.factory.getNewInstance("task1");
-        final Identifier id2 = this.factory.getNewInstance("task2");
-
-        final Map<Identifier, InetSocketAddress> respMap = new 
HashMap<Identifier, InetSocketAddress>();
-        final InetSocketAddress addr1 = client.lookup(id1);
-        respMap.put(id1, addr1);
-        final InetSocketAddress addr2 = client.lookup(id2);
-        respMap.put(id2, addr2);
-
-        for (final Identifier id : respMap.keySet()) {
-          LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
-        }
-
-        Assert.assertTrue(isEqual(idToAddrMap, respMap));
-      }
-    }
-  }
-
-  /**
-   * Test concurrent lookups (threads share a client).
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testConcurrentNamingLookup() throws Exception {
-
-    LOG.log(Level.FINEST, this.name.getMethodName());
-
-    final String localAddress = localAddressProvider.getLocalAddress();
-    // test it 3 times to make failure likely
-    for (int i = 0; i < 3; i++) {
-
-      LOG.log(Level.FINEST, "test {0}", i);
-
-      // names 
-      final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
-      idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
-      idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
-      idToAddrMap.put(this.factory.getNewInstance("task3"), new 
InetSocketAddress(localAddress, 7003));
-
-      // run a server
-      final Injector injector = Tang.Factory.getTang().newInjector();
-      
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
-      injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-      try (final NameServer server = injector.getInstance(NameServer.class)) {
-        this.port = server.getPort();
-        for (final Identifier id : idToAddrMap.keySet()) {
-          server.register(id, idToAddrMap.get(id));
-        }
-
-        // run a client
-        try (final NameLookupClient client = new 
NameLookupClient(localAddress, this.port,
-            10000, this.factory, RETRY_COUNT, RETRY_TIMEOUT, new 
NameCache(this.TTL), this.localAddressProvider)) {
-
-          final Identifier id1 = this.factory.getNewInstance("task1");
-          final Identifier id2 = this.factory.getNewInstance("task2");
-          final Identifier id3 = this.factory.getNewInstance("task3");
-
-          final ExecutorService e = Executors.newCachedThreadPool();
-
-          final ConcurrentMap<Identifier, InetSocketAddress> respMap = 
-              new ConcurrentHashMap<Identifier, InetSocketAddress>();
-
-          final Future<?> f1 = e.submit(new Runnable() {
-            @Override
-            public void run() {
-              InetSocketAddress addr = null;
-              try {
-                addr = client.lookup(id1);
-              } catch (final Exception e) {
-                LOG.log(Level.SEVERE, "Lookup failed", e);
-                Assert.fail(e.toString());
-              }
-              respMap.put(id1, addr);
-            }
-          });
-          final Future<?> f2 = e.submit(new Runnable() {
-            @Override
-            public void run() {
-              InetSocketAddress addr = null;
-              try {
-                addr = client.lookup(id2);
-              } catch (final Exception e) {
-                LOG.log(Level.SEVERE, "Lookup failed", e);
-                Assert.fail(e.toString());
-              }
-              respMap.put(id2, addr);
-            }
-          });
-          final Future<?> f3 = e.submit(new Runnable() {
-            @Override
-            public void run() {
-              InetSocketAddress addr = null;
-              try {
-                addr = client.lookup(id3);
-              } catch (final Exception e) {
-                LOG.log(Level.SEVERE, "Lookup failed", e);
-                Assert.fail(e.toString());
-              }
-              respMap.put(id3, addr);
-            }
-          });
-
-          f1.get();
-          f2.get();
-          f3.get();
-
-          for (final Identifier id : respMap.keySet()) {
-            LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
-          }
-
-          Assert.assertTrue(isEqual(idToAddrMap, respMap));
-        }
-      }
-    }
-  }
-
-  /**
-   * NameServer and NameRegistryClient test.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testNamingRegistry() throws Exception {
-
-    LOG.log(Level.FINEST, this.name.getMethodName());
-
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
-    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    try (final NameServer server = injector.getInstance(NameServer.class)) {
-      this.port = server.getPort();
-      final String localAddress = localAddressProvider.getLocalAddress();
-
-      // names to start with
-      final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
-      idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
-      idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
-
-      // registration
-      // invoke registration from the client side
-      try (final NameRegistryClient client = 
-          new NameRegistryClient(localAddress, this.port, this.factory, 
this.localAddressProvider)) {
-        for (final Identifier id : idToAddrMap.keySet()) {
-          client.register(id, idToAddrMap.get(id));
-        }
-
-        // wait
-        final Set<Identifier> ids = idToAddrMap.keySet();
-        busyWait(server, ids.size(), ids);
-
-        // check the server side
-        Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, 
InetSocketAddress>();
-        Iterable<NameAssignment> nas = server.lookup(ids);
-
-        for (final NameAssignment na : nas) {
-          LOG.log(Level.FINEST, "Mapping: {0} -> {1}",
-              new Object[]{na.getIdentifier(), na.getAddress()});
-          serverMap.put(na.getIdentifier(), na.getAddress());
-        }
-
-        Assert.assertTrue(isEqual(idToAddrMap, serverMap));
-
-        // un-registration
-        for (final Identifier id : idToAddrMap.keySet()) {
-          client.unregister(id);
-        }
-
-        // wait
-        busyWait(server, 0, ids);
-
-        serverMap = new HashMap<Identifier, InetSocketAddress>();
-        nas = server.lookup(ids);
-        for (final NameAssignment na : nas) {
-          serverMap.put(na.getIdentifier(), na.getAddress());
-        }
-
-        Assert.assertEquals(0, serverMap.size());
-      }
-    }
-  }
-
-  /**
-   * NameServer and NameClient test.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testNameClient() throws Exception {
-
-    LOG.log(Level.FINEST, this.name.getMethodName());
-
-    final String localAddress = localAddressProvider.getLocalAddress();
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 this.factory);
-    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    try (final NameServer server = injector.getInstance(NameServer.class)) {
-      this.port = server.getPort();
-
-      final Map<Identifier, InetSocketAddress> idToAddrMap = new 
HashMap<Identifier, InetSocketAddress>();
-      idToAddrMap.put(this.factory.getNewInstance("task1"), new 
InetSocketAddress(localAddress, 7001));
-      idToAddrMap.put(this.factory.getNewInstance("task2"), new 
InetSocketAddress(localAddress, 7002));
-
-      // registration
-      // invoke registration from the client side
-      final Configuration nameResolverConf = NameResolverConfiguration.CONF
-          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
-          .set(NameResolverConfiguration.NAME_SERVICE_PORT, this.port)
-          .set(NameResolverConfiguration.CACHE_TIMEOUT, this.TTL)
-          .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT)
-          .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT)
-          .build();
-
-      try (final NameResolver client
-               = 
Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class))
 {
-        for (final Identifier id : idToAddrMap.keySet()) {
-          client.register(id, idToAddrMap.get(id));
-        }
-
-        // wait
-        final Set<Identifier> ids = idToAddrMap.keySet();
-        busyWait(server, ids.size(), ids);
-
-        // lookup
-        final Identifier id1 = this.factory.getNewInstance("task1");
-        final Identifier id2 = this.factory.getNewInstance("task2");
-
-        final Map<Identifier, InetSocketAddress> respMap = new 
HashMap<Identifier, InetSocketAddress>();
-        InetSocketAddress addr1 = client.lookup(id1);
-        respMap.put(id1, addr1);
-        InetSocketAddress addr2 = client.lookup(id2);
-        respMap.put(id2, addr2);
-
-        for (final Identifier id : respMap.keySet()) {
-          LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, 
respMap.get(id)});
-        }
-
-        Assert.assertTrue(isEqual(idToAddrMap, respMap));
-
-        // un-registration
-        for (final Identifier id : idToAddrMap.keySet()) {
-          client.unregister(id);
-        }
-
-        // wait
-        busyWait(server, 0, ids);
-
-        final Map<Identifier, InetSocketAddress> serverMap = new 
HashMap<Identifier, InetSocketAddress>();
-        addr1 = server.lookup(id1);
-        if (addr1 != null) {
-          serverMap.put(id1, addr1);
-        }
-        addr2 = server.lookup(id1);
-        if (addr2 != null) {
-          serverMap.put(id2, addr2);
-        }
-
-        Assert.assertEquals(0, serverMap.size());
-      }
-    }
-  }
-
-  private boolean isEqual(final Map<Identifier, InetSocketAddress> map1,
-                          final Map<Identifier, InetSocketAddress> map2) {
-
-    if (map1.size() != map2.size()) {
-      return false;
-    }
-
-    for (final Identifier id : map1.keySet()) {
-      final InetSocketAddress addr1 = map1.get(id);
-      final InetSocketAddress addr2 = map2.get(id);
-      if (!addr1.equals(addr2)) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  private void busyWait(final NameServer server, final int expected, final 
Set<Identifier> ids) {
-    int count = 0;
-    for (;;) {
-      final Iterable<NameAssignment> nas = server.lookup(ids);
-      for (@SuppressWarnings("unused") final NameAssignment na : nas) {
-        ++count;
-      }
-      if (count == expected) {
-        break;
-      }
-      count = 0;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
deleted file mode 100644
index 6bc0ac9..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network;
-
-import org.apache.reef.exception.evaluator.NetworkException;
-import org.apache.reef.io.network.Connection;
-import org.apache.reef.io.network.util.StringCodec;
-import org.apache.reef.io.network.util.StringIdentifierFactory;
-import org.apache.reef.services.network.util.*;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.remote.Codec;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
-import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.util.concurrent.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Default Network connection service test.
- */
-public class NetworkConnectionServiceTest {
-  private static final Logger LOG = 
Logger.getLogger(NetworkConnectionServiceTest.class.getName());
-
-  private final LocalAddressProvider localAddressProvider;
-  private final String localAddress;
-  private final Identifier groupCommClientId;
-  private final Identifier shuffleClientId;
-
-  public NetworkConnectionServiceTest() throws InjectionException {
-    localAddressProvider = LocalAddressProviderFactory.getInstance();
-    localAddress = localAddressProvider.getLocalAddress();
-
-    final IdentifierFactory idFac = new StringIdentifierFactory();
-    this.groupCommClientId = idFac.getNewInstance("groupComm");
-    this.shuffleClientId = idFac.getNewInstance("shuffle");
-  }
-
-  @Rule
-  public TestName name = new TestName();
-
-  private void runMessagingNetworkConnectionService(final Codec<String> codec) 
throws Exception {
-    final int numMessages = 2000;
-    final Monitor monitor = new Monitor();
-    try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
-      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-
-      try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-        try {
-          conn.open();
-          for (int count = 0; count < numMessages; ++count) {
-            // send messages to the receiver.
-            conn.write("hello" + count);
-          }
-          monitor.mwait();
-        } catch (final NetworkException e) {
-          e.printStackTrace();
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
-
-  /**
-   * NetworkConnectionService messaging test.
-   */
-  @Test
-  public void testMessagingNetworkConnectionService() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    runMessagingNetworkConnectionService(new StringCodec());
-  }
-
-  /**
-   * NetworkConnectionService streaming messaging test.
-   */
-  @Test
-  public void testStreamingMessagingNetworkConnectionService() throws 
Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    runMessagingNetworkConnectionService(new StreamingStringCodec());
-  }
-
-  public void runNetworkConnServiceWithMultipleConnFactories(final 
Codec<String> stringCodec,
-                                                             final 
Codec<Integer> integerCodec)
-      throws Exception {
-    final ExecutorService executor = Executors.newFixedThreadPool(5);
-
-    final int groupcommMessages = 1000;
-    final Monitor monitor = new Monitor();
-    try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
-
-      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
groupcommMessages, monitor, stringCodec);
-
-      final int shuffleMessages = 2000;
-      final Monitor monitor2 = new Monitor();
-      messagingTestService.registerTestConnectionFactory(shuffleClientId, 
shuffleMessages, monitor2, integerCodec);
-
-      executor.submit(new Runnable() {
-        @Override
-        public void run() {
-          try (final Connection<String> conn =
-                   
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-            conn.open();
-            for (int count = 0; count < groupcommMessages; ++count) {
-              // send messages to the receiver.
-              conn.write("hello" + count);
-            }
-            monitor.mwait();
-          } catch (final Exception e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-        }
-      });
-
-      executor.submit(new Runnable() {
-        @Override
-        public void run() {
-          try (final Connection<Integer> conn =
-                   
messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) {
-            conn.open();
-            for (int count = 0; count < shuffleMessages; ++count) {
-              // send messages to the receiver.
-              conn.write(count);
-            }
-            monitor2.mwait();
-          } catch (final Exception e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-        }
-      });
-
-      monitor.mwait();
-      monitor2.mwait();
-      executor.shutdown();
-    }
-  }
-
-  /**
-   * Test NetworkService registering multiple connection factories.
-   */
-  @Test
-  public void testMultipleConnectionFactoriesTest() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new 
ObjectSerializableCodec<Integer>());
-  }
-
-  /**
-   * Test NetworkService registering multiple connection factories with 
Streamingcodec.
-   */
-  @Test
-  public void testMultipleConnectionFactoriesStreamingTest() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), 
new StreamingIntegerCodec());
-  }
-
-  /**
-   * NetworkService messaging rate benchmark.
-   */
-  @Test
-  public void testMessagingNetworkConnServiceRate() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
-
-    for (final int size : messageSizes) {
-      final int numMessages = 300000 / (Math.max(1, size / 512));
-      final Monitor monitor = new Monitor();
-      final Codec<String> codec = new StringCodec();
-      try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
-        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-
-        // build the message
-        final StringBuilder msb = new StringBuilder();
-        for (int i = 0; i < size; i++) {
-          msb.append("1");
-        }
-        final String message = msb.toString();
-
-        try (final Connection<String> conn =
-                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-
-          final long start = System.currentTimeMillis();
-          try {
-            conn.open();
-            for (int count = 0; count < numMessages; ++count) {
-              // send messages to the receiver.
-              conn.write(message);
-            }
-            monitor.mwait();
-          } catch (final NetworkException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-          final long end = System.currentTimeMillis();
-
-          final double runtime = ((double) end - start) / 1000;
-          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages 
/ runtime +
-              " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / 
runtime); // x2 for unicode chars
-        }
-      }
-    }
-  }
-
-  /**
-   * NetworkService messaging rate benchmark.
-   */
-  @Test
-  public void testMessagingNetworkConnServiceRateDisjoint() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
-
-    final int numThreads = 4;
-    final int size = 2000;
-    final int numMessages = 300000 / (Math.max(1, size / 512));
-    final int totalNumMessages = numMessages * numThreads;
-
-    final ExecutorService e = Executors.newCachedThreadPool();
-    for (int t = 0; t < numThreads; t++) {
-      final int tt = t;
-
-      e.submit(new Runnable() {
-        public void run() {
-          try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
-            final Monitor monitor = new Monitor();
-            final Codec<String> codec = new StringCodec();
-
-            
messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-            try (final Connection<String> conn =
-                     
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-
-              // build the message
-              final StringBuilder msb = new StringBuilder();
-              for (int i = 0; i < size; i++) {
-                msb.append("1");
-              }
-              final String message = msb.toString();
-
-              try {
-                conn.open();
-                for (int count = 0; count < numMessages; ++count) {
-                  // send messages to the receiver.
-                  conn.write(message);
-                }
-                monitor.mwait();
-              } catch (final NetworkException e) {
-                e.printStackTrace();
-                throw new RuntimeException(e);
-              }
-            }
-          } catch (final Exception e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
-    }
-
-    // start and time
-    final long start = System.currentTimeMillis();
-    final Object ignore = new Object();
-    for (int i = 0; i < numThreads; i++) {
-      barrier.add(ignore);
-    }
-    e.shutdown();
-    e.awaitTermination(100, TimeUnit.SECONDS);
-    final long end = System.currentTimeMillis();
-    final double runtime = ((double) end - start) / 1000;
-    LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages 
/ runtime +
-        " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / 
runtime); // x2 for unicode chars
-  }
-
-  @Test
-  public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() 
throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024};
-
-    for (final int size : messageSizes) {
-      final int numMessages = 300000 / (Math.max(1, size / 512));
-      final int numThreads = 2;
-      final int totalNumMessages = numMessages * numThreads;
-      final Monitor monitor = new Monitor();
-      final Codec<String> codec = new StringCodec();
-      try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
-        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
totalNumMessages, monitor, codec);
-
-        final ExecutorService e = Executors.newCachedThreadPool();
-
-        // build the message
-        final StringBuilder msb = new StringBuilder();
-        for (int i = 0; i < size; i++) {
-          msb.append("1");
-        }
-        final String message = msb.toString();
-        try (final Connection<String> conn =
-                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-
-          final long start = System.currentTimeMillis();
-          for (int i = 0; i < numThreads; i++) {
-            e.submit(new Runnable() {
-              @Override
-              public void run() {
-
-                try {
-                  conn.open();
-                  for (int count = 0; count < numMessages; ++count) {
-                    // send messages to the receiver.
-                    conn.write(message);
-                  }
-                } catch (final Exception e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            });
-          }
-
-          e.shutdown();
-          e.awaitTermination(30, TimeUnit.SECONDS);
-          monitor.mwait();
-          final long end = System.currentTimeMillis();
-          final double runtime = ((double) end - start) / 1000;
-          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + 
-              " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) 
/ runtime); // x2 for unicode chars
-        }
-      }
-    }
-  }
-
-  /**
-   * NetworkService messaging rate benchmark.
-   */
-  @Test
-  public void testMessagingNetworkConnServiceBatchingRate() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    final int batchSize = 1024 * 1024;
-    final int[] messageSizes = {32, 64, 512};
-
-    for (final int size : messageSizes) {
-      final int numMessages = 300 / (Math.max(1, size / 512));
-      final Monitor monitor = new Monitor();
-      final Codec<String> codec = new StringCodec();
-      try (final NetworkMessagingTestService messagingTestService = new 
NetworkMessagingTestService(localAddress)) {
-        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-        try (final Connection<String> conn =
-                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-
-          // build the message
-          final StringBuilder msb = new StringBuilder();
-          for (int i = 0; i < size; i++) {
-            msb.append("1");
-          }
-          final String message = msb.toString();
-
-          final long start = System.currentTimeMillis();
-          try {
-            for (int i = 0; i < numMessages; i++) {
-              final StringBuilder sb = new StringBuilder();
-              for (int j = 0; j < batchSize / size; j++) {
-                sb.append(message);
-              }
-              conn.open();
-              conn.write(sb.toString());
-            }
-            monitor.mwait();
-          } catch (final NetworkException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-
-          final long end = System.currentTimeMillis();
-          final double runtime = ((double) end - start) / 1000;
-          final long numAppMessages = numMessages * batchSize / size;
-          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
numAppMessages / runtime +
-              " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / 
runtime); // x2 for unicode chars
-        }
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
deleted file mode 100644
index 6045be2..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network;
-
-import org.apache.reef.exception.evaluator.NetworkException;
-import org.apache.reef.io.network.Connection;
-import org.apache.reef.io.network.Message;
-import org.apache.reef.io.network.impl.NetworkService;
-import org.apache.reef.io.network.naming.*;
-import org.apache.reef.io.network.util.StringIdentifierFactory;
-import org.apache.reef.services.network.util.Monitor;
-import org.apache.reef.services.network.util.StringCodec;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
-import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Network service test.
- */
-public class NetworkServiceTest {
-  private static final Logger LOG = 
Logger.getLogger(NetworkServiceTest.class.getName());
-
-  private final LocalAddressProvider localAddressProvider;
-  private final String localAddress;
-
-  public NetworkServiceTest() throws InjectionException {
-    localAddressProvider = LocalAddressProviderFactory.getInstance();
-    localAddress = localAddressProvider.getLocalAddress();
-  }
-
-  @Rule
-  public TestName name = new TestName();
-
-  /**
-   * NetworkService messaging test.
-   */
-  @Test
-  public void testMessagingNetworkService() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    final IdentifierFactory factory = new StringIdentifierFactory();
-
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
-    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-
-    try (final NameServer server = injector.getInstance(NameServer.class)) {
-      final int nameServerPort = server.getPort();
-
-      final int numMessages = 10;
-      final Monitor monitor = new Monitor();
-
-      // network service
-      final String name2 = "task2";
-      final String name1 = "task1";
-      final Configuration nameResolverConf =
-          
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
-          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
-          .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
-          .build())
-          .build();
-
-      final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
-
-      LOG.log(Level.FINEST, "=== Test network service receiver start");
-      LOG.log(Level.FINEST, "=== Test network service sender start");
-      try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
-           NetworkService<String> ns2 = new NetworkService<String>(factory, 0, 
nameResolver,
-               new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-               new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
-           final NetworkService<String> ns1 = new 
NetworkService<String>(factory, 0, nameResolver,
-               new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-               new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
-
-        ns2.registerId(factory.getNewInstance(name2));
-        final int port2 = ns2.getTransport().getListeningPort();
-        server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
-
-        ns1.registerId(factory.getNewInstance(name1));
-        final int port1 = ns1.getTransport().getListeningPort();
-        server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
-
-        final Identifier destId = factory.getNewInstance(name2);
-
-        try (final Connection<String> conn = ns1.newConnection(destId)) {
-          conn.open();
-          for (int count = 0; count < numMessages; ++count) {
-            conn.write("hello! " + count);
-          }
-          monitor.mwait();
-
-        } catch (final NetworkException e) {
-          e.printStackTrace();
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
-
-  /**
-   * NetworkService messaging rate benchmark.
-   */
-  @Test
-  public void testMessagingNetworkServiceRate() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    final IdentifierFactory factory = new StringIdentifierFactory();
-
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
-    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-
-    try (final NameServer server = injector.getInstance(NameServer.class)) {
-      final int nameServerPort = server.getPort();
-
-      final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
-
-      for (final int size : messageSizes) {
-        final int numMessages = 300000 / (Math.max(1, size / 512));
-        final Monitor monitor = new Monitor();
-
-        // network service
-        final String name2 = "task2";
-        final String name1 = "task1";
-        final Configuration nameResolverConf =
-            
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
-            .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
-            .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
-            .build())
-            .build();
-
-        final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
-
-        LOG.log(Level.FINEST, "=== Test network service receiver start");
-        LOG.log(Level.FINEST, "=== Test network service sender start");
-        try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
-             NetworkService<String> ns2 = new NetworkService<String>(factory, 
0, nameResolver,
-                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-                 new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
-             NetworkService<String> ns1 = new NetworkService<String>(factory, 
0, nameResolver,
-                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-                 new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
-
-          ns2.registerId(factory.getNewInstance(name2));
-          final int port2 = ns2.getTransport().getListeningPort();
-          server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
-
-          ns1.registerId(factory.getNewInstance(name1));
-          final int port1 = ns1.getTransport().getListeningPort();
-          server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
-
-          final Identifier destId = factory.getNewInstance(name2);
-
-          // build the message
-          final StringBuilder msb = new StringBuilder();
-          for (int i = 0; i < size; i++) {
-            msb.append("1");
-          }
-          final String message = msb.toString();
-
-          final long start = System.currentTimeMillis();
-          try (Connection<String> conn = ns1.newConnection(destId)) {
-            for (int i = 0; i < numMessages; i++) {
-              conn.open();
-              conn.write(message);
-            }
-            monitor.mwait();
-          } catch (final NetworkException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-          final long end = System.currentTimeMillis();
-          final double runtime = ((double) end - start) / 1000;
-          LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
numMessages / runtime +
-              " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / 
runtime); // x2 for unicode chars
-        }
-      }
-    }
-  }
-
-  /**
-   * NetworkService messaging rate benchmark.
-   */
-  @Test
-  public void testMessagingNetworkServiceRateDisjoint() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    final IdentifierFactory factory = new StringIdentifierFactory();
-
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
-    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-
-    try (final NameServer server = injector.getInstance(NameServer.class)) {
-      final int nameServerPort = server.getPort();
-
-      final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
-
-      final int numThreads = 4;
-      final int size = 2000;
-      final int numMessages = 300000 / (Math.max(1, size / 512));
-      final int totalNumMessages = numMessages * numThreads;
-
-      final ExecutorService e = Executors.newCachedThreadPool();
-      for (int t = 0; t < numThreads; t++) {
-        final int tt = t;
-
-        e.submit(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              final Monitor monitor = new Monitor();
-
-              // network service
-              final String name2 = "task2-" + tt;
-              final String name1 = "task1-" + tt;
-              final Configuration nameResolverConf =
-                  
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
-                  .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
localAddress)
-                  .set(NameResolverConfiguration.NAME_SERVICE_PORT, 
nameServerPort)
-                  .build())
-                  .build();
-
-              final Injector injector = 
Tang.Factory.getTang().newInjector(nameResolverConf);
-
-              LOG.log(Level.FINEST, "=== Test network service receiver start");
-              LOG.log(Level.FINEST, "=== Test network service sender start");
-              try (final NameResolver nameResolver = 
injector.getInstance(NameResolver.class);
-                   NetworkService<String> ns2 = new 
NetworkService<String>(factory, 0, nameResolver,
-                       new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-                       new MessageHandler<String>(name2, monitor, numMessages),
-                       new ExceptionHandler(), localAddressProvider);
-                   NetworkService<String> ns1 = new 
NetworkService<String>(factory, 0, nameResolver,
-                       new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-                       new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
-
-                ns2.registerId(factory.getNewInstance(name2));
-                final int port2 = ns2.getTransport().getListeningPort();
-                server.register(factory.getNewInstance(name2), new 
InetSocketAddress(localAddress, port2));
-
-                ns1.registerId(factory.getNewInstance(name1));
-                final int port1 = ns1.getTransport().getListeningPort();
-                server.register(factory.getNewInstance(name1), new 
InetSocketAddress(localAddress, port1));
-
-                final Identifier destId = factory.getNewInstance(name2);
-
-                // build the message
-                final StringBuilder msb = new StringBuilder();
-                for (int i = 0; i < size; i++) {
-                  msb.append("1");
-                }
-                final String message = msb.toString();
-
-                try (Connection<String> conn = ns1.newConnection(destId)) {
-                  for (int i = 0; i < numMessages; i++) {
-                    conn.open();
-                    conn.write(message);
-                  }
-                  monitor.mwait();
-                } catch (final NetworkException e) {
-                  e.printStackTrace();
-                  throw new RuntimeException(e);
-                }
-              }
-            } catch (final Exception e) {
-              e.printStackTrace();
-              throw new RuntimeException(e);
-            }
-          }
-        });
-      }
-
-      // start and time
-      final long start = System.currentTimeMillis();
-      final Object ignore = new Object();
-      for (int i = 0; i < numThreads; i++) {
-        barrier.add(ignore);
-      }
-      e.shutdown();
-      e.awaitTermination(100, TimeUnit.SECONDS);
-      final long end = System.currentTimeMillis();
-
-      final double runtime = ((double) end - start) / 1000;
-      LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + 
-          " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / 
runtime); // x2 for unicode chars
-    }
-  }
-
-  @Test
-  public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws 
Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    final IdentifierFactory factory = new StringIdentifierFactory();
-
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
-    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-    try (final NameServer server = injector.getInstance(NameServer.class)) {
-      final int nameServerPort = server.getPort();
-
-      final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024};
-
-      for (final int size : messageSizes) {
-        final int numMessages = 300000 / (Math.max(1, size / 512));
-        final int numThreads = 2;
-        final int totalNumMessages = numMessages * numThreads;
-        final Monitor monitor = new Monitor();
-
-
-        // network service
-        final String name2 = "task2";
-        final String name1 = "task1";
-        final Configuration nameResolverConf =
-            
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
-            .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
-            .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
-            .build())
-            .build();
-
-        final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
-
-        LOG.log(Level.FINEST, "=== Test network service receiver start");
-        LOG.log(Level.FINEST, "=== Test network service sender start");
-        try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
-             NetworkService<String> ns2 = new NetworkService<String>(factory, 
0, nameResolver,
-                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-                 new MessageHandler<String>(name2, monitor, totalNumMessages),
-                 new ExceptionHandler(), localAddressProvider);
-             NetworkService<String> ns1 = new NetworkService<String>(factory, 
0, nameResolver,
-                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-                 new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
-
-          ns2.registerId(factory.getNewInstance(name2));
-          final int port2 = ns2.getTransport().getListeningPort();
-          server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
-
-          ns1.registerId(factory.getNewInstance(name1));
-          final int port1 = ns1.getTransport().getListeningPort();
-          server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
-
-          final Identifier destId = factory.getNewInstance(name2);
-
-          try (final Connection<String> conn = ns1.newConnection(destId)) {
-            conn.open();
-
-            // build the message
-            final StringBuilder msb = new StringBuilder();
-            for (int i = 0; i < size; i++) {
-              msb.append("1");
-            }
-            final String message = msb.toString();
-
-            final ExecutorService e = Executors.newCachedThreadPool();
-
-            final long start = System.currentTimeMillis();
-            for (int i = 0; i < numThreads; i++) {
-              e.submit(new Runnable() {
-
-                @Override
-                public void run() {
-                  for (int i = 0; i < numMessages; i++) {
-                    conn.write(message);
-                  }
-                }
-              });
-            }
-
-
-            e.shutdown();
-            e.awaitTermination(30, TimeUnit.SECONDS);
-            monitor.mwait();
-
-            final long end = System.currentTimeMillis();
-            final double runtime = ((double) end - start) / 1000;
-
-            LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime + 
-                " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * 
size) / runtime); // x2 for unicode chars
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * NetworkService messaging rate benchmark.
-   */
-  @Test
-  public void testMessagingNetworkServiceBatchingRate() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    final IdentifierFactory factory = new StringIdentifierFactory();
-
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
-    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-
-    try (final NameServer server = injector.getInstance(NameServer.class)) {
-      final int nameServerPort = server.getPort();
-
-      final int batchSize = 1024 * 1024;
-      final int[] messageSizes = {32, 64, 512};
-
-      for (final int size : messageSizes) {
-        final int numMessages = 300 / (Math.max(1, size / 512));
-        final Monitor monitor = new Monitor();
-
-        // network service
-        final String name2 = "task2";
-        final String name1 = "task1";
-        final Configuration nameResolverConf =
-            
Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF
-            .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, 
this.localAddress)
-            .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort)
-            .build())
-            .build();
-
-        final Injector injector2 = 
Tang.Factory.getTang().newInjector(nameResolverConf);
-
-        LOG.log(Level.FINEST, "=== Test network service receiver start");
-        LOG.log(Level.FINEST, "=== Test network service sender start");
-        try (final NameResolver nameResolver = 
injector2.getInstance(NameResolver.class);
-             NetworkService<String> ns2 = new NetworkService<String>(factory, 
0, nameResolver,
-                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-                 new MessageHandler<String>(name2, monitor, numMessages), new 
ExceptionHandler(), localAddressProvider);
-             NetworkService<String> ns1 = new NetworkService<String>(factory, 
0, nameResolver,
-                 new StringCodec(), new 
MessagingTransportFactory(localAddressProvider),
-                 new MessageHandler<String>(name1, null, 0), new 
ExceptionHandler(), localAddressProvider)) {
-
-          ns2.registerId(factory.getNewInstance(name2));
-          final int port2 = ns2.getTransport().getListeningPort();
-          server.register(factory.getNewInstance("task2"), new 
InetSocketAddress(this.localAddress, port2));
-
-          ns1.registerId(factory.getNewInstance(name1));
-          final int port1 = ns1.getTransport().getListeningPort();
-          server.register(factory.getNewInstance("task1"), new 
InetSocketAddress(this.localAddress, port1));
-
-          final Identifier destId = factory.getNewInstance(name2);
-
-          // build the message
-          final StringBuilder msb = new StringBuilder();
-          for (int i = 0; i < size; i++) {
-            msb.append("1");
-          }
-          final String message = msb.toString();
-
-          final long start = System.currentTimeMillis();
-          try (Connection<String> conn = ns1.newConnection(destId)) {
-            for (int i = 0; i < numMessages; i++) {
-              final StringBuilder sb = new StringBuilder();
-              for (int j = 0; j < batchSize / size; j++) {
-                sb.append(message);
-              }
-              conn.open();
-              conn.write(sb.toString());
-            }
-            monitor.mwait();
-          } catch (final NetworkException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-          final long end = System.currentTimeMillis();
-          final double runtime = ((double) end - start) / 1000;
-          final long numAppMessages = numMessages * batchSize / size;
-          LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + 
numAppMessages / runtime +
-              " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / 
runtime); // x2 for unicode chars
-        }
-      }
-    }
-  }
-
-  /**
-   * Test message handler.
-   */
-  class MessageHandler<T> implements EventHandler<Message<T>> {
-
-    private final String name;
-    private final int expected;
-    private final Monitor monitor;
-    private AtomicInteger count = new AtomicInteger(0);
-
-    MessageHandler(final String name, final Monitor monitor, final int 
expected) {
-      this.name = name;
-      this.monitor = monitor;
-      this.expected = expected;
-    }
-
-    @Override
-    public void onNext(final Message<T> value) {
-
-      count.incrementAndGet();
-
-      LOG.log(Level.FINEST,
-          "OUT: {0} received {1} from {2} to {3}",
-          new Object[]{name, value.getData(), value.getSrcId(), 
value.getDestId()});
-
-      for (final T obj : value.getData()) {
-        LOG.log(Level.FINEST, "OUT: data: {0}", obj);
-      }
-
-      if (count.get() == expected) {
-        monitor.mnotify();
-      }
-    }
-  }
-
-  /**
-   * Test exception handler.
-   */
-  class ExceptionHandler implements EventHandler<Exception> {
-    @Override
-    public void onNext(final Exception error) {
-      System.err.println(error);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java
deleted file mode 100644
index 92ba655..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network;
-
-import java.io.Serializable;
-
-/**
- * Event for testing.
- */
-public class TestEvent implements Serializable {
-
-  private static final long serialVersionUID = 1L;
-  private String message;
-
-  public TestEvent(final String message) {
-    this.message = message;
-  }
-
-  public String getMessage() {
-    return message;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/package-info.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/package-info.java
deleted file mode 100644
index 2556f21..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * TODO: Document.
- */
-package org.apache.reef.services.network;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java
deleted file mode 100644
index 4c4bd69..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network.util;
-
-import org.apache.reef.exception.evaluator.NetworkException;
-import org.apache.reef.io.network.Connection;
-import org.apache.reef.io.network.Message;
-import org.apache.reef.io.network.NetworkConnectionService;
-import 
org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
-import org.apache.reef.io.network.naming.NameResolver;
-import org.apache.reef.io.network.naming.NameResolverConfiguration;
-import org.apache.reef.io.network.naming.NameServer;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.remote.Codec;
-import org.apache.reef.wake.remote.transport.LinkListener;
-
-import java.net.SocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-// TODO[JIRA REEF-637] Remove the deprecated class.
-/**
- * Helper class for DeprecatedNetworkConnectionService test, deprecated in 
0.13.
- */
-@Deprecated
-public final class DeprecatedNetworkMessagingTestService implements 
AutoCloseable {
-  private static final Logger LOG = 
Logger.getLogger(DeprecatedNetworkMessagingTestService.class.getName());
-
-  private final IdentifierFactory factory;
-  private final NetworkConnectionService receiverNetworkConnService;
-  private final NetworkConnectionService senderNetworkConnService;
-  private final String receiver;
-  private final String sender;
-  private final NameServer nameServer;
-  private final NameResolver receiverResolver;
-  private final NameResolver senderResolver;
-
-  public DeprecatedNetworkMessagingTestService(final String localAddress) 
throws InjectionException {
-    // name server
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    this.nameServer = injector.getInstance(NameServer.class);
-    final Configuration netConf = NameResolverConfiguration.CONF
-        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
-        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
-        .build();
-
-    LOG.log(Level.FINEST, "=== Test network connection service receiver 
start");
-    // network service for receiver
-    this.receiver = "receiver";
-    final Injector injectorReceiver = injector.forkInjector(netConf);
-    this.receiverNetworkConnService = 
injectorReceiver.getInstance(NetworkConnectionService.class);
-    this.receiverResolver = injectorReceiver.getInstance(NameResolver.class);
-    this.factory = 
injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
-    
this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver));
-
-    // network service for sender
-    this.sender = "sender";
-    LOG.log(Level.FINEST, "=== Test network connection service sender start");
-    final Injector injectorSender = injector.forkInjector(netConf);
-    senderNetworkConnService = 
injectorSender.getInstance(NetworkConnectionService.class);
-    
senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender));
-    this.senderResolver = injectorSender.getInstance(NameResolver.class);
-  }
-
-  public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
-                                                final int numMessages, final 
Monitor monitor,
-                                                final Codec<T> codec) throws 
NetworkException {
-    receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec,
-        new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
-    senderNetworkConnService.registerConnectionFactory(connFactoryId, codec,
-        new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
-  }
-
-  public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier 
connFactoryId) {
-    final Identifier destId = factory.getNewInstance(receiver);
-    return 
(Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId);
-  }
-
-  public void close() throws Exception {
-    senderNetworkConnService.close();
-    receiverNetworkConnService.close();
-    nameServer.close();
-    receiverResolver.close();
-    senderResolver.close();
-  }
-
-  public static final class MessageHandler<T> implements 
EventHandler<Message<T>> {
-    private final int expected;
-    private final Monitor monitor;
-    private AtomicInteger count = new AtomicInteger(0);
-
-    public MessageHandler(final Monitor monitor,
-                          final int expected) {
-      this.monitor = monitor;
-      this.expected = expected;
-    }
-
-    @Override
-    public void onNext(final Message<T> value) {
-      count.incrementAndGet();
-      LOG.log(Level.FINE, "Count: {0}", count.get());
-      LOG.log(Level.FINE,
-          "OUT: {0} received {1} from {2} to {3}",
-          new Object[]{value, value.getSrcId(), value.getDestId()});
-
-      for (final T obj : value.getData()) {
-        LOG.log(Level.FINE, "OUT: data: {0}", obj);
-      }
-
-      if (count.get() == expected) {
-        monitor.mnotify();
-      }
-    }
-  }
-
-  public static final class TestListener<T> implements 
LinkListener<Message<T>> {
-    @Override
-    public void onSuccess(final Message<T> message) {
-      LOG.log(Level.FINE, "success: " + message);
-    }
-    @Override
-    public void onException(final Throwable cause, final SocketAddress 
remoteAddress, final Message<T> message) {
-      LOG.log(Level.WARNING, "exception: " + cause + message);
-      throw new RuntimeException(cause);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java
deleted file mode 100644
index b10d876..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network.util;
-
-import java.util.logging.ConsoleHandler;
-import java.util.logging.Handler;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-public final class LoggingUtils {
-  public static void setLoggingLevel(final Level level) {
-    final Handler[] handlers = Logger.getLogger("").getHandlers();
-    ConsoleHandler ch = null;
-    for (final Handler h : handlers) {
-      if (h instanceof ConsoleHandler) {
-        ch = (ConsoleHandler) h;
-        break;
-      }
-    }
-    if (ch == null) {
-      ch = new ConsoleHandler();
-      Logger.getLogger("").addHandler(ch);
-    }
-    ch.setLevel(level);
-    Logger.getLogger("").setLevel(level);
-  }
-
-  /**
-   * Empty private constructor to prohibit instantiation of utility class.
-   */
-  private LoggingUtils() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java
deleted file mode 100644
index 161ee6a..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network.util;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class Monitor {
-  private AtomicBoolean finished = new AtomicBoolean(false);
-
-  public void mwait() throws InterruptedException {
-    synchronized (this) {
-      while (!finished.get()) {
-        this.wait();
-      }
-    }
-  }
-
-  public void mnotify() {
-    synchronized (this) {
-      finished.compareAndSet(false, true);
-      this.notifyAll();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
deleted file mode 100644
index 3dc33e8..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network.util;
-
-import org.apache.reef.exception.evaluator.NetworkException;
-import org.apache.reef.io.network.Connection;
-import org.apache.reef.io.network.Message;
-import org.apache.reef.io.network.NetworkConnectionService;
-import 
org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
-import org.apache.reef.io.network.naming.NameResolver;
-import org.apache.reef.io.network.naming.NameResolverConfiguration;
-import org.apache.reef.io.network.naming.NameServer;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.remote.Codec;
-import org.apache.reef.wake.remote.transport.LinkListener;
-
-import java.net.SocketAddress;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Helper class for NetworkConnectionService test.
- */
-public final class NetworkMessagingTestService implements AutoCloseable {
-  private static final Logger LOG = 
Logger.getLogger(NetworkMessagingTestService.class.getName());
-
-  private final IdentifierFactory factory;
-  private final NetworkConnectionService receiverNetworkConnService;
-  private final NetworkConnectionService senderNetworkConnService;
-  private final NameServer nameServer;
-  private final NameResolver receiverResolver;
-  private final NameResolver senderResolver;
-
-  public NetworkMessagingTestService(final String localAddress) throws 
InjectionException {
-    // name server
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    this.nameServer = injector.getInstance(NameServer.class);
-    final Configuration netConf = NameResolverConfiguration.CONF
-        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
-        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
-        .build();
-
-    LOG.log(Level.FINEST, "=== Test network connection service receiver 
start");
-    // network service for receiver
-    final Injector injectorReceiver = injector.forkInjector(netConf);
-    this.receiverNetworkConnService = 
injectorReceiver.getInstance(NetworkConnectionService.class);
-    this.receiverResolver = injectorReceiver.getInstance(NameResolver.class);
-    this.factory = 
injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
-
-    // network service for sender
-    LOG.log(Level.FINEST, "=== Test network connection service sender start");
-    final Injector injectorSender = injector.forkInjector(netConf);
-    senderNetworkConnService = 
injectorSender.getInstance(NetworkConnectionService.class);
-    this.senderResolver = injectorSender.getInstance(NameResolver.class);
-  }
-
-  public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
-                                                final int numMessages, final 
Monitor monitor,
-                                                final Codec<T> codec) throws 
NetworkException {
-    final Identifier receiverEndPointId = factory.getNewInstance("receiver");
-    final Identifier senderEndPointId = factory.getNewInstance("sender");
-    receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec,
-        new MessageHandler<T>(monitor, numMessages, senderEndPointId, 
receiverEndPointId),
-        new TestListener<T>(), receiverEndPointId);
-    senderNetworkConnService.registerConnectionFactory(connFactoryId, codec,
-        new MessageHandler<T>(monitor, numMessages, receiverEndPointId, 
senderEndPointId),
-        new TestListener<T>(), senderEndPointId);
-  }
-
-  public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier 
connFactoryId) {
-    final Identifier receiverEndPointId = factory.getNewInstance("receiver");
-    return (Connection<T>)senderNetworkConnService
-        .getConnectionFactory(connFactoryId)
-        .newConnection(receiverEndPointId);
-  }
-
-  public void close() throws Exception {
-    senderNetworkConnService.close();
-    receiverNetworkConnService.close();
-    nameServer.close();
-    receiverResolver.close();
-    senderResolver.close();
-  }
-
-  public static final class MessageHandler<T> implements 
EventHandler<Message<T>> {
-    private final int expected;
-    private final Monitor monitor;
-    private final Identifier expectedSrcId;
-    private final Identifier expectedDestId;
-    private AtomicInteger count = new AtomicInteger(0);
-
-    public MessageHandler(final Monitor monitor,
-                          final int expected,
-                          final Identifier expectedSrcId,
-                          final Identifier expectedDestId) {
-      this.monitor = monitor;
-      this.expected = expected;
-      this.expectedSrcId = expectedSrcId;
-      this.expectedDestId = expectedDestId;
-    }
-
-    @Override
-    public void onNext(final Message<T> value) {
-      count.incrementAndGet();
-      LOG.log(Level.FINE, "Count: {0}", count.get());
-      LOG.log(Level.FINE,
-          "OUT: {0} received {1} from {2} to {3}",
-          new Object[]{value, value.getSrcId(), value.getDestId()});
-
-      for (final T obj : value.getData()) {
-        LOG.log(Level.FINE, "OUT: data: {0}", obj);
-      }
-
-      assert value.getSrcId().equals(expectedSrcId);
-      assert value.getDestId().equals(expectedDestId);
-
-      if (count.get() == expected) {
-        monitor.mnotify();
-      }
-    }
-  }
-
-  public static final class TestListener<T> implements 
LinkListener<Message<T>> {
-    @Override
-    public void onSuccess(final Message<T> message) {
-      LOG.log(Level.FINE, "success: " + message);
-    }
-    @Override
-    public void onException(final Throwable cause, final SocketAddress 
remoteAddress, final Message<T> message) {
-      LOG.log(Level.WARNING, "exception: " + cause + message);
-      throw new RuntimeException(cause);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java
deleted file mode 100644
index 757a803..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network.util;
-
-import org.apache.reef.io.network.impl.StreamingCodec;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-
-public class StreamingIntegerCodec implements StreamingCodec<Integer> {
-
-  @Override
-  public void encodeToStream(final Integer obj, final DataOutputStream stream) 
{
-    try {
-      stream.writeInt(obj);
-    } catch (final IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Integer decodeFromStream(final DataInputStream stream) {
-    try {
-      return stream.readInt();
-    } catch (final IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Integer decode(final byte[] data) {
-    return null;
-  }
-
-  @Override
-  public byte[] encode(final Integer obj) {
-    return new byte[0];
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java
deleted file mode 100644
index 43efaf6..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network.util;
-
-import org.apache.reef.io.network.impl.StreamingCodec;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-
-public class StreamingStringCodec implements StreamingCodec<String> {
-  @Override
-  public byte[] encode(final String obj) {
-    return obj.getBytes();
-  }
-
-  @Override
-  public String decode(final byte[] buf) {
-    return new String(buf);
-  }
-
-  @Override
-  public void encodeToStream(final String obj, final DataOutputStream stream) {
-    try {
-      stream.writeUTF(obj);
-    } catch (final IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public String decodeFromStream(final DataInputStream stream) {
-    try {
-      return stream.readUTF();
-    } catch (final IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java
deleted file mode 100644
index d939b8e..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network.util;
-
-import org.apache.reef.wake.remote.Codec;
-
-
-public class StringCodec implements Codec<String> {
-  @Override
-  public byte[] encode(final String obj) {
-    return obj.getBytes();
-  }
-
-  @Override
-  public String decode(final byte[] buf) {
-    return new String(buf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java
deleted file mode 100644
index 146a6db..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network.util;
-
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.impl.PeriodicEvent;
-
-public class TimeoutHandler implements EventHandler<PeriodicEvent> {
-
-  private final Monitor monitor;
-
-  public TimeoutHandler(final Monitor monitor) {
-    this.monitor = monitor;
-  }
-
-  @Override
-  public void onNext(final PeriodicEvent event) {
-    monitor.mnotify();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java
deleted file mode 100644
index 5630dec..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-/**
- * TODO: Document.
- */
-package org.apache.reef.services.network.util;


Reply via email to