Repository: incubator-reef
Updated Branches:
refs/heads/master ed3d8a40e -> 26209180f
[REEF-764] Use TCPConfigurationProviderModule in IMRU
This addressed the issue by
* removing any explicit setting of TcpPortProvider configuration in IMRU
Driver and ServiceAndContextConfigurationProvider.
* using TCPConfigurationProviderModule to set TcpPortProvider configuration
in REEF.IMRU.Examples so that it is automagically passed to driver and
other evaluators.
JIRA:
[REEF-764](https://issues.apache.org/jira/browse/REEF-764)
Pull Request:
This closes #500
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/26209180
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/26209180
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/26209180
Branch: refs/heads/master
Commit: 26209180fa3996cbf40919a7e8dfb624e2d8b66a
Parents: ed3d8a4
Author: Dhruv <[email protected]>
Authored: Wed Sep 16 23:49:36 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Sep 17 09:31:39 2015 -0700
----------------------------------------------------------------------
lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs | 7 ++++---
.../OnREEF/Driver/IMRUDriver.cs | 21 +-------------------
.../ServiceAndContextConfigurationProvider.cs | 10 +++-------
3 files changed, 8 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/26209180/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
index b1554d5..9018473 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
@@ -20,6 +20,7 @@
using System;
using System.Globalization;
using System.Linq;
+using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
@@ -142,10 +143,10 @@ namespace Org.Apache.REEF.IMRU.Examples
}
}
- var tcpPortConfig = TangFactory.GetTang().NewConfigurationBuilder()
- .BindNamedParameter(typeof(TcpPortRangeStart),
+ var tcpPortConfig = TcpPortConfigurationModule.ConfigurationModule
+ .Set(TcpPortConfigurationModule.PortRangeStart,
startPort.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter(typeof(TcpPortRangeCount),
+ .Set(TcpPortConfigurationModule.PortRangeCount,
portRange.ToString(CultureInfo.InvariantCulture))
.Build();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/26209180/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 7322e4e..958364a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -64,7 +64,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
private ICommunicationGroupDriver _commGroup;
private readonly IGroupCommDriver _groupCommDriver;
private readonly TaskStarter _groupCommTaskStarter;
- private IConfiguration _tcpPortProviderConfig;
private readonly ConcurrentStack<string> _taskIdStack;
private readonly ConcurrentStack<IConfiguration>
_perMapperConfiguration;
private readonly Stack<IPartitionDescriptor> _partitionDescriptorStack;
@@ -86,8 +85,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
[Parameter(typeof (PerMapConfigGeneratorSet))]
ISet<IPerMapperConfigGenerator> perMapperConfigs,
ConfigurationManager configurationManager,
IEvaluatorRequestor evaluatorRequestor,
- [Parameter(typeof (TcpPortRangeStart))] int startingPort,
- [Parameter(typeof (TcpPortRangeCount))] int portRange,
[Parameter(typeof (CoresPerMapper))] int coresPerMapper,
[Parameter(typeof (CoresForUpdateTask))] int coresForUpdateTask,
[Parameter(typeof (MemoryPerMapper))] int memoryPerMapper,
@@ -108,12 +105,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
_allowedFailedEvaluators = (int)
(failedEvaluatorsFraction*dataSet.Count);
AddGroupCommunicationOperators();
-
- //TODO[REEF-600]: Once the configuration module for
TcpPortProvider
- //TODO[REEF-600]: will be provided, the configuraiton will be
automatically
- //TODO[REEF-600]: carried over to evaluators and below function
will be obsolete.
- ConstructTcpPortProviderConfig(startingPort, portRange);
-
_groupCommTaskStarter = new TaskStarter(_groupCommDriver,
_dataSet.Count + 1);
_taskIdStack = new ConcurrentStack<string>();
@@ -122,7 +113,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
ConstructTaskIdAndPartitionDescriptorStack();
_serviceAndContextConfigurationProvider =
new ServiceAndContextConfigurationProvider<TMapInput,
TMapOutput>(dataSet.Count + 1, groupCommDriver,
- _configurationManager, _tcpPortProviderConfig,
_partitionDescriptorStack);
+ _configurationManager, _partitionDescriptorStack);
}
/// <summary>
@@ -349,16 +340,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
.Build();
}
- private void ConstructTcpPortProviderConfig(int startingPort, int
portRange)
- {
- _tcpPortProviderConfig =
TangFactory.GetTang().NewConfigurationBuilder()
- .BindNamedParameter<TcpPortRangeStart,
int>(GenericType<TcpPortRangeStart>.Class,
- startingPort.ToString(CultureInfo.InvariantCulture))
- .BindNamedParameter<TcpPortRangeCount,
int>(GenericType<TcpPortRangeCount>.Class,
- portRange.ToString(CultureInfo.InvariantCulture))
- .Build();
- }
-
private void ConstructTaskIdAndPartitionDescriptorStack()
{
int counter = 0;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/26209180/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
index 40faa9c..26cbb79 100644
---
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
@@ -49,11 +49,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
private int _assignedPartitionDescriptors;
private readonly IGroupCommDriver _groupCommDriver;
private readonly ConfigurationManager _configurationManager;
- private readonly IConfiguration _tcpPortProviderConfig;
private readonly Stack<IPartitionDescriptor> _partitionDescriptors;
internal ServiceAndContextConfigurationProvider(int numNodes,
IGroupCommDriver groupCommDriver,
- ConfigurationManager configurationManager, IConfiguration
tcpPortProviderConfig, Stack<IPartitionDescriptor> partitionDescriptors)
+ ConfigurationManager configurationManager,
Stack<IPartitionDescriptor> partitionDescriptors)
{
_configurationProvider = new Dictionary<string,
ContextAndServiceConfiguration>();
_failedEvaluators = new HashSet<string>();
@@ -61,7 +60,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
_numNodes = numNodes;
_groupCommDriver = groupCommDriver;
_configurationManager = configurationManager;
- _tcpPortProviderConfig = tcpPortProviderConfig;
_assignedPartitionDescriptors = 0;
_partitionDescriptors = partitionDescriptors;
_lock = new object();
@@ -157,8 +155,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
).Build();
var contextConf =
Configurations.Merge(_groupCommDriver.GetContextConfiguration(),
partitionDescriptor.GetPartitionConfiguration());
- var serviceConf =
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig,
- _tcpPortProviderConfig);
+ var serviceConf =
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig);
return new ContextAndServiceConfiguration(contextConf,
serviceConf);
}
@@ -178,8 +175,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
}
).Build();
- var serviceConf =
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig,
- _tcpPortProviderConfig);
+ var serviceConf =
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig);
return new
ContextAndServiceConfiguration(_groupCommDriver.GetContextConfiguration(),
serviceConf);
}
}