Repository: incubator-reef Updated Branches: refs/heads/master a9919be4a -> 8c494adf9
[REEF-641] NetworkConnectionService should look up a correct end point id with deprecated methods This addressed the issue by: * Make NetworkConnectionFactory open a link without connection factory id if it was registered by deprecated method * Change NetworkConnectionServiceImpl#unregisterConnectionFactory not to unregister the local end point id for the NCF which was registered by deprecated method * Add DeprecatedNetworkConnectionServiceTest which tests the deprecated methods of NetworkConnectionService JIRA: [REEF-641](https://issues.apache.org/jira/browse/REEF-641) Pull Request: Closes #411 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8c494adf Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8c494adf Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8c494adf Branch: refs/heads/master Commit: 8c494adf9ae902fcb0a5d6e62f88a9fe15866cb1 Parents: a9919be Author: Kim_Geon_Woo <[email protected]> Authored: Tue Aug 25 14:47:27 2015 +0900 Committer: Brian Cho <[email protected]> Committed: Fri Aug 28 14:00:45 2015 +0900 ---------------------------------------------------------------------- .../io/network/NetworkConnectionService.java | 5 + .../BindNetworkConnectionServiceToTask.java | 1 + .../network/impl/NetworkConnectionFactory.java | 15 + .../impl/NetworkConnectionServiceImpl.java | 40 +- .../UnbindNetworkConnectionServiceFromTask.java | 2 + .../DeprecatedNetworkConnectionServiceTest.java | 407 +++++++++++++++++++ .../DeprecatedNetworkMessagingTestService.java | 150 +++++++ 7 files changed, 617 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java index 448b941..f87490b 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/NetworkConnectionService.java @@ -26,6 +26,7 @@ import org.apache.reef.wake.Identifier; import org.apache.reef.wake.remote.Codec; import org.apache.reef.wake.remote.transport.LinkListener; +// TODO[JIRA REEF-637] Annotate the class as @Unstable. /** * NetworkConnectionService. * @@ -49,6 +50,7 @@ import org.apache.reef.wake.remote.transport.LinkListener; @DefaultImplementation(NetworkConnectionServiceImpl.class) public interface NetworkConnectionService extends AutoCloseable { + // TODO[JIRA REEF-637] Remove the deprecated method. /** * Registers an instance of ConnectionFactory corresponding to the connectionFactoryId. * Binds Codec, EventHandler and LinkListener to the ConnectionFactory. @@ -109,6 +111,7 @@ public interface NetworkConnectionService extends AutoCloseable { */ void close() throws Exception; + // TODO[JIRA REEF-637] Remove the deprecated method. /** * Registers a network connection service identifier. * This can be used for destination identifier @@ -119,6 +122,7 @@ public interface NetworkConnectionService extends AutoCloseable { @Deprecated void registerId(final Identifier ncsId); + // TODO[JIRA REEF-637] Remove the deprecated method. /** * Unregister a network connection service identifier. * @param ncsId network connection service identifier @@ -127,6 +131,7 @@ public interface NetworkConnectionService extends AutoCloseable { @Deprecated void unregisterId(final Identifier ncsId); + // TODO[JIRA REEF-637] Remove the deprecated method. /** * Gets a network connection service client id which is equal to the registered id. * @deprecated in 0.13. Use ConnectionFactory.getLocalEndPointId instead. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java index ad2f47f..e7d58e3 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/BindNetworkConnectionServiceToTask.java @@ -27,6 +27,7 @@ import org.apache.reef.wake.IdentifierFactory; import javax.inject.Inject; +// TODO[JIRA REEF-637] Remove the deprecated class. /** * TaskStart event handler for registering NetworkConnectionService. * Users have to bind this handler into ServiceConfiguration.ON_TASK_STARTED. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java index fc7c11e..4224ec8 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionFactory.java @@ -77,9 +77,23 @@ final class NetworkConnectionFactory<T> implements ConnectionFactory<T> { } Link<NetworkConnectionServiceMessage<T>> openLink(final Identifier remoteId) throws NetworkException { + // TODO[JIRA REEF-637] : Remove below if statement. + if (isRegisteredByDeprecatedMethod()) { + return networkService.openLink(remoteId); + } + return networkService.openLink(connectionFactoryId, remoteId); } + // TODO[JIRA REEF-637] Remove the deprecated method. + /** + * @deprecated in 0.13. + */ + @Deprecated + boolean isRegisteredByDeprecatedMethod() { + return localEndPointId == null; + } + @Override public Identifier getConnectionFactoryId() { return connectionFactoryId; @@ -90,6 +104,7 @@ final class NetworkConnectionFactory<T> implements ConnectionFactory<T> { return localEndPointId; } + // TODO[JIRA REEF-637] Remove the deprecated method. /** * @deprecated in 0.13. Use getLocalEndPointId() instead. */ http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java index 7ad86c8..c11f8ea 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/NetworkConnectionServiceImpl.java @@ -72,6 +72,7 @@ public final class NetworkConnectionServiceImpl implements NetworkConnectionServ * A map of (id of connection factory, a connection factory instance). */ private final ConcurrentMap<String, NetworkConnectionFactory> connFactoryMap; + // TODO[JIRA REEF-637] Remove the deprecated field. /** * A network connection service identifier. * @deprecated in 0.13. Use ConnectionFactory.getLocalEndPointId instead. @@ -152,6 +153,7 @@ public final class NetworkConnectionServiceImpl implements NetworkConnectionServ this.isClosed = new AtomicBoolean(); } + // TODO[JIRA REEF-637] Remove the deprecated method. /** * @deprecated in 0.13. Use registerConnectionFactory(Identifier, Codec, EventHandler, LinkListener, Identifier) * instead. @@ -188,6 +190,7 @@ public final class NetworkConnectionServiceImpl implements NetworkConnectionServ "The ConnectionFactoryId " + connectionFactoryId + " should not contain " + DELIMITER); } } + @Override public <T> ConnectionFactory<T> registerConnectionFactory( final Identifier connectionFactoryId, @@ -207,21 +210,29 @@ public final class NetworkConnectionServiceImpl implements NetworkConnectionServ throw new NetworkRuntimeException("ConnectionFactory " + connectionFactoryId + " was already registered."); } + LOG.log(Level.INFO, "ConnectionFactory {0} was registered", id); + return connectionFactory; } @Override public void unregisterConnectionFactory(final Identifier connFactoryId) { final String id = connFactoryId.toString(); - final ConnectionFactory connFactory = connFactoryMap.remove(id); + final NetworkConnectionFactory connFactory = connFactoryMap.remove(id); if (connFactory != null) { - final Identifier localId = getEndPointIdWithConnectionFactoryId(connFactoryId, connFactory.getLocalEndPointId()); - nameServiceUnregisteringStage.onNext(localId); + LOG.log(Level.INFO, "ConnectionFactory {0} was unregistered", id); + + if (!connFactory.isRegisteredByDeprecatedMethod()) { // TODO[JIRA REEF-637] : Remove the redundant check. + final Identifier localId = getEndPointIdWithConnectionFactoryId( + connFactoryId, connFactory.getLocalEndPointId()); + nameServiceUnregisteringStage.onNext(localId); + } } else { LOG.log(Level.WARNING, "ConnectionFactory of {0} is null", id); } } + // TODO[JIRA REEF-637] Remove the deprecated method. /** * Registers a source identifier of NetworkConnectionService. * @param ncsId @@ -243,6 +254,7 @@ public final class NetworkConnectionServiceImpl implements NetworkConnectionServ /** * Open a channel for destination identifier of NetworkConnectionService. + * @param connectionFactoryId * @param remoteEndPointId * @throws NetworkException */ @@ -260,6 +272,26 @@ public final class NetworkConnectionServiceImpl implements NetworkConnectionServ } } + // TODO[JIRA REEF-637] Remove the deprecated method. + /** + * Open a channel for destination identifier of NetworkConnectionService. + * @param remoteEndPointId + * @throws NetworkException + * @deprecated in 0.13. Use openLink(Identifier, Identifier) instead. + */ + @Deprecated + <T> Link<NetworkConnectionServiceMessage<T>> openLink(final Identifier remoteEndPointId) throws NetworkException { + try { + final SocketAddress address = nameResolver.lookup(remoteEndPointId); + if (address == null) { + throw new NetworkException("Lookup " + remoteEndPointId + " is null"); + } + return transport.open(address, nsCodec, nsLinkListener); + } catch(final Exception e) { + throw new NetworkException(e); + } + } + private Identifier getEndPointIdWithConnectionFactoryId( final Identifier connectionFactoryId, final Identifier endPointId) { final String identifier = connectionFactoryId.toString() + DELIMITER + endPointId.toString(); @@ -279,6 +311,7 @@ public final class NetworkConnectionServiceImpl implements NetworkConnectionServ return connFactory; } + // TODO[JIRA REEF-637] Remove the deprecated method. /** * @param ncsId network connection service identifier * @deprecated in 0.13. @@ -292,6 +325,7 @@ public final class NetworkConnectionServiceImpl implements NetworkConnectionServ this.nameServiceUnregisteringStage.onNext(ncsId); } + // TODO[JIRA REEF-637] Remove the deprecated method. /** * @return the identifier of this NetworkConnectionService * @deprecated in 0.13. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java index 6b94a8b..b9a31fb 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/UnbindNetworkConnectionServiceFromTask.java @@ -26,6 +26,8 @@ import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.IdentifierFactory; import javax.inject.Inject; + +// TODO[JIRA REEF-637] Remove the deprecated class. /** * TaskStop event handler for unregistering NetworkConnectionService. * Users have to bind this handler into ServiceConfiguration.ON_TASK_STOP. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java new file mode 100644 index 0000000..76ccbc9 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/DeprecatedNetworkConnectionServiceTest.java @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.services.network; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.Connection; +import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.services.network.util.*; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +import java.util.concurrent.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +// TODO[JIRA REEF-637] Remove the deprecated class. +/** + * Test for deprecated methods, which are deprecated in 0.13, of NetworkConnectionService. + */ +@Deprecated +public final class DeprecatedNetworkConnectionServiceTest { + private static final Logger LOG = Logger.getLogger(DeprecatedNetworkConnectionServiceTest.class.getName()); + + private final LocalAddressProvider localAddressProvider; + private final String localAddress; + private final Identifier groupCommClientId; + private final Identifier shuffleClientId; + + public DeprecatedNetworkConnectionServiceTest() throws InjectionException { + localAddressProvider = LocalAddressProviderFactory.getInstance(); + localAddress = localAddressProvider.getLocalAddress(); + + final IdentifierFactory idFac = new StringIdentifierFactory(); + this.groupCommClientId = idFac.getNewInstance("groupComm"); + this.shuffleClientId = idFac.getNewInstance("shuffle"); + } + + @Rule + public TestName name = new TestName(); + + private void runMessagingNetworkConnectionService(final Codec<String> codec) throws Exception { + final int numMessages = 2000; + final Monitor monitor = new Monitor(); + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + + try (final Connection<String> conn = messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write("hello" + count); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + } + + /** + * NetworkConnectionService messaging test. + */ + @Test + public void testMessagingNetworkConnectionService() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runMessagingNetworkConnectionService(new StringCodec()); + } + + /** + * NetworkConnectionService streaming messaging test. + */ + @Test + public void testStreamingMessagingNetworkConnectionService() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runMessagingNetworkConnectionService(new StreamingStringCodec()); + } + + public void runNetworkConnServiceWithMultipleConnFactories(final Codec<String> stringCodec, + final Codec<Integer> integerCodec) + throws Exception { + final ExecutorService executor = Executors.newFixedThreadPool(5); + + final int groupcommMessages = 1000; + final Monitor monitor = new Monitor(); + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + + messagingTestService.registerTestConnectionFactory(groupCommClientId, groupcommMessages, monitor, stringCodec); + + final int shuffleMessages = 2000; + final Monitor monitor2 = new Monitor(); + messagingTestService.registerTestConnectionFactory(shuffleClientId, shuffleMessages, monitor2, integerCodec); + + executor.submit(new Runnable() { + @Override + public void run() { + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + conn.open(); + for (int count = 0; count < groupcommMessages; ++count) { + // send messages to the receiver. + conn.write("hello" + count); + } + monitor.mwait(); + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + }); + + executor.submit(new Runnable() { + @Override + public void run() { + try (final Connection<Integer> conn = + messagingTestService.getConnectionFromSenderToReceiver(shuffleClientId)) { + conn.open(); + for (int count = 0; count < shuffleMessages; ++count) { + // send messages to the receiver. + conn.write(count); + } + monitor2.mwait(); + } catch (final Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + }); + + monitor.mwait(); + monitor2.mwait(); + executor.shutdown(); + } + } + + /** + * Test NetworkService registering multiple connection factories. + */ + @Test + public void testMultipleConnectionFactoriesTest() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runNetworkConnServiceWithMultipleConnFactories(new StringCodec(), new ObjectSerializableCodec<Integer>()); + } + + /** + * Test NetworkService registering multiple connection factories with Streamingcodec. + */ + @Test + public void testMultipleConnectionFactoriesStreamingTest() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + runNetworkConnServiceWithMultipleConnFactories(new StreamingStringCodec(), new StreamingIntegerCodec()); + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024}; + + for (final int size : messageSizes) { + final int numMessages = 300000 / (Math.max(1, size / 512)); + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + final long start = System.currentTimeMillis(); + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + final long end = System.currentTimeMillis(); + + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numMessages / runtime + + " bandwidth(bytes/s): " + ((double) numMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceRateDisjoint() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>(); + + final int numThreads = 4; + final int size = 2000; + final int numMessages = 300000 / (Math.max(1, size / 512)); + final int totalNumMessages = numMessages * numThreads; + + final ExecutorService e = Executors.newCachedThreadPool(); + for (int t = 0; t < numThreads; t++) { + final int tt = t; + + e.submit(new Runnable() { + public void run() { + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + // start and time + final long start = System.currentTimeMillis(); + final Object ignore = new Object(); + for (int i = 0; i < numThreads; i++) { + barrier.add(ignore); + } + e.shutdown(); + e.awaitTermination(100, TimeUnit.SECONDS); + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars + } + + @Test + public void testMultithreadedSharedConnMessagingNetworkConnServiceRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + final int[] messageSizes = {2000}; // {1,16,32,64,512,64*1024,1024*1024}; + + for (final int size : messageSizes) { + final int numMessages = 300000 / (Math.max(1, size / 512)); + final int numThreads = 2; + final int totalNumMessages = numMessages * numThreads; + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, totalNumMessages, monitor, codec); + + final ExecutorService e = Executors.newCachedThreadPool(); + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + final long start = System.currentTimeMillis(); + for (int i = 0; i < numThreads; i++) { + e.submit(new Runnable() { + @Override + public void run() { + + try { + conn.open(); + for (int count = 0; count < numMessages; ++count) { + // send messages to the receiver. + conn.write(message); + } + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + }); + } + + e.shutdown(); + e.awaitTermination(30, TimeUnit.SECONDS); + monitor.mwait(); + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + totalNumMessages / runtime + + " bandwidth(bytes/s): " + ((double) totalNumMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } + + /** + * NetworkService messaging rate benchmark. + */ + @Test + public void testMessagingNetworkConnServiceBatchingRate() throws Exception { + LOG.log(Level.FINEST, name.getMethodName()); + + final int batchSize = 1024 * 1024; + final int[] messageSizes = {32, 64, 512}; + + for (final int size : messageSizes) { + final int numMessages = 300 / (Math.max(1, size / 512)); + final Monitor monitor = new Monitor(); + final Codec<String> codec = new StringCodec(); + try (final DeprecatedNetworkMessagingTestService messagingTestService + = new DeprecatedNetworkMessagingTestService(localAddress)) { + messagingTestService.registerTestConnectionFactory(groupCommClientId, numMessages, monitor, codec); + try (final Connection<String> conn = + messagingTestService.getConnectionFromSenderToReceiver(groupCommClientId)) { + + // build the message + final StringBuilder msb = new StringBuilder(); + for (int i = 0; i < size; i++) { + msb.append("1"); + } + final String message = msb.toString(); + + final long start = System.currentTimeMillis(); + try { + for (int i = 0; i < numMessages; i++) { + final StringBuilder sb = new StringBuilder(); + for (int j = 0; j < batchSize / size; j++) { + sb.append(message); + } + conn.open(); + conn.write(sb.toString()); + } + monitor.mwait(); + } catch (final NetworkException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + + final long end = System.currentTimeMillis(); + final double runtime = ((double) end - start) / 1000; + final long numAppMessages = numMessages * batchSize / size; + LOG.log(Level.INFO, "size: " + size + "; messages/s: " + numAppMessages / runtime + + " bandwidth(bytes/s): " + ((double) numAppMessages * 2 * size) / runtime); // x2 for unicode chars + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c494adf/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java new file mode 100644 index 0000000..4c4bd69 --- /dev/null +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/util/DeprecatedNetworkMessagingTestService.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.services.network.util; + +import org.apache.reef.exception.evaluator.NetworkException; +import org.apache.reef.io.network.Connection; +import org.apache.reef.io.network.Message; +import org.apache.reef.io.network.NetworkConnectionService; +import org.apache.reef.io.network.impl.config.NetworkConnectionServiceIdFactory; +import org.apache.reef.io.network.naming.NameResolver; +import org.apache.reef.io.network.naming.NameResolverConfiguration; +import org.apache.reef.io.network.naming.NameServer; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.Identifier; +import org.apache.reef.wake.IdentifierFactory; +import org.apache.reef.wake.remote.Codec; +import org.apache.reef.wake.remote.transport.LinkListener; + +import java.net.SocketAddress; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +// TODO[JIRA REEF-637] Remove the deprecated class. +/** + * Helper class for DeprecatedNetworkConnectionService test, deprecated in 0.13. + */ +@Deprecated +public final class DeprecatedNetworkMessagingTestService implements AutoCloseable { + private static final Logger LOG = Logger.getLogger(DeprecatedNetworkMessagingTestService.class.getName()); + + private final IdentifierFactory factory; + private final NetworkConnectionService receiverNetworkConnService; + private final NetworkConnectionService senderNetworkConnService; + private final String receiver; + private final String sender; + private final NameServer nameServer; + private final NameResolver receiverResolver; + private final NameResolver senderResolver; + + public DeprecatedNetworkMessagingTestService(final String localAddress) throws InjectionException { + // name server + final Injector injector = Tang.Factory.getTang().newInjector(); + this.nameServer = injector.getInstance(NameServer.class); + final Configuration netConf = NameResolverConfiguration.CONF + .set(NameResolverConfiguration.NAME_SERVER_HOSTNAME, localAddress) + .set(NameResolverConfiguration.NAME_SERVICE_PORT, nameServer.getPort()) + .build(); + + LOG.log(Level.FINEST, "=== Test network connection service receiver start"); + // network service for receiver + this.receiver = "receiver"; + final Injector injectorReceiver = injector.forkInjector(netConf); + this.receiverNetworkConnService = injectorReceiver.getInstance(NetworkConnectionService.class); + this.receiverResolver = injectorReceiver.getInstance(NameResolver.class); + this.factory = injectorReceiver.getNamedInstance(NetworkConnectionServiceIdFactory.class); + this.receiverNetworkConnService.registerId(this.factory.getNewInstance(receiver)); + + // network service for sender + this.sender = "sender"; + LOG.log(Level.FINEST, "=== Test network connection service sender start"); + final Injector injectorSender = injector.forkInjector(netConf); + senderNetworkConnService = injectorSender.getInstance(NetworkConnectionService.class); + senderNetworkConnService.registerId(this.factory.getNewInstance(this.sender)); + this.senderResolver = injectorSender.getInstance(NameResolver.class); + } + + public <T> void registerTestConnectionFactory(final Identifier connFactoryId, + final int numMessages, final Monitor monitor, + final Codec<T> codec) throws NetworkException { + receiverNetworkConnService.registerConnectionFactory(connFactoryId, codec, + new MessageHandler<T>(monitor, numMessages), new TestListener<T>()); + senderNetworkConnService.registerConnectionFactory(connFactoryId, codec, + new MessageHandler<T>(monitor, numMessages), new TestListener<T>()); + } + + public <T> Connection<T> getConnectionFromSenderToReceiver(final Identifier connFactoryId) { + final Identifier destId = factory.getNewInstance(receiver); + return (Connection<T>)senderNetworkConnService.getConnectionFactory(connFactoryId).newConnection(destId); + } + + public void close() throws Exception { + senderNetworkConnService.close(); + receiverNetworkConnService.close(); + nameServer.close(); + receiverResolver.close(); + senderResolver.close(); + } + + public static final class MessageHandler<T> implements EventHandler<Message<T>> { + private final int expected; + private final Monitor monitor; + private AtomicInteger count = new AtomicInteger(0); + + public MessageHandler(final Monitor monitor, + final int expected) { + this.monitor = monitor; + this.expected = expected; + } + + @Override + public void onNext(final Message<T> value) { + count.incrementAndGet(); + LOG.log(Level.FINE, "Count: {0}", count.get()); + LOG.log(Level.FINE, + "OUT: {0} received {1} from {2} to {3}", + new Object[]{value, value.getSrcId(), value.getDestId()}); + + for (final T obj : value.getData()) { + LOG.log(Level.FINE, "OUT: data: {0}", obj); + } + + if (count.get() == expected) { + monitor.mnotify(); + } + } + } + + public static final class TestListener<T> implements LinkListener<Message<T>> { + @Override + public void onSuccess(final Message<T> message) { + LOG.log(Level.FINE, "success: " + message); + } + @Override + public void onException(final Throwable cause, final SocketAddress remoteAddress, final Message<T> message) { + LOG.log(Level.WARNING, "exception: " + cause + message); + throw new RuntimeException(cause); + } + } +}
