http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java deleted file mode 100644 index d550dae..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java +++ /dev/null @@ -1,402 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network; - -import org.apache.reef.io.naming.NameAssignment; -import org.apache.reef.io.network.naming.*; -import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount; -import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout; -import org.apache.reef.io.network.util.StringIdentifierFactory; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.wake.Identifier; -import org.apache.reef.wake.IdentifierFactory; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.*; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Naming server and client test. - */ -public class NamingTest { - - private static final Logger LOG = Logger.getLogger(NamingTest.class.getName()); - private static final int RETRY_COUNT; - private static final int RETRY_TIMEOUT; - - static { - try { - final Injector injector = Tang.Factory.getTang().newInjector(); - RETRY_COUNT = injector.getNamedInstance(NameResolverRetryCount.class); - RETRY_TIMEOUT = injector.getNamedInstance(NameResolverRetryTimeout.class); - } catch (final InjectionException ex) { - final String msg = "Exception while trying to find default values for retryCount & Timeout"; - LOG.log(Level.SEVERE, msg, ex); - throw new RuntimeException(msg, ex); - } - } - - private final LocalAddressProvider localAddressProvider; - @Rule - public final TestName name = new TestName(); - static final long TTL = 30000; - private final IdentifierFactory factory = new StringIdentifierFactory(); - private int port; - - public NamingTest() throws InjectionException { - this.localAddressProvider = LocalAddressProviderFactory.getInstance(); - } - - /** - * NameServer and NameLookupClient test. - * - * @throws Exception - */ - @Test - public void testNamingLookup() throws Exception { - - final String localAddress = localAddressProvider.getLocalAddress(); - LOG.log(Level.FINEST, this.name.getMethodName()); - - // names - final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>(); - idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(localAddress, 7001)); - idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(localAddress, 7002)); - - // run a server - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - try (final NameServer server = injector.getInstance(NameServer.class)) { - this.port = server.getPort(); - for (final Identifier id : idToAddrMap.keySet()) { - server.register(id, idToAddrMap.get(id)); - } - - // run a client - try (final NameLookupClient client = new NameLookupClient(localAddress, this.port, - 10000, this.factory, RETRY_COUNT, RETRY_TIMEOUT, new NameCache(this.TTL), this.localAddressProvider)) { - - final Identifier id1 = this.factory.getNewInstance("task1"); - final Identifier id2 = this.factory.getNewInstance("task2"); - - final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>(); - final InetSocketAddress addr1 = client.lookup(id1); - respMap.put(id1, addr1); - final InetSocketAddress addr2 = client.lookup(id2); - respMap.put(id2, addr2); - - for (final Identifier id : respMap.keySet()) { - LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)}); - } - - Assert.assertTrue(isEqual(idToAddrMap, respMap)); - } - } - } - - /** - * Test concurrent lookups (threads share a client). - * - * @throws Exception - */ - @Test - public void testConcurrentNamingLookup() throws Exception { - - LOG.log(Level.FINEST, this.name.getMethodName()); - - final String localAddress = localAddressProvider.getLocalAddress(); - // test it 3 times to make failure likely - for (int i = 0; i < 3; i++) { - - LOG.log(Level.FINEST, "test {0}", i); - - // names - final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>(); - idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(localAddress, 7001)); - idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(localAddress, 7002)); - idToAddrMap.put(this.factory.getNewInstance("task3"), new InetSocketAddress(localAddress, 7003)); - - // run a server - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - try (final NameServer server = injector.getInstance(NameServer.class)) { - this.port = server.getPort(); - for (final Identifier id : idToAddrMap.keySet()) { - server.register(id, idToAddrMap.get(id)); - } - - // run a client - try (final NameLookupClient client = new NameLookupClient(localAddress, this.port, - 10000, this.factory, RETRY_COUNT, RETRY_TIMEOUT, new NameCache(this.TTL), this.localAddressProvider)) { - - final Identifier id1 = this.factory.getNewInstance("task1"); - final Identifier id2 = this.factory.getNewInstance("task2"); - final Identifier id3 = this.factory.getNewInstance("task3"); - - final ExecutorService e = Executors.newCachedThreadPool(); - - final ConcurrentMap<Identifier, InetSocketAddress> respMap = - new ConcurrentHashMap<Identifier, InetSocketAddress>(); - - final Future<?> f1 = e.submit(new Runnable() { - @Override - public void run() { - InetSocketAddress addr = null; - try { - addr = client.lookup(id1); - } catch (final Exception e) { - LOG.log(Level.SEVERE, "Lookup failed", e); - Assert.fail(e.toString()); - } - respMap.put(id1, addr); - } - }); - final Future<?> f2 = e.submit(new Runnable() { - @Override - public void run() { - InetSocketAddress addr = null; - try { - addr = client.lookup(id2); - } catch (final Exception e) { - LOG.log(Level.SEVERE, "Lookup failed", e); - Assert.fail(e.toString()); - } - respMap.put(id2, addr); - } - }); - final Future<?> f3 = e.submit(new Runnable() { - @Override - public void run() { - InetSocketAddress addr = null; - try { - addr = client.lookup(id3); - } catch (final Exception e) { - LOG.log(Level.SEVERE, "Lookup failed", e); - Assert.fail(e.toString()); - } - respMap.put(id3, addr); - } - }); - - f1.get(); - f2.get(); - f3.get(); - - for (final Identifier id : respMap.keySet()) { - LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)}); - } - - Assert.assertTrue(isEqual(idToAddrMap, respMap)); - } - } - } - } - - /** - * NameServer and NameRegistryClient test. - * - * @throws Exception - */ - @Test - public void testNamingRegistry() throws Exception { - - LOG.log(Level.FINEST, this.name.getMethodName()); - - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - try (final NameServer server = injector.getInstance(NameServer.class)) { - this.port = server.getPort(); - final String localAddress = localAddressProvider.getLocalAddress(); - - // names to start with - final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>(); - idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(localAddress, 7001)); - idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(localAddress, 7002)); - - // registration - // invoke registration from the client side - try (final NameRegistryClient client = - new NameRegistryClient(localAddress, this.port, this.factory, this.localAddressProvider)) { - for (final Identifier id : idToAddrMap.keySet()) { - client.register(id, idToAddrMap.get(id)); - } - - // wait - final Set<Identifier> ids = idToAddrMap.keySet(); - busyWait(server, ids.size(), ids); - - // check the server side - Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>(); - Iterable<NameAssignment> nas = server.lookup(ids); - - for (final NameAssignment na : nas) { - LOG.log(Level.FINEST, "Mapping: {0} -> {1}", - new Object[]{na.getIdentifier(), na.getAddress()}); - serverMap.put(na.getIdentifier(), na.getAddress()); - } - - Assert.assertTrue(isEqual(idToAddrMap, serverMap)); - - // un-registration - for (final Identifier id : idToAddrMap.keySet()) { - client.unregister(id); - } - - // wait - busyWait(server, 0, ids); - - serverMap = new HashMap<Identifier, InetSocketAddress>(); - nas = server.lookup(ids); - for (final NameAssignment na : nas) { - serverMap.put(na.getIdentifier(), na.getAddress()); - } - - Assert.assertEquals(0, serverMap.size()); - } - } - } - - /** - * NameServer and NameClient test. - * - * @throws Exception - */ - @Test - public void testNameClient() throws Exception { - - LOG.log(Level.FINEST, this.name.getMethodName()); - - final String localAddress = localAddressProvider.getLocalAddress(); - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - try (final NameServer server = injector.getInstance(NameServer.class)) { - this.port = server.getPort(); - - final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>(); - idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(localAddress, 7001)); - idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(localAddress, 7002)); - - // registration - // invoke registration from the client side - final Configuration nameResolverConf = NameResolverConfiguration.CONF - .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) - .set(NameResolverConfiguration.NAME_SERVICE_PORT, this.port) - .set(NameResolverConfiguration.CACHE_TIMEOUT, this.TTL) - .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT) - .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT) - .build(); - - try (final NameResolver client - = Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class)) { - for (final Identifier id : idToAddrMap.keySet()) { - client.register(id, idToAddrMap.get(id)); - } - - // wait - final Set<Identifier> ids = idToAddrMap.keySet(); - busyWait(server, ids.size(), ids); - - // lookup - final Identifier id1 = this.factory.getNewInstance("task1"); - final Identifier id2 = this.factory.getNewInstance("task2"); - - final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>(); - InetSocketAddress addr1 = client.lookup(id1); - respMap.put(id1, addr1); - InetSocketAddress addr2 = client.lookup(id2); - respMap.put(id2, addr2); - - for (final Identifier id : respMap.keySet()) { - LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)}); - } - - Assert.assertTrue(isEqual(idToAddrMap, respMap)); - - // un-registration - for (final Identifier id : idToAddrMap.keySet()) { - client.unregister(id); - } - - // wait - busyWait(server, 0, ids); - - final Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>(); - addr1 = server.lookup(id1); - if (addr1 != null) { - serverMap.put(id1, addr1); - } - addr2 = server.lookup(id1); - if (addr2 != null) { - serverMap.put(id2, addr2); - } - - Assert.assertEquals(0, serverMap.size()); - } - } - } - - private boolean isEqual(final Map<Identifier, InetSocketAddress> map1, - final Map<Identifier, InetSocketAddress> map2) { - - if (map1.size() != map2.size()) { - return false; - } - - for (final Identifier id : map1.keySet()) { - final InetSocketAddress addr1 = map1.get(id); - final InetSocketAddress addr2 = map2.get(id); - if (!addr1.equals(addr2)) { - return false; - } - } - - return true; - } - - private void busyWait(final NameServer server, final int expected, final Set<Identifier> ids) { - int count = 0; - for (;;) { - final Iterable<NameAssignment> nas = server.lookup(ids); - for (@SuppressWarnings("unused") final NameAssignment na : nas) { - ++count; - } - if (count == expected) { - break; - } - count = 0; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java deleted file mode 100644 index 6bc0ac9..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkConnectionServiceTest.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network; - -import org.apache.reef.exception.evaluator.NetworkException; -import org.apache.reef.io.network.Connection; -import org.apache.reef.io.network.util.StringCodec; -import org.apache.reef.io.network.util.StringIdentifierFactory; -import org.apache.reef.services.network.util.*; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.wake.Identifier; -import org.apache.reef.wake.IdentifierFactory; -import org.apache.reef.wake.remote.Codec; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; -import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -import java.util.concurrent.*; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Default Network connection service test. - */ -public class NetworkConnectionServiceTest { - private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceTest.class.getName()); - - private final LocalAddressProvider localAddressProvider; - private final String localAddress; - private final Identifier groupCommClientId; - private final Identifier shuffleClientId; - - public NetworkConnectionServiceTest() throws InjectionException { - localAddressProvider = LocalAddressProviderFactory.getInstance(); - localAddress = localAddressProvider.getLocalAddress(); - - final IdentifierFactory idFac = new StringIdentifierFactory(); - this.groupCommClientId = idFac.getNewInstance("groupComm"); - this.shuffleClientId = idFac.getNewInstance("shuffle"); - } - - @Rule - public TestName name = new TestName(); - - private void runMessagingNetworkConnectionService(final Codec<String> codec) throws Exception { - final int numMessages = 2000; - final Monitor monitor = new Monitor(); - try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { - messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); - - try (final Connection<String> conn = messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - try { - conn.open(); - for (int count = 0; count < numMessages; ++count) { - // send messages to the receiver. - conn.write("hello" + count); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - } - } - - /** - * NetworkConnectionService messaging test. - */ - @Test - public void testMessagingNetworkConnectionService() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - runMessagingNetworkConnectionService(new StringCodec()); - } - - /** - * NetworkConnectionService streaming messaging test. - */ - @Test - public void testStreamingMessagingNetworkConnectionService() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - runMessagingNetworkConnectionService(new StreamingStringCodec()); - } - - public void runNetworkConnServiceWithMultipleConnFactories(final Codec<String> stringCodec, - final Codec<Integer> integerCodec) - throws Exception { - final ExecutorService executor = Executors.newFixedThreadPool(5); - - final int groupcommMessages = 1000; - final Monitor monitor = new Monitor(); - try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { - - messagingTestService.registerTestConnectionFactory(groupCommClientId, groupcommMessages, monitor, stringCodec); - - final int shuffleMessages = 2000; - final Monitor monitor2 = new Monitor(); - messagingTestService.registerTestConnectionFactory(shuffleClientId, shuffleMessages, monitor2, integerCodec); - - executor.submit(new Runnable() { - @Override - public void run() { - try (final Connection<String> conn = - messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - conn.open(); - for (int count = 0; count < groupcommMessages; ++count) { - // send messages to the receiver. - conn.write("hello" + count); - } - monitor.mwait(); - } catch (final Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - }); - - executor.submit(new Runnable() { - @Override - public void run() { - try (final Connection<Integer> conn = - messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) { - conn.open(); - for (int count = 0; count < shuffleMessages; ++count) { - // send messages to the receiver. - conn.write(count); - } - monitor2.mwait(); - } catch (final Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - }); - - monitor.mwait(); - monitor2.mwait(); - executor.shutdown(); - } - } - - /** - * Test NetworkService registering multiple connection factories. - */ - @Test - public void testMultipleConnectionFactoriesTest() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new ObjectSerializableCodec<Integer>()); - } - - /** - * Test NetworkService registering multiple connection factories with Streamingcodec. - */ - @Test - public void testMultipleConnectionFactoriesStreamingTest() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), new StreamingIntegerCodec()); - } - - /** - * NetworkService messaging rate benchmark. - */ - @Test - public void testMessagingNetworkConnServiceRate() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024}; - - for (final int size : messageSizes) { - final int numMessages = 300000 / (Math.max(1, size / 512)); - final Monitor monitor = new Monitor(); - final Codec<String> codec = new StringCodec(); - try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { - messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - - try (final Connection<String> conn = - messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - - final long start = System.currentTimeMillis(); - try { - conn.open(); - for (int count = 0; count < numMessages; ++count) { - // send messages to the receiver. - conn.write(message); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - final long end = System.currentTimeMillis(); - - final double runtime = ((double) end - start) / 1000; - LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages / runtime + - " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime); // x2 for unicode chars - } - } - } - } - - /** - * NetworkService messaging rate benchmark. - */ - @Test - public void testMessagingNetworkConnServiceRateDisjoint() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>(); - - final int numThreads = 4; - final int size = 2000; - final int numMessages = 300000 / (Math.max(1, size / 512)); - final int totalNumMessages = numMessages * numThreads; - - final ExecutorService e = Executors.newCachedThreadPool(); - for (int t = 0; t < numThreads; t++) { - final int tt = t; - - e.submit(new Runnable() { - public void run() { - try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { - final Monitor monitor = new Monitor(); - final Codec<String> codec = new StringCodec(); - - messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); - try (final Connection<String> conn = - messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - - try { - conn.open(); - for (int count = 0; count < numMessages; ++count) { - // send messages to the receiver. - conn.write(message); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - }); - } - - // start and time - final long start = System.currentTimeMillis(); - final Object ignore = new Object(); - for (int i = 0; i < numThreads; i++) { - barrier.add(ignore); - } - e.shutdown(); - e.awaitTermination(100, TimeUnit.SECONDS); - final long end = System.currentTimeMillis(); - final double runtime = ((double) end - start) / 1000; - LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + - " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars - } - - @Test - public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024}; - - for (final int size : messageSizes) { - final int numMessages = 300000 / (Math.max(1, size / 512)); - final int numThreads = 2; - final int totalNumMessages = numMessages * numThreads; - final Monitor monitor = new Monitor(); - final Codec<String> codec = new StringCodec(); - try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { - messagingTestService.registerTestConnectionFactory(groupCommClientId, totalNumMessages, monitor, codec); - - final ExecutorService e = Executors.newCachedThreadPool(); - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - try (final Connection<String> conn = - messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - - final long start = System.currentTimeMillis(); - for (int i = 0; i < numThreads; i++) { - e.submit(new Runnable() { - @Override - public void run() { - - try { - conn.open(); - for (int count = 0; count < numMessages; ++count) { - // send messages to the receiver. - conn.write(message); - } - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - }); - } - - e.shutdown(); - e.awaitTermination(30, TimeUnit.SECONDS); - monitor.mwait(); - final long end = System.currentTimeMillis(); - final double runtime = ((double) end - start) / 1000; - LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + - " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars - } - } - } - } - - /** - * NetworkService messaging rate benchmark. - */ - @Test - public void testMessagingNetworkConnServiceBatchingRate() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - - final int batchSize = 1024 * 1024; - final int[] messageSizes = {32, 64, 512}; - - for (final int size : messageSizes) { - final int numMessages = 300 / (Math.max(1, size / 512)); - final Monitor monitor = new Monitor(); - final Codec<String> codec = new StringCodec(); - try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { - messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); - try (final Connection<String> conn = - messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - - final long start = System.currentTimeMillis(); - try { - for (int i = 0; i < numMessages; i++) { - final StringBuilder sb = new StringBuilder(); - for (int j = 0; j < batchSize / size; j++) { - sb.append(message); - } - conn.open(); - conn.write(sb.toString()); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - - final long end = System.currentTimeMillis(); - final double runtime = ((double) end - start) / 1000; - final long numAppMessages = numMessages * batchSize / size; - LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numAppMessages / runtime + - " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime); // x2 for unicode chars - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java deleted file mode 100644 index 6045be2..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java +++ /dev/null @@ -1,547 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network; - -import org.apache.reef.exception.evaluator.NetworkException; -import org.apache.reef.io.network.Connection; -import org.apache.reef.io.network.Message; -import org.apache.reef.io.network.impl.NetworkService; -import org.apache.reef.io.network.naming.*; -import org.apache.reef.io.network.util.StringIdentifierFactory; -import org.apache.reef.services.network.util.Monitor; -import org.apache.reef.services.network.util.StringCodec; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.Identifier; -import org.apache.reef.wake.IdentifierFactory; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; -import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -import java.net.InetSocketAddress; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Network service test. - */ -public class NetworkServiceTest { - private static final Logger LOG = Logger.getLogger(NetworkServiceTest.class.getName()); - - private final LocalAddressProvider localAddressProvider; - private final String localAddress; - - public NetworkServiceTest() throws InjectionException { - localAddressProvider = LocalAddressProviderFactory.getInstance(); - localAddress = localAddressProvider.getLocalAddress(); - } - - @Rule - public TestName name = new TestName(); - - /** - * NetworkService messaging test. - */ - @Test - public void testMessagingNetworkService() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - - final IdentifierFactory factory = new StringIdentifierFactory(); - - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - - try (final NameServer server = injector.getInstance(NameServer.class)) { - final int nameServerPort = server.getPort(); - - final int numMessages = 10; - final Monitor monitor = new Monitor(); - - // network service - final String name2 = "task2"; - final String name1 = "task1"; - final Configuration nameResolverConf = - Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF - .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress) - .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort) - .build()) - .build(); - - final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf); - - LOG.log(Level.FINEST, "=== Test network service receiver start"); - LOG.log(Level.FINEST, "=== Test network service sender start"); - try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); - NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), new MessagingTransportFactory(localAddressProvider), - new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider); - final NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), new MessagingTransportFactory(localAddressProvider), - new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { - - ns2.registerId(factory.getNewInstance(name2)); - final int port2 = ns2.getTransport().getListeningPort(); - server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2)); - - ns1.registerId(factory.getNewInstance(name1)); - final int port1 = ns1.getTransport().getListeningPort(); - server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1)); - - final Identifier destId = factory.getNewInstance(name2); - - try (final Connection<String> conn = ns1.newConnection(destId)) { - conn.open(); - for (int count = 0; count < numMessages; ++count) { - conn.write("hello! " + count); - } - monitor.mwait(); - - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - } - } - - /** - * NetworkService messaging rate benchmark. - */ - @Test - public void testMessagingNetworkServiceRate() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - - final IdentifierFactory factory = new StringIdentifierFactory(); - - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - - try (final NameServer server = injector.getInstance(NameServer.class)) { - final int nameServerPort = server.getPort(); - - final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024}; - - for (final int size : messageSizes) { - final int numMessages = 300000 / (Math.max(1, size / 512)); - final Monitor monitor = new Monitor(); - - // network service - final String name2 = "task2"; - final String name1 = "task1"; - final Configuration nameResolverConf = - Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF - .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress) - .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort) - .build()) - .build(); - - final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf); - - LOG.log(Level.FINEST, "=== Test network service receiver start"); - LOG.log(Level.FINEST, "=== Test network service sender start"); - try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); - NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), new MessagingTransportFactory(localAddressProvider), - new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider); - NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), new MessagingTransportFactory(localAddressProvider), - new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { - - ns2.registerId(factory.getNewInstance(name2)); - final int port2 = ns2.getTransport().getListeningPort(); - server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2)); - - ns1.registerId(factory.getNewInstance(name1)); - final int port1 = ns1.getTransport().getListeningPort(); - server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1)); - - final Identifier destId = factory.getNewInstance(name2); - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - - final long start = System.currentTimeMillis(); - try (Connection<String> conn = ns1.newConnection(destId)) { - for (int i = 0; i < numMessages; i++) { - conn.open(); - conn.write(message); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - final long end = System.currentTimeMillis(); - final double runtime = ((double) end - start) / 1000; - LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numMessages / runtime + - " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime); // x2 for unicode chars - } - } - } - } - - /** - * NetworkService messaging rate benchmark. - */ - @Test - public void testMessagingNetworkServiceRateDisjoint() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - - final IdentifierFactory factory = new StringIdentifierFactory(); - - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - - try (final NameServer server = injector.getInstance(NameServer.class)) { - final int nameServerPort = server.getPort(); - - final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>(); - - final int numThreads = 4; - final int size = 2000; - final int numMessages = 300000 / (Math.max(1, size / 512)); - final int totalNumMessages = numMessages * numThreads; - - final ExecutorService e = Executors.newCachedThreadPool(); - for (int t = 0; t < numThreads; t++) { - final int tt = t; - - e.submit(new Runnable() { - @Override - public void run() { - try { - final Monitor monitor = new Monitor(); - - // network service - final String name2 = "task2-" + tt; - final String name1 = "task1-" + tt; - final Configuration nameResolverConf = - Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF - .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) - .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort) - .build()) - .build(); - - final Injector injector = Tang.Factory.getTang().newInjector(nameResolverConf); - - LOG.log(Level.FINEST, "=== Test network service receiver start"); - LOG.log(Level.FINEST, "=== Test network service sender start"); - try (final NameResolver nameResolver = injector.getInstance(NameResolver.class); - NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), new MessagingTransportFactory(localAddressProvider), - new MessageHandler<String>(name2, monitor, numMessages), - new ExceptionHandler(), localAddressProvider); - NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), new MessagingTransportFactory(localAddressProvider), - new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { - - ns2.registerId(factory.getNewInstance(name2)); - final int port2 = ns2.getTransport().getListeningPort(); - server.register(factory.getNewInstance(name2), new InetSocketAddress(localAddress, port2)); - - ns1.registerId(factory.getNewInstance(name1)); - final int port1 = ns1.getTransport().getListeningPort(); - server.register(factory.getNewInstance(name1), new InetSocketAddress(localAddress, port1)); - - final Identifier destId = factory.getNewInstance(name2); - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - - try (Connection<String> conn = ns1.newConnection(destId)) { - for (int i = 0; i < numMessages; i++) { - conn.open(); - conn.write(message); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - } catch (final Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - }); - } - - // start and time - final long start = System.currentTimeMillis(); - final Object ignore = new Object(); - for (int i = 0; i < numThreads; i++) { - barrier.add(ignore); - } - e.shutdown(); - e.awaitTermination(100, TimeUnit.SECONDS); - final long end = System.currentTimeMillis(); - - final double runtime = ((double) end - start) / 1000; - LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + - " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars - } - } - - @Test - public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - - final IdentifierFactory factory = new StringIdentifierFactory(); - - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - try (final NameServer server = injector.getInstance(NameServer.class)) { - final int nameServerPort = server.getPort(); - - final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024}; - - for (final int size : messageSizes) { - final int numMessages = 300000 / (Math.max(1, size / 512)); - final int numThreads = 2; - final int totalNumMessages = numMessages * numThreads; - final Monitor monitor = new Monitor(); - - - // network service - final String name2 = "task2"; - final String name1 = "task1"; - final Configuration nameResolverConf = - Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF - .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress) - .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort) - .build()) - .build(); - - final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf); - - LOG.log(Level.FINEST, "=== Test network service receiver start"); - LOG.log(Level.FINEST, "=== Test network service sender start"); - try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); - NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), new MessagingTransportFactory(localAddressProvider), - new MessageHandler<String>(name2, monitor, totalNumMessages), - new ExceptionHandler(), localAddressProvider); - NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), new MessagingTransportFactory(localAddressProvider), - new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { - - ns2.registerId(factory.getNewInstance(name2)); - final int port2 = ns2.getTransport().getListeningPort(); - server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2)); - - ns1.registerId(factory.getNewInstance(name1)); - final int port1 = ns1.getTransport().getListeningPort(); - server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1)); - - final Identifier destId = factory.getNewInstance(name2); - - try (final Connection<String> conn = ns1.newConnection(destId)) { - conn.open(); - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - - final ExecutorService e = Executors.newCachedThreadPool(); - - final long start = System.currentTimeMillis(); - for (int i = 0; i < numThreads; i++) { - e.submit(new Runnable() { - - @Override - public void run() { - for (int i = 0; i < numMessages; i++) { - conn.write(message); - } - } - }); - } - - - e.shutdown(); - e.awaitTermination(30, TimeUnit.SECONDS); - monitor.mwait(); - - final long end = System.currentTimeMillis(); - final double runtime = ((double) end - start) / 1000; - - LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + - " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars - } - } - } - } - } - - /** - * NetworkService messaging rate benchmark. - */ - @Test - public void testMessagingNetworkServiceBatchingRate() throws Exception { - LOG.log(Level.FINEST, name.getMethodName()); - - final IdentifierFactory factory = new StringIdentifierFactory(); - - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); - injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); - - try (final NameServer server = injector.getInstance(NameServer.class)) { - final int nameServerPort = server.getPort(); - - final int batchSize = 1024 * 1024; - final int[] messageSizes = {32, 64, 512}; - - for (final int size : messageSizes) { - final int numMessages = 300 / (Math.max(1, size / 512)); - final Monitor monitor = new Monitor(); - - // network service - final String name2 = "task2"; - final String name1 = "task1"; - final Configuration nameResolverConf = - Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF - .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress) - .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort) - .build()) - .build(); - - final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf); - - LOG.log(Level.FINEST, "=== Test network service receiver start"); - LOG.log(Level.FINEST, "=== Test network service sender start"); - try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); - NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), new MessagingTransportFactory(localAddressProvider), - new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider); - NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, - new StringCodec(), new MessagingTransportFactory(localAddressProvider), - new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { - - ns2.registerId(factory.getNewInstance(name2)); - final int port2 = ns2.getTransport().getListeningPort(); - server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2)); - - ns1.registerId(factory.getNewInstance(name1)); - final int port1 = ns1.getTransport().getListeningPort(); - server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1)); - - final Identifier destId = factory.getNewInstance(name2); - - // build the message - final StringBuilder msb = new StringBuilder(); - for (int i = 0; i < size; i++) { - msb.append("1"); - } - final String message = msb.toString(); - - final long start = System.currentTimeMillis(); - try (Connection<String> conn = ns1.newConnection(destId)) { - for (int i = 0; i < numMessages; i++) { - final StringBuilder sb = new StringBuilder(); - for (int j = 0; j < batchSize / size; j++) { - sb.append(message); - } - conn.open(); - conn.write(sb.toString()); - } - monitor.mwait(); - } catch (final NetworkException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - final long end = System.currentTimeMillis(); - final double runtime = ((double) end - start) / 1000; - final long numAppMessages = numMessages * batchSize / size; - LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numAppMessages / runtime + - " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime); // x2 for unicode chars - } - } - } - } - - /** - * Test message handler. - */ - class MessageHandler<T> implements EventHandler<Message<T>> { - - private final String name; - private final int expected; - private final Monitor monitor; - private AtomicInteger count = new AtomicInteger(0); - - MessageHandler(final String name, final Monitor monitor, final int expected) { - this.name = name; - this.monitor = monitor; - this.expected = expected; - } - - @Override - public void onNext(final Message<T> value) { - - count.incrementAndGet(); - - LOG.log(Level.FINEST, - "OUT: {0} received {1} from {2} to {3}", - new Object[]{name, value.getData(), value.getSrcId(), value.getDestId()}); - - for (final T obj : value.getData()) { - LOG.log(Level.FINEST, "OUT: data: {0}", obj); - } - - if (count.get() == expected) { - monitor.mnotify(); - } - } - } - - /** - * Test exception handler. - */ - class ExceptionHandler implements EventHandler<Exception> { - @Override - public void onNext(final Exception error) { - System.err.println(error); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java deleted file mode 100644 index 92ba655..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/TestEvent.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network; - -import java.io.Serializable; - -/** - * Event for testing. - */ -public class TestEvent implements Serializable { - - private static final long serialVersionUID = 1L; - private String message; - - public TestEvent(final String message) { - this.message = message; - } - - public String getMessage() { - return message; - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/package-info.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/package-info.java deleted file mode 100644 index 2556f21..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * TODO: Document. - */ -package org.apache.reef.services.network; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java deleted file mode 100644 index 4c4bd69..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network.util; - -import org.apache.reef.exception.evaluator.NetworkException; -import org.apache.reef.io.network.Connection; -import org.apache.reef.io.network.Message; -import org.apache.reef.io.network.NetworkConnectionService; -import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory; -import org.apache.reef.io.network.naming.NameResolver; -import org.apache.reef.io.network.naming.NameResolverConfiguration; -import org.apache.reef.io.network.naming.NameServer; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.Identifier; -import org.apache.reef.wake.IdentifierFactory; -import org.apache.reef.wake.remote.Codec; -import org.apache.reef.wake.remote.transport.LinkListener; - -import java.net.SocketAddress; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -// TODO[JIRA REEF-637] Remove the deprecated class. -/** - * Helper class for DeprecatedNetworkConnectionService test, deprecated in 0.13. - */ -@Deprecated -public final class DeprecatedNetworkMessagingTestService implements AutoCloseable { - private static final Logger LOG = Logger.getLogger(DeprecatedNetworkMessagingTestService.class.getName()); - - private final IdentifierFactory factory; - private final NetworkConnectionService receiverNetworkConnService; - private final NetworkConnectionService senderNetworkConnService; - private final String receiver; - private final String sender; - private final NameServer nameServer; - private final NameResolver receiverResolver; - private final NameResolver senderResolver; - - public DeprecatedNetworkMessagingTestService(final String localAddress) throws InjectionException { - // name server - final Injector injector = Tang.Factory.getTang().newInjector(); - this.nameServer = injector.getInstance(NameServer.class); - final Configuration netConf = NameResolverConfiguration.CONF - .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) - .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort()) - .build(); - - LOG.log(Level.FINEST, "=== Test network connection service receiver start"); - // network service for receiver - this.receiver = "receiver"; - final Injector injectorReceiver = injector.forkInjector(netConf); - this.receiverNetworkConnService = injectorReceiver.getInstance(NetworkConnectionService.class); - this.receiverResolver = injectorReceiver.getInstance(NameResolver.class); - this.factory = injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class); - this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver)); - - // network service for sender - this.sender = "sender"; - LOG.log(Level.FINEST, "=== Test network connection service sender start"); - final Injector injectorSender = injector.forkInjector(netConf); - senderNetworkConnService = injectorSender.getInstance(NetworkConnectionService.class); - senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender)); - this.senderResolver = injectorSender.getInstance(NameResolver.class); - } - - public <T> void registerTestConnectionFactory(final Identifier connFactoryId, - final int numMessages, final Monitor monitor, - final Codec<T> codec) throws NetworkException { - receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec, - new MessageHandler<T>(monitor, numMessages), new TestListener<T>()); - senderNetworkConnService.registerConnectionFactory(connFactoryId, codec, - new MessageHandler<T>(monitor, numMessages), new TestListener<T>()); - } - - public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier connFactoryId) { - final Identifier destId = factory.getNewInstance(receiver); - return (Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId); - } - - public void close() throws Exception { - senderNetworkConnService.close(); - receiverNetworkConnService.close(); - nameServer.close(); - receiverResolver.close(); - senderResolver.close(); - } - - public static final class MessageHandler<T> implements EventHandler<Message<T>> { - private final int expected; - private final Monitor monitor; - private AtomicInteger count = new AtomicInteger(0); - - public MessageHandler(final Monitor monitor, - final int expected) { - this.monitor = monitor; - this.expected = expected; - } - - @Override - public void onNext(final Message<T> value) { - count.incrementAndGet(); - LOG.log(Level.FINE, "Count: {0}", count.get()); - LOG.log(Level.FINE, - "OUT: {0} received {1} from {2} to {3}", - new Object[]{value, value.getSrcId(), value.getDestId()}); - - for (final T obj : value.getData()) { - LOG.log(Level.FINE, "OUT: data: {0}", obj); - } - - if (count.get() == expected) { - monitor.mnotify(); - } - } - } - - public static final class TestListener<T> implements LinkListener<Message<T>> { - @Override - public void onSuccess(final Message<T> message) { - LOG.log(Level.FINE, "success: " + message); - } - @Override - public void onException(final Throwable cause, final SocketAddress remoteAddress, final Message<T> message) { - LOG.log(Level.WARNING, "exception: " + cause + message); - throw new RuntimeException(cause); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java deleted file mode 100644 index b10d876..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/LoggingUtils.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network.util; - -import java.util.logging.ConsoleHandler; -import java.util.logging.Handler; -import java.util.logging.Level; -import java.util.logging.Logger; - -public final class LoggingUtils { - public static void setLoggingLevel(final Level level) { - final Handler[] handlers = Logger.getLogger("").getHandlers(); - ConsoleHandler ch = null; - for (final Handler h : handlers) { - if (h instanceof ConsoleHandler) { - ch = (ConsoleHandler) h; - break; - } - } - if (ch == null) { - ch = new ConsoleHandler(); - Logger.getLogger("").addHandler(ch); - } - ch.setLevel(level); - Logger.getLogger("").setLevel(level); - } - - /** - * Empty private constructor to prohibit instantiation of utility class. - */ - private LoggingUtils() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java deleted file mode 100644 index 161ee6a..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/Monitor.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network.util; - -import java.util.concurrent.atomic.AtomicBoolean; - -public class Monitor { - private AtomicBoolean finished = new AtomicBoolean(false); - - public void mwait() throws InterruptedException { - synchronized (this) { - while (!finished.get()) { - this.wait(); - } - } - } - - public void mnotify() { - synchronized (this) { - finished.compareAndSet(false, true); - this.notifyAll(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java deleted file mode 100644 index 3dc33e8..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/NetworkMessagingTestService.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network.util; - -import org.apache.reef.exception.evaluator.NetworkException; -import org.apache.reef.io.network.Connection; -import org.apache.reef.io.network.Message; -import org.apache.reef.io.network.NetworkConnectionService; -import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory; -import org.apache.reef.io.network.naming.NameResolver; -import org.apache.reef.io.network.naming.NameResolverConfiguration; -import org.apache.reef.io.network.naming.NameServer; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.Identifier; -import org.apache.reef.wake.IdentifierFactory; -import org.apache.reef.wake.remote.Codec; -import org.apache.reef.wake.remote.transport.LinkListener; - -import java.net.SocketAddress; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Helper class for NetworkConnectionService test. - */ -public final class NetworkMessagingTestService implements AutoCloseable { - private static final Logger LOG = Logger.getLogger(NetworkMessagingTestService.class.getName()); - - private final IdentifierFactory factory; - private final NetworkConnectionService receiverNetworkConnService; - private final NetworkConnectionService senderNetworkConnService; - private final NameServer nameServer; - private final NameResolver receiverResolver; - private final NameResolver senderResolver; - - public NetworkMessagingTestService(final String localAddress) throws InjectionException { - // name server - final Injector injector = Tang.Factory.getTang().newInjector(); - this.nameServer = injector.getInstance(NameServer.class); - final Configuration netConf = NameResolverConfiguration.CONF - .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) - .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort()) - .build(); - - LOG.log(Level.FINEST, "=== Test network connection service receiver start"); - // network service for receiver - final Injector injectorReceiver = injector.forkInjector(netConf); - this.receiverNetworkConnService = injectorReceiver.getInstance(NetworkConnectionService.class); - this.receiverResolver = injectorReceiver.getInstance(NameResolver.class); - this.factory = injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class); - - // network service for sender - LOG.log(Level.FINEST, "=== Test network connection service sender start"); - final Injector injectorSender = injector.forkInjector(netConf); - senderNetworkConnService = injectorSender.getInstance(NetworkConnectionService.class); - this.senderResolver = injectorSender.getInstance(NameResolver.class); - } - - public <T> void registerTestConnectionFactory(final Identifier connFactoryId, - final int numMessages, final Monitor monitor, - final Codec<T> codec) throws NetworkException { - final Identifier receiverEndPointId = factory.getNewInstance("receiver"); - final Identifier senderEndPointId = factory.getNewInstance("sender"); - receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec, - new MessageHandler<T>(monitor, numMessages, senderEndPointId, receiverEndPointId), - new TestListener<T>(), receiverEndPointId); - senderNetworkConnService.registerConnectionFactory(connFactoryId, codec, - new MessageHandler<T>(monitor, numMessages, receiverEndPointId, senderEndPointId), - new TestListener<T>(), senderEndPointId); - } - - public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier connFactoryId) { - final Identifier receiverEndPointId = factory.getNewInstance("receiver"); - return (Connection<T>)senderNetworkConnService - .getConnectionFactory(connFactoryId) - .newConnection(receiverEndPointId); - } - - public void close() throws Exception { - senderNetworkConnService.close(); - receiverNetworkConnService.close(); - nameServer.close(); - receiverResolver.close(); - senderResolver.close(); - } - - public static final class MessageHandler<T> implements EventHandler<Message<T>> { - private final int expected; - private final Monitor monitor; - private final Identifier expectedSrcId; - private final Identifier expectedDestId; - private AtomicInteger count = new AtomicInteger(0); - - public MessageHandler(final Monitor monitor, - final int expected, - final Identifier expectedSrcId, - final Identifier expectedDestId) { - this.monitor = monitor; - this.expected = expected; - this.expectedSrcId = expectedSrcId; - this.expectedDestId = expectedDestId; - } - - @Override - public void onNext(final Message<T> value) { - count.incrementAndGet(); - LOG.log(Level.FINE, "Count: {0}", count.get()); - LOG.log(Level.FINE, - "OUT: {0} received {1} from {2} to {3}", - new Object[]{value, value.getSrcId(), value.getDestId()}); - - for (final T obj : value.getData()) { - LOG.log(Level.FINE, "OUT: data: {0}", obj); - } - - assert value.getSrcId().equals(expectedSrcId); - assert value.getDestId().equals(expectedDestId); - - if (count.get() == expected) { - monitor.mnotify(); - } - } - } - - public static final class TestListener<T> implements LinkListener<Message<T>> { - @Override - public void onSuccess(final Message<T> message) { - LOG.log(Level.FINE, "success: " + message); - } - @Override - public void onException(final Throwable cause, final SocketAddress remoteAddress, final Message<T> message) { - LOG.log(Level.WARNING, "exception: " + cause + message); - throw new RuntimeException(cause); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java deleted file mode 100644 index 757a803..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingIntegerCodec.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network.util; - -import org.apache.reef.io.network.impl.StreamingCodec; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - - -public class StreamingIntegerCodec implements StreamingCodec<Integer> { - - @Override - public void encodeToStream(final Integer obj, final DataOutputStream stream) { - try { - stream.writeInt(obj); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public Integer decodeFromStream(final DataInputStream stream) { - try { - return stream.readInt(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public Integer decode(final byte[] data) { - return null; - } - - @Override - public byte[] encode(final Integer obj) { - return new byte[0]; - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java deleted file mode 100644 index 43efaf6..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StreamingStringCodec.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network.util; - -import org.apache.reef.io.network.impl.StreamingCodec; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - - -public class StreamingStringCodec implements StreamingCodec<String> { - @Override - public byte[] encode(final String obj) { - return obj.getBytes(); - } - - @Override - public String decode(final byte[] buf) { - return new String(buf); - } - - @Override - public void encodeToStream(final String obj, final DataOutputStream stream) { - try { - stream.writeUTF(obj); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public String decodeFromStream(final DataInputStream stream) { - try { - return stream.readUTF(); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java deleted file mode 100644 index d939b8e..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/StringCodec.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network.util; - -import org.apache.reef.wake.remote.Codec; - - -public class StringCodec implements Codec<String> { - @Override - public byte[] encode(final String obj) { - return obj.getBytes(); - } - - @Override - public String decode(final byte[] buf) { - return new String(buf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java deleted file mode 100644 index 146a6db..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/TimeoutHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.services.network.util; - -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.impl.PeriodicEvent; - -public class TimeoutHandler implements EventHandler<PeriodicEvent> { - - private final Monitor monitor; - - public TimeoutHandler(final Monitor monitor) { - this.monitor = monitor; - } - - @Override - public void onNext(final PeriodicEvent event) { - monitor.mnotify(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java deleted file mode 100644 index 5630dec..0000000 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * TODO: Document. - */ -package org.apache.reef.services.network.util;
