[REEF-273]: Port ranges need to be configurable in REEF.NET This addresses the issue by adding support for port range configuration.
JIRA: [REEF-273](https://issues.apache.org/jira/browse/REEF-273) Pull Request: This closes #175 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/fffee854 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/fffee854 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/fffee854 Branch: refs/heads/master Commit: fffee854502b5b462c8a284956a17f56c3c5f863 Parents: 7a07abc Author: Beysim Sezgin <[email protected]> Authored: Tue May 5 16:59:32 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu May 7 18:10:32 2015 -0700 ---------------------------------------------------------------------- .../Local/LocalRuntimeClientConfiguration.cs | 25 +++++ .../YARN/YARNClientConfiguration.cs | 26 +++++ .../Io/NamingConfigurationOptions.cs | 3 +- .../Io/TcpPortConfigurationProvider.cs | 54 +++++++++ .../Org.Apache.REEF.Common.csproj | 1 + .../Protobuf/ReefProtocol/REEFMessageCodec.cs | 1 + .../Bridge/Events/AllocatedEvaluator.cs | 17 +-- lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs | 3 +- .../HelloREEF.cs | 4 - .../GroupCommunicationTests.cs | 18 ++- .../NamingService/NameServerTests.cs | 22 ++-- .../NetworkService/NetworkServiceTests.cs | 5 +- .../Group/Driver/Impl/GroupCommDriver.cs | 38 +------ .../Naming/NameServer.cs | 13 ++- .../NetworkService/NetworkService.cs | 1 + .../Util/ReflectionUtilities.cs | 41 +++---- .../RemoteManagerTest.cs | 41 ++++--- .../Org.Apache.REEF.Wake.Tests/TransportTest.cs | 45 +++++--- .../WritableRemoteManagerTest.cs | 2 +- .../WritableTransportTest.cs | 39 ++++--- .../Org.Apache.REEF.Wake.csproj | 6 + .../Remote/IRemoteManagerFactory.cs | 11 +- .../Remote/ITcpPortProvider.cs | 32 ++++++ .../Remote/Impl/DefaultRemoteManager.cs | 17 ++- .../Remote/Impl/DefaultRemoteManagerFactory.cs | 14 ++- .../Remote/Impl/TransportServer.cs | 60 +++++++--- .../Remote/Impl/WritableRemoteManager.cs | 4 +- .../Remote/Impl/WritableRemoteManagerFactory.cs | 6 +- .../Remote/Impl/WritableTransportServer.cs | 55 ++++++++- .../Remote/Parameters/TcpPortRangeCount.cs | 29 +++++ .../Remote/Parameters/TcpPortRangeSeed.cs | 28 +++++ .../Remote/Parameters/TcpPortRangeStart.cs | 28 +++++ .../Remote/Parameters/TcpPortRangeTryCount.cs | 28 +++++ .../Remote/TcpPortProvider.cs | 112 +++++++++++++++++++ 34 files changed, 658 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs index e24d2ae..2504a5d 100644 --- a/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Client/Local/LocalRuntimeClientConfiguration.cs @@ -22,6 +22,7 @@ using Org.Apache.REEF.Client.Local.Parameters; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote.Parameters; namespace Org.Apache.REEF.Client.Local { @@ -52,11 +53,35 @@ namespace Org.Apache.REEF.Client.Local public static readonly OptionalImpl<IConfigurationProvider> DriverConfigurationProvider = new OptionalImpl<IConfigurationProvider>(); + /// <summary> + /// Start of the tcp port range for listening. + /// </summary> + public static readonly OptionalParameter<int> TcpPortRangeStartParameter = new OptionalParameter<int>(); + + /// <summary> + /// Number of port for the tcp port range for listening. + /// </summary> + public static readonly OptionalParameter<int> TcpPortRangeCountParameter = new OptionalParameter<int>(); + + /// <summary> + /// Max number of times we will deliver a port from the tcp port range. + /// </summary> + public static readonly OptionalParameter<int> TcpPortRangeTryCountParameter = new OptionalParameter<int>(); + + /// <summary> + /// Seed for the number number for determining which particular port to deliver from the range + /// </summary> + public static readonly OptionalParameter<int> TcpPortRangeSeedParameter = new OptionalParameter<int>(); + public static ConfigurationModule ConfigurationModule = new LocalRuntimeClientConfiguration() .BindImplementation(GenericType<IREEFClient>.Class, GenericType<LocalClient>.Class) .BindNamedParameter(GenericType<LocalRuntimeDirectory>.Class, RuntimeFolder) .BindNamedParameter(GenericType<NumberOfEvaluators>.Class, NumberOfEvaluators) .BindSetEntry(GenericType<DriverConfigurationProviders>.Class, DriverConfigurationProvider) + .BindNamedParameter(GenericType<TcpPortRangeStart>.Class, TcpPortRangeStartParameter) + .BindNamedParameter(GenericType<TcpPortRangeCount>.Class, TcpPortRangeCountParameter) + .BindNamedParameter(GenericType<TcpPortRangeTryCount>.Class, TcpPortRangeTryCountParameter) + .BindNamedParameter(GenericType<TcpPortRangeSeed>.Class, TcpPortRangeSeedParameter) .Build(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs index 94105a0..69c4b29 100644 --- a/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Client/YARN/YARNClientConfiguration.cs @@ -23,6 +23,7 @@ using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Parameters; namespace Org.Apache.REEF.Client.YARN { @@ -36,9 +37,34 @@ namespace Org.Apache.REEF.Client.YARN /// </summary> public static readonly OptionalImpl<IConfigurationProvider> DriverConfigurationProvider = new OptionalImpl<IConfigurationProvider>(); + /// <summary> + /// Start of the tcp port range for listening. + /// </summary> + public static readonly OptionalParameter<int> TcpPortRangeStartParameter = new OptionalParameter<int>(); + + /// <summary> + /// Number of port for the tcp port range for listening. + /// </summary> + public static readonly OptionalParameter<int> TcpPortRangeCountParameter = new OptionalParameter<int>(); + + /// <summary> + /// Max number of times we will deliver a port from the tcp port range. + /// </summary> + public static readonly OptionalParameter<int> TcpPortRangeTryCountParameter = new OptionalParameter<int>(); + + /// <summary> + /// Seed for the number number for determining which particular port to deliver from the range + /// </summary> + public static readonly OptionalParameter<int> TcpPortRangeSeedParameter = new OptionalParameter<int>(); + + public static ConfigurationModule ConfigurationModule = new YARNClientConfiguration() .BindImplementation(GenericType<IREEFClient>.Class, GenericType<YARNClient>.Class) .BindSetEntry(GenericType<DriverConfigurationProviders>.Class, DriverConfigurationProvider) + .BindNamedParameter(GenericType<TcpPortRangeStart>.Class, TcpPortRangeStartParameter) + .BindNamedParameter(GenericType<TcpPortRangeCount>.Class, TcpPortRangeCountParameter) + .BindNamedParameter(GenericType<TcpPortRangeTryCount>.Class, TcpPortRangeTryCountParameter) + .BindNamedParameter(GenericType<TcpPortRangeSeed>.Class, TcpPortRangeSeedParameter) .Build(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Common/Io/NamingConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Io/NamingConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Common/Io/NamingConfigurationOptions.cs index 8e7e91d..6f396eb 100644 --- a/lang/cs/Org.Apache.REEF.Common/Io/NamingConfigurationOptions.cs +++ b/lang/cs/Org.Apache.REEF.Common/Io/NamingConfigurationOptions.cs @@ -17,6 +17,7 @@ * under the License. */ +using System.ComponentModel; using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.Common.Io @@ -28,7 +29,7 @@ namespace Org.Apache.REEF.Common.Io { } - [NamedParameter("Port of NameServer")] + [NamedParameter("Port of NameServer", DefaultValue = "0")] public class NameServerPort : Name<int> { } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Common/Io/TcpPortConfigurationProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Io/TcpPortConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.Common/Io/TcpPortConfigurationProvider.cs new file mode 100644 index 0000000..16bc014 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Io/TcpPortConfigurationProvider.cs @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Common.Evaluator.Parameters; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Parameters; + +namespace Org.Apache.REEF.Common.Io +{ + public class TcpPortConfigurationProvider : IConfigurationProvider + { + private readonly IConfiguration _configuration; + [Inject] + private TcpPortConfigurationProvider( + [Parameter(typeof(TcpPortRangeStart))] int tcpPortRangeStart, + [Parameter(typeof(TcpPortRangeCount))] int tcpPortRangeCount, + [Parameter(typeof(TcpPortRangeTryCount))] int tcpPortRangeTryCount, + [Parameter(typeof(TcpPortRangeSeed))] int tcpPortRangeTrySeed) + { + _configuration = TangFactory.GetTang().NewConfigurationBuilder() + .BindIntNamedParam<TcpPortRangeStart>(tcpPortRangeStart.ToString()) + .BindIntNamedParam<TcpPortRangeCount>(tcpPortRangeCount.ToString()) + .BindIntNamedParam<TcpPortRangeTryCount>(tcpPortRangeTryCount.ToString()) + .BindIntNamedParam<TcpPortRangeSeed>(tcpPortRangeTrySeed.ToString()) + .BindSetEntry<EvaluatorConfigurationProviders, TcpPortConfigurationProvider, IConfigurationProvider>() + .Build(); + } + + IConfiguration IConfigurationProvider.GetConfiguration() + { + return _configuration; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 77da0f9..7bd929a 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -100,6 +100,7 @@ under the License. <Compile Include="Io\NameAssignment.cs" /> <Compile Include="Io\NamingConfiguration.cs" /> <Compile Include="Io\NamingConfigurationOptions.cs" /> + <Compile Include="Io\TcpPortConfigurationProvider.cs" /> <Compile Include="ITaskSubmittable.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Protobuf\ReefProtocol\ClientRuntime.pb.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs b/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs index e8c7515..697ea1c 100644 --- a/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs +++ b/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs @@ -17,6 +17,7 @@ * under the License. */ +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Wake.Remote; namespace Org.Apache.REEF.Common.Protobuf.ReefProtocol http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs index 05f22ba..3655593 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs @@ -81,7 +81,8 @@ namespace Org.Apache.REEF.Driver.Bridge.Events { LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndTask"); - contextConfiguration = MergeContextConfiguration(contextConfiguration); + //TODO: Change this to service configuration when REEF-289(https://issues.apache.org/jira/browse/REEF-289) is fixed. + taskConfiguration = MergeWithConfigurationProviders(taskConfiguration); string context = _serializer.ToString(contextConfiguration); string task = _serializer.ToString(taskConfiguration); @@ -94,8 +95,7 @@ namespace Org.Apache.REEF.Driver.Bridge.Events public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) { LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndService"); - - contextConfiguration = MergeContextConfiguration(contextConfiguration); + string context = _serializer.ToString(contextConfiguration); string service = _serializer.ToString(serviceConfiguration); @@ -109,7 +109,8 @@ namespace Org.Apache.REEF.Driver.Bridge.Events { LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndServiceAndTask"); - contextConfiguration = MergeContextConfiguration(contextConfiguration); + //TODO: Change this to service configuration when REEF-289(https://issues.apache.org/jira/browse/REEF-289) is fixed. + taskConfiguration = MergeWithConfigurationProviders(taskConfiguration); string context = _serializer.ToString(contextConfiguration); string service = _serializer.ToString(serviceConfiguration); string task = _serializer.ToString(taskConfiguration); @@ -179,17 +180,17 @@ namespace Org.Apache.REEF.Driver.Bridge.Events } } - private IConfiguration MergeContextConfiguration(IConfiguration contextConfiguration) + private IConfiguration MergeWithConfigurationProviders(IConfiguration configuration) { - IConfiguration contextConfig = contextConfiguration; + IConfiguration config = configuration; if (_configurationProviders != null) { foreach (var configurationProvider in _configurationProviders) { - contextConfig = Configurations.Merge(contextConfig, configurationProvider.GetConfiguration()); + config = Configurations.Merge(config, configurationProvider.GetConfiguration()); } } - return contextConfig; + return config; } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs index 5e094f6..f97438d 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs @@ -45,6 +45,7 @@ using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; using Org.Apache.REEF.Wake.Time.Runtime; using Org.Apache.REEF.Wake.Time.Runtime.Event; +using Org.Apache.REEF.Wake.Util; namespace Org.Apache.REEF.Evaluator { @@ -109,7 +110,7 @@ namespace Org.Apache.REEF.Evaluator Optional<ServiceConfiguration> rootServiceConfig = _evaluatorConfig.RootServiceConfiguration; // remoteManager used as client-only in evaluator - IRemoteManager<REEFMessage> remoteManager = _injector.GetInstance<IRemoteManagerFactory>().GetInstance((new REEFMessageCodec())); + IRemoteManager<REEFMessage> remoteManager = _injector.GetInstance<IRemoteManagerFactory>().GetInstance(new REEFMessageCodec()); IRemoteIdentifier remoteId = new SocketRemoteIdentifier(NetUtilities.ParseIpEndpoint(rId)); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs index 1e74c9f..87a48e3 100644 --- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs +++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs @@ -18,9 +18,6 @@ */ using System; -using System.Linq; -using System.Net; -using System.Net.Sockets; using Org.Apache.REEF.Client.API; using Org.Apache.REEF.Client.Local; using Org.Apache.REEF.Client.YARN; @@ -30,7 +27,6 @@ using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Wake.Remote; namespace Org.Apache.REEF.Examples.HelloREEF { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs index 26fa16c..7a6b5c1 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs @@ -25,6 +25,7 @@ using System.Net; using System.Reactive; using System.Text; using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Examples.MachineLearning.KMeans; using Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs; @@ -40,6 +41,7 @@ using Org.Apache.REEF.Network.Group.Topology; using Org.Apache.REEF.Network.Naming; using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Network.NetworkService.Codec; +using Org.Apache.REEF.Network.Tests.NamingService; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Implementations.Configuration; @@ -58,7 +60,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication [TestMethod] public void TestSender() { - using (NameServer nameServer = new NameServer(0)) + using (var nameServer = NameServerTests.BuildNameServer()) { IPEndPoint endpoint = nameServer.LocalEndpoint; BlockingCollection<GroupCommunicationMessage> messages1 = new BlockingCollection<GroupCommunicationMessage>(); @@ -261,8 +263,6 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication [TestMethod] public void TestBroadcastOperator() { - NameServer nameServer = new NameServer(0); - string groupName = "group1"; string operatorName = "broadcast"; string masterTaskId = "task0"; @@ -295,7 +295,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication [TestMethod] public void TestBroadcastOperatorWithDefaultCodec() { - NameServer nameServer = new NameServer(0); + INameServer nameServer = NameServerTests.BuildNameServer(); string groupName = "group1"; string operatorName = "broadcast"; @@ -745,9 +745,15 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication public static NetworkService<GroupCommunicationMessage> BuildNetworkService( IPEndPoint nameServerEndpoint, IObserver<NsMessage<GroupCommunicationMessage>> handler) { - var remoteManagerFactory = TangFactory.GetTang().NewInjector().GetInstance<IRemoteManagerFactory>(); + var injector = TangFactory.GetTang().NewInjector(); + var remoteManagerFactory = injector.GetInstance<IRemoteManagerFactory>(); return new NetworkService<GroupCommunicationMessage>( - 0, handler, new StringIdentifierFactory(), new GroupCommunicationMessageCodec(), new NameClient(nameServerEndpoint.Address.ToString(), nameServerEndpoint.Port), remoteManagerFactory); + 0, handler, new StringIdentifierFactory(), + new GroupCommunicationMessageCodec(), + new NameClient(nameServerEndpoint.Address.ToString(), + nameServerEndpoint.Port), + remoteManagerFactory); + } private GroupCommunicationMessage CreateGcm(string message, string from, string to) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs index fd3002c..6523e84 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/NamingService/NameServerTests.cs @@ -28,6 +28,7 @@ using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote.Parameters; namespace Org.Apache.REEF.Network.Tests.NamingService { @@ -37,7 +38,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService [TestMethod] public void TestNameServerNoRequests() { - using (var server = new NameServer(0)) + using (var server = BuildNameServer()) { } } @@ -45,7 +46,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService [TestMethod] public void TestNameServerNoRequestsTwoClients() { - using (var server = new NameServer(0)) + using (var server = BuildNameServer()) { var nameClient = new NameClient(server.LocalEndpoint); var nameClient2 = new NameClient(server.LocalEndpoint); @@ -57,7 +58,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService [TestMethod] public void TestNameServerNoRequestsTwoClients2() { - using (var server = new NameServer(0)) + using (var server = BuildNameServer()) { var nameClient = new NameClient(server.LocalEndpoint); var nameClient2 = new NameClient(server.LocalEndpoint); @@ -69,7 +70,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService [TestMethod] public void TestNameServerMultipleRequestsTwoClients() { - using (var server = new NameServer(0)) + using (var server = BuildNameServer()) { var nameClient = new NameClient(server.LocalEndpoint); var nameClient2 = new NameClient(server.LocalEndpoint); @@ -192,7 +193,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService { int oldPort = 6666; int newPort = 6662; - INameServer server = new NameServer(oldPort); + INameServer server = BuildNameServer(oldPort); using (INameClient client = BuildNameClient(server.LocalEndpoint)) { @@ -203,7 +204,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService server.Dispose(); - server = new NameServer(newPort); + server = BuildNameServer(newPort); client.Restart(server.LocalEndpoint); client.Register("b", endpoint); @@ -217,7 +218,7 @@ namespace Org.Apache.REEF.Network.Tests.NamingService public void TestConstructorInjection() { int port = 6666; - using (INameServer server = new NameServer(port)) + using (INameServer server = BuildNameServer(port)) { IConfiguration nameClientConfiguration = NamingConfiguration.ConfigurationModule .Set(NamingConfiguration.NameServerAddress, server.LocalEndpoint.Address.ToString()) @@ -232,12 +233,11 @@ namespace Org.Apache.REEF.Network.Tests.NamingService } } - private INameServer BuildNameServer() + public static INameServer BuildNameServer(int listenPort = 0) { var builder = TangFactory.GetTang() - .NewConfigurationBuilder() - .BindNamedParameter<NamingConfigurationOptions.NameServerPort, int>( - GenericType<NamingConfigurationOptions.NameServerPort>.Class, "0"); + .NewConfigurationBuilder() + .BindIntNamedParam<NamingConfigurationOptions.NameServerPort>(listenPort.ToString()); return TangFactory.GetTang().NewInjector(builder.Build()).GetInstance<INameServer>(); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs index e52a082..762cc48 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/NetworkService/NetworkServiceTests.cs @@ -26,6 +26,7 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Network.Naming; using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Network.Tests.NamingService; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Util; @@ -47,7 +48,7 @@ namespace Org.Apache.REEF.Network.Tests.NetworkService BlockingCollection<string> queue = new BlockingCollection<string>(); - using (INameServer nameServer = new NameServer(0)) + using (var nameServer = NameServerTests.BuildNameServer()) { IPEndPoint endpoint = nameServer.LocalEndpoint; int nameServerPort = endpoint.Port; @@ -84,7 +85,7 @@ namespace Org.Apache.REEF.Network.Tests.NetworkService BlockingCollection<string> queue1 = new BlockingCollection<string>(); BlockingCollection<string> queue2 = new BlockingCollection<string>(); - using (INameServer nameServer = new NameServer(0)) + using (var nameServer = NameServerTests.BuildNameServer()) { IPEndPoint endpoint = nameServer.LocalEndpoint; int nameServerPort = endpoint.Port; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs index 42394ab..a1a2548 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs @@ -60,37 +60,9 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl private readonly Dictionary<string, ICommunicationGroupDriver> _commGroups; private readonly AvroConfigurationSerializer _configSerializer; - private readonly NameServer _nameServer; - - /// <summary> - /// Create a new GroupCommunicationDriver object. - /// </summary> - /// <param name="driverId">Identifer for the REEF driver</param> - /// <param name="masterTaskId">Identifer for Group Communication master task</param> - /// <param name="fanOut">fanOut for tree topology</param> - /// <param name="configSerializer">Used to serialize task configuration</param> - [System.Obsolete("user the other constructor")] - [Inject] - public GroupCommDriver( - [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId, - [Parameter(typeof(GroupCommConfigurationOptions.MasterTaskId))] string masterTaskId, - [Parameter(typeof(GroupCommConfigurationOptions.FanOut))] int fanOut, - AvroConfigurationSerializer configSerializer) - { - _driverId = driverId; - _contextIds = -1; - _fanOut = fanOut; - MasterTaskId = masterTaskId; - - _configSerializer = configSerializer; - _commGroups = new Dictionary<string, ICommunicationGroupDriver>(); - _nameServer = new NameServer(0); - - IPEndPoint localEndpoint = _nameServer.LocalEndpoint; - _nameServerAddr = localEndpoint.Address.ToString(); - _nameServerPort = localEndpoint.Port; - } + private readonly INameServer _nameServer; + /// <summary> /// Create a new GroupCommunicationDriver object. /// </summary> @@ -100,6 +72,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="groupName">default communication group name</param> /// <param name="numberOfTasks">Number of tasks in the default group</param> /// <param name="configSerializer">Used to serialize task configuration</param> + /// <param name="nameServer">Used to map names to ip adresses</param> [Inject] public GroupCommDriver( [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId, @@ -107,7 +80,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl [Parameter(typeof(GroupCommConfigurationOptions.FanOut))] int fanOut, [Parameter(typeof(GroupCommConfigurationOptions.GroupName))] string groupName, [Parameter(typeof(GroupCommConfigurationOptions.NumberOfTasks))] int numberOfTasks, - AvroConfigurationSerializer configSerializer) + AvroConfigurationSerializer configSerializer, + INameServer nameServer) { _driverId = driverId; _contextIds = -1; @@ -117,7 +91,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl _configSerializer = configSerializer; _commGroups = new Dictionary<string, ICommunicationGroupDriver>(); - _nameServer = new NameServer(0); + _nameServer = nameServer; IPEndPoint localEndpoint = _nameServer.LocalEndpoint; _nameServerAddr = localEndpoint.Address.ToString(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs b/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs index 0175e1b..47d17f5 100644 --- a/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs +++ b/lang/cs/Org.Apache.REEF.Network/Naming/NameServer.cs @@ -32,6 +32,7 @@ using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; using Org.Apache.REEF.Wake.RX; using Org.Apache.REEF.Wake.RX.Impl; +using Org.Apache.REEF.Wake.Util; namespace Org.Apache.REEF.Network.Naming { @@ -50,8 +51,14 @@ namespace Org.Apache.REEF.Network.Naming /// Create a new NameServer to run on the specified port. /// </summary> /// <param name="port">The port to listen for incoming connections on.</param> + /// <param name="tcpPortProvider">If port is 0, this interface provides + /// a port range to try. + /// </param> + [Obsolete("Please use TANG injection instead.")] [Inject] - public NameServer([Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int port) + public NameServer( + [Parameter(typeof(NamingConfigurationOptions.NameServerPort))] int port, + ITcpPortProvider tcpPortProvider) { IObserver<TransportEvent<NamingEvent>> handler = CreateServerHandler(); _idToAddrMap = new Dictionary<string, IPEndPoint>(); @@ -59,7 +66,9 @@ namespace Org.Apache.REEF.Network.Naming // Start transport server, get listening IP endpoint _logger.Log(Level.Info, "Starting naming server"); - _server = new TransportServer<NamingEvent>(port, handler, codec); + _server = new TransportServer<NamingEvent>( + new IPEndPoint(NetworkUtils.LocalIPAddress, port), handler, + codec, tcpPortProvider); _server.Run(); LocalEndpoint = _server.LocalEndpoint; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs index 43e55c1..1b796a3 100644 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs @@ -58,6 +58,7 @@ namespace Org.Apache.REEF.Network.NetworkService /// <param name="idFactory">The factory used to create IIdentifiers</param> /// <param name="codec">The codec used for serialization</param> /// <param name="remoteManagerFactory">Used to instantiate remote manager instances.</param> + /// <param name="tcpPortProvider">Provides ports for tcp listeners.</param> [Inject] public NetworkService( [Parameter(typeof(NetworkServiceOptions.NetworkServicePort))] int nsPort, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Tang/Util/ReflectionUtilities.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tang/Util/ReflectionUtilities.cs b/lang/cs/Org.Apache.REEF.Tang/Util/ReflectionUtilities.cs index 29bd754..569a564 100644 --- a/lang/cs/Org.Apache.REEF.Tang/Util/ReflectionUtilities.cs +++ b/lang/cs/Org.Apache.REEF.Tang/Util/ReflectionUtilities.cs @@ -170,7 +170,7 @@ namespace Org.Apache.REEF.Tang.Util if (c.IsInterface) { - workQueue.Add(typeof (object)); + workQueue.Add(typeof(object)); } return workQueue; @@ -209,39 +209,39 @@ namespace Org.Apache.REEF.Tang.Util /// <exception cref="System.NotSupportedException">Encountered unknown primitive type!</exception> public static Type BoxClass(Type c) { - if (c.IsPrimitive && c != typeof (Type)) + if (c.IsPrimitive && c != typeof(Type)) { - if (c == typeof (bool)) + if (c == typeof(bool)) { - return typeof (Boolean); + return typeof(Boolean); } - else if (c == typeof (byte)) + else if (c == typeof(byte)) { - return typeof (Byte); + return typeof(Byte); } - else if (c == typeof (char)) + else if (c == typeof(char)) { - return typeof (Char); + return typeof(Char); } - else if (c == typeof (short)) + else if (c == typeof(short)) { - return typeof (Int16); + return typeof(Int16); } - else if (c == typeof (int)) + else if (c == typeof(int)) { - return typeof (Int32); + return typeof(Int32); } - else if (c == typeof (long)) + else if (c == typeof(long)) { - return typeof (Int64); + return typeof(Int64); } - else if (c == typeof (float)) + else if (c == typeof(float)) { - return typeof (Single); + return typeof(Single); } - else if (c == typeof (double)) + else if (c == typeof(double)) { - return typeof (Double); + return typeof(Double); } else { @@ -371,7 +371,8 @@ namespace Org.Apache.REEF.Tang.Util } if (t == null) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ApplicationException("Not able to get Type from the name provided: " + name), LOGGER); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw( + new ApplicationException("Not able to get Type from the name provided: " + name), LOGGER); } return t; @@ -480,7 +481,7 @@ namespace Org.Apache.REEF.Tang.Util Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); } - return args[0]; + return args[0]; } if (ImplementName(type)) //Implement Name<> but no [NamedParameter] attribute http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs index f24eb29..c0ffa11 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/RemoteManagerTest.cs @@ -42,8 +42,8 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) - using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) + using (var remoteManager1 = GetRemoteManager()) + using (var remoteManager2 = GetRemoteManager()) { var observer = Observer.Create<string>(queue.Add); IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0); @@ -65,14 +65,13 @@ namespace Org.Apache.REEF.Wake.Tests [TestMethod] public void TestOneWayCommunicationClientOnly() { - int listeningPort = NetworkUtils.GenerateRandomPort(6000, 7000); IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); using (var remoteManager1 = _remoteManagerFactory.GetInstance(new StringCodec())) - using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, listeningPort, new StringCodec())) + using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, new StringCodec())) { IPEndPoint remoteEndpoint = new IPEndPoint(listeningAddress, 0); var observer = Observer.Create<string>(queue.Add); @@ -101,8 +100,8 @@ namespace Org.Apache.REEF.Wake.Tests List<string> events1 = new List<string>(); List<string> events2 = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) - using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) + using (var remoteManager1 = GetRemoteManager()) + using (var remoteManager2 = GetRemoteManager()) { // Register observers for remote manager 1 and remote manager 2 var remoteEndpoint = new IPEndPoint(listeningAddress, 0); @@ -146,9 +145,9 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) - using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) - using (var remoteManager3 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) + using (var remoteManager1 = GetRemoteManager()) + using (var remoteManager2 = GetRemoteManager()) + using (var remoteManager3 = GetRemoteManager()) { var remoteEndpoint = new IPEndPoint(listeningAddress, 0); var observer = Observer.Create<string>(queue.Add); @@ -184,9 +183,9 @@ namespace Org.Apache.REEF.Wake.Tests List<string> events2 = new List<string>(); List<string> events3 = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) - using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) - using (var remoteManager3 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) + using (var remoteManager1 = GetRemoteManager()) + using (var remoteManager2 = GetRemoteManager()) + using (var remoteManager3 = GetRemoteManager()) { var remoteEndpoint = new IPEndPoint(listeningAddress, 0); @@ -244,8 +243,8 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) - using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) + using (var remoteManager1 = GetRemoteManager()) + using (var remoteManager2 = GetRemoteManager()) { // Register handler for when remote manager 2 receives events; respond // with an ack @@ -285,8 +284,8 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) - using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) + using (var remoteManager1 = GetRemoteManager()) + using (var remoteManager2 = GetRemoteManager()) { // RemoteManager2 listens and records events of type IRemoteEvent<string> var observer = Observer.Create<IRemoteMessage<string>>(message => queue.Add(message.Message)); @@ -314,8 +313,8 @@ namespace Org.Apache.REEF.Wake.Tests BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); - using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) - using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, 0, new StringCodec())) + using (var remoteManager1 = GetRemoteManager()) + using (var remoteManager2 = GetRemoteManager()) { var observer = Observer.Create<string>(queue.Add); IPEndPoint endpoint1 = new IPEndPoint(listeningAddress, 0); @@ -337,5 +336,11 @@ namespace Org.Apache.REEF.Wake.Tests Assert.AreEqual(4, events.Count); } + + private IRemoteManager<string> GetRemoteManager() + { + IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); + return _remoteManagerFactory.GetInstance(listeningAddress, new StringCodec()); + } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs index 3e67e0d..c1c65b2 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/TransportTest.cs @@ -23,8 +23,10 @@ using System.Net; using System.Reactive; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Remote.Parameters; using Org.Apache.REEF.Wake.Util; namespace Org.Apache.REEF.Wake.Tests @@ -32,23 +34,25 @@ namespace Org.Apache.REEF.Wake.Tests [TestClass] public class TransportTest { + private readonly IPAddress _localIpAddress = IPAddress.Parse("127.0.0.1"); + private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(8900, 8940); [TestMethod] public void TestTransportServer() { ICodec<string> codec = new StringCodec(); - int port = NetworkUtils.GenerateRandomPort(6000, 7000); BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); - IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + + IPEndPoint endpoint = new IPEndPoint(_localIpAddress, 0); var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data)); - using (var server = new TransportServer<string>(endpoint, remoteHandler, codec)) + using (var server = new TransportServer<string>(endpoint, remoteHandler, codec, _tcpPortProvider)) { server.Run(); - IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port); using (var client = new TransportClient<string>(remoteEndpoint, codec)) { client.Send("Hello"); @@ -68,19 +72,18 @@ namespace Org.Apache.REEF.Wake.Tests public void TestTransportServerEvent() { ICodec<TestEvent> codec = new TestEventCodec(); - int port = NetworkUtils.GenerateRandomPort(6000, 7000); BlockingCollection<TestEvent> queue = new BlockingCollection<TestEvent>(); List<TestEvent> events = new List<TestEvent>(); - IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + IPEndPoint endpoint = new IPEndPoint(_localIpAddress, 0); var remoteHandler = Observer.Create<TransportEvent<TestEvent>>(tEvent => queue.Add(tEvent.Data)); - using (var server = new TransportServer<TestEvent>(endpoint, remoteHandler, codec)) + using (var server = new TransportServer<TestEvent>(endpoint, remoteHandler, codec, _tcpPortProvider)) { server.Run(); - IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port); using (var client = new TransportClient<TestEvent>(remoteEndpoint, codec)) { client.Send(new TestEvent("Hello")); @@ -100,8 +103,7 @@ namespace Org.Apache.REEF.Wake.Tests public void TestTransportSenderStage() { ICodec<string> codec = new StringCodec(); - int port = NetworkUtils.GenerateRandomPort(6000, 7000); - IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + IPEndPoint endpoint = new IPEndPoint(_localIpAddress, 0); List<string> events = new List<string>(); BlockingCollection<string> queue = new BlockingCollection<string>(); @@ -109,12 +111,12 @@ namespace Org.Apache.REEF.Wake.Tests // Server echoes the message back to the client var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => tEvent.Link.Write(tEvent.Data)); - using (TransportServer<string> server = new TransportServer<string>(endpoint, remoteHandler, codec)) + using (TransportServer<string> server = new TransportServer<string>(endpoint, remoteHandler, codec, _tcpPortProvider)) { server.Run(); var clientHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data)); - IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port); using (var client = new TransportClient<string>(remoteEndpoint, codec, clientHandler)) { client.Send("Hello"); @@ -134,8 +136,7 @@ namespace Org.Apache.REEF.Wake.Tests public void TestRaceCondition() { ICodec<string> codec = new StringCodec(); - int port = NetworkUtils.GenerateRandomPort(6000, 7000); - + var port = 0; BlockingCollection<string> queue = new BlockingCollection<string>(); List<string> events = new List<string>(); int numEventsExpected = 150; @@ -143,7 +144,7 @@ namespace Org.Apache.REEF.Wake.Tests IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); var remoteHandler = Observer.Create<TransportEvent<string>>(tEvent => queue.Add(tEvent.Data)); - using (var server = new TransportServer<string>(endpoint, remoteHandler, codec)) + using (var server = new TransportServer<string>(endpoint, remoteHandler, codec, _tcpPortProvider)) { server.Run(); @@ -151,7 +152,7 @@ namespace Org.Apache.REEF.Wake.Tests { Task.Run(() => { - IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + IPEndPoint remoteEndpoint = new IPEndPoint(_localIpAddress, server.LocalEndpoint.Port); using (var client = new TransportClient<string>(remoteEndpoint, codec)) { client.Send("Hello"); @@ -197,5 +198,17 @@ namespace Org.Apache.REEF.Wake.Tests return new TestEvent(new StringCodec().Decode(data)); } } + + + private static ITcpPortProvider GetTcpProvider(int portRangeStart, int portRangeEnd) + { + var configuration = TangFactory.GetTang().NewConfigurationBuilder() + .BindImplementation<ITcpPortProvider, TcpPortProvider>() + .BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString()) + .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd - portRangeStart + 1).ToString()) + .Build(); + return TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs index 6f7baf9..80bb78b 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableRemoteManagerTest.cs @@ -79,7 +79,7 @@ namespace Org.Apache.REEF.Wake.Tests [TestMethod] public void TestWritableOneWayCommunicationClientOnly() { - int listeningPort = NetworkUtils.GenerateRandomPort(6000, 7000); + int listeningPort = NetworkUtils.GenerateRandomPort(8900, 8940); IPAddress listeningAddress = IPAddress.Parse("127.0.0.1"); BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs index 03de24e..914a2aa 100644 --- a/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs +++ b/lang/cs/Org.Apache.REEF.Wake.Tests/WritableTransportTest.cs @@ -26,8 +26,11 @@ using System.Reactive; using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Remote.Parameters; using Org.Apache.REEF.Wake.Util; namespace Org.Apache.REEF.Wake.Tests @@ -40,6 +43,8 @@ namespace Org.Apache.REEF.Wake.Tests [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] public class WritableTransportTest { + private readonly ITcpPortProvider _tcpPortProvider = GetTcpProvider(8900, 8940); + /// <summary> /// Tests whether WritableTransportServer receives /// string messages from WritableTransportClient @@ -47,19 +52,17 @@ namespace Org.Apache.REEF.Wake.Tests [TestMethod] public void TestWritableTransportServer() { - int port = NetworkUtils.GenerateRandomPort(6000, 7000); - BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); List<string> events = new List<string>(); - IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0); var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data)); - using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler)) + using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider)) { server.Run(); - IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port); using (var client = new WritableTransportClient<WritableString>(remoteEndpoint)) { client.Send(new WritableString("Hello")); @@ -85,8 +88,8 @@ namespace Org.Apache.REEF.Wake.Tests [TestMethod] public void TestWritableTransportSenderStage() { - int port = NetworkUtils.GenerateRandomPort(6000, 7000); - IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0); List<string> events = new List<string>(); BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); @@ -94,12 +97,12 @@ namespace Org.Apache.REEF.Wake.Tests // Server echoes the message back to the client var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => tEvent.Link.Write(tEvent.Data)); - using (WritableTransportServer<WritableString> server = new WritableTransportServer<WritableString>(endpoint, remoteHandler)) + using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider)) { server.Run(); var clientHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data)); - IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port); using (var client = new WritableTransportClient<WritableString>(remoteEndpoint, clientHandler)) { client.Send(new WritableString("Hello")); @@ -126,16 +129,14 @@ namespace Org.Apache.REEF.Wake.Tests [TestMethod] public void TestWritableRaceCondition() { - int port = NetworkUtils.GenerateRandomPort(6000, 7000); - BlockingCollection<WritableString> queue = new BlockingCollection<WritableString>(); List<string> events = new List<string>(); int numEventsExpected = 150; - IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, port); + IPEndPoint endpoint = new IPEndPoint(IPAddress.Any, 0); var remoteHandler = Observer.Create<TransportEvent<WritableString>>(tEvent => queue.Add(tEvent.Data)); - using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler)) + using (var server = new WritableTransportServer<WritableString>(endpoint, remoteHandler, _tcpPortProvider)) { server.Run(); @@ -143,7 +144,7 @@ namespace Org.Apache.REEF.Wake.Tests { Task.Run(() => { - IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port); + IPEndPoint remoteEndpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), server.LocalEndpoint.Port); using (var client = new WritableTransportClient<WritableString>(remoteEndpoint)) { client.Send(new WritableString("Hello")); @@ -162,5 +163,15 @@ namespace Org.Apache.REEF.Wake.Tests Assert.AreEqual(numEventsExpected, events.Count); } + + private static ITcpPortProvider GetTcpProvider(int portRangeStart, int portRangeEnd) + { + var configuration = TangFactory.GetTang().NewConfigurationBuilder() + .BindImplementation<ITcpPortProvider, TcpPortProvider>() + .BindIntNamedParam<TcpPortRangeStart>(portRangeStart.ToString()) + .BindIntNamedParam<TcpPortRangeCount>((portRangeEnd - portRangeStart + 1).ToString()) + .Build(); + return TangFactory.GetTang().NewInjector(configuration).GetInstance<ITcpPortProvider>(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj index a62d524..53ffd65 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj +++ b/lang/cs/Org.Apache.REEF.Wake/Org.Apache.REEF.Wake.csproj @@ -62,6 +62,11 @@ under the License. <Compile Include="IObserverFactory.cs" /> <Compile Include="IStage.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="Remote\ITcpPortProvider.cs" /> + <Compile Include="Remote\Parameters\TcpPortRangeCount.cs" /> + <Compile Include="Remote\Parameters\TcpPortRangeSeed.cs" /> + <Compile Include="Remote\Parameters\TcpPortRangeStart.cs" /> + <Compile Include="Remote\Parameters\TcpPortRangeTryCount.cs" /> <Compile Include="Remote\IDataReader.cs" /> <Compile Include="Remote\IDataWriter.cs" /> <Compile Include="Remote\Impl\StreamDataReader.cs" /> @@ -114,6 +119,7 @@ under the License. <Compile Include="Remote\Proto\WakeRemoteProtos.cs" /> <Compile Include="Remote\RemoteConfiguration.cs" /> <Compile Include="Remote\RemoteRuntimeException.cs" /> + <Compile Include="Remote\TcpPortProvider.cs" /> <Compile Include="Remote\TypeCache.cs" /> <Compile Include="RX\AbstractObserver.cs" /> <Compile Include="RX\AbstractRxStage.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManagerFactory.cs index 7d8041b..36a1adc 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManagerFactory.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/IRemoteManagerFactory.cs @@ -24,7 +24,7 @@ using Org.Apache.REEF.Wake.Impl; namespace Org.Apache.REEF.Wake.Remote { /// <summary> - /// Creates new intsances of IRemoteManager. + /// Creates new instances of IRemoteManager. /// </summary> [DefaultImplementation(typeof(DefaultRemoteManagerFactory))] public interface IRemoteManagerFactory @@ -39,6 +39,15 @@ namespace Org.Apache.REEF.Wake.Remote IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec); /// <summary> + /// Constructs a DefaultRemoteManager listening on the specified address and any + /// available port. + /// </summary> + /// <param name="localAddress">The address to listen on</param> + /// <param name="codec">The codec used for serializing messages</param> + IRemoteManager<T> GetInstance<T>(IPAddress localAddress, ICodec<T> codec); + + + /// <summary> /// Constructs a DefaultRemoteManager. Does not listen for incoming messages. /// </summary> /// <param name="codec">The codec used for serializing messages</param> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpPortProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpPortProvider.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpPortProvider.cs new file mode 100644 index 0000000..8783f01 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/ITcpPortProvider.cs @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Wake.Remote +{ + /// <summary> + /// Provides port numbers for tcp listeners + /// </summary> + [DefaultImplementation(typeof(TcpPortProvider))] + public interface ITcpPortProvider : IEnumerable<int> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs index 210ebcf..de577d1 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManager.cs @@ -20,6 +20,7 @@ using System; using System.Collections.Generic; using System.Net; +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Util; @@ -43,8 +44,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> /// <param name="localAddress">The address to listen on</param> /// <param name="codec">The codec used for serializing messages</param> + /// <param name="tcpPortProvider">provides port numbers to listen</param> [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] - public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec) : this(localAddress, 0, codec) + public DefaultRemoteManager(IPAddress localAddress, ICodec<T> codec, ITcpPortProvider tcpPortProvider) : + this(localAddress, 0, codec, tcpPortProvider) { } @@ -53,8 +56,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> /// <param name="localEndpoint">The endpoint to listen on</param> /// <param name="codec">The codec used for serializing messages</param> + /// <param name="tcpPortProvider">provides port numbers to listen</param> [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] - public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec) + public DefaultRemoteManager(IPEndPoint localEndpoint, ICodec<T> codec, ITcpPortProvider tcpPortProvider) { if (localEndpoint == null) { @@ -74,7 +78,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl _cachedClients = new Dictionary<IPEndPoint, ProxyObserver>(); // Begin to listen for incoming messages - _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec); + _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec, + tcpPortProvider); _server.Run(); LocalEndpoint = _server.LocalEndpoint; @@ -88,8 +93,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="localAddress">The address to listen on</param> /// <param name="port">The port to listen on</param> /// <param name="codec">The codec used for serializing messages</param> + /// <param name="tcpPortProvider">provides port numbers to listen</param> [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] - public DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec) + public DefaultRemoteManager(IPAddress localAddress, int port, ICodec<T> codec, ITcpPortProvider tcpPortProvider) { if (localAddress == null) { @@ -111,7 +117,8 @@ namespace Org.Apache.REEF.Wake.Remote.Impl IPEndPoint localEndpoint = new IPEndPoint(localAddress, port); // Begin to listen for incoming messages - _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec); + _server = new TransportServer<IRemoteEvent<T>>(localEndpoint, _observerContainer, _codec, + tcpPortProvider); _server.Run(); LocalEndpoint = _server.LocalEndpoint; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs index 38a020f..54728fc 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/DefaultRemoteManagerFactory.cs @@ -29,16 +29,26 @@ namespace Org.Apache.REEF.Wake.Impl /// </summary> internal sealed class DefaultRemoteManagerFactory : IRemoteManagerFactory { + private readonly ITcpPortProvider _tcpPortProvider; [Inject] - private DefaultRemoteManagerFactory() + private DefaultRemoteManagerFactory(ITcpPortProvider tcpPortProvider) { + _tcpPortProvider = tcpPortProvider; } public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port, ICodec<T> codec) { #pragma warning disable 618 // This is the one place allowed to call this constructor. Hence, disabling the warning is OK. - return new DefaultRemoteManager<T>(localAddress, port, codec); + return new DefaultRemoteManager<T>(localAddress, port, codec, _tcpPortProvider); +#pragma warning restore 618 + } + + public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, ICodec<T> codec) + { +#pragma warning disable 618 + // This is the one place allowed to call this constructor. Hence, disabling the warning is OK. + return new DefaultRemoteManager<T>(localAddress, 0, codec, _tcpPortProvider); #pragma warning restore 618 } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs index 743bac5..8cd350e 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/TransportServer.cs @@ -22,6 +22,7 @@ using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Util; @@ -34,9 +35,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl { private static readonly Logger LOGGER = Logger.GetLogger(typeof(TransportServer<>)); - private readonly TcpListener _listener; + private TcpListener _listener; private readonly CancellationTokenSource _cancellationSource; private readonly IObserver<TransportEvent<T>> _remoteObserver; + private readonly ITcpPortProvider _tcpPortProvider; private readonly ICodec<T> _codec; private bool _disposed; private Task _serverTask; @@ -46,33 +48,22 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// Listens on the specified remote endpoint. When it recieves a remote /// event, it will envoke the specified remote handler. /// </summary> - /// <param name="port">Port to listen on</param> - /// <param name="remoteHandler">The handler to invoke when receiving incoming - /// remote messages</param> - /// <param name="codec">The codec to encode/decode"</param> - public TransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ICodec<T> codec) - : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, codec) - { - } - - /// <summary> - /// Constructs a TransportServer to listen for remote events. - /// Listens on the specified remote endpoint. When it recieves a remote - /// event, it will envoke the specified remote handler. - /// </summary> /// <param name="localEndpoint">Endpoint to listen on</param> /// <param name="remoteHandler">The handler to invoke when receiving incoming /// remote messages</param> /// <param name="codec">The codec to encode/decode"</param> + /// <param name="tcpPortProvider">provides port numbers to listen</param> public TransportServer(IPEndPoint localEndpoint, IObserver<TransportEvent<T>> remoteHandler, - ICodec<T> codec) + ICodec<T> codec, + ITcpPortProvider tcpPortProvider) { _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port); _remoteObserver = remoteHandler; _cancellationSource = new CancellationTokenSource(); _cancellationSource.Token.ThrowIfCancellationRequested(); _codec = codec; + _tcpPortProvider = tcpPortProvider; _disposed = false; } @@ -89,10 +80,45 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> public void Run() { - _listener.Start(); + if (LocalEndpoint.Port == 0) + { + FindAPortAndStartListener(); + } + else + { + _listener.Start(); + } + _serverTask = Task.Run(() => StartServer()); } + private void FindAPortAndStartListener() + { + var foundAPort = false; + var exception = new SocketException((int)SocketError.AddressAlreadyInUse); + for (var enumerator = _tcpPortProvider.GetEnumerator(); + !foundAPort && enumerator.MoveNext(); + ) + { + _listener = new TcpListener(LocalEndpoint.Address, enumerator.Current); + try + { + _listener.Start(); + foundAPort = true; + } + catch (SocketException e) + { + exception = e; + } + } + if (!foundAPort) + { + Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER); + } + LOGGER.Log(Level.Info, + String.Format("Listening on {0}", _listener.LocalEndpoint.ToString())); + } + /// <summary> /// Close the TransportServer and all open connections /// </summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs index 285db71..0a9ead3 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManager.cs @@ -45,7 +45,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="localAddress">The address to listen on</param> /// <param name="port">The port to listen on</param> [Obsolete("Use IRemoteManagerFactory.GetInstance() instead.", false)] - public WritableRemoteManager(IPAddress localAddress, int port) + public WritableRemoteManager(IPAddress localAddress, int port, ITcpPortProvider tcpPortProvider) { if (localAddress == null) { @@ -62,7 +62,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl IPEndPoint localEndpoint = new IPEndPoint(localAddress, port); // Begin to listen for incoming messages - _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer); + _server = new WritableTransportServer<IWritableRemoteEvent<T>>(localEndpoint, _observerContainer, tcpPortProvider); _server.Run(); LocalEndpoint = _server.LocalEndpoint; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs index 6d3c4ad..4beb844 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableRemoteManagerFactory.cs @@ -31,16 +31,18 @@ namespace Org.Apache.REEF.Wake.Impl [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] public sealed class WritableRemoteManagerFactory { + private readonly ITcpPortProvider _tcpPortProvider; [Inject] - private WritableRemoteManagerFactory() + private WritableRemoteManagerFactory(ITcpPortProvider tcpPortProvider) { + _tcpPortProvider = tcpPortProvider; } public IRemoteManager<T> GetInstance<T>(IPAddress localAddress, int port) where T : IWritable { #pragma warning disable 618 // This is the one place allowed to call this constructor. Hence, disabling the warning is OK. - return new WritableRemoteManager<T>(localAddress, port); + return new WritableRemoteManager<T>(localAddress, port, _tcpPortProvider); #pragma warning disable 618 } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs index 05a520d..90cfdd7 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/WritableTransportServer.cs @@ -22,6 +22,7 @@ using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; +using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Util; @@ -36,9 +37,10 @@ namespace Org.Apache.REEF.Wake.Remote.Impl { private static readonly Logger LOGGER = Logger.GetLogger(typeof (TransportServer<>)); - private readonly TcpListener _listener; + private TcpListener _listener; private readonly CancellationTokenSource _cancellationSource; private readonly IObserver<TransportEvent<T>> _remoteObserver; + private readonly ITcpPortProvider _tcpPortProvider; private bool _disposed; private Task _serverTask; @@ -50,8 +52,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="port">Port to listen on</param> /// <param name="remoteHandler">The handler to invoke when receiving incoming /// remote messages</param> - public WritableTransportServer(int port, IObserver<TransportEvent<T>> remoteHandler) - : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler) + /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param> + public WritableTransportServer(int port, IObserver<TransportEvent<T>> remoteHandler, ITcpPortProvider tcpPortProvider) + : this(new IPEndPoint(NetworkUtils.LocalIPAddress, port), remoteHandler, tcpPortProvider) { } @@ -63,11 +66,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// <param name="localEndpoint">Endpoint to listen on</param> /// <param name="remoteHandler">The handler to invoke when receiving incoming /// remote messages</param> - public WritableTransportServer(IPEndPoint localEndpoint, - IObserver<TransportEvent<T>> remoteHandler) + /// <param name="tcpPortProvider">Find port numbers if listenport is 0</param> + public WritableTransportServer( + IPEndPoint localEndpoint, + IObserver<TransportEvent<T>> remoteHandler, + ITcpPortProvider tcpPortProvider) { _listener = new TcpListener(localEndpoint.Address, localEndpoint.Port); _remoteObserver = remoteHandler; + _tcpPortProvider = tcpPortProvider; _cancellationSource = new CancellationTokenSource(); _cancellationSource.Token.ThrowIfCancellationRequested(); _disposed = false; @@ -86,10 +93,46 @@ namespace Org.Apache.REEF.Wake.Remote.Impl /// </summary> public void Run() { - _listener.Start(); + if (LocalEndpoint.Port == 0) + { + FindAPortAndStartListener(); + } + else + { + _listener.Start(); + } + _serverTask = Task.Run(() => StartServer()); } + private void FindAPortAndStartListener() + { + var foundAPort = false; + var exception = new SocketException((int)SocketError.AddressAlreadyInUse); + for (var enumerator = _tcpPortProvider.GetEnumerator(); + !foundAPort && enumerator.MoveNext(); + ) + { + _listener = new TcpListener(LocalEndpoint.Address, enumerator.Current); + try + { + _listener.Start(); + foundAPort = true; + } + catch (SocketException e) + { + exception = e; + } + } + if (!foundAPort) + { + Exceptions.Throw(exception, "Could not find a port to listen on", LOGGER); + } + LOGGER.Log(Level.Info, + String.Format("Listening on {0}", _listener.LocalEndpoint.ToString())); + } + + /// <summary> /// Close the TransportServer and all open connections /// </summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeCount.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeCount.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeCount.cs new file mode 100644 index 0000000..164e2bd --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeCount.cs @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Wake.Remote.Parameters +{ + [NamedParameter(Documentation = "Port number count in the range for listening on tcp ports", DefaultValue = "1000") + ] + public class TcpPortRangeCount : Name<int> + { + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/fffee854/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeSeed.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeSeed.cs b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeSeed.cs new file mode 100644 index 0000000..f60f169 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/TcpPortRangeSeed.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Wake.Remote.Parameters +{ + [NamedParameter(Documentation = "Seed for the random port number generator", DefaultValue = "0")] + public class TcpPortRangeSeed: Name<int> + { + } +}
