Repository: incubator-reef
Updated Branches:
  refs/heads/master 85ce1ee69 -> 6bbc775fb


[REEF-437]:Fix DefaultRemoteManagerImplementation to use TcpPortProvider

This addressed the issue by adding a new constructor that takes TcpPortProvider.

JIRA:
  [REEF-437](https://issues.apache.org/jira/browse/REEF-437)

Pull Request:
  This closes #267


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/6bbc775f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/6bbc775f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/6bbc775f

Branch: refs/heads/master
Commit: 6bbc775fb3f783439b9ca9c18023d7fcaad7ba7e
Parents: 85ce1ee
Author: Beysim Sezgin <[email protected]>
Authored: Wed Jul 1 11:29:07 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Jul 1 17:51:48 2015 -0700

----------------------------------------------------------------------
 .../DefaultRemoteManagerImplementation.java     | 67 +++++++++++++-------
 1 file changed, 43 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/6bbc775f/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index 6dd7c98..2ba322a 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -25,6 +25,8 @@ import org.apache.reef.wake.impl.StageManager;
 import org.apache.reef.wake.remote.*;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.address.LocalAddressProviderFactory;
+import org.apache.reef.wake.remote.ports.RangeTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
 import org.apache.reef.wake.remote.transport.Transport;
 import org.apache.reef.wake.remote.transport.TransportFactory;
 import org.apache.reef.wake.remote.transport.netty.MessagingTransportFactory;
@@ -107,32 +109,49 @@ public class DefaultRemoteManagerImplementation 
implements RemoteManager {
       @Parameter(RemoteConfiguration.RetryTimeout.class) final int 
retryTimeout,
       final LocalAddressProvider localAddressProvider,
       final TransportFactory tpFactory) {
+      this(name, hostAddress, listeningPort, codec, errorHandler, 
orderingGuarantee, numberOfTries, retryTimeout,
+              localAddressProvider, tpFactory, RangeTcpPortProvider.Default);
+  }
 
-    this.name = name;
-    this.handlerContainer = new HandlerContainer<>(name, codec);
-
-    this.reRecvStage = orderingGuarantee ?
-        new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) :
-        new RemoteReceiverStage(this.handlerContainer, errorHandler, 10);
-
-    this.transport = tpFactory.newInstance(
-        hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, 
numberOfTries, retryTimeout);
-
-    this.handlerContainer.setTransport(this.transport);
-
-    this.myIdentifier = new SocketRemoteIdentifier(
-        (InetSocketAddress) this.transport.getLocalAddress());
-
-    this.reSendStage = new RemoteSenderStage(codec, this.transport, 10);
-
-    StageManager.instance().register(this);
+    @Inject
+    private <T> DefaultRemoteManagerImplementation(
+            @Parameter(RemoteConfiguration.ManagerName.class) final String 
name,
+            @Parameter(RemoteConfiguration.HostAddress.class) final String 
hostAddress,
+            @Parameter(RemoteConfiguration.Port.class) final int listeningPort,
+            @Parameter(RemoteConfiguration.MessageCodec.class) final Codec<T> 
codec,
+            @Parameter(RemoteConfiguration.ErrorHandler.class) final 
EventHandler<Throwable> errorHandler,
+            @Parameter(RemoteConfiguration.OrderingGuarantee.class) final 
boolean orderingGuarantee,
+            @Parameter(RemoteConfiguration.NumberOfTries.class) final int 
numberOfTries,
+            @Parameter(RemoteConfiguration.RetryTimeout.class) final int 
retryTimeout,
+            final LocalAddressProvider localAddressProvider,
+            final TransportFactory tpFactory,
+            final TcpPortProvider tcpPortProvider) {
+
+        this.name = name;
+        this.handlerContainer = new HandlerContainer<>(name, codec);
+
+        this.reRecvStage = orderingGuarantee ?
+                new OrderedRemoteReceiverStage(this.handlerContainer, 
errorHandler) :
+                new RemoteReceiverStage(this.handlerContainer, errorHandler, 
10);
+
+        this.transport = tpFactory.newInstance(
+                hostAddress, listeningPort, this.reRecvStage, 
this.reRecvStage, numberOfTries, retryTimeout, tcpPortProvider);
+
+        this.handlerContainer.setTransport(this.transport);
+
+        this.myIdentifier = new SocketRemoteIdentifier(
+                (InetSocketAddress) this.transport.getLocalAddress());
+
+        this.reSendStage = new RemoteSenderStage(codec, this.transport, 10);
+
+        StageManager.instance().register(this);
+        LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter 
{2} listening on {3}:{4}. Binding address provided by {5}",
+                new Object[]{this.name, this.myIdentifier, 
COUNTER.incrementAndGet(),
+                        this.transport.getLocalAddress().toString(),
+                        this.transport.getListeningPort(), 
localAddressProvider}
+        );
+    }
 
-    LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} 
listening on {3}:{4}. Binding address provided by {5}",
-        new Object[]{this.name, this.myIdentifier, COUNTER.incrementAndGet(),
-            this.transport.getLocalAddress().toString(),
-            this.transport.getListeningPort(), localAddressProvider}
-    );
-  }
 
   /**
    * Returns a proxy event handler for a remote identifier and a message type.

Reply via email to