[REEF-664] REEF-IO test and main packages do not match - Move tests that were in org.apache.reef.services to org.apache.reef.io. - Remove a duplicate class and an unused class
JIRA: [REEF-664](https://issues.apache.org/jira/browse/REEF-664) Pull Request: Closes #433 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/3325748d Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/3325748d Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/3325748d Branch: refs/heads/master Commit: 3325748dc61f0db8285efd2f0cff225a9dd45a18 Parents: 9cab341 Author: Brian Cho <[email protected]> Authored: Fri Aug 28 16:49:58 2015 +0900 Committer: Markus Weimer <[email protected]> Committed: Fri Aug 28 11:24:06 2015 -0700 ---------------------------------------------------------------------- .../DeprecatedNetworkConnectionServiceTest.java | 405 ++++++++++++++ .../network/NetworkConnectionServiceTest.java | 397 ++++++++++++++ .../reef/io/network/NetworkServiceTest.java | 548 +++++++++++++++++++ .../network/naming/LocalNameResolverTest.java | 102 ++++ .../reef/io/network/naming/NameClientTest.java | 161 ++++++ .../reef/io/network/naming/NamingTest.java | 401 ++++++++++++++ .../reef/io/network/naming/package-info.java | 22 + .../apache/reef/io/network/package-info.java | 22 + .../DeprecatedNetworkMessagingTestService.java | 150 +++++ .../reef/io/network/util/LoggingUtils.java | 49 ++ .../apache/reef/io/network/util/Monitor.java | 40 ++ .../util/NetworkMessagingTestService.java | 157 ++++++ .../io/network/util/StreamingIntegerCodec.java | 57 ++ .../io/network/util/StreamingStringCodec.java | 56 ++ .../reef/io/network/util/TimeoutHandler.java | 36 ++ .../apache/reef/io/storage/ExternalMapTest.java | 94 ++++ .../org/apache/reef/io/storage/FramingTest.java | 102 ++++ .../reef/io/storage/MergingIteratorTest.java | 53 ++ .../reef/io/storage/SortingSpoolTest.java | 117 ++++ .../apache/reef/io/storage/SpoolFileTest.java | 207 +++++++ .../reef/io/storage/TupleSerializerTest.java | 103 ++++ .../apache/reef/io/storage/package-info.java | 22 + .../DeprecatedNetworkConnectionServiceTest.java | 407 -------------- .../services/network/LocalNameResolverTest.java | 104 ---- .../reef/services/network/NameClientTest.java | 162 ------ .../reef/services/network/NamingTest.java | 402 -------------- .../network/NetworkConnectionServiceTest.java | 400 -------------- .../services/network/NetworkServiceTest.java | 547 ------------------ .../apache/reef/services/network/TestEvent.java | 38 -- .../reef/services/network/package-info.java | 22 - .../DeprecatedNetworkMessagingTestService.java | 150 ----- .../services/network/util/LoggingUtils.java | 49 -- .../reef/services/network/util/Monitor.java | 40 -- .../util/NetworkMessagingTestService.java | 157 ------ .../network/util/StreamingIntegerCodec.java | 57 -- .../network/util/StreamingStringCodec.java | 56 -- .../reef/services/network/util/StringCodec.java | 34 -- .../services/network/util/TimeoutHandler.java | 36 -- .../services/network/util/package-info.java | 22 - .../reef/services/storage/ExternalMapTest.java | 94 ---- .../reef/services/storage/FramingTest.java | 104 ---- .../services/storage/MergingIteratorTest.java | 54 -- .../reef/services/storage/SortingSpoolTest.java | 117 ---- .../reef/services/storage/SpoolFileTest.java | 207 ------- .../services/storage/TupleSerializerTest.java | 105 ---- .../reef/services/storage/package-info.java | 22 - 46 files changed, 3301 insertions(+), 3386 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/DeprecatedNetworkConnectionServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/DeprecatedNetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/DeprecatedNetworkConnectionServiceTest.java new file mode 100644 index 0000000..2fcb125 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/DeprecatedNetworkConnectionServiceTest.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.util.*; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.util.concurrent.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +// TODO[JIRA REEF-637] Remove the deprecated class. +/** + * Test for deprecated methods, which are deprecated in 0.13, of NetworkConnectionService. + */ +@Deprecated +public final class DeprecatedNetworkConnectionServiceTest { + private static final Logger LOG = Logger.getLogger(DeprecatedNetworkConnectionServiceTest.class.getName()); + + private final LocalAddressProvider localAddressProvider; + private final String localAddress; + private final Identifier groupCommClientId; + private final Identifier shuffleClientId; + + public DeprecatedNetworkConnectionServiceTest() throws InjectionException { + localAddressProvider = LocalAddressProviderFactory.getInstance(); + localAddress = localAddressProvider.getLocalAddress(); + + final IdentifierFactory idFac = new StringIdentifierFactory(); + this.groupCommClientId = idFac.getNewInstance("groupComm"); + this.shuffleClientId = idFac.getNewInstance("shuffle"); + } + + @Rule + public TestName name = new TestName(); + + private void runMessagingNetworkConnectionService(final Codec<String> codec) throws Exception { + final int numMessages = 2000; + final Monitor monitor = new Monitor(); + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + + try (final Connection<String> conn = messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write("hello" + count); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + } + + /** + * NetworkConnectionService messaging test. + */ + @Test + public void testMessagingNetworkConnectionService() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runMessagingNetworkConnectionService(new StringCodec()); + } + + /** + * NetworkConnectionService streaming messaging test. + */ + @Test + public void testStreamingMessagingNetworkConnectionService() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runMessagingNetworkConnectionService(new StreamingStringCodec()); + } + + public void runNetworkConnServiceWithMultipleConnFactories(final Codec<String> stringCodec, + final Codec<Integer> integerCodec) + throws Exception { + final ExecutorService executor = Executors.newFixedThreadPool(5); + + final int groupcommMessages = 1000; + final Monitor monitor = new Monitor(); + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + + messagingTestService.registerTestConnectionFactory(groupCommClientId, groupcommMessages, monitor, stringCodec); + + final int shuffleMessages = 2000; + final Monitor monitor2 = new Monitor(); + messagingTestService.registerTestConnectionFactory(shuffleClientId, shuffleMessages, monitor2, integerCodec); + + executor.submit(new Runnable() { + @Override + public void run() { + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + conn.open(); + for (int count = 0; count < groupcommMessages; ++count) { + // send messages to the receiver. + conn.write("hello" + count); + } + monitor.mwait(); + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + }); + + executor.submit(new Runnable() { + @Override + public void run() { + try (final Connection<Integer> conn = + messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) { + conn.open(); + for (int count = 0; count < shuffleMessages; ++count) { + // send messages to the receiver. + conn.write(count); + } + monitor2.mwait(); + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + }); + + monitor.mwait(); + monitor2.mwait(); + executor.shutdown(); + } + } + + /** + * Test NetworkService registering multiple connection factories. + */ + @Test + public void testMultipleConnectionFactoriesTest() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new ObjectSerializableCodec<Integer>()); + } + + /** + * Test NetworkService registering multiple connection factories with Streamingcodec. + */ + @Test + public void testMultipleConnectionFactoriesStreamingTest() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), new StreamingIntegerCodec()); + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024}; + + for (final int size : messageSizes) { + final int numMessages = 300000 / (Math.max(1, size / 512)); + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + final long start = System.currentTimeMillis(); + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + final long end = System.currentTimeMillis(); + + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages / runtime + + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceRateDisjoint() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>(); + + final int numThreads = 4; + final int size = 2000; + final int numMessages = 300000 / (Math.max(1, size / 512)); + final int totalNumMessages = numMessages * numThreads; + + final ExecutorService e = Executors.newCachedThreadPool(); + for (int t = 0; t < numThreads; t++) { + final int tt = t; + + e.submit(new Runnable() { + public void run() { + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + // start and time + final long start = System.currentTimeMillis(); + final Object ignore = new Object(); + for (int i = 0; i < numThreads; i++) { + barrier.add(ignore); + } + e.shutdown(); + e.awaitTermination(100, TimeUnit.SECONDS); + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars + } + + @Test + public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024}; + + for (final int size : messageSizes) { + final int numMessages = 300000 / (Math.max(1, size / 512)); + final int numThreads = 2; + final int totalNumMessages = numMessages * numThreads; + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, totalNumMessages, monitor, codec); + + final ExecutorService e = Executors.newCachedThreadPool(); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + final long start = System.currentTimeMillis(); + for (int i = 0; i < numThreads; i++) { + e.submit(new Runnable() { + @Override + public void run() { + + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + e.shutdown(); + e.awaitTermination(30, TimeUnit.SECONDS); + monitor.mwait(); + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceBatchingRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + + final int batchSize = 1024 * 1024; + final int[] messageSizes = {32, 64, 512}; + + for (final int size : messageSizes) { + final int numMessages = 300 / (Math.max(1, size / 512)); + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + final long start = System.currentTimeMillis(); + try { + for (int i = 0; i < numMessages; i++) { + final StringBuilder sb = new StringBuilder(); + for (int j = 0; j < batchSize / size; j++) { + sb.append(message); + } + conn.open(); + conn.write(sb.toString()); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + final long numAppMessages = numMessages * batchSize / size; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numAppMessages / runtime + + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java new file mode 100644 index 0000000..e5f6313 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkConnectionServiceTest.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.util.*; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.util.concurrent.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Default Network connection service test. + */ +public class NetworkConnectionServiceTest { + private static final Logger LOG = Logger.getLogger(NetworkConnectionServiceTest.class.getName()); + + private final LocalAddressProvider localAddressProvider; + private final String localAddress; + private final Identifier groupCommClientId; + private final Identifier shuffleClientId; + + public NetworkConnectionServiceTest() throws InjectionException { + localAddressProvider = LocalAddressProviderFactory.getInstance(); + localAddress = localAddressProvider.getLocalAddress(); + + final IdentifierFactory idFac = new StringIdentifierFactory(); + this.groupCommClientId = idFac.getNewInstance("groupComm"); + this.shuffleClientId = idFac.getNewInstance("shuffle"); + } + + @Rule + public TestName name = new TestName(); + + private void runMessagingNetworkConnectionService(final Codec<String> codec) throws Exception { + final int numMessages = 2000; + final Monitor monitor = new Monitor(); + try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + + try (final Connection<String> conn = messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write("hello" + count); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + } + + /** + * NetworkConnectionService messaging test. + */ + @Test + public void testMessagingNetworkConnectionService() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runMessagingNetworkConnectionService(new StringCodec()); + } + + /** + * NetworkConnectionService streaming messaging test. + */ + @Test + public void testStreamingMessagingNetworkConnectionService() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runMessagingNetworkConnectionService(new StreamingStringCodec()); + } + + public void runNetworkConnServiceWithMultipleConnFactories(final Codec<String> stringCodec, + final Codec<Integer> integerCodec) + throws Exception { + final ExecutorService executor = Executors.newFixedThreadPool(5); + + final int groupcommMessages = 1000; + final Monitor monitor = new Monitor(); + try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { + + messagingTestService.registerTestConnectionFactory(groupCommClientId, groupcommMessages, monitor, stringCodec); + + final int shuffleMessages = 2000; + final Monitor monitor2 = new Monitor(); + messagingTestService.registerTestConnectionFactory(shuffleClientId, shuffleMessages, monitor2, integerCodec); + + executor.submit(new Runnable() { + @Override + public void run() { + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + conn.open(); + for (int count = 0; count < groupcommMessages; ++count) { + // send messages to the receiver. + conn.write("hello" + count); + } + monitor.mwait(); + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + }); + + executor.submit(new Runnable() { + @Override + public void run() { + try (final Connection<Integer> conn = + messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) { + conn.open(); + for (int count = 0; count < shuffleMessages; ++count) { + // send messages to the receiver. + conn.write(count); + } + monitor2.mwait(); + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + }); + + monitor.mwait(); + monitor2.mwait(); + executor.shutdown(); + } + } + + /** + * Test NetworkService registering multiple connection factories. + */ + @Test + public void testMultipleConnectionFactoriesTest() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new ObjectSerializableCodec<Integer>()); + } + + /** + * Test NetworkService registering multiple connection factories with Streamingcodec. + */ + @Test + public void testMultipleConnectionFactoriesStreamingTest() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), new StreamingIntegerCodec()); + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024}; + + for (final int size : messageSizes) { + final int numMessages = 300000 / (Math.max(1, size / 512)); + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + final long start = System.currentTimeMillis(); + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + final long end = System.currentTimeMillis(); + + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages / runtime + + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceRateDisjoint() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>(); + + final int numThreads = 4; + final int size = 2000; + final int numMessages = 300000 / (Math.max(1, size / 512)); + final int totalNumMessages = numMessages * numThreads; + + final ExecutorService e = Executors.newCachedThreadPool(); + for (int t = 0; t < numThreads; t++) { + final int tt = t; + + e.submit(new Runnable() { + public void run() { + try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + // start and time + final long start = System.currentTimeMillis(); + final Object ignore = new Object(); + for (int i = 0; i < numThreads; i++) { + barrier.add(ignore); + } + e.shutdown(); + e.awaitTermination(100, TimeUnit.SECONDS); + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars + } + + @Test + public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024}; + + for (final int size : messageSizes) { + final int numMessages = 300000 / (Math.max(1, size / 512)); + final int numThreads = 2; + final int totalNumMessages = numMessages * numThreads; + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, totalNumMessages, monitor, codec); + + final ExecutorService e = Executors.newCachedThreadPool(); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + final long start = System.currentTimeMillis(); + for (int i = 0; i < numThreads; i++) { + e.submit(new Runnable() { + @Override + public void run() { + + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + e.shutdown(); + e.awaitTermination(30, TimeUnit.SECONDS); + monitor.mwait(); + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceBatchingRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + + final int batchSize = 1024 * 1024; + final int[] messageSizes = {32, 64, 512}; + + for (final int size : messageSizes) { + final int numMessages = 300 / (Math.max(1, size / 512)); + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + try (final NetworkMessagingTestService messagingTestService = new NetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + final long start = System.currentTimeMillis(); + try { + for (int i = 0; i < numMessages; i++) { + final StringBuilder sb = new StringBuilder(); + for (int j = 0; j < batchSize / size; j++) { + sb.append(message); + } + conn.open(); + conn.write(sb.toString()); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + final long numAppMessages = numMessages * batchSize / size; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numAppMessages / runtime + + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java new file mode 100644 index 0000000..5f6f937 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/NetworkServiceTest.java @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.impl.NetworkService; +import org.apache.reef.io.network.naming.NameResolver; +import org.apache.reef.io.network.naming.NameResolverConfiguration; +import org.apache.reef.io.network.naming.NameServer; +import org.apache.reef.io.network.naming.NameServerParameters; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.io.network.util.Monitor; +import org.apache.reef.io.network.util.StringCodec; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.net.InetSocketAddress; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Network service test. + */ +public class NetworkServiceTest { + private static final Logger LOG = Logger.getLogger(NetworkServiceTest.class.getName()); + + private final LocalAddressProvider localAddressProvider; + private final String localAddress; + + public NetworkServiceTest() throws InjectionException { + localAddressProvider = LocalAddressProviderFactory.getInstance(); + localAddress = localAddressProvider.getLocalAddress(); + } + + @Rule + public TestName name = new TestName(); + + /** + * NetworkService messaging test. + */ + @Test + public void testMessagingNetworkService() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + + final IdentifierFactory factory = new StringIdentifierFactory(); + + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + + try (final NameServer server = injector.getInstance(NameServer.class)) { + final int nameServerPort = server.getPort(); + + final int numMessages = 10; + final Monitor monitor = new Monitor(); + + // network service + final String name2 = "task2"; + final String name1 = "task1"; + final Configuration nameResolverConf = + Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort) + .build()) + .build(); + + final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf); + + LOG.log(Level.FINEST, "=== Test network service receiver start"); + LOG.log(Level.FINEST, "=== Test network service sender start"); + try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); + NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, + new StringCodec(), new MessagingTransportFactory(localAddressProvider), + new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider); + final NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, + new StringCodec(), new MessagingTransportFactory(localAddressProvider), + new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { + + ns2.registerId(factory.getNewInstance(name2)); + final int port2 = ns2.getTransport().getListeningPort(); + server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2)); + + ns1.registerId(factory.getNewInstance(name1)); + final int port1 = ns1.getTransport().getListeningPort(); + server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1)); + + final Identifier destId = factory.getNewInstance(name2); + + try (final Connection<String> conn = ns1.newConnection(destId)) { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + conn.write("hello! " + count); + } + monitor.mwait(); + + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkServiceRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + + final IdentifierFactory factory = new StringIdentifierFactory(); + + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + + try (final NameServer server = injector.getInstance(NameServer.class)) { + final int nameServerPort = server.getPort(); + + final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024}; + + for (final int size : messageSizes) { + final int numMessages = 300000 / (Math.max(1, size / 512)); + final Monitor monitor = new Monitor(); + + // network service + final String name2 = "task2"; + final String name1 = "task1"; + final Configuration nameResolverConf = + Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort) + .build()) + .build(); + + final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf); + + LOG.log(Level.FINEST, "=== Test network service receiver start"); + LOG.log(Level.FINEST, "=== Test network service sender start"); + try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); + NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, + new StringCodec(), new MessagingTransportFactory(localAddressProvider), + new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider); + NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, + new StringCodec(), new MessagingTransportFactory(localAddressProvider), + new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { + + ns2.registerId(factory.getNewInstance(name2)); + final int port2 = ns2.getTransport().getListeningPort(); + server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2)); + + ns1.registerId(factory.getNewInstance(name1)); + final int port1 = ns1.getTransport().getListeningPort(); + server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1)); + + final Identifier destId = factory.getNewInstance(name2); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + final long start = System.currentTimeMillis(); + try (Connection<String> conn = ns1.newConnection(destId)) { + for (int i = 0; i < numMessages; i++) { + conn.open(); + conn.write(message); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numMessages / runtime + + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkServiceRateDisjoint() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + + final IdentifierFactory factory = new StringIdentifierFactory(); + + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + + try (final NameServer server = injector.getInstance(NameServer.class)) { + final int nameServerPort = server.getPort(); + + final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>(); + + final int numThreads = 4; + final int size = 2000; + final int numMessages = 300000 / (Math.max(1, size / 512)); + final int totalNumMessages = numMessages * numThreads; + + final ExecutorService e = Executors.newCachedThreadPool(); + for (int t = 0; t < numThreads; t++) { + final int tt = t; + + e.submit(new Runnable() { + @Override + public void run() { + try { + final Monitor monitor = new Monitor(); + + // network service + final String name2 = "task2-" + tt; + final String name1 = "task1-" + tt; + final Configuration nameResolverConf = + Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort) + .build()) + .build(); + + final Injector injector = Tang.Factory.getTang().newInjector(nameResolverConf); + + LOG.log(Level.FINEST, "=== Test network service receiver start"); + LOG.log(Level.FINEST, "=== Test network service sender start"); + try (final NameResolver nameResolver = injector.getInstance(NameResolver.class); + NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, + new StringCodec(), new MessagingTransportFactory(localAddressProvider), + new MessageHandler<String>(name2, monitor, numMessages), + new ExceptionHandler(), localAddressProvider); + NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, + new StringCodec(), new MessagingTransportFactory(localAddressProvider), + new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { + + ns2.registerId(factory.getNewInstance(name2)); + final int port2 = ns2.getTransport().getListeningPort(); + server.register(factory.getNewInstance(name2), new InetSocketAddress(localAddress, port2)); + + ns1.registerId(factory.getNewInstance(name1)); + final int port1 = ns1.getTransport().getListeningPort(); + server.register(factory.getNewInstance(name1), new InetSocketAddress(localAddress, port1)); + + final Identifier destId = factory.getNewInstance(name2); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + try (Connection<String> conn = ns1.newConnection(destId)) { + for (int i = 0; i < numMessages; i++) { + conn.open(); + conn.write(message); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + }); + } + + // start and time + final long start = System.currentTimeMillis(); + final Object ignore = new Object(); + for (int i = 0; i < numThreads; i++) { + barrier.add(ignore); + } + e.shutdown(); + e.awaitTermination(100, TimeUnit.SECONDS); + final long end = System.currentTimeMillis(); + + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + + @Test + public void testMultithreadedSharedConnMessagingNetworkServiceRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + + final IdentifierFactory factory = new StringIdentifierFactory(); + + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + try (final NameServer server = injector.getInstance(NameServer.class)) { + final int nameServerPort = server.getPort(); + + final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024}; + + for (final int size : messageSizes) { + final int numMessages = 300000 / (Math.max(1, size / 512)); + final int numThreads = 2; + final int totalNumMessages = numMessages * numThreads; + final Monitor monitor = new Monitor(); + + + // network service + final String name2 = "task2"; + final String name1 = "task1"; + final Configuration nameResolverConf = + Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort) + .build()) + .build(); + + final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf); + + LOG.log(Level.FINEST, "=== Test network service receiver start"); + LOG.log(Level.FINEST, "=== Test network service sender start"); + try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); + NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, + new StringCodec(), new MessagingTransportFactory(localAddressProvider), + new MessageHandler<String>(name2, monitor, totalNumMessages), + new ExceptionHandler(), localAddressProvider); + NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, + new StringCodec(), new MessagingTransportFactory(localAddressProvider), + new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { + + ns2.registerId(factory.getNewInstance(name2)); + final int port2 = ns2.getTransport().getListeningPort(); + server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2)); + + ns1.registerId(factory.getNewInstance(name1)); + final int port1 = ns1.getTransport().getListeningPort(); + server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1)); + + final Identifier destId = factory.getNewInstance(name2); + + try (final Connection<String> conn = ns1.newConnection(destId)) { + conn.open(); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + final ExecutorService e = Executors.newCachedThreadPool(); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < numThreads; i++) { + e.submit(new Runnable() { + + @Override + public void run() { + for (int i = 0; i < numMessages; i++) { + conn.write(message); + } + } + }); + } + + + e.shutdown(); + e.awaitTermination(30, TimeUnit.SECONDS); + monitor.mwait(); + + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + + LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + totalNumMessages / runtime + + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkServiceBatchingRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + + final IdentifierFactory factory = new StringIdentifierFactory(); + + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + + try (final NameServer server = injector.getInstance(NameServer.class)) { + final int nameServerPort = server.getPort(); + + final int batchSize = 1024 * 1024; + final int[] messageSizes = {32, 64, 512}; + + for (final int size : messageSizes) { + final int numMessages = 300 / (Math.max(1, size / 512)); + final Monitor monitor = new Monitor(); + + // network service + final String name2 = "task2"; + final String name1 = "task1"; + final Configuration nameResolverConf = + Tang.Factory.getTang().newConfigurationBuilder(NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, this.localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServerPort) + .build()) + .build(); + + final Injector injector2 = Tang.Factory.getTang().newInjector(nameResolverConf); + + LOG.log(Level.FINEST, "=== Test network service receiver start"); + LOG.log(Level.FINEST, "=== Test network service sender start"); + try (final NameResolver nameResolver = injector2.getInstance(NameResolver.class); + NetworkService<String> ns2 = new NetworkService<String>(factory, 0, nameResolver, + new StringCodec(), new MessagingTransportFactory(localAddressProvider), + new MessageHandler<String>(name2, monitor, numMessages), new ExceptionHandler(), localAddressProvider); + NetworkService<String> ns1 = new NetworkService<String>(factory, 0, nameResolver, + new StringCodec(), new MessagingTransportFactory(localAddressProvider), + new MessageHandler<String>(name1, null, 0), new ExceptionHandler(), localAddressProvider)) { + + ns2.registerId(factory.getNewInstance(name2)); + final int port2 = ns2.getTransport().getListeningPort(); + server.register(factory.getNewInstance("task2"), new InetSocketAddress(this.localAddress, port2)); + + ns1.registerId(factory.getNewInstance(name1)); + final int port1 = ns1.getTransport().getListeningPort(); + server.register(factory.getNewInstance("task1"), new InetSocketAddress(this.localAddress, port1)); + + final Identifier destId = factory.getNewInstance(name2); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + final long start = System.currentTimeMillis(); + try (Connection<String> conn = ns1.newConnection(destId)) { + for (int i = 0; i < numMessages; i++) { + final StringBuilder sb = new StringBuilder(); + for (int j = 0; j < batchSize / size; j++) { + sb.append(message); + } + conn.open(); + conn.write(sb.toString()); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + final long numAppMessages = numMessages * batchSize / size; + LOG.log(Level.FINEST, "size: " + size + "; messages/s: " + numAppMessages / runtime + + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } + + /** + * Test message handler. + */ + class MessageHandler<T> implements EventHandler<Message<T>> { + + private final String name; + private final int expected; + private final Monitor monitor; + private AtomicInteger count = new AtomicInteger(0); + + MessageHandler(final String name, final Monitor monitor, final int expected) { + this.name = name; + this.monitor = monitor; + this.expected = expected; + } + + @Override + public void onNext(final Message<T> value) { + + count.incrementAndGet(); + + LOG.log(Level.FINEST, + "OUT: {0} received {1} from {2} to {3}", + new Object[]{name, value.getData(), value.getSrcId(), value.getDestId()}); + + for (final T obj : value.getData()) { + LOG.log(Level.FINEST, "OUT: data: {0}", obj); + } + + if (count.get() == expected) { + monitor.mnotify(); + } + } + } + + /** + * Test exception handler. + */ + class ExceptionHandler implements EventHandler<Exception> { + @Override + public void onNext(final Exception error) { + System.err.println(error); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/LocalNameResolverTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/LocalNameResolverTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/LocalNameResolverTest.java new file mode 100644 index 0000000..bd1c25a --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/LocalNameResolverTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.naming; + +import org.apache.reef.io.network.naming.exception.NamingException; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutionException; + +public class LocalNameResolverTest { + + private final LocalAddressProvider localAddressProvider; + + public LocalNameResolverTest() throws InjectionException { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } + + /** + * Test method for {@link org.apache.reef.io.network.naming.LocalNameResolverImpl#close()}. + * + * @throws Exception + */ + @Test + public final void testClose() throws Exception { + final String localAddress = localAddressProvider.getLocalAddress(); + final IdentifierFactory factory = new StringIdentifierFactory(); + try (final NameResolver resolver = Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF + .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 10000) + .build()).getInstance(NameResolver.class)) { + final Identifier id = factory.getNewInstance("Task1"); + resolver.register(id, new InetSocketAddress(localAddress, 7001)); + resolver.unregister(id); + Thread.sleep(100); + } + } + + /** + * Test method for {@link org.apache.reef.io.network.naming.LocalNameResolverImpl#lookup(Identifier id)}. + * To check caching behavior with expireAfterAccess & expireAfterWrite + * Changing NameCache's pattern to expireAfterAccess causes this test to fail + * + * @throws Exception + */ + @Test + public final void testLookup() throws Exception { + final IdentifierFactory factory = new StringIdentifierFactory(); + final String localAddress = localAddressProvider.getLocalAddress(); + try (final NameResolver resolver = Tang.Factory.getTang().newInjector(LocalNameResolverConfiguration.CONF + .set(LocalNameResolverConfiguration.CACHE_TIMEOUT, 150) + .build()).getInstance(NameResolver.class)) { + final Identifier id = factory.getNewInstance("Task1"); + final InetSocketAddress socketAddr = new InetSocketAddress(localAddress, 7001); + resolver.register(id, socketAddr); + InetSocketAddress lookupAddr = resolver.lookup(id); // caches the entry + Assert.assertTrue(socketAddr.equals(lookupAddr)); + resolver.unregister(id); + Thread.sleep(100); + try { + lookupAddr = resolver.lookup(id); + Thread.sleep(100); + //With expireAfterAccess, the previous lookup would reset expiry to 150ms + //more and 100ms wait will not expire the item and will return the cached value + //With expireAfterWrite, the extra wait of 100 ms will expire the item + //resulting in NamingException and the test passes + lookupAddr = resolver.lookup(id); + Assert.assertNull("resolver.lookup(id)", lookupAddr); + } catch (final Exception e) { + if (e instanceof ExecutionException) { + Assert.assertTrue("Execution Exception cause is instanceof NamingException", + e.getCause() instanceof NamingException); + } else { + throw e; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NameClientTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NameClientTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NameClientTest.java new file mode 100644 index 0000000..5c81afc --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NameClientTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.naming; + +import org.apache.reef.io.network.naming.exception.NamingException; +import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount; +import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutionException; + +public class NameClientTest { + + private final LocalAddressProvider localAddressProvider; + + public NameClientTest() throws InjectionException { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } + + private static final int RETRY_COUNT, RETRY_TIMEOUT; + + static { + final Tang tang = Tang.Factory.getTang(); + try { + RETRY_COUNT = tang.newInjector().getNamedInstance(NameResolverRetryCount.class); + RETRY_TIMEOUT = tang.newInjector().getNamedInstance(NameResolverRetryTimeout.class); + } catch (final InjectionException e1) { + throw new RuntimeException("Exception while trying to find default values for retryCount & Timeout", e1); + } + } + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + /** + * Test method for {@link org.apache.reef.io.network.naming.NameClient#close()}. + * + * @throws Exception + */ + @Test + public final void testClose() throws Exception { + final String localAddress = localAddressProvider.getLocalAddress(); + final IdentifierFactory factory = new StringIdentifierFactory(); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + + try (final NameServer server = injector.getInstance(NameServer.class)) { + final int serverPort = server.getPort(); + final Configuration nameResolverConf = NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, serverPort) + .set(NameResolverConfiguration.CACHE_TIMEOUT, 10000) + .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT) + .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT) + .build(); + + try (final NameResolver client = + Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class)) { + final Identifier id = factory.getNewInstance("Task1"); + client.register(id, new InetSocketAddress(localAddress, 7001)); + client.unregister(id); + Thread.sleep(100); + } + } + } + + /** + * Test method for {@link org.apache.reef.io.network.naming.NameClient#lookup()}. + * To check caching behavior with expireAfterAccess & expireAfterWrite + * Changing NameCache's pattern to expireAfterAccess causes this test to fail + * + * @throws Exception + */ + @Test + public final void testLookup() throws Exception { + final String localAddress = localAddressProvider.getLocalAddress(); + final IdentifierFactory factory = new StringIdentifierFactory(); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + + try (final NameServer server = injector.getInstance(NameServer.class)) { + final int serverPort = server.getPort(); + final Configuration nameResolverConf = NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, serverPort) + .set(NameResolverConfiguration.CACHE_TIMEOUT, 150) + .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT) + .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT) + .build(); + + try (final NameResolver client = + Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class)) { + final Identifier id = factory.getNewInstance("Task1"); + client.register(id, new InetSocketAddress(localAddress, 7001)); + client.lookup(id); // caches the entry + client.unregister(id); + Thread.sleep(100); + try { + InetSocketAddress addr = client.lookup(id); + Thread.sleep(100); + //With expireAfterAccess, the previous lookup would reset expiry to 150ms + //more and 100ms wait will not expire the item and will return the cached value + //With expireAfterWrite, the extra wait of 100 ms will expire the item + //resulting in NamingException and the test passes + addr = client.lookup(id); + Assert.assertNull("client.lookup(id)", addr); + } catch (final Exception e) { + if (e instanceof ExecutionException) { + Assert.assertTrue("Execution Exception cause is instanceof NamingException", + e.getCause() instanceof NamingException); + } else { + throw e; + } + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/3325748d/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NamingTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NamingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NamingTest.java new file mode 100644 index 0000000..ed881ab --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/io/network/naming/NamingTest.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.io.network.naming; + +import org.apache.reef.io.naming.NameAssignment; +import org.apache.reef.io.network.naming.parameters.NameResolverRetryCount; +import org.apache.reef.io.network.naming.parameters.NameResolverRetryTimeout; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Naming server and client test. + */ +public class NamingTest { + + private static final Logger LOG = Logger.getLogger(NamingTest.class.getName()); + private static final int RETRY_COUNT; + private static final int RETRY_TIMEOUT; + + static { + try { + final Injector injector = Tang.Factory.getTang().newInjector(); + RETRY_COUNT = injector.getNamedInstance(NameResolverRetryCount.class); + RETRY_TIMEOUT = injector.getNamedInstance(NameResolverRetryTimeout.class); + } catch (final InjectionException ex) { + final String msg = "Exception while trying to find default values for retryCount & Timeout"; + LOG.log(Level.SEVERE, msg, ex); + throw new RuntimeException(msg, ex); + } + } + + private final LocalAddressProvider localAddressProvider; + @Rule + public final TestName name = new TestName(); + static final long TTL = 30000; + private final IdentifierFactory factory = new StringIdentifierFactory(); + private int port; + + public NamingTest() throws InjectionException { + this.localAddressProvider = LocalAddressProviderFactory.getInstance(); + } + + /** + * NameServer and NameLookupClient test. + * + * @throws Exception + */ + @Test + public void testNamingLookup() throws Exception { + + final String localAddress = localAddressProvider.getLocalAddress(); + LOG.log(Level.FINEST, this.name.getMethodName()); + + // names + final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>(); + idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(localAddress, 7001)); + idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(localAddress, 7002)); + + // run a server + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + try (final NameServer server = injector.getInstance(NameServer.class)) { + this.port = server.getPort(); + for (final Identifier id : idToAddrMap.keySet()) { + server.register(id, idToAddrMap.get(id)); + } + + // run a client + try (final NameLookupClient client = new NameLookupClient(localAddress, this.port, + 10000, this.factory, RETRY_COUNT, RETRY_TIMEOUT, new NameCache(this.TTL), this.localAddressProvider)) { + + final Identifier id1 = this.factory.getNewInstance("task1"); + final Identifier id2 = this.factory.getNewInstance("task2"); + + final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>(); + final InetSocketAddress addr1 = client.lookup(id1); + respMap.put(id1, addr1); + final InetSocketAddress addr2 = client.lookup(id2); + respMap.put(id2, addr2); + + for (final Identifier id : respMap.keySet()) { + LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)}); + } + + Assert.assertTrue(isEqual(idToAddrMap, respMap)); + } + } + } + + /** + * Test concurrent lookups (threads share a client). + * + * @throws Exception + */ + @Test + public void testConcurrentNamingLookup() throws Exception { + + LOG.log(Level.FINEST, this.name.getMethodName()); + + final String localAddress = localAddressProvider.getLocalAddress(); + // test it 3 times to make failure likely + for (int i = 0; i < 3; i++) { + + LOG.log(Level.FINEST, "test {0}", i); + + // names + final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>(); + idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(localAddress, 7001)); + idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(localAddress, 7002)); + idToAddrMap.put(this.factory.getNewInstance("task3"), new InetSocketAddress(localAddress, 7003)); + + // run a server + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + try (final NameServer server = injector.getInstance(NameServer.class)) { + this.port = server.getPort(); + for (final Identifier id : idToAddrMap.keySet()) { + server.register(id, idToAddrMap.get(id)); + } + + // run a client + try (final NameLookupClient client = new NameLookupClient(localAddress, this.port, + 10000, this.factory, RETRY_COUNT, RETRY_TIMEOUT, new NameCache(this.TTL), this.localAddressProvider)) { + + final Identifier id1 = this.factory.getNewInstance("task1"); + final Identifier id2 = this.factory.getNewInstance("task2"); + final Identifier id3 = this.factory.getNewInstance("task3"); + + final ExecutorService e = Executors.newCachedThreadPool(); + + final ConcurrentMap<Identifier, InetSocketAddress> respMap = + new ConcurrentHashMap<Identifier, InetSocketAddress>(); + + final Future<?> f1 = e.submit(new Runnable() { + @Override + public void run() { + InetSocketAddress addr = null; + try { + addr = client.lookup(id1); + } catch (final Exception e) { + LOG.log(Level.SEVERE, "Lookup failed", e); + Assert.fail(e.toString()); + } + respMap.put(id1, addr); + } + }); + final Future<?> f2 = e.submit(new Runnable() { + @Override + public void run() { + InetSocketAddress addr = null; + try { + addr = client.lookup(id2); + } catch (final Exception e) { + LOG.log(Level.SEVERE, "Lookup failed", e); + Assert.fail(e.toString()); + } + respMap.put(id2, addr); + } + }); + final Future<?> f3 = e.submit(new Runnable() { + @Override + public void run() { + InetSocketAddress addr = null; + try { + addr = client.lookup(id3); + } catch (final Exception e) { + LOG.log(Level.SEVERE, "Lookup failed", e); + Assert.fail(e.toString()); + } + respMap.put(id3, addr); + } + }); + + f1.get(); + f2.get(); + f3.get(); + + for (final Identifier id : respMap.keySet()) { + LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)}); + } + + Assert.assertTrue(isEqual(idToAddrMap, respMap)); + } + } + } + } + + /** + * NameServer and NameRegistryClient test. + * + * @throws Exception + */ + @Test + public void testNamingRegistry() throws Exception { + + LOG.log(Level.FINEST, this.name.getMethodName()); + + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + try (final NameServer server = injector.getInstance(NameServer.class)) { + this.port = server.getPort(); + final String localAddress = localAddressProvider.getLocalAddress(); + + // names to start with + final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>(); + idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(localAddress, 7001)); + idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(localAddress, 7002)); + + // registration + // invoke registration from the client side + try (final NameRegistryClient client = + new NameRegistryClient(localAddress, this.port, this.factory, this.localAddressProvider)) { + for (final Identifier id : idToAddrMap.keySet()) { + client.register(id, idToAddrMap.get(id)); + } + + // wait + final Set<Identifier> ids = idToAddrMap.keySet(); + busyWait(server, ids.size(), ids); + + // check the server side + Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>(); + Iterable<NameAssignment> nas = server.lookup(ids); + + for (final NameAssignment na : nas) { + LOG.log(Level.FINEST, "Mapping: {0} -> {1}", + new Object[]{na.getIdentifier(), na.getAddress()}); + serverMap.put(na.getIdentifier(), na.getAddress()); + } + + Assert.assertTrue(isEqual(idToAddrMap, serverMap)); + + // un-registration + for (final Identifier id : idToAddrMap.keySet()) { + client.unregister(id); + } + + // wait + busyWait(server, 0, ids); + + serverMap = new HashMap<Identifier, InetSocketAddress>(); + nas = server.lookup(ids); + for (final NameAssignment na : nas) { + serverMap.put(na.getIdentifier(), na.getAddress()); + } + + Assert.assertEquals(0, serverMap.size()); + } + } + } + + /** + * NameServer and NameClient test. + * + * @throws Exception + */ + @Test + public void testNameClient() throws Exception { + + LOG.log(Level.FINEST, this.name.getMethodName()); + + final String localAddress = localAddressProvider.getLocalAddress(); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + try (final NameServer server = injector.getInstance(NameServer.class)) { + this.port = server.getPort(); + + final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>(); + idToAddrMap.put(this.factory.getNewInstance("task1"), new InetSocketAddress(localAddress, 7001)); + idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(localAddress, 7002)); + + // registration + // invoke registration from the client side + final Configuration nameResolverConf = NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, this.port) + .set(NameResolverConfiguration.CACHE_TIMEOUT, this.TTL) + .set(NameResolverConfiguration.RETRY_TIMEOUT, RETRY_TIMEOUT) + .set(NameResolverConfiguration.RETRY_COUNT, RETRY_COUNT) + .build(); + + try (final NameResolver client + = Tang.Factory.getTang().newInjector(nameResolverConf).getInstance(NameClient.class)) { + for (final Identifier id : idToAddrMap.keySet()) { + client.register(id, idToAddrMap.get(id)); + } + + // wait + final Set<Identifier> ids = idToAddrMap.keySet(); + busyWait(server, ids.size(), ids); + + // lookup + final Identifier id1 = this.factory.getNewInstance("task1"); + final Identifier id2 = this.factory.getNewInstance("task2"); + + final Map<Identifier, InetSocketAddress> respMap = new HashMap<Identifier, InetSocketAddress>(); + InetSocketAddress addr1 = client.lookup(id1); + respMap.put(id1, addr1); + InetSocketAddress addr2 = client.lookup(id2); + respMap.put(id2, addr2); + + for (final Identifier id : respMap.keySet()) { + LOG.log(Level.FINEST, "Mapping: {0} -> {1}", new Object[]{id, respMap.get(id)}); + } + + Assert.assertTrue(isEqual(idToAddrMap, respMap)); + + // un-registration + for (final Identifier id : idToAddrMap.keySet()) { + client.unregister(id); + } + + // wait + busyWait(server, 0, ids); + + final Map<Identifier, InetSocketAddress> serverMap = new HashMap<Identifier, InetSocketAddress>(); + addr1 = server.lookup(id1); + if (addr1 != null) { + serverMap.put(id1, addr1); + } + addr2 = server.lookup(id1); + if (addr2 != null) { + serverMap.put(id2, addr2); + } + + Assert.assertEquals(0, serverMap.size()); + } + } + } + + private boolean isEqual(final Map<Identifier, InetSocketAddress> map1, + final Map<Identifier, InetSocketAddress> map2) { + + if (map1.size() != map2.size()) { + return false; + } + + for (final Identifier id : map1.keySet()) { + final InetSocketAddress addr1 = map1.get(id); + final InetSocketAddress addr2 = map2.get(id); + if (!addr1.equals(addr2)) { + return false; + } + } + + return true; + } + + private void busyWait(final NameServer server, final int expected, final Set<Identifier> ids) { + int count = 0; + for (;;) { + final Iterable<NameAssignment> nas = server.lookup(ids); + for (@SuppressWarnings("unused") final NameAssignment na : nas) { + ++count; + } + if (count == expected) { + break; + } + count = 0; + } + } +}
