Repository: incubator-reef Updated Branches: refs/heads/master 1d2003346 -> 474760f78
[REEF-264] Make the NameService injectable JIRA: [REEF-264](https://issues.apache.org/jira/browse/REEF-264) Pull Request: This closes #247 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/474760f7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/474760f7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/474760f7 Branch: refs/heads/master Commit: 474760f7800cee7f0d00f1dde29aebf8c74eecf0 Parents: 1d20033 Author: taegeonum <[email protected]> Authored: Fri Jun 26 16:15:54 2015 +0900 Committer: Markus Weimer <[email protected]> Committed: Sat Jun 27 15:42:34 2015 -0700 ---------------------------------------------------------------------- .../group/impl/driver/GroupCommDriverImpl.java | 21 +++++++++++-- .../reef/io/network/naming/NameServer.java | 2 ++ .../reef/io/network/naming/NameServerImpl.java | 17 +++++----- .../reef/services/network/NameClientTest.java | 19 ++++++++--- .../reef/services/network/NamingTest.java | 22 ++++++++++--- .../services/network/NetworkServiceTest.java | 33 +++++++++++++++----- 6 files changed, 85 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/474760f7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java index 2ce98a3..5ac4fa8 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/GroupCommDriverImpl.java @@ -56,8 +56,8 @@ import org.apache.reef.wake.impl.SyncStage; import org.apache.reef.wake.impl.ThreadPoolStage; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; -import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; import org.apache.reef.wake.remote.transport.TransportFactory; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; import javax.inject.Inject; import java.util.HashMap; @@ -139,10 +139,25 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { @Parameter(TreeTopologyFanOut.class) final int fanOut, final LocalAddressProvider localAddressProvider, final TransportFactory tpFactory) { + this(confSerializer, driverId, fanOut, localAddressProvider, tpFactory, + new NameServerImpl(0, new StringIdentifierFactory())); + } + + /** + * @deprecated in 0.12. Use Tang to obtain an instance of this instead. + */ + @Deprecated + @Inject + public GroupCommDriverImpl(final ConfigurationSerializer confSerializer, + @Parameter(DriverIdentifier.class) final String driverId, + @Parameter(TreeTopologyFanOut.class) final int fanOut, + final LocalAddressProvider localAddressProvider, + final TransportFactory tpFactory, + final NameServer nameService) { assert (SingletonAsserter.assertSingleton(getClass())); this.driverId = driverId; this.fanOut = fanOut; - this.nameService = new NameServerImpl(0, idFac, localAddressProvider); + this.nameService = nameService; this.nameServiceAddr = localAddressProvider.getLocalAddress(); this.nameServicePort = nameService.getPort(); this.confSerializer = confSerializer; @@ -280,4 +295,4 @@ public class GroupCommDriverImpl implements GroupCommServiceDriver { return groupCommFailedEvaluatorStage; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/474760f7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java index 0625b4c..7b79539 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServer.java @@ -20,6 +20,7 @@ package org.apache.reef.io.network.naming; import org.apache.reef.io.naming.NameAssignment; +import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.wake.Identifier; import org.apache.reef.wake.Stage; @@ -29,6 +30,7 @@ import java.util.List; /** * Naming server interface. */ +@DefaultImplementation(NameServerImpl.class) public interface NameServer extends Stage { /** http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/474760f7/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java index 9149b3c..d990aca 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java @@ -50,7 +50,7 @@ import java.util.logging.Logger; /** * Naming server implementation. */ -public class NameServerImpl implements NameServer { +public final class NameServerImpl implements NameServer { private static final Logger LOG = Logger.getLogger(NameServer.class.getName()); @@ -63,14 +63,15 @@ public class NameServerImpl implements NameServer { /** * @param port a listening port number * @param factory an identifier factory - * @deprecated inject the NameServer instead of new it up + * @param localAddressProvider a local address provider * Constructs a name server + * @deprecated in 0.12. Use Tang to obtain an instance of this or, better, NameServer, instead. */ - // TODO: All existing NameServer usage is currently new-up, need to make them injected as well. @Deprecated + @Inject public NameServerImpl( - final int port, - final IdentifierFactory factory, + @Parameter(NameServerParameters.NameServerPort.class) final int port, + @Parameter(NameServerParameters.NameServerIdentifierFactory.class) final IdentifierFactory factory, final LocalAddressProvider localAddressProvider) { Injector injector = Tang.Factory.getTang().newInjector(); @@ -102,7 +103,6 @@ public class NameServerImpl implements NameServer { @Deprecated public NameServerImpl(final int port, final IdentifierFactory factory) { this(port, factory, LocalAddressProviderFactory.getInstance()); - } /** @@ -134,7 +134,6 @@ public class NameServerImpl implements NameServer { this(port, factory, reefEventStateManager, localAddressProvider, new MessagingTransportFactory()); } - /** * Constructs a name server. * @@ -143,7 +142,9 @@ public class NameServerImpl implements NameServer { * @param reefEventStateManager the event state manager used to register name server info * @param localAddressProvider a local address provider * @param tpFactory a transport factory + * @deprecated in 0.12. Use Tang to obtain an instance of this or, better, NameServer, instead. */ + @Deprecated @Inject public NameServerImpl( @Parameter(NameServerParameters.NameServerPort.class) final int port, @@ -344,4 +345,4 @@ class NamingUnregisterRequestHandler implements EventHandler<NamingUnregisterReq public void onNext(final NamingUnregisterRequest value) { server.unregister(value.getIdentifier()); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/474760f7/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java index 18787db..571b8b4 100644 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NameClientTest.java @@ -21,6 +21,7 @@ package org.apache.reef.services.network; import org.apache.reef.io.network.naming.*; import org.apache.reef.io.network.naming.exception.NamingException; import org.apache.reef.io.network.util.StringIdentifierFactory; +import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.Identifier; @@ -77,8 +78,12 @@ public class NameClientTest { @Test public final void testClose() throws Exception { final String localAddress = localAddressProvider.getLocalAddress(); - IdentifierFactory factory = new StringIdentifierFactory(); - try (NameServer server = new NameServerImpl(0, factory, this.localAddressProvider)) { + final IdentifierFactory factory = new StringIdentifierFactory(); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + + try (final NameServer server = injector.getInstance(NameServer.class)) { int serverPort = server.getPort(); try (NameClient client = new NameClient(localAddress, serverPort, factory, retryCount, retryTimeout, new NameCache(10000), localAddressProvider)) { @@ -99,9 +104,13 @@ public class NameClientTest { */ @Test public final void testLookup() throws Exception { - IdentifierFactory factory = new StringIdentifierFactory(); final String localAddress = localAddressProvider.getLocalAddress(); - try (NameServer server = new NameServerImpl(0, factory, this.localAddressProvider)) { + final IdentifierFactory factory = new StringIdentifierFactory(); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + + try (final NameServer server = injector.getInstance(NameServer.class)) { int serverPort = server.getPort(); try (NameClient client = new NameClient(localAddress, serverPort, factory, retryCount, retryTimeout, new NameCache(150), localAddressProvider)) { @@ -129,4 +138,4 @@ public class NameClientTest { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/474760f7/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java index 139595b..ef6f30a 100644 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NamingTest.java @@ -90,7 +90,10 @@ public class NamingTest { idToAddrMap.put(this.factory.getNewInstance("task2"), new InetSocketAddress(localAddress, 7002)); // run a server - final NameServer server = new NameServerImpl(0, this.factory, this.localAddressProvider); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + final NameServer server = injector.getInstance(NameServer.class); this.port = server.getPort(); for (final Identifier id : idToAddrMap.keySet()) { server.register(id, idToAddrMap.get(id)); @@ -142,7 +145,10 @@ public class NamingTest { idToAddrMap.put(this.factory.getNewInstance("task3"), new InetSocketAddress(localAddress, 7003)); // run a server - final NameServer server = new NameServerImpl(0, this.factory, this.localAddressProvider); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + final NameServer server = injector.getInstance(NameServer.class); this.port = server.getPort(); for (final Identifier id : idToAddrMap.keySet()) { server.register(id, idToAddrMap.get(id)); @@ -225,7 +231,10 @@ public class NamingTest { LOG.log(Level.FINEST, this.name.getMethodName()); - final NameServer server = new NameServerImpl(0, this.factory, this.localAddressProvider); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + final NameServer server = injector.getInstance(NameServer.class); this.port = server.getPort(); final String localAddress = localAddressProvider.getLocalAddress(); @@ -287,7 +296,10 @@ public class NamingTest { LOG.log(Level.FINEST, this.name.getMethodName()); final String localAddress = localAddressProvider.getLocalAddress(); - final NameServer server = new NameServerImpl(0, this.factory, this.localAddressProvider); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, this.factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + final NameServer server = injector.getInstance(NameServer.class); this.port = server.getPort(); final Map<Identifier, InetSocketAddress> idToAddrMap = new HashMap<Identifier, InetSocketAddress>(); @@ -373,4 +385,4 @@ public class NamingTest { count = 0; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/474760f7/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java index 9976026..3c5f0b8 100644 --- a/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java +++ b/lang/java/reef-io/src/test/java/org/apache/reef/services/network/NetworkServiceTest.java @@ -21,19 +21,21 @@ package org.apache.reef.services.network; import org.apache.reef.exception.evaluator.NetworkException; import org.apache.reef.io.network.Connection; import org.apache.reef.io.network.Message; -import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; import org.apache.reef.io.network.impl.NetworkService; import org.apache.reef.io.network.naming.NameServer; -import org.apache.reef.io.network.naming.NameServerImpl; +import org.apache.reef.io.network.naming.NameServerParameters; import org.apache.reef.io.network.util.StringIdentifierFactory; import org.apache.reef.services.network.util.Monitor; import org.apache.reef.services.network.util.StringCodec; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.Identifier; import org.apache.reef.wake.IdentifierFactory; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -70,7 +72,10 @@ public class NetworkServiceTest { IdentifierFactory factory = new StringIdentifierFactory(); - NameServer server = new NameServerImpl(0, factory, localAddressProvider); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + final NameServer server = injector.getInstance(NameServer.class); int nameServerPort = server.getPort(); final int numMessages = 10; @@ -125,7 +130,10 @@ public class NetworkServiceTest { IdentifierFactory factory = new StringIdentifierFactory(); - NameServer server = new NameServerImpl(0, factory, localAddressProvider); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + final NameServer server = injector.getInstance(NameServer.class); int nameServerPort = server.getPort(); final int[] messageSizes = {1, 16, 32, 64, 512, 64 * 1024, 1024 * 1024}; @@ -196,7 +204,10 @@ public class NetworkServiceTest { final IdentifierFactory factory = new StringIdentifierFactory(); - final NameServer server = new NameServerImpl(0, factory, localAddressProvider); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + final NameServer server = injector.getInstance(NameServer.class); final int nameServerPort = server.getPort(); BlockingQueue<Object> barrier = new LinkedBlockingQueue<Object>(); @@ -288,7 +299,10 @@ public class NetworkServiceTest { IdentifierFactory factory = new StringIdentifierFactory(); - NameServer server = new NameServerImpl(0, factory, localAddressProvider); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + final NameServer server = injector.getInstance(NameServer.class); int nameServerPort = server.getPort(); final int[] messageSizes = {2000};// {1,16,32,64,512,64*1024,1024*1024}; @@ -377,7 +391,10 @@ public class NetworkServiceTest { IdentifierFactory factory = new StringIdentifierFactory(); - NameServer server = new NameServerImpl(0, factory, localAddressProvider); + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(NameServerParameters.NameServerIdentifierFactory.class, factory); + injector.bindVolatileInstance(LocalAddressProvider.class, this.localAddressProvider); + final NameServer server = injector.getInstance(NameServer.class); int nameServerPort = server.getPort(); final int batchSize = 1024 * 1024; @@ -489,4 +506,4 @@ public class NetworkServiceTest { System.err.println(error); } } -} +} \ No newline at end of file
