Repository: incubator-reef Updated Branches: refs/heads/master 47fbccdaf -> e243535d7
[REEF-247]: Port ranges should to be configurable for listening Defined a port range provider and a default implementation. JIRA: [REEF-247](https://issues.apache.org/jira/browse/REEF-247) Pull Request: This closes #157 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/e243535d Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/e243535d Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/e243535d Branch: refs/heads/master Commit: e243535d762411134bfcc58b773713f25b994329 Parents: 47fbccd Author: Beysim Sezgin <[email protected]> Authored: Mon Apr 20 17:31:54 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu Apr 23 10:33:38 2015 -0700 ---------------------------------------------------------------------- .../network/impl/MessagingTransportFactory.java | 3 +- .../local/client/LocalRuntimeConfiguration.java | 9 ++ .../runtime/mesos/util/MesosRemoteManager.java | 3 +- .../yarn/client/YarnClientConfiguration.java | 9 ++ lang/java/reef-wake/wake/pom.xml | 4 + .../DefaultRemoteManagerImplementation.java | 18 ++-- .../wake/remote/ports/RandomRangeIterator.java | 63 +++++++++++++ .../wake/remote/ports/RangeTcpPortProvider.java | 90 ++++++++++++++++++ .../reef/wake/remote/ports/TcpPortProvider.java | 46 ++++++++++ .../ports/parameters/TcpPortRangeBegin.java | 30 ++++++ .../ports/parameters/TcpPortRangeCount.java | 30 ++++++ .../ports/parameters/TcpPortRangeTryCount.java | 30 ++++++ .../netty/NettyMessagingTransport.java | 96 ++++++++++++-------- .../remote/RemoteIdentifierFactoryTest.java | 3 +- .../wake/test/remote/RemoteManagerTest.java | 16 +++- .../reef/wake/test/remote/TestRemote.java | 4 +- 16 files changed, 400 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java index 82fd720..f9391bd 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/impl/MessagingTransportFactory.java @@ -24,6 +24,7 @@ import org.apache.reef.wake.impl.SyncStage; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; @@ -68,7 +69,7 @@ public class MessagingTransportFactory implements TransportFactory { final EventHandler<Exception> exHandler) { final Transport transport = new NettyMessagingTransport(this.localAddress, - port, new SyncStage<>(clientHandler), new SyncStage<>(serverHandler), 3, 10000); + port, new SyncStage<>(clientHandler), new SyncStage<>(serverHandler), 3, 10000, RangeTcpPortProvider.Default); transport.registerErrorHandler(exHandler); return transport; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java index cbcdf66..accb6b0 100644 --- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java +++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalRuntimeConfiguration.java @@ -32,6 +32,7 @@ import org.apache.reef.tang.formats.ConfigurationModuleBuilder; import org.apache.reef.tang.formats.OptionalImpl; import org.apache.reef.tang.formats.OptionalParameter; import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.ports.TcpPortProvider; import java.util.concurrent.ExecutorService; @@ -74,6 +75,13 @@ public class LocalRuntimeConfiguration extends ConfigurationModuleBuilder { public static final OptionalImpl<LocalAddressProvider> LOCAL_ADDRESS_PROVIDER = new OptionalImpl<>(); /** + * The class used to restrict tcp port ranges for listening + * Note that you will likely want to bind the same class also to DRIVER_CONFIGURATION_PROVIDERS to make sure that + * the Driver (and the Evaluators) also use it. + */ + public static final OptionalImpl<TcpPortProvider> TCP_PORT_PROVIDER = new OptionalImpl<>(); + + /** * The ConfigurationModule for the local resourcemanager. */ public static final ConfigurationModule CONF = new LocalRuntimeConfiguration() @@ -89,6 +97,7 @@ public class LocalRuntimeConfiguration extends ConfigurationModuleBuilder { .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS) // Bind LocalAddressProvider .bindImplementation(LocalAddressProvider.class, LOCAL_ADDRESS_PROVIDER) + .bindImplementation(TcpPortProvider.class, TCP_PORT_PROVIDER) .build(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java index 3c3bfd9..9a4f974 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/util/MesosRemoteManager.java @@ -24,6 +24,7 @@ import org.apache.reef.wake.remote.RemoteIdentifierFactory; import org.apache.reef.wake.remote.RemoteManager; import org.apache.reef.wake.remote.RemoteMessage; import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; +import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; import javax.inject.Inject; @@ -43,7 +44,7 @@ public final class MesosRemoteManager { final LocalAddressProvider localAddressProvider) { this.factory = factory; this.raw = new DefaultRemoteManagerImplementation("MESOS_EXECUTOR", "##UNKNOWN##", 0, - codec, mesosErrorHandler, false, 3, 10000, localAddressProvider); + codec, mesosErrorHandler, false, 3, 10000, localAddressProvider, RangeTcpPortProvider.Default); } public <T> EventHandler<T> getHandler( http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java index 5b46da6..115484b 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnClientConfiguration.java @@ -36,6 +36,7 @@ import org.apache.reef.tang.formats.OptionalImpl; import org.apache.reef.tang.formats.OptionalParameter; import org.apache.reef.util.logging.LoggingSetup; import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.ports.TcpPortProvider; /** * A ConfigurationModule for the YARN resourcemanager. @@ -58,6 +59,13 @@ public class YarnClientConfiguration extends ConfigurationModuleBuilder { public static final OptionalImpl<ConfigurationProvider> DRIVER_CONFIGURATION_PROVIDERS = new OptionalImpl<>(); /** + * The class used to restrict tcp port ranges for listening + * Note that you will likely want to bind the same class also to DRIVER_CONFIGURATION_PROVIDERS to make sure that + * the Driver (and the Evaluators) also use it. + */ + public static final OptionalImpl<TcpPortProvider> TCP_PORT_PROVIDER = new OptionalImpl<>(); + + /** * The class used to resolve the local address for Wake and HTTP to bind to. * Note that you will likely want to bind the same class also to DRIVER_CONFIGURATION_PROVIDERS to make sure that * the Driver (and the Evaluators) also use it. @@ -78,6 +86,7 @@ public class YarnClientConfiguration extends ConfigurationModuleBuilder { .bindSetEntry(DriverConfigurationProviders.class, DRIVER_CONFIGURATION_PROVIDERS) // Bind LocalAddressProvider .bindImplementation(LocalAddressProvider.class, LOCAL_ADDRESS_PROVIDER) + .bindImplementation(TcpPortProvider.class, TCP_PORT_PROVIDER) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/pom.xml b/lang/java/reef-wake/wake/pom.xml index 75fc953..bb762ec 100644 --- a/lang/java/reef-wake/wake/pom.xml +++ b/lang/java/reef-wake/wake/pom.xml @@ -118,6 +118,10 @@ under the License. <groupId>${project.groupId}</groupId> <artifactId>tang</artifactId> </dependency> + <dependency> + <groupId>net.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java index ff1679c..f5d7a9c 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java @@ -25,6 +25,8 @@ import org.apache.reef.wake.impl.StageManager; import org.apache.reef.wake.remote.*; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; +import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; +import org.apache.reef.wake.remote.ports.TcpPortProvider; import org.apache.reef.wake.remote.transport.Transport; import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport; @@ -82,7 +84,8 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { orderingGuarantee, numberOfTries, retryTimeout, - LocalAddressProviderFactory.getInstance()); + LocalAddressProviderFactory.getInstance(), + RangeTcpPortProvider.Default); } @@ -100,7 +103,8 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { final @Parameter(RemoteConfiguration.OrderingGuarantee.class) boolean orderingGuarantee, final @Parameter(RemoteConfiguration.NumberOfTries.class) int numberOfTries, final @Parameter(RemoteConfiguration.RetryTimeout.class) int retryTimeout, - final LocalAddressProvider localAddressProvider) { + final LocalAddressProvider localAddressProvider, + final TcpPortProvider tcpPortProvider) { this.name = name; this.handlerContainer = new HandlerContainer<>(name, codec); @@ -109,13 +113,9 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { new OrderedRemoteReceiverStage(this.handlerContainer, errorHandler) : new RemoteReceiverStage(this.handlerContainer, errorHandler, 10); - if ("##UNKNOWN##".equals(hostAddress)) { - this.transport = new NettyMessagingTransport( - localAddressProvider.getLocalAddress(), listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout); - } else { - this.transport = new NettyMessagingTransport( - hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout); - } + final String host = "##UNKNOWN##".equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress; + this.transport = new NettyMessagingTransport( + host, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, tcpPortProvider); this.handlerContainer.setTransport(this.transport); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java new file mode 100644 index 0000000..dbb47c4 --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.ports; + +import net.jcip.annotations.ThreadSafe; + +import java.util.Iterator; +import java.util.Random; + +/** + * This class will give out random port numbers between tcpPortRangeBegin and tcpPortRangeBegin+tcpPortRangeCount + * Max number of ports given is tryCount + */ +@ThreadSafe +final class RandomRangeIterator implements Iterator<Integer> { + private final int tcpPortRangeBegin; + private final int tcpPortRangeCount; + private final int tryCount; + private int currentRetryCount; + private final Random random = new Random(System.currentTimeMillis()); + + RandomRangeIterator(final int tcpPortRangeBegin, final int tcpPortRangeCount, int tryCount) { + this.tcpPortRangeBegin = tcpPortRangeBegin; + this.tcpPortRangeCount = tcpPortRangeCount; + this.tryCount = tryCount; + } + + @Override + public synchronized boolean hasNext() { + return currentRetryCount++ < tryCount; + } + + @Override + public synchronized Integer next() { + return random.nextInt(tcpPortRangeCount) + tcpPortRangeBegin; + } + + /** + * always throws + * @throws UnsupportedOperationException always. + */ + @Override + public void remove() { + throw new UnsupportedOperationException (); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java new file mode 100644 index 0000000..64b8de4 --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RangeTcpPortProvider.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.ports; + + +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.ConfigurationProvider; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeTryCount; + +import javax.inject.Inject; +import java.util.Iterator; +import java.util.logging.Logger; + +/** + * A TcpPortProvider which gives out random ports in a range + */ +public final class RangeTcpPortProvider implements TcpPortProvider, ConfigurationProvider { + private final int portRangeBegin; + private final int portRangeCount; + private final int portRangeTryCount; + private static final Logger LOG = Logger.getLogger(RangeTcpPortProvider.class.getName()); + + @Inject + public RangeTcpPortProvider(final @Parameter(TcpPortRangeBegin.class) int portRangeBegin, + final @Parameter(TcpPortRangeCount.class) int portRangeCount, + final @Parameter(TcpPortRangeTryCount.class) int portRangeTryCount) { + this.portRangeBegin = portRangeBegin; + this.portRangeCount = portRangeCount; + this.portRangeTryCount = portRangeTryCount; + } + + /** + * Returns an iterator over a set of tcp ports + * + * @return an Iterator. + */ + @Override + public Iterator<Integer> iterator() { + return new RandomRangeIterator(portRangeBegin, portRangeCount, portRangeTryCount); + } + + /** + * @deprecated have an instance injected instead. + */ + @Deprecated + public static final RangeTcpPortProvider Default = new RangeTcpPortProvider( + Integer.parseInt(TcpPortRangeBegin.default_value), + Integer.parseInt(TcpPortRangeCount.default_value), + Integer.parseInt(TcpPortRangeTryCount.default_value)); + + + @Override + public Configuration getConfiguration() { + return Tang.Factory.getTang().newConfigurationBuilder() + .bindNamedParameter(TcpPortRangeBegin.class, String.valueOf(portRangeBegin)) + .bindNamedParameter(TcpPortRangeCount.class, String.valueOf(portRangeCount)) + .bindNamedParameter(TcpPortRangeTryCount.class, String.valueOf(portRangeTryCount)) + .bindImplementation(TcpPortProvider.class, RangeTcpPortProvider.class) + .build(); + } + + @Override + public String toString() { + return "RangeTcpPortProvider{" + + "portRangeBegin=" + portRangeBegin + + ", portRangeCount=" + portRangeCount + + ", portRangeTryCount=" + portRangeTryCount + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java new file mode 100644 index 0000000..566b62b --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/TcpPortProvider.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.ports; + +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.DefaultImplementation; + +import java.util.Iterator; + +/** + * Provides an iterator that returns port numbers. +*/ +@DefaultImplementation(RangeTcpPortProvider.class) +public interface TcpPortProvider extends Iterable<Integer> { + /** + * Returns an iterator over a set of tcp ports + * + * @return an Iterator. + */ + @Override + Iterator<Integer> iterator(); + + /** + * returns a configuration for the class that implements TcpPortProvider so that class can be instantiated + * somewhere else + * + * @return Configuration. + */ + Configuration getConfiguration(); +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java new file mode 100644 index 0000000..83bf9a1 --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeBegin.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.ports.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * First tcp port number to try + */ +@NamedParameter(doc = "First tcp port number to try", default_value = TcpPortRangeBegin.default_value) +public class TcpPortRangeBegin implements Name<Integer> { + public static final String default_value = "10000"; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java new file mode 100644 index 0000000..e5727e2 --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeCount.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.ports.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * Number of tcp ports in the range + */ +@NamedParameter(doc = "Number of tcp ports in the range", default_value = TcpPortRangeCount.default_value) +public class TcpPortRangeCount implements Name<Integer> { + public static final String default_value = "10000"; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java new file mode 100644 index 0000000..7aaa4af --- /dev/null +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortRangeTryCount.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.remote.ports.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * Max number tries for port numbers + */ +@NamedParameter(doc = "Max number tries for port numbers", default_value = TcpPortRangeTryCount.default_value) +public class TcpPortRangeTryCount implements Name<Integer> { + public static final String default_value = "1000"; +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java index f9999ae..a1f0886 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java @@ -30,30 +30,31 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.GlobalEventExecutor; +import org.apache.reef.wake.EStage; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.DefaultThreadFactory; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.exception.RemoteRuntimeException; +import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; +import org.apache.reef.wake.remote.ports.TcpPortProvider; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; +import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.exception.TransportRuntimeException; import java.io.IOException; import java.net.BindException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Random; +import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.reef.wake.EStage; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.impl.DefaultThreadFactory; -import org.apache.reef.wake.remote.Encoder; -import org.apache.reef.wake.remote.exception.RemoteRuntimeException; -import org.apache.reef.wake.remote.impl.TransportEvent; -import org.apache.reef.wake.remote.transport.Link; -import org.apache.reef.wake.remote.transport.LinkListener; -import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.exception.TransportRuntimeException; - /** * Messaging transport implementation with Netty */ @@ -65,9 +66,6 @@ public class NettyMessagingTransport implements Transport { private static final int SERVER_BOSS_NUM_THREADS = 3; private static final int SERVER_WORKER_NUM_THREADS = 20; private static final int CLIENT_WORKER_NUM_THREADS = 10; - private static final int PORT_START = 10000; - private static final int PORT_RANGE = 10000; - private static final Random randPort = new Random(); private final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap = new ConcurrentHashMap<>(); @@ -100,12 +98,15 @@ public class NettyMessagingTransport implements Transport { * @param serverStage the server-side stage that handles transport events * @param numberOfTries the number of tries of connection * @param retryTimeout the timeout of reconnection + * @param tcpPortProvider gives an iterator that produces random tcp ports in a range + * */ public NettyMessagingTransport(final String hostAddress, int port, final EStage<TransportEvent> clientStage, final EStage<TransportEvent> serverStage, final int numberOfTries, - final int retryTimeout) { + final int retryTimeout, + final TcpPortProvider tcpPortProvider) { if (port < 0) { throw new RemoteRuntimeException("Invalid server port: " + port); @@ -139,29 +140,31 @@ public class NettyMessagingTransport implements Transport { LOG.log(Level.FINE, "Binding to {0}", port); - Channel acceptor = null; - try { - if (port > 0) { - acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel(); - } else { - while (acceptor == null) { - port = randPort.nextInt(PORT_START) + PORT_RANGE; - LOG.log(Level.FINEST, "Try port {0}", port); - try { - acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel(); - } catch (final Exception ex) { - if (ex instanceof BindException) { - LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port); - } else { - throw ex; - } + Channel acceptor = null; + try { + if (port > 0) { + acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel(); + } else { + Iterator<Integer> ports = tcpPortProvider.iterator(); + while (acceptor == null) { + if (!ports.hasNext()) break; + port = ports.next(); + LOG.log(Level.FINEST, "Try port {0}", port); + try { + acceptor = this.serverBootstrap.bind(new InetSocketAddress(hostAddress, port)).sync().channel(); + } catch (final Exception ex) { + if (ex instanceof BindException) { + LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port); + } else { + throw ex; } } } - } catch (final Exception ex) { - final RuntimeException transportException = - new TransportRuntimeException("Cannot bind to port " + port); - LOG.log(Level.SEVERE, "Cannot bind to port " + port, ex); + } + } catch (final Exception ex) { + final RuntimeException transportException = + new TransportRuntimeException("Cannot bind to port " + port); + LOG.log(Level.SEVERE, "Cannot bind to port " + port, ex); this.clientWorkerGroup.shutdownGracefully(); this.serverBossGroup.shutdownGracefully(); @@ -177,6 +180,27 @@ public class NettyMessagingTransport implements Transport { } /** + * Constructs a messaging transport + * + * @param hostAddress the server host address + * @param port the server listening port; when it is 0, randomly assign a port number + * @param clientStage the client-side stage that handles transport events + * @param serverStage the server-side stage that handles transport events + * @param numberOfTries the number of tries of connection + * @param retryTimeout the timeout of reconnection + * @deprecated use the constructor that takes a TcpProvider instead + */ + @Deprecated + public NettyMessagingTransport(final String hostAddress, int port, + final EStage<TransportEvent> clientStage, + final EStage<TransportEvent> serverStage, + final int numberOfTries, + final int retryTimeout) { + this(hostAddress, port, clientStage, serverStage, numberOfTries, retryTimeout, + RangeTcpPortProvider.Default); + } + + /** * Closes all channels and releases all resources */ @Override http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java index c003cb5..48f2c25 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteIdentifierFactoryTest.java @@ -28,6 +28,7 @@ import org.apache.reef.wake.remote.*; import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; import org.apache.reef.wake.remote.impl.MultiCodec; +import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -70,7 +71,7 @@ public class RemoteIdentifierFactoryTest { try (final RemoteManager rm = new DefaultRemoteManagerImplementation("TestRemoteManager", localAddressProvider.getLocalAddress(), port, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000, - localAddressProvider)) { + localAddressProvider, RangeTcpPortProvider.Default)) { final RemoteIdentifier id = rm.getMyIdentifier(); final IdentifierFactory factory = new DefaultIdentifierFactory(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java index 888574f..ad17964 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/RemoteManagerTest.java @@ -30,6 +30,7 @@ import org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementa import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; import org.apache.reef.wake.remote.impl.MultiCodec; import org.apache.reef.wake.remote.impl.ObjectSerializableCodec; +import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; import org.apache.reef.wake.test.util.Monitor; import org.apache.reef.wake.test.util.TimeoutHandler; import org.junit.Assert; @@ -80,7 +81,8 @@ public class RemoteManagerTest { final String hostAddress = localAddressProvider.getLocalAddress(); final RemoteManager rm = new DefaultRemoteManagerImplementation( - "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, localAddressProvider); + "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, + localAddressProvider, RangeTcpPortProvider.Default); RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -182,7 +184,8 @@ public class RemoteManagerTest { final String hostAddress = localAddressProvider.getLocalAddress(); final RemoteManager rm = new DefaultRemoteManagerImplementation( - "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), true, 3, 10000, localAddressProvider); + "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), true, 3, 10000, + localAddressProvider, RangeTcpPortProvider.Default); RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -225,7 +228,8 @@ public class RemoteManagerTest { String hostAddress = localAddressProvider.getLocalAddress(); final RemoteManager rm = new DefaultRemoteManagerImplementation( - "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, localAddressProvider); + "name", hostAddress, port, codec, new LoggingEventHandler<Throwable>(), false, 3, 10000, + localAddressProvider, RangeTcpPortProvider.Default); RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -264,7 +268,8 @@ public class RemoteManagerTest { ExceptionHandler errorHandler = new ExceptionHandler(monitor); try (final RemoteManager rm = new DefaultRemoteManagerImplementation( - "name", hostAddress, port, codec, errorHandler, false, 3, 10000, localAddressProvider)) { + "name", hostAddress, port, codec, errorHandler, false, 3, 10000, localAddressProvider, + RangeTcpPortProvider.Default)) { RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + port); @@ -292,7 +297,8 @@ public class RemoteManagerTest { String hostAddress = localAddressProvider.getLocalAddress(); return new DefaultRemoteManagerImplementation(name, hostAddress, localPort, - codec, new LoggingEventHandler<Throwable>(), false, retry, retryTimeout, localAddressProvider); + codec, new LoggingEventHandler<Throwable>(), false, retry, retryTimeout, + localAddressProvider, RangeTcpPortProvider.Default); } private class SendingRemoteManagerThread implements Callable<Integer> { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e243535d/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java index e12a517..3280bd6 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/remote/TestRemote.java @@ -27,6 +27,7 @@ import org.apache.reef.wake.remote.address.LocalAddressProvider; import org.apache.reef.wake.remote.address.LocalAddressProviderFactory; import org.apache.reef.wake.remote.impl.DefaultRemoteIdentifierFactoryImplementation; import org.apache.reef.wake.remote.impl.DefaultRemoteManagerImplementation; +import org.apache.reef.wake.remote.ports.RangeTcpPortProvider; import javax.inject.Inject; import java.net.UnknownHostException; @@ -46,7 +47,8 @@ public class TestRemote implements Runnable { int remotePort = 10001; Codec<TestEvent> codec = new TestEventCodec(); try (RemoteManager rm = new DefaultRemoteManagerImplementation("name", hostAddress, - myPort, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000, localAddressProvider)) { + myPort, codec, new LoggingEventHandler<Throwable>(), false, 1, 10000, localAddressProvider, + RangeTcpPortProvider.Default)) { // proxy handler RemoteIdentifierFactory factory = new DefaultRemoteIdentifierFactoryImplementation(); RemoteIdentifier remoteId = factory.getNewInstance("socket://" + hostAddress + ":" + remotePort);
