http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/package-info.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/package-info.java
new file mode 100644
index 0000000..f315d02
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Tests for naming.
+ */
+package org.apache.reef.io.network.naming;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/package-info.java 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/package-info.java
new file mode 100644
index 0000000..b5c06d5
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * TODO: Document.
+ */
+package org.apache.reef.io.network;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/DeprecatedNetworkMessagingTestService.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/DeprecatedNetworkMessagingTestService.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/DeprecatedNetworkMessagingTestService.java
new file mode 100644
index 0000000..8c3506f
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/DeprecatedNetworkMessagingTestService.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.NetworkConnectionService;
+import 
org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.io.network.naming.NameResolverConfiguration;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+// TODO[JIRA REEF-637] Remove the deprecated class.
+/**
+ * Helper class for DeprecatedNetworkConnectionService test, deprecated in 
0.13.
+ */
+@Deprecated
+public final class DeprecatedNetworkMessagingTestService implements 
AutoCloseable {
+  private static final Logger LOG = 
Logger.getLogger(DeprecatedNetworkMessagingTestService.class.getName());
+
+  private final IdentifierFactory factory;
+  private final NetworkConnectionService receiverNetworkConnService;
+  private final NetworkConnectionService senderNetworkConnService;
+  private final String receiver;
+  private final String sender;
+  private final NameServer nameServer;
+  private final NameResolver receiverResolver;
+  private final NameResolver senderResolver;
+
+  public DeprecatedNetworkMessagingTestService(final String localAddress) 
throws InjectionException {
+    // name server
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.nameServer = injector.getInstance(NameServer.class);
+    final Configuration netConf = NameResolverConfiguration.CONF
+        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
+        .build();
+
+    LOG.log(Level.FINEST, "=== Test network connection service receiver 
start");
+    // network service for receiver
+    this.receiver = "receiver";
+    final Injector injectorReceiver = injector.forkInjector(netConf);
+    this.receiverNetworkConnService = 
injectorReceiver.getInstance(NetworkConnectionService.class);
+    this.receiverResolver = injectorReceiver.getInstance(NameResolver.class);
+    this.factory = 
injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
+    
this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver));
+
+    // network service for sender
+    this.sender = "sender";
+    LOG.log(Level.FINEST, "=== Test network connection service sender start");
+    final Injector injectorSender = injector.forkInjector(netConf);
+    senderNetworkConnService = 
injectorSender.getInstance(NetworkConnectionService.class);
+    
senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender));
+    this.senderResolver = injectorSender.getInstance(NameResolver.class);
+  }
+
+  public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
+                                                final int numMessages, final 
Monitor monitor,
+                                                final Codec<T> codec) throws 
NetworkException {
+    receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec,
+        new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+    senderNetworkConnService.registerConnectionFactory(connFactoryId, codec,
+        new MessageHandler<T>(monitor, numMessages), new TestListener<T>());
+  }
+
+  public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier 
connFactoryId) {
+    final Identifier destId = factory.getNewInstance(receiver);
+    return 
(Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId);
+  }
+
+  public void close() throws Exception {
+    senderNetworkConnService.close();
+    receiverNetworkConnService.close();
+    nameServer.close();
+    receiverResolver.close();
+    senderResolver.close();
+  }
+
+  public static final class MessageHandler<T> implements 
EventHandler<Message<T>> {
+    private final int expected;
+    private final Monitor monitor;
+    private AtomicInteger count = new AtomicInteger(0);
+
+    public MessageHandler(final Monitor monitor,
+                          final int expected) {
+      this.monitor = monitor;
+      this.expected = expected;
+    }
+
+    @Override
+    public void onNext(final Message<T> value) {
+      count.incrementAndGet();
+      LOG.log(Level.FINE, "Count: {0}", count.get());
+      LOG.log(Level.FINE,
+          "OUT: {0} received {1} from {2} to {3}",
+          new Object[]{value, value.getSrcId(), value.getDestId()});
+
+      for (final T obj : value.getData()) {
+        LOG.log(Level.FINE, "OUT: data: {0}", obj);
+      }
+
+      if (count.get() == expected) {
+        monitor.mnotify();
+      }
+    }
+  }
+
+  public static final class TestListener<T> implements 
LinkListener<Message<T>> {
+    @Override
+    public void onSuccess(final Message<T> message) {
+      LOG.log(Level.FINE, "success: " + message);
+    }
+    @Override
+    public void onException(final Throwable cause, final SocketAddress 
remoteAddress, final Message<T> message) {
+      LOG.log(Level.WARNING, "exception: " + cause + message);
+      throw new RuntimeException(cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/LoggingUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/LoggingUtils.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/LoggingUtils.java
new file mode 100644
index 0000000..e2cf7f9
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/LoggingUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public final class LoggingUtils {
+  public static void setLoggingLevel(final Level level) {
+    final Handler[] handlers = Logger.getLogger("").getHandlers();
+    ConsoleHandler ch = null;
+    for (final Handler h : handlers) {
+      if (h instanceof ConsoleHandler) {
+        ch = (ConsoleHandler) h;
+        break;
+      }
+    }
+    if (ch == null) {
+      ch = new ConsoleHandler();
+      Logger.getLogger("").addHandler(ch);
+    }
+    ch.setLevel(level);
+    Logger.getLogger("").setLevel(level);
+  }
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private LoggingUtils() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/Monitor.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/Monitor.java 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/Monitor.java
new file mode 100644
index 0000000..32c61ad
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/Monitor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Monitor {
+  private AtomicBoolean finished = new AtomicBoolean(false);
+
+  public void mwait() throws InterruptedException {
+    synchronized (this) {
+      while (!finished.get()) {
+        this.wait();
+      }
+    }
+  }
+
+  public void mnotify() {
+    synchronized (this) {
+      finished.compareAndSet(false, true);
+      this.notifyAll();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java
new file mode 100644
index 0000000..a05f266
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/NetworkMessagingTestService.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.exception.evaluator.NetworkException;
+import org.apache.reef.io.network.Connection;
+import org.apache.reef.io.network.Message;
+import org.apache.reef.io.network.NetworkConnectionService;
+import 
org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory;
+import org.apache.reef.io.network.naming.NameResolver;
+import org.apache.reef.io.network.naming.NameResolverConfiguration;
+import org.apache.reef.io.network.naming.NameServer;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.Identifier;
+import org.apache.reef.wake.IdentifierFactory;
+import org.apache.reef.wake.remote.Codec;
+import org.apache.reef.wake.remote.transport.LinkListener;
+
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper class for NetworkConnectionService test.
+ */
+public final class NetworkMessagingTestService implements AutoCloseable {
+  private static final Logger LOG = 
Logger.getLogger(NetworkMessagingTestService.class.getName());
+
+  private final IdentifierFactory factory;
+  private final NetworkConnectionService receiverNetworkConnService;
+  private final NetworkConnectionService senderNetworkConnService;
+  private final NameServer nameServer;
+  private final NameResolver receiverResolver;
+  private final NameResolver senderResolver;
+
+  public NetworkMessagingTestService(final String localAddress) throws 
InjectionException {
+    // name server
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    this.nameServer = injector.getInstance(NameServer.class);
+    final Configuration netConf = NameResolverConfiguration.CONF
+        .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
+        .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort())
+        .build();
+
+    LOG.log(Level.FINEST, "=== Test network connection service receiver 
start");
+    // network service for receiver
+    final Injector injectorReceiver = injector.forkInjector(netConf);
+    this.receiverNetworkConnService = 
injectorReceiver.getInstance(NetworkConnectionService.class);
+    this.receiverResolver = injectorReceiver.getInstance(NameResolver.class);
+    this.factory = 
injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class);
+
+    // network service for sender
+    LOG.log(Level.FINEST, "=== Test network connection service sender start");
+    final Injector injectorSender = injector.forkInjector(netConf);
+    senderNetworkConnService = 
injectorSender.getInstance(NetworkConnectionService.class);
+    this.senderResolver = injectorSender.getInstance(NameResolver.class);
+  }
+
+  public <T> void registerTestConnectionFactory(final Identifier connFactoryId,
+                                                final int numMessages, final 
Monitor monitor,
+                                                final Codec<T> codec) throws 
NetworkException {
+    final Identifier receiverEndPointId = factory.getNewInstance("receiver");
+    final Identifier senderEndPointId = factory.getNewInstance("sender");
+    receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec,
+        new MessageHandler<T>(monitor, numMessages, senderEndPointId, 
receiverEndPointId),
+        new TestListener<T>(), receiverEndPointId);
+    senderNetworkConnService.registerConnectionFactory(connFactoryId, codec,
+        new MessageHandler<T>(monitor, numMessages, receiverEndPointId, 
senderEndPointId),
+        new TestListener<T>(), senderEndPointId);
+  }
+
+  public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier 
connFactoryId) {
+    final Identifier receiverEndPointId = factory.getNewInstance("receiver");
+    return (Connection<T>)senderNetworkConnService
+        .getConnectionFactory(connFactoryId)
+        .newConnection(receiverEndPointId);
+  }
+
+  public void close() throws Exception {
+    senderNetworkConnService.close();
+    receiverNetworkConnService.close();
+    nameServer.close();
+    receiverResolver.close();
+    senderResolver.close();
+  }
+
+  public static final class MessageHandler<T> implements 
EventHandler<Message<T>> {
+    private final int expected;
+    private final Monitor monitor;
+    private final Identifier expectedSrcId;
+    private final Identifier expectedDestId;
+    private AtomicInteger count = new AtomicInteger(0);
+
+    public MessageHandler(final Monitor monitor,
+                          final int expected,
+                          final Identifier expectedSrcId,
+                          final Identifier expectedDestId) {
+      this.monitor = monitor;
+      this.expected = expected;
+      this.expectedSrcId = expectedSrcId;
+      this.expectedDestId = expectedDestId;
+    }
+
+    @Override
+    public void onNext(final Message<T> value) {
+      count.incrementAndGet();
+      LOG.log(Level.FINE, "Count: {0}", count.get());
+      LOG.log(Level.FINE,
+          "OUT: {0} received {1} from {2} to {3}",
+          new Object[]{value, value.getSrcId(), value.getDestId()});
+
+      for (final T obj : value.getData()) {
+        LOG.log(Level.FINE, "OUT: data: {0}", obj);
+      }
+
+      assert value.getSrcId().equals(expectedSrcId);
+      assert value.getDestId().equals(expectedDestId);
+
+      if (count.get() == expected) {
+        monitor.mnotify();
+      }
+    }
+  }
+
+  public static final class TestListener<T> implements 
LinkListener<Message<T>> {
+    @Override
+    public void onSuccess(final Message<T> message) {
+      LOG.log(Level.FINE, "success: " + message);
+    }
+    @Override
+    public void onException(final Throwable cause, final SocketAddress 
remoteAddress, final Message<T> message) {
+      LOG.log(Level.WARNING, "exception: " + cause + message);
+      throw new RuntimeException(cause);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingIntegerCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingIntegerCodec.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingIntegerCodec.java
new file mode 100644
index 0000000..75ff0b6
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingIntegerCodec.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.io.network.impl.StreamingCodec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+
+public class StreamingIntegerCodec implements StreamingCodec<Integer> {
+
+  @Override
+  public void encodeToStream(final Integer obj, final DataOutputStream stream) 
{
+    try {
+      stream.writeInt(obj);
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Integer decodeFromStream(final DataInputStream stream) {
+    try {
+      return stream.readInt();
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Integer decode(final byte[] data) {
+    return null;
+  }
+
+  @Override
+  public byte[] encode(final Integer obj) {
+    return new byte[0];
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingStringCodec.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingStringCodec.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingStringCodec.java
new file mode 100644
index 0000000..3e4edfc
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/StreamingStringCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.io.network.impl.StreamingCodec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+
+public class StreamingStringCodec implements StreamingCodec<String> {
+  @Override
+  public byte[] encode(final String obj) {
+    return obj.getBytes();
+  }
+
+  @Override
+  public String decode(final byte[] buf) {
+    return new String(buf);
+  }
+
+  @Override
+  public void encodeToStream(final String obj, final DataOutputStream stream) {
+    try {
+      stream.writeUTF(obj);
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String decodeFromStream(final DataInputStream stream) {
+    try {
+      return stream.readUTF();
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TimeoutHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TimeoutHandler.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TimeoutHandler.java
new file mode 100644
index 0000000..6dc69a4
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/util/TimeoutHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.network.util;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.PeriodicEvent;
+
+public class TimeoutHandler implements EventHandler<PeriodicEvent> {
+
+  private final Monitor monitor;
+
+  public TimeoutHandler(final Monitor monitor) {
+    this.monitor = monitor;
+  }
+
+  @Override
+  public void onNext(final PeriodicEvent event) {
+    monitor.mnotify();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/ExternalMapTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/ExternalMapTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/ExternalMapTest.java
new file mode 100644
index 0000000..2437717
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/ExternalMapTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.io.ExternalMap;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.storage.ram.CodecRamMap;
+import org.apache.reef.io.storage.ram.RamMap;
+import org.apache.reef.io.storage.ram.RamStorageService;
+import org.apache.reef.io.storage.util.IntegerCodec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+
+public class ExternalMapTest {
+  @Test
+  public void testCodecRamMap() {
+    final RamStorageService ramStore = new RamStorageService();
+    final Codec<Integer> c = new IntegerCodec();
+    final ExternalMap<Integer> m = new CodecRamMap<>(ramStore, c);
+    genericTest(m);
+  }
+
+  @Test
+  public void testRamMap() {
+    final RamStorageService ramStore = new RamStorageService();
+    final ExternalMap<Integer> m = new RamMap<>(ramStore);
+    genericTest(m);
+  }
+
+
+  void genericTest(final ExternalMap<Integer> m) {
+    m.put("foo", 42);
+    final Map<String, Integer> smallMap = new HashMap<>();
+    smallMap.put("bar", 43);
+    smallMap.put("baz", 44);
+
+    m.putAll(smallMap);
+
+    Assert.assertEquals(44, (int) m.get("baz"));
+    Assert.assertEquals(43, (int) m.get("bar"));
+    Assert.assertEquals(42, (int) m.get("foo"));
+    Assert.assertNull(m.get("quuz"));
+
+    Assert.assertTrue(m.containsKey("bar"));
+    Assert.assertFalse(m.containsKey("quuz"));
+
+    final Set<String> barBaz = new HashSet<>();
+    barBaz.add("bar");
+    barBaz.add("baz");
+    barBaz.add("quuz");
+
+    final Iterable<Map.Entry<CharSequence, Integer>> it = m.getAll(barBaz);
+
+    final Map<CharSequence, Integer> found = new TreeMap<>();
+
+    for (final Map.Entry<CharSequence, Integer> e : it) {
+      found.put(e.getKey(), e.getValue());
+    }
+    final Iterator<CharSequence> it2 = found.keySet().iterator();
+    Assert.assertTrue(it2.hasNext());
+    CharSequence s = it2.next();
+    Assert.assertEquals(s, "bar");
+    Assert.assertEquals((int) found.get(s), 43);
+    Assert.assertTrue(it2.hasNext());
+    s = it2.next();
+    Assert.assertEquals(s, "baz");
+    Assert.assertEquals((int) found.get(s), 44);
+    Assert.assertFalse(it2.hasNext());
+
+    Assert.assertEquals(44, (int) m.remove("baz"));
+    Assert.assertFalse(m.containsKey("baz"));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/FramingTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/FramingTest.java 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/FramingTest.java
new file mode 100644
index 0000000..33caa7c
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/FramingTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.io.Accumulator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class FramingTest {
+
+  @Test
+  public void frameRoundTripTest() throws IOException, ServiceException {
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
+    final FramingOutputStream o = new FramingOutputStream(baos);
+    final FramingOutputStream o2 = new FramingOutputStream(baos2);
+    final Accumulator<byte[]> a = o2.accumulator();
+    int offset = 0;
+    for (int i = 0; i < 256; i++) {
+      final byte[] b = new byte[i];
+      Arrays.fill(b, (byte) i);
+      o.write(b);
+      if (i == 255) {
+        o.close();
+      } else {
+        o.nextFrame();
+      }
+      offset += (4 + i);
+      Assert.assertEquals(offset, o.getCurrentOffset());
+      a.add(b);
+      Assert.assertEquals(offset, o2.getCurrentOffset());
+    }
+    a.close();
+    o2.close();
+    final byte[] b1 = baos.toByteArray();
+    final byte[] b2 = baos2.toByteArray();
+    Assert.assertArrayEquals(b1, b2);
+    final FramingInputStream inA1 = new FramingInputStream(new 
ByteArrayInputStream(b1));
+    final FramingInputStream inA2 = new FramingInputStream(new 
ByteArrayInputStream(b2));
+    for (int i = 0; i <= 256; i++) {
+      final byte[] b = new byte[i];
+      Arrays.fill(b, (byte) i);
+      final byte[] f = inA1.readFrame();
+      final byte[] g = inA2.readFrame();
+      if (i == 256) {
+        Assert.assertNull(f);
+        Assert.assertNull(g);
+      } else {
+        Assert.assertArrayEquals(b, f);
+        Assert.assertArrayEquals(b, g);
+      }
+    }
+    inA2.close();
+    inA1.close();
+
+    final FramingInputStream inB1 = new FramingInputStream(new 
ByteArrayInputStream(b1));
+    int i = 0;
+    for (final byte[] bin : inB1) {
+      final byte[] b = new byte[i];
+      Arrays.fill(b, (byte) i);
+      Assert.assertArrayEquals(b, bin);
+      i++;
+    }
+    Assert.assertEquals(256, i);
+    inB1.close();
+
+    final FramingInputStream inB2 = new FramingInputStream(new 
ByteArrayInputStream(b2));
+    i = 0;
+    for (final byte[] bin : inB2) {
+      final byte[] b = new byte[i];
+      Arrays.fill(b, (byte) i);
+      Assert.assertArrayEquals(b, bin);
+      i++;
+    }
+    Assert.assertEquals(256, i);
+    inB2.close();
+    Assert.assertArrayEquals(b1, b2);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/MergingIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/MergingIteratorTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/MergingIteratorTest.java
new file mode 100644
index 0000000..072c0b6
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/MergingIteratorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+
+public class MergingIteratorTest {
+
+  @Test
+  public void testMergingIterator() {
+    Comparator<Integer> cmp = new Comparator<Integer>() {
+
+      @Override
+      public int compare(final Integer o1, final Integer o2) {
+        return Integer.compare(o1, o2);
+      }
+    };
+    @SuppressWarnings("unchecked")
+    Iterator<Integer>[] its = new Iterator[]{
+        Arrays.asList(new Integer[]{1, 4, 7, 10}).iterator(),
+        Arrays.asList(new Integer[]{2, 5, 8, 11}).iterator(),
+        Arrays.asList(new Integer[]{3, 6, 9, 12}).iterator()
+    };
+    MergingIterator<Integer> merge = new MergingIterator<Integer>(cmp, its);
+    int i = 1;
+    while (merge.hasNext()) {
+      Assert.assertEquals(i, (int) merge.next());
+      i++;
+    }
+    Assert.assertEquals(13, i);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SortingSpoolTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SortingSpoolTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SortingSpoolTest.java
new file mode 100644
index 0000000..68b7879
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SortingSpoolTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Spool;
+import org.apache.reef.io.storage.ram.SortingRamSpool;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class SortingSpoolTest {
+
+  @Test
+  public void testRamSpool() throws ServiceException {
+    genericTest(new SortingRamSpool<Integer>(), new Comparator<Integer>() {
+
+      @Override
+      public int compare(final Integer o1, final Integer o2) {
+        return Integer.compare(o1, o2);
+      }
+
+    });
+  }
+
+  @Test
+  public void testRamSpoolComparator() throws ServiceException {
+    final Comparator<Integer> backwards = new Comparator<Integer>() {
+
+      @Override
+      public int compare(final Integer o1, final Integer o2) {
+        return -1 * o1.compareTo(o2);
+      }
+
+    };
+    genericTest(new SortingRamSpool<Integer>(backwards), backwards);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testRamSpoolAddAfterClose() throws ServiceException {
+    final Spool<Integer> s = new SortingRamSpool<>();
+    genericAddAfterCloseTest(s);
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testRamSpoolCantRemove() throws ServiceException {
+    final Spool<Integer> s = new SortingRamSpool<>();
+    genericCantRemove(s);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testIteratorBeforeClose() throws ServiceException {
+    final Spool<Integer> s = new SortingRamSpool<>();
+    genericIteratorBeforeClose(s);
+  }
+
+  void genericTest(final Spool<Integer> s, final Comparator<Integer> 
comparator)
+      throws ServiceException {
+    final List<Integer> l = new ArrayList<Integer>();
+    final Random r = new Random(42);
+    while (l.size() < 100) {
+      l.add(r.nextInt(75));
+    }
+    final Accumulator<Integer> a = s.accumulator();
+    for (int i = 0; i < 100; i++) {
+      a.add(l.get(i));
+    }
+    a.close();
+    final List<Integer> m = new ArrayList<Integer>();
+    for (final int i : s) {
+      m.add(i);
+    }
+    final Integer[] sorted = l.toArray(new Integer[0]);
+    Arrays.sort(sorted, 0, sorted.length, comparator);
+    final Integer[] shouldBeSorted = m.toArray(new Integer[0]);
+    Assert.assertArrayEquals(sorted, shouldBeSorted);
+  }
+
+  void genericAddAfterCloseTest(final Spool<?> s) throws ServiceException {
+    final Accumulator<?> a = s.accumulator();
+    a.close();
+    a.add(null);
+  }
+
+  void genericCantRemove(final Spool<Integer> s) throws ServiceException {
+    final Accumulator<Integer> a = s.accumulator();
+    a.add(10);
+    a.close();
+    final Iterator<?> it = s.iterator();
+    it.remove();
+  }
+
+  void genericIteratorBeforeClose(final Spool<Integer> s) throws 
ServiceException {
+    final Accumulator<Integer> a = s.accumulator();
+    a.add(10);
+    s.iterator();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SpoolFileTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SpoolFileTest.java 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SpoolFileTest.java
new file mode 100644
index 0000000..61bf349
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/SpoolFileTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.io.Accumulable;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Spool;
+import org.apache.reef.io.serialization.Codec;
+import org.apache.reef.io.serialization.Deserializer;
+import org.apache.reef.io.serialization.Serializer;
+import org.apache.reef.io.storage.local.CodecFileAccumulable;
+import org.apache.reef.io.storage.local.CodecFileIterable;
+import org.apache.reef.io.storage.local.LocalStorageService;
+import org.apache.reef.io.storage.local.SerializerFileSpool;
+import org.apache.reef.io.storage.ram.RamSpool;
+import org.apache.reef.io.storage.ram.RamStorageService;
+import org.apache.reef.io.storage.util.IntegerCodec;
+import org.apache.reef.tang.ConfigurationBuilder;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+
+public class SpoolFileTest {
+  private final Serializer<Integer, OutputStream> serializer = new 
Serializer<Integer, OutputStream>() {
+    @Override
+    public Accumulable<Integer> create(final OutputStream out) {
+      return new Accumulable<Integer>() {
+
+        @Override
+        public Accumulator<Integer> accumulator() {
+          return new Accumulator<Integer>() {
+
+            @Override
+            public void add(final Integer datum) {
+              try {
+                final int d = datum;
+                out.write(new byte[]{(byte) (d >>> 24), (byte) (d >>> 16),
+                    (byte) (d >>> 8), (byte) d});
+              } catch (final IOException e) {
+                throw new IllegalStateException(e);
+              }
+            }
+
+            @Override
+            public void close() {
+              try {
+                out.flush();
+              } catch (final IOException e) {
+                throw new IllegalStateException(e);
+              }
+            }
+          };
+        }
+      };
+    }
+  };
+  private final Deserializer<Integer, InputStream> deserializer = new 
Deserializer<Integer, InputStream>() {
+    @Override
+    public Iterable<Integer> create(final InputStream in) {
+      return new Iterable<Integer>() {
+        @Override
+        public Iterator<Integer> iterator() {
+          final Iterator<Integer> it = new Iterator<Integer>() {
+            private final byte[] inb = new byte[4];
+            private Integer nextInt;
+
+            @Override
+            public boolean hasNext() {
+              return nextInt != null;
+            }
+
+            private void prime() {
+              final int read;
+              try {
+                read = in.read(inb);
+              } catch (final IOException e) {
+                throw new IllegalStateException(e);
+              }
+              if (read != 4) {
+                nextInt = null;
+              } else {
+                nextInt = ((inb[0] & 0xFF) << 24) + ((inb[1] & 0xFF) << 16)
+                    + ((inb[2] & 0xFF) << 8) + (inb[3] & 0xFF);
+              }
+
+            }
+
+            @Override
+            public Integer next() {
+              final Integer ret = nextInt;
+              prime();
+              return ret;
+            }
+
+            @Override
+            public void remove() {
+              throw new UnsupportedOperationException();
+            }
+          };
+          it.next(); // calls prime
+          return it;
+        }
+      };
+    }
+  };
+
+  @Test
+  public void testRam() throws BindException, InjectionException, 
ServiceException, IOException {
+    final Tang t = Tang.Factory.getTang();
+    final ConfigurationBuilder configurationBuilderOne = 
t.newConfigurationBuilder(RamConf.CONF.build());
+
+    final AvroConfigurationSerializer avroSerializer = new 
AvroConfigurationSerializer();
+    final String serializedConfiguration = 
avroSerializer.toString(configurationBuilderOne.build());
+    final ConfigurationBuilder configurationBuilderTwo =
+        
t.newConfigurationBuilder(avroSerializer.fromString(serializedConfiguration));
+
+    @SuppressWarnings("unchecked")
+    final Spool<Integer> f = (Spool<Integer>) 
t.newInjector(configurationBuilderTwo.build()).getInstance(
+        Spool.class);
+    test(f);
+  }
+
+  @Test
+  public void testFile() throws ServiceException {
+    final LocalStorageService service = new LocalStorageService("spoolTest", 
"file");
+    final Spool<Integer> f = new SerializerFileSpool<Integer>(service, 
serializer,
+        deserializer);
+    test(f);
+    service.getScratchSpace().delete();
+  }
+
+  @Test
+  public void testInterop() throws ServiceException {
+    final LocalStorageService service = new LocalStorageService("spoolTest", 
"file");
+    final Codec<Integer> c = new IntegerCodec();
+
+
+    final CodecFileAccumulable<Integer, Codec<Integer>> f = new 
CodecFileAccumulable<Integer, Codec<Integer>>(
+        service, c);
+    final CodecFileIterable<Integer, Codec<Integer>> g = new 
CodecFileIterable<Integer, Codec<Integer>>(
+        new File(f.getName()), c);
+    test(f, g);
+    service.getScratchSpace().delete();
+  }
+
+  protected void test(final Spool<Integer> f) throws ServiceException {
+    test(f, f);
+  }
+
+  protected void test(final Accumulable<Integer> f, final Iterable<Integer> g) 
throws ServiceException {
+
+    try (Accumulator<Integer> acc = f.accumulator()) {
+      for (int i = 0; i < 1000; i++) {
+        acc.add(i);
+      }
+    }
+    int i = 0;
+    for (final int j : g) {
+      Assert.assertEquals(i, j);
+      i++;
+    }
+    final Iterator<Integer> itA = g.iterator();
+    final Iterator<Integer> itB = g.iterator();
+
+    for (i = 0; i < 1000; i++) {
+      Assert.assertEquals((int) itA.next(), i);
+      Assert.assertEquals((int) itB.next(), i);
+    }
+    Assert.assertFalse(itA.hasNext());
+    Assert.assertFalse(itB.hasNext());
+  }
+
+  public static final class RamConf extends ConfigurationModuleBuilder {
+    public static final ConfigurationModule CONF = new RamConf()
+        .bindImplementation(RamStorageService.class, RamStorageService.class)
+        .bindImplementation(Spool.class, RamSpool.class)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/TupleSerializerTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/TupleSerializerTest.java
new file mode 100644
index 0000000..07cea31
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/TupleSerializerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.storage;
+
+import org.apache.reef.exception.evaluator.ServiceException;
+import org.apache.reef.io.Accumulator;
+import org.apache.reef.io.Tuple;
+import org.apache.reef.io.serialization.Deserializer;
+import org.apache.reef.io.serialization.Serializer;
+import org.apache.reef.io.storage.util.IntegerDeserializer;
+import org.apache.reef.io.storage.util.IntegerSerializer;
+import org.apache.reef.io.storage.util.StringDeserializer;
+import org.apache.reef.io.storage.util.StringSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+public class TupleSerializerTest {
+
+  private Serializer<Integer, OutputStream> keySerializer;
+  private Serializer<String, OutputStream> valSerializer;
+  private Deserializer<Integer, InputStream> keyDeserializer;
+  private Deserializer<String, InputStream> valDeserializer;
+  private FramingTupleSerializer<Integer, String> fts;
+  private ByteArrayOutputStream baos;
+  private FramingTupleDeserializer<Integer, String> ftd;
+  private Iterable<Tuple<Integer, String>> iterable;
+
+  @Before
+  public void setup() throws ServiceException {
+
+    keySerializer = new IntegerSerializer();
+    valSerializer = new StringSerializer();
+    keyDeserializer = new IntegerDeserializer();
+    valDeserializer = new StringDeserializer();
+
+    fts = new FramingTupleSerializer<Integer, String>(
+        keySerializer, valSerializer);
+
+    baos = new ByteArrayOutputStream();
+    final Accumulator<Tuple<Integer, String>> acc = 
fts.create(baos).accumulator();
+    for (int i = 0; i < 100; i++) {
+      acc.add(new Tuple<>(i, i + ""));
+    }
+    acc.close();
+
+    ftd = new FramingTupleDeserializer<Integer, String>(
+        keyDeserializer, valDeserializer);
+    iterable = ftd.create(new ByteArrayInputStream(baos.toByteArray()));
+  }
+
+  @Test
+  public void testFramingSerializer() throws ServiceException, IOException {
+    int i = 0;
+    for (final Tuple<Integer, String> t : iterable) {
+      final Tuple<Integer, String> u = new Tuple<>(i, i + "");
+      Assert.assertEquals(u, t);
+      i++;
+    }
+    Assert.assertEquals(100, i);
+  }
+
+  @Test(expected = NoSuchElementException.class)
+  public void testReadOffEnd() {
+    final Iterator<Tuple<Integer, String>> it = iterable.iterator();
+    try {
+      while (it.hasNext()) {
+        it.next();
+        it.hasNext();
+      }
+    } catch (final NoSuchElementException e) {
+      throw new IllegalStateException("Errored out too early!", e);
+    }
+    it.next();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testCantRemove() {
+    final Iterator<Tuple<Integer, String>> it = iterable.iterator();
+    it.next();
+    it.remove();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/package-info.java 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/package-info.java
new file mode 100644
index 0000000..b5207f3
--- /dev/null
+++ 
b/lang/java/reef-io/src/test/java/org/apache/reef/io/storage/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * TODO: Document.
+ */
+package org.apache.reef.io.storage;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java
deleted file mode 100644
index 76ccbc9..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network;
-
-import org.apache.reef.exception.evaluator.NetworkException;
-import org.apache.reef.io.network.Connection;
-import org.apache.reef.io.network.util.StringIdentifierFactory;
-import org.apache.reef.services.network.util.*;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.remote.Codec;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
-import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.util.concurrent.*;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-// TODO[JIRA REEF-637] Remove the deprecated class.
-/**
- * Test for deprecated methods, which are deprecated in 0.13, of 
NetworkConnectionService.
- */
-@Deprecated
-public final class DeprecatedNetworkConnectionServiceTest {
-  private static final Logger LOG = 
Logger.getLogger(DeprecatedNetworkConnectionServiceTest.class.getName());
-
-  private final LocalAddressProvider localAddressProvider;
-  private final String localAddress;
-  private final Identifier groupCommClientId;
-  private final Identifier shuffleClientId;
-
-  public DeprecatedNetworkConnectionServiceTest() throws InjectionException {
-    localAddressProvider = LocalAddressProviderFactory.getInstance();
-    localAddress = localAddressProvider.getLocalAddress();
-
-    final IdentifierFactory idFac = new StringIdentifierFactory();
-    this.groupCommClientId = idFac.getNewInstance("groupComm");
-    this.shuffleClientId = idFac.getNewInstance("shuffle");
-  }
-
-  @Rule
-  public TestName name = new TestName();
-
-  private void runMessagingNetworkConnectionService(final Codec<String> codec) 
throws Exception {
-    final int numMessages = 2000;
-    final Monitor monitor = new Monitor();
-    try (final DeprecatedNetworkMessagingTestService messagingTestService
-             = new DeprecatedNetworkMessagingTestService(localAddress)) {
-      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-
-      try (final Connection<String> conn = 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-        try {
-          conn.open();
-          for (int count = 0; count < numMessages; ++count) {
-            // send messages to the receiver.
-            conn.write("hello" + count);
-          }
-          monitor.mwait();
-        } catch (final NetworkException e) {
-          e.printStackTrace();
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
-
-  /**
-   * NetworkConnectionService messaging test.
-   */
-  @Test
-  public void testMessagingNetworkConnectionService() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    runMessagingNetworkConnectionService(new StringCodec());
-  }
-
-  /**
-   * NetworkConnectionService streaming messaging test.
-   */
-  @Test
-  public void testStreamingMessagingNetworkConnectionService() throws 
Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    runMessagingNetworkConnectionService(new StreamingStringCodec());
-  }
-
-  public void runNetworkConnServiceWithMultipleConnFactories(final 
Codec<String> stringCodec,
-                                                             final 
Codec<Integer> integerCodec)
-      throws Exception {
-    final ExecutorService executor = Executors.newFixedThreadPool(5);
-
-    final int groupcommMessages = 1000;
-    final Monitor monitor = new Monitor();
-    try (final DeprecatedNetworkMessagingTestService messagingTestService
-             = new DeprecatedNetworkMessagingTestService(localAddress)) {
-
-      messagingTestService.registerTestConnectionFactory(groupCommClientId, 
groupcommMessages, monitor, stringCodec);
-
-      final int shuffleMessages = 2000;
-      final Monitor monitor2 = new Monitor();
-      messagingTestService.registerTestConnectionFactory(shuffleClientId, 
shuffleMessages, monitor2, integerCodec);
-
-      executor.submit(new Runnable() {
-        @Override
-        public void run() {
-          try (final Connection<String> conn =
-                   
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-            conn.open();
-            for (int count = 0; count < groupcommMessages; ++count) {
-              // send messages to the receiver.
-              conn.write("hello" + count);
-            }
-            monitor.mwait();
-          } catch (final Exception e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-        }
-      });
-
-      executor.submit(new Runnable() {
-        @Override
-        public void run() {
-          try (final Connection<Integer> conn =
-                   
messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) {
-            conn.open();
-            for (int count = 0; count < shuffleMessages; ++count) {
-              // send messages to the receiver.
-              conn.write(count);
-            }
-            monitor2.mwait();
-          } catch (final Exception e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-        }
-      });
-
-      monitor.mwait();
-      monitor2.mwait();
-      executor.shutdown();
-    }
-  }
-
-  /**
-   * Test NetworkService registering multiple connection factories.
-   */
-  @Test
-  public void testMultipleConnectionFactoriesTest() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new 
ObjectSerializableCodec<Integer>());
-  }
-
-  /**
-   * Test NetworkService registering multiple connection factories with 
Streamingcodec.
-   */
-  @Test
-  public void testMultipleConnectionFactoriesStreamingTest() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), 
new StreamingIntegerCodec());
-  }
-
-  /**
-   * NetworkService messaging rate benchmark.
-   */
-  @Test
-  public void testMessagingNetworkConnServiceRate() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024};
-
-    for (final int size : messageSizes) {
-      final int numMessages = 300000 / (Math.max(1, size / 512));
-      final Monitor monitor = new Monitor();
-      final Codec<String> codec = new StringCodec();
-      try (final DeprecatedNetworkMessagingTestService messagingTestService
-               = new DeprecatedNetworkMessagingTestService(localAddress)) {
-        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-
-        // build the message
-        final StringBuilder msb = new StringBuilder();
-        for (int i = 0; i < size; i++) {
-          msb.append("1");
-        }
-        final String message = msb.toString();
-
-        try (final Connection<String> conn =
-                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-
-          final long start = System.currentTimeMillis();
-          try {
-            conn.open();
-            for (int count = 0; count < numMessages; ++count) {
-              // send messages to the receiver.
-              conn.write(message);
-            }
-            monitor.mwait();
-          } catch (final NetworkException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-          final long end = System.currentTimeMillis();
-
-          final double runtime = ((double) end - start) / 1000;
-          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages 
/ runtime +
-              " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / 
runtime); // x2 for unicode chars
-        }
-      }
-    }
-  }
-
-  /**
-   * NetworkService messaging rate benchmark.
-   */
-  @Test
-  public void testMessagingNetworkConnServiceRateDisjoint() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>();
-
-    final int numThreads = 4;
-    final int size = 2000;
-    final int numMessages = 300000 / (Math.max(1, size / 512));
-    final int totalNumMessages = numMessages * numThreads;
-
-    final ExecutorService e = Executors.newCachedThreadPool();
-    for (int t = 0; t < numThreads; t++) {
-      final int tt = t;
-
-      e.submit(new Runnable() {
-        public void run() {
-          try (final DeprecatedNetworkMessagingTestService messagingTestService
-                   = new DeprecatedNetworkMessagingTestService(localAddress)) {
-            final Monitor monitor = new Monitor();
-            final Codec<String> codec = new StringCodec();
-
-            
messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-            try (final Connection<String> conn =
-                     
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-
-              // build the message
-              final StringBuilder msb = new StringBuilder();
-              for (int i = 0; i < size; i++) {
-                msb.append("1");
-              }
-              final String message = msb.toString();
-
-              try {
-                conn.open();
-                for (int count = 0; count < numMessages; ++count) {
-                  // send messages to the receiver.
-                  conn.write(message);
-                }
-                monitor.mwait();
-              } catch (final NetworkException e) {
-                e.printStackTrace();
-                throw new RuntimeException(e);
-              }
-            }
-          } catch (final Exception e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
-    }
-
-    // start and time
-    final long start = System.currentTimeMillis();
-    final Object ignore = new Object();
-    for (int i = 0; i < numThreads; i++) {
-      barrier.add(ignore);
-    }
-    e.shutdown();
-    e.awaitTermination(100, TimeUnit.SECONDS);
-    final long end = System.currentTimeMillis();
-    final double runtime = ((double) end - start) / 1000;
-    LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages 
/ runtime +
-        " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / 
runtime); // x2 for unicode chars
-  }
-
-  @Test
-  public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() 
throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-    final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024};
-
-    for (final int size : messageSizes) {
-      final int numMessages = 300000 / (Math.max(1, size / 512));
-      final int numThreads = 2;
-      final int totalNumMessages = numMessages * numThreads;
-      final Monitor monitor = new Monitor();
-      final Codec<String> codec = new StringCodec();
-      try (final DeprecatedNetworkMessagingTestService messagingTestService
-               = new DeprecatedNetworkMessagingTestService(localAddress)) {
-        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
totalNumMessages, monitor, codec);
-
-        final ExecutorService e = Executors.newCachedThreadPool();
-
-        // build the message
-        final StringBuilder msb = new StringBuilder();
-        for (int i = 0; i < size; i++) {
-          msb.append("1");
-        }
-        final String message = msb.toString();
-        try (final Connection<String> conn =
-                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-
-          final long start = System.currentTimeMillis();
-          for (int i = 0; i < numThreads; i++) {
-            e.submit(new Runnable() {
-              @Override
-              public void run() {
-
-                try {
-                  conn.open();
-                  for (int count = 0; count < numMessages; ++count) {
-                    // send messages to the receiver.
-                    conn.write(message);
-                  }
-                } catch (final Exception e) {
-                  throw new RuntimeException(e);
-                }
-              }
-            });
-          }
-
-          e.shutdown();
-          e.awaitTermination(30, TimeUnit.SECONDS);
-          monitor.mwait();
-          final long end = System.currentTimeMillis();
-          final double runtime = ((double) end - start) / 1000;
-          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
totalNumMessages / runtime +
-              " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) 
/ runtime); // x2 for unicode chars
-        }
-      }
-    }
-  }
-
-  /**
-   * NetworkService messaging rate benchmark.
-   */
-  @Test
-  public void testMessagingNetworkConnServiceBatchingRate() throws Exception {
-    LOG.log(Level.FINEST, name.getMethodName());
-
-    final int batchSize = 1024 * 1024;
-    final int[] messageSizes = {32, 64, 512};
-
-    for (final int size : messageSizes) {
-      final int numMessages = 300 / (Math.max(1, size / 512));
-      final Monitor monitor = new Monitor();
-      final Codec<String> codec = new StringCodec();
-      try (final DeprecatedNetworkMessagingTestService messagingTestService
-               = new DeprecatedNetworkMessagingTestService(localAddress)) {
-        messagingTestService.registerTestConnectionFactory(groupCommClientId, 
numMessages, monitor, codec);
-        try (final Connection<String> conn =
-                 
messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) {
-
-          // build the message
-          final StringBuilder msb = new StringBuilder();
-          for (int i = 0; i < size; i++) {
-            msb.append("1");
-          }
-          final String message = msb.toString();
-
-          final long start = System.currentTimeMillis();
-          try {
-            for (int i = 0; i < numMessages; i++) {
-              final StringBuilder sb = new StringBuilder();
-              for (int j = 0; j < batchSize / size; j++) {
-                sb.append(message);
-              }
-              conn.open();
-              conn.write(sb.toString());
-            }
-            monitor.mwait();
-          } catch (final NetworkException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-
-          final long end = System.currentTimeMillis();
-          final double runtime = ((double) end - start) / 1000;
-          final long numAppMessages = numMessages * batchSize / size;
-          LOG.log(Level.INFO, "size: " + size + "; messages/s: " + 
numAppMessages / runtime +
-              " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / 
runtime); // x2 for unicode chars
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java
deleted file mode 100644
index 47c8700..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/LocalNameResolverTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network;
-
-import org.apache.reef.io.network.naming.LocalNameResolverConfiguration;
-import org.apache.reef.io.network.naming.NameResolver;
-import org.apache.reef.io.network.naming.exception.NamingException;
-import org.apache.reef.io.network.util.StringIdentifierFactory;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutionException;
-
-public class LocalNameResolverTest {
-
-  private final LocalAddressProvider localAddressProvider;
-
-  public LocalNameResolverTest() throws InjectionException {
-    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
-  }
-
-  /**
-   * Test method for {@link 
org.apache.reef.io.network.naming.LocalNameResolverImpl#close()}.
-   *
-   * @throws Exception
-   */
-  @Test
-  public final void testClose() throws Exception {
-    final String localAddress = localAddressProvider.getLocalAddress();
-    final IdentifierFactory factory = new StringIdentifierFactory();
-    try (final NameResolver resolver = 
Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF
-        .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 10000)
-        .build()).getInstance(NameResolver.class)) {
-      final Identifier id = factory.getNewInstance("Task1");
-      resolver.register(id, new InetSocketAddress(localAddress, 7001));
-      resolver.unregister(id);
-      Thread.sleep(100);
-    }
-  }
-
-  /**
-   * Test method for {@link 
org.apache.reef.io.network.naming.LocalNameResolverImpl#lookup(Identifier id)}.
-   * To check caching behavior with expireAfterAccess & expireAfterWrite
-   * Changing NameCache's pattern to expireAfterAccess causes this test to fail
-   *
-   * @throws Exception
-   */
-  @Test
-  public final void testLookup() throws Exception {
-    final IdentifierFactory factory = new StringIdentifierFactory();
-    final String localAddress = localAddressProvider.getLocalAddress();
-    try (final NameResolver resolver = 
Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF
-        .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 150)
-        .build()).getInstance(NameResolver.class)) {
-      final Identifier id = factory.getNewInstance("Task1");
-      final InetSocketAddress socketAddr = new InetSocketAddress(localAddress, 
7001);
-      resolver.register(id, socketAddr);
-      InetSocketAddress lookupAddr = resolver.lookup(id); // caches the entry
-      Assert.assertTrue(socketAddr.equals(lookupAddr));
-      resolver.unregister(id);
-      Thread.sleep(100);
-      try {
-        lookupAddr = resolver.lookup(id);
-        Thread.sleep(100);
-        //With expireAfterAccess, the previous lookup would reset expiry to 
150ms
-        //more and 100ms wait will not expire the item and will return the 
cached value
-        //With expireAfterWrite, the extra wait of 100 ms will expire the item
-        //resulting in NamingException and the test passes
-        lookupAddr = resolver.lookup(id);
-        Assert.assertNull("resolver.lookup(id)", lookupAddr);
-      } catch (final Exception e) {
-        if (e instanceof ExecutionException) {
-          Assert.assertTrue("Execution Exception cause is instanceof 
NamingException",
-              e.getCause() instanceof NamingException);
-        } else {
-          throw e;
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
 
b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
deleted file mode 100644
index b66f3da..0000000
--- 
a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.services.network;
-
-import org.apache.reef.io.network.naming.*;
-import org.apache.reef.io.network.naming.exception.NamingException;
-import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount;
-import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout;
-import org.apache.reef.io.network.util.StringIdentifierFactory;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.Identifier;
-import org.apache.reef.wake.IdentifierFactory;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
-import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutionException;
-
-public class NameClientTest {
-
-  private final LocalAddressProvider localAddressProvider;
-
-  public NameClientTest() throws InjectionException {
-    this.localAddressProvider = LocalAddressProviderFactory.getInstance();
-  }
-
-  private static final int RETRY_COUNT, RETRY_TIMEOUT;
-
-  static {
-    final Tang tang = Tang.Factory.getTang();
-    try {
-      RETRY_COUNT = 
tang.newInjector().getNamedInstance(NameResolverRetryCount.class);
-      RETRY_TIMEOUT = 
tang.newInjector().getNamedInstance(NameResolverRetryTimeout.class);
-    } catch (final InjectionException e1) {
-      throw new RuntimeException("Exception while trying to find default 
values for retryCount & Timeout", e1);
-    }
-  }
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-  }
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-  }
-
-  /**
-   * Test method for {@link 
org.apache.reef.io.network.naming.NameClient#close()}.
-   *
-   * @throws Exception
-   */
-  @Test
-  public final void testClose() throws Exception {
-    final String localAddress = localAddressProvider.getLocalAddress();
-    final IdentifierFactory factory = new StringIdentifierFactory();
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
-    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-
-    try (final NameServer server = injector.getInstance(NameServer.class)) {
-      final int serverPort = server.getPort();
-      final Configuration nameResolverConf = NameResolverConfiguration.CONF
-          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
-          .set(NameResolverConfiguration.NAME_SERVICE_PORT, serverPort)
-          .set(NameResolverConfiguration.CACHE_TIMEOUT, 10000)
-          .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT)
-          .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT)
-          .build();
-
-      try (final NameResolver client =
-               
Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class))
 {
-        final Identifier id = factory.getNewInstance("Task1");
-        client.register(id, new InetSocketAddress(localAddress, 7001));
-        client.unregister(id);
-        Thread.sleep(100);
-      }
-    }
-  }
-
-  /**
-   * Test method for {@link 
org.apache.reef.io.network.naming.NameClient#lookup()}.
-   * To check caching behavior with expireAfterAccess & expireAfterWrite
-   * Changing NameCache's pattern to expireAfterAccess causes this test to fail
-   *
-   * @throws Exception
-   */
-  @Test
-  public final void testLookup() throws Exception {
-    final String localAddress = localAddressProvider.getLocalAddress();
-    final IdentifierFactory factory = new StringIdentifierFactory();
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    
injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class,
 factory);
-    injector.bindVolatileInstance(LocalAddressProvider.class, 
this.localAddressProvider);
-
-    try (final NameServer server = injector.getInstance(NameServer.class)) {
-      final int serverPort = server.getPort();
-      final Configuration nameResolverConf = NameResolverConfiguration.CONF
-          .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress)
-          .set(NameResolverConfiguration.NAME_SERVICE_PORT, serverPort)
-          .set(NameResolverConfiguration.CACHE_TIMEOUT, 150)
-          .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT)
-          .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT)
-          .build();
-
-      try (final NameResolver client =
-               
Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class))
 {
-        final Identifier id = factory.getNewInstance("Task1");
-        client.register(id, new InetSocketAddress(localAddress, 7001));
-        client.lookup(id); // caches the entry
-        client.unregister(id);
-        Thread.sleep(100);
-        try {
-          InetSocketAddress addr = client.lookup(id);
-          Thread.sleep(100);
-          //With expireAfterAccess, the previous lookup would reset expiry to 
150ms
-          //more and 100ms wait will not expire the item and will return the 
cached value
-          //With expireAfterWrite, the extra wait of 100 ms will expire the 
item
-          //resulting in NamingException and the test passes
-          addr = client.lookup(id);
-          Assert.assertNull("client.lookup(id)", addr);
-        } catch (final Exception e) {
-          if (e instanceof ExecutionException) {
-            Assert.assertTrue("Execution Exception cause is instanceof 
NamingException",
-                e.getCause() instanceof NamingException);
-          } else {
-            throw e;
-          }
-        }
-      }
-    }
-  }
-
-}

Reply via email to