Repository: incubator-reef Updated Branches: refs/heads/master 85ce1ee69 -> 6bbc775fb
[REEF-437]:Fix DefaultRemoteManagerImplementation to use TcpPortProvider This addressed the issue by adding a new constructor that takes TcpPortProvider. JIRA: [REEF-437](https://issues.apache.org/jira/browse/REEF-437) Pull Request: This closes #267 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/6bbc775f Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/6bbc775f Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/6bbc775f Branch: refs/heads/master Commit: 6bbc775fb3f783439b9ca9c18023d7fcaad7ba7e Parents: 85ce1ee Author: Beysim Sezgin <[email protected]> Authored: Wed Jul 1 11:29:07 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Jul 1 17:51:48 2015 -0700 ---------------------------------------------------------------------- .../DefaultRemoteManagerImplementation.java | 67 +++++++++++++------- 1 file changed, 43 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6bbc775f/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java index 6dd7c98..2ba322a 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java @@ -25,6 +25,8 @@ import org.apache.reef.wake.impl.StageManager; import org.apache.reef.wake.remote.*; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; +import org.apache.reef.wake.remote.ports.TcpPortProvider; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.TransportFactory; import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory; @@ -107,32 +109,49 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout, final LocalAddressProvider localAddressProvider, final TransportFactory tpFactory) { + this(name, hostAddress, listeningPort, codec, errorHandler, orderingGuarantee, numberOfTries, retryTimeout, + localAddressProvider, tpFactory, RangeTcpPortProvider.Default); + } - this.name = name; - this.handlerContainer = new HandlerContainer<>(name, codec); - - this.reRecvStage = orderingGuarantee ? - new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) : - new RemoteReceiverStage(this.handlerContainer, errorHandler, 10); - - this.transport = tpFactory.newInstance( - hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout); - - this.handlerContainer.setTransport(this.transport); - - this.myIdentifier = new SocketRemoteIdentifier( - (InetSocketAddress) this.transport.getLocalAddress()); - - this.reSendStage = new RemoteSenderStage(codec, this.transport, 10); - - StageManager.instance().register(this); + @Inject + private <T> DefaultRemoteManagerImplementation( + @Parameter(RemoteConfiguration.ManagerName.class) final String name, + @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress, + @Parameter(RemoteConfiguration.Port.class) final int listeningPort, + @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> codec, + @Parameter(RemoteConfiguration.ErrorHandler.class) final EventHandler<Throwable> errorHandler, + @Parameter(RemoteConfiguration.OrderingGuarantee.class) final boolean orderingGuarantee, + @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries, + @Parameter(RemoteConfiguration.RetryTimeout.class) final int retryTimeout, + final LocalAddressProvider localAddressProvider, + final TransportFactory tpFactory, + final TcpPortProvider tcpPortProvider) { + + this.name = name; + this.handlerContainer = new HandlerContainer<>(name, codec); + + this.reRecvStage = orderingGuarantee ? + new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) : + new RemoteReceiverStage(this.handlerContainer, errorHandler, 10); + + this.transport = tpFactory.newInstance( + hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, tcpPortProvider); + + this.handlerContainer.setTransport(this.transport); + + this.myIdentifier = new SocketRemoteIdentifier( + (InetSocketAddress) this.transport.getLocalAddress()); + + this.reSendStage = new RemoteSenderStage(codec, this.transport, 10); + + StageManager.instance().register(this); + LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}. Binding address provided by {5}", + new Object[]{this.name, this.myIdentifier, COUNTER.incrementAndGet(), + this.transport.getLocalAddress().toString(), + this.transport.getListeningPort(), localAddressProvider} + ); + } - LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}. Binding address provided by {5}", - new Object[]{this.name, this.myIdentifier, COUNTER.incrementAndGet(), - this.transport.getLocalAddress().toString(), - this.transport.getListeningPort(), localAddressProvider} - ); - } /** * Returns a proxy event handler for a remote identifier and a message type.
