[REEF-256] Rename MPI to Group Communication Some of the classes in Group Communication are names as Mpi when it was created initially. We need to rename them to group communication. This PR renamed classes, variables and tests to use GroupComm or GroupCommunication
JIRA: [REEF-256](https://issues.apache.org/jira/browse/REEF-256) Pull Request: This closes #148 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c330dcff Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c330dcff Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c330dcff Branch: refs/heads/master Commit: c330dcff5157f53cedaf8db4c5cbdda7570a0c48 Parents: e1be758 Author: Julia Wang <[email protected]> Authored: Thu Apr 16 13:12:38 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu Apr 16 14:33:27 2015 -0700 ---------------------------------------------------------------------- .../KMeans/KMeansDriverHandlers.cs | 22 +- .../MachineLearning/KMeans/KMeansMasterTask.cs | 4 +- .../MachineLearning/KMeans/KMeansSlaveTask.cs | 10 +- .../MachineLearning/KMeans/LegacyKMeansTask.cs | 2 +- .../MachineLearning/KMeans/PartialMean.cs | 2 +- .../BroadcastAndReduceClient.cs | 14 +- .../PipelineBroadcastAndReduceClient.cs | 14 +- .../BroadcastReduceDriver.cs | 22 +- .../BroadcastReduceDriverAndTasks/MasterTask.cs | 10 +- .../BroadcastReduceDriverAndTasks/SlaveTask.cs | 10 +- .../PipelinedBroadcastReduceDriver.cs | 22 +- .../PipelinedMasterTask.cs | 10 +- .../PipelinedSlaveTask.cs | 10 +- .../ScatterReduceDriverAndTasks/MasterTask.cs | 10 +- .../ScatterReduceDriver.cs | 24 +- .../ScatterReduceDriverAndTasks/SlaveTask.cs | 10 +- .../GroupCommunicationTests.cs | 100 +++---- .../GroupCommunicationTreeTopologyTests.cs | 54 ++-- .../Config/GroupCommConfigurationOptions.cs | 102 +++++++ .../Group/Config/MpiConfigurationOptions.cs | 102 ------- .../Group/Driver/ICommunicationGroupDriver.cs | 18 +- .../Group/Driver/IGroupCommDriver.cs | 96 +++++++ .../Group/Driver/IMpiDriver.cs | 96 ------- .../Driver/Impl/CommunicationGroupDriver.cs | 34 +-- .../Group/Driver/Impl/GroupCommDriver.cs | 285 +++++++++++++++++++ .../Driver/Impl/GroupCommunicationMessage.cs | 8 +- .../Group/Driver/Impl/MessageType.cs | 2 +- .../Group/Driver/Impl/MpiDriver.cs | 285 ------------------- .../Group/Driver/Impl/TaskStarter.cs | 16 +- .../Group/Operators/IBroadcastReceiver.cs | 4 +- .../Group/Operators/IBroadcastSender.cs | 4 +- .../Group/Operators/IGroupCommOperator.cs | 43 +++ .../Group/Operators/IMpiOperator.cs | 43 --- .../Group/Operators/IReduceReceiver.cs | 4 +- .../Group/Operators/IReduceSender.cs | 4 +- .../Group/Operators/IScatterReceiver.cs | 4 +- .../Group/Operators/IScatterSender.cs | 4 +- .../Group/Operators/Impl/BroadcastReceiver.cs | 6 +- .../Group/Operators/Impl/BroadcastSender.cs | 8 +- .../Group/Operators/Impl/ReduceOperatorSpec.cs | 2 +- .../Group/Operators/Impl/ReduceReceiver.cs | 6 +- .../Group/Operators/Impl/ReduceSender.cs | 6 +- .../Group/Operators/Impl/ScatterOperatorSpec.cs | 2 +- .../Group/Operators/Impl/ScatterReceiver.cs | 6 +- .../Group/Operators/Impl/ScatterSender.cs | 6 +- .../Group/Operators/Impl/Sender.cs | 2 +- .../Group/Task/ICommunicationGroupClient.cs | 2 +- .../Group/Task/IGroupCommClient.cs | 39 +++ .../Group/Task/IGroupCommNetworkObserver.cs | 44 +++ .../Group/Task/IMpiClient.cs | 39 --- .../Group/Task/IMpiNetworkObserver.cs | 44 --- .../Group/Task/Impl/CommunicationGroupClient.cs | 32 +-- .../Impl/CommunicationGroupNetworkObserver.cs | 4 +- .../Group/Task/Impl/GroupCommClient.cs | 107 +++++++ .../Group/Task/Impl/GroupCommNetworkObserver.cs | 108 +++++++ .../Group/Task/Impl/MpiClient.cs | 107 ------- .../Group/Task/Impl/MpiNetworkObserver.cs | 108 ------- .../Group/Task/Impl/OperatorTopology.cs | 16 +- .../Group/Topology/FlatTopology.cs | 22 +- .../Group/Topology/ITopology.cs | 2 +- .../Group/Topology/TreeTopology.cs | 20 +- .../Org.Apache.REEF.Network.csproj | 16 +- .../Functional/Group/BroadcastReduceTest.cs | 14 +- .../Group/PipelinedBroadcastReduceTest.cs | 14 +- .../Functional/Group/ScatterReduceTest.cs | 14 +- .../Functional/ML/KMeans/TestKMeans.cs | 14 +- .../Org.Apache.REEF.Tests.csproj | 4 - 67 files changed, 1157 insertions(+), 1161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs index 358e3e1..08173cc 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs @@ -60,12 +60,12 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans private readonly int _clustersNumber = 3; private readonly int _totalEvaluators; private int _partitionInex = 0; - private readonly IMpiDriver _mpiDriver; + private readonly IGroupCommDriver _groupCommDriver; private readonly ICommunicationGroupDriver _commGroup; - private readonly TaskStarter _mpiTaskStarter; + private readonly TaskStarter _groupCommTaskStarter; [Inject] - public KMeansDriverHandlers([Parameter(typeof(NumPartitions))] int numPartitions, MpiDriver mpiDriver) + public KMeansDriverHandlers([Parameter(typeof(NumPartitions))] int numPartitions, GroupCommDriver groupCommDriver) { Identifier = "KMeansDriverId"; _executionDirectory = Path.Combine(Directory.GetCurrentDirectory(), Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4)); @@ -79,15 +79,15 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans _totalEvaluators = numPartitions + 1; - _mpiDriver = mpiDriver; + _groupCommDriver = groupCommDriver; - _commGroup = _mpiDriver.DefaultGroup + _commGroup = _groupCommDriver.DefaultGroup .AddBroadcast<Centroids, CentroidsCodec>(Constants.CentroidsBroadcastOperatorName, Constants.MasterTaskId) .AddBroadcast<ControlMessage, ControlMessageCodec>(Constants.ControlMessageBroadcastOperatorName, Constants.MasterTaskId) .AddReduce<ProcessedResults, ProcessedResultsCodec>(Constants.MeansReduceOperatorName, Constants.MasterTaskId, new KMeansMasterTask.AggregateMeans()) .Build(); - _mpiTaskStarter = new TaskStarter(_mpiDriver, _totalEvaluators); + _groupCommTaskStarter = new TaskStarter(_groupCommDriver, _totalEvaluators); CreateClassHierarchy(); } @@ -105,10 +105,10 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans public void OnNext(IAllocatedEvaluator allocatedEvaluator) { - IConfiguration contextConfiguration = _mpiDriver.GetContextConfiguration(); + IConfiguration contextConfiguration = _groupCommDriver.GetContextConfiguration(); int partitionNum; - if (_mpiDriver.IsMasterContextConfiguration(contextConfiguration)) + if (_groupCommDriver.IsMasterContextConfiguration(contextConfiguration)) { partitionNum = -1; } @@ -121,7 +121,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans } } - IConfiguration gcServiceConfiguration = _mpiDriver.GetServiceConfiguration(); + IConfiguration gcServiceConfiguration = _groupCommDriver.GetServiceConfiguration(); IConfiguration commonServiceConfiguration = TangFactory.GetTang().NewConfigurationBuilder(gcServiceConfiguration) .BindNamedParameter<DataPartitionCache.PartitionIndex, int>(GenericType<DataPartitionCache.PartitionIndex>.Class, partitionNum.ToString(CultureInfo.InvariantCulture)) @@ -141,7 +141,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans { IConfiguration taskConfiguration; - if (_mpiDriver.IsMasterTaskContext(activeContext)) + if (_groupCommDriver.IsMasterTaskContext(activeContext)) { // Configure Master Task taskConfiguration = TaskConfiguration.ConfigurationModule @@ -162,7 +162,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans _commGroup.AddTask(slaveTaskId); } - _mpiTaskStarter.QueueTask(taskConfiguration, activeContext); + _groupCommTaskStarter.QueueTask(taskConfiguration, activeContext); } public void OnError(Exception error) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs index fd096ea..375dbc6 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs @@ -49,7 +49,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans public KMeansMasterTask( [Parameter(typeof(KMeansConfiguratioinOptions.TotalNumEvaluators))] int totalNumEvaluators, [Parameter(Value = typeof(KMeansConfiguratioinOptions.ExecutionDirectory))] string executionDirectory, - IMpiClient mpiClient) + IGroupCommClient groupCommClient) { using (_logger.LogFunction("KMeansMasterTask")) { @@ -57,7 +57,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans { throw new ArgumentException("There must be more than 1 Evaluators in total, but the total evaluators number provided is " + totalNumEvaluators); } - _commGroup = mpiClient.GetCommunicationGroup(Constants.KMeansCommunicationGroupName); + _commGroup = groupCommClient.GetCommunicationGroup(Constants.KMeansCommunicationGroupName); _dataBroadcastSender = _commGroup.GetBroadcastSender<Centroids>(Constants.CentroidsBroadcastOperatorName); _meansReducerReceiver = _commGroup.GetReduceReceiver<ProcessedResults>(Constants.MeansReduceOperatorName); _controlBroadcastSender = _commGroup.GetBroadcastSender<ControlMessage>(Constants.ControlMessageBroadcastOperatorName); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs index 8d45245..6e037fe 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs @@ -32,7 +32,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans { private static readonly Logger _logger = Logger.GetLogger(typeof(KMeansSlaveTask)); private readonly int _clustersNum; - private readonly IMpiClient _mpiClient; + private readonly IGroupCommClient _groupCommClient; private readonly ICommunicationGroupClient _commGroup; private readonly IBroadcastReceiver<Centroids> _dataBroadcastReceiver; private readonly IBroadcastReceiver<ControlMessage> _controlBroadcastReceiver; @@ -43,14 +43,14 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans public KMeansSlaveTask( DataPartitionCache dataPartition, [Parameter(typeof(KMeansConfiguratioinOptions.TotalNumEvaluators))] int clustersNumber, - IMpiClient mpiClient) + IGroupCommClient groupCommClient) { using (_logger.LogFunction("KMeansSlaveTask::KMeansSlaveTask")) { _dataPartition = dataPartition; - _mpiClient = mpiClient; + _groupCommClient = groupCommClient; _clustersNum = clustersNumber; - _commGroup = _mpiClient.GetCommunicationGroup(Constants.KMeansCommunicationGroupName); + _commGroup = _groupCommClient.GetCommunicationGroup(Constants.KMeansCommunicationGroupName); _dataBroadcastReceiver = _commGroup.GetBroadcastReceiver<Centroids>(Constants.CentroidsBroadcastOperatorName); _partialMeansSender = _commGroup.GetReduceSender<ProcessedResults>(Constants.MeansReduceOperatorName); _controlBroadcastReceiver = _commGroup.GetBroadcastReceiver<ControlMessage>(Constants.ControlMessageBroadcastOperatorName); @@ -80,7 +80,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans public void Dispose() { - _mpiClient.Dispose(); + _groupCommClient.Dispose(); } private List<PartialMean> ComputePartialMeans() http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/LegacyKMeansTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/LegacyKMeansTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/LegacyKMeansTask.cs index c6c3ffa..6a6634a 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/LegacyKMeansTask.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/LegacyKMeansTask.cs @@ -74,7 +74,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans _dataPartition.LabelData(_centroids); _partialMeans = ComputePartialMeans(); - // should be replaced with MPI + // should be replaced with Group Communication using (StreamWriter writer = new StreamWriter( File.OpenWrite(Path.Combine(_kMeansExecutionDirectory, Constants.DataDirectory, Constants.PartialMeanFilePrefix + _dataPartition.Partition)))) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs index 6f44167..3a6ece0 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/PartialMean.cs @@ -74,7 +74,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans List<PartialMean> partialMeans = new List<PartialMean>(); for (int i = 0; i < partitionsNum; i++) { - // should be replaced with MPI + // should be replaced with Group Communication string path = Path.Combine(executionDirectory, Constants.DataDirectory, Constants.PartialMeanFilePrefix + i.ToString(CultureInfo.InvariantCulture)); FileStream file = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read); using (StreamReader reader = new StreamReader(file)) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs index 0db6ce0..559d936 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs @@ -62,15 +62,15 @@ namespace Org.Apache.REEF.Network.Examples.Client numTasks.ToString(CultureInfo.InvariantCulture)) .Build(); - IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() - .BindStringNamedParam<MpiConfigurationOptions.DriverId>(driverId) - .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(masterTaskId) - .BindStringNamedParam<MpiConfigurationOptions.GroupName>(groupName) - .BindIntNamedParam<MpiConfigurationOptions.FanOut>(fanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) - .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) + IConfiguration groupCommDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindStringNamedParam<GroupCommConfigurationOptions.DriverId>(driverId) + .BindStringNamedParam<GroupCommConfigurationOptions.MasterTaskId>(masterTaskId) + .BindStringNamedParam<GroupCommConfigurationOptions.GroupName>(groupName) + .BindIntNamedParam<GroupCommConfigurationOptions.FanOut>(fanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) + .BindIntNamedParam<GroupCommConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) .Build(); - IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig); + IConfiguration merged = Configurations.Merge(driverConfig, groupCommDriverConfig); HashSet<string> appDlls = new HashSet<string>(); appDlls.Add(typeof(IDriver).Assembly.GetName().Name); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs index 570b227..c86697b 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs @@ -63,15 +63,15 @@ namespace Org.Apache.REEF.Network.Examples.Client GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture)) .Build(); - IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() - .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId) - .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId) - .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName) - .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) - .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) + IConfiguration groupCommDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindStringNamedParam<GroupCommConfigurationOptions.DriverId>(GroupTestConstants.DriverId) + .BindStringNamedParam<GroupCommConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId) + .BindStringNamedParam<GroupCommConfigurationOptions.GroupName>(GroupTestConstants.GroupName) + .BindIntNamedParam<GroupCommConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) + .BindIntNamedParam<GroupCommConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) .Build(); - IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig); + IConfiguration merged = Configurations.Merge(driverConfig, groupCommDriverConfig); HashSet<string> appDlls = new HashSet<string>(); appDlls.Add(typeof(IDriver).Assembly.GetName().Name); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs index c6b8578..760d3b8 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs @@ -47,21 +47,21 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri private readonly int _numEvaluators; private readonly int _numIterations; - private readonly IMpiDriver _mpiDriver; + private readonly IGroupCommDriver _groupCommDriver; private readonly ICommunicationGroupDriver _commGroup; - private readonly TaskStarter _mpiTaskStarter; + private readonly TaskStarter _groupCommTaskStarter; [Inject] public BroadcastReduceDriver( [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators, [Parameter(typeof(GroupTestConfig.NumIterations))] int numIterations, - MpiDriver mpiDriver) + GroupCommDriver groupCommDriver) { Identifier = "BroadcastStartHandler"; _numEvaluators = numEvaluators; _numIterations = numIterations; - _mpiDriver = mpiDriver; - _commGroup = _mpiDriver.DefaultGroup + _groupCommDriver = groupCommDriver; + _commGroup = _groupCommDriver.DefaultGroup .AddBroadcast<int, IntCodec>( GroupTestConstants.BroadcastOperatorName, GroupTestConstants.MasterTaskId) @@ -71,7 +71,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri new SumFunction()) .Build(); - _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); + _groupCommTaskStarter = new TaskStarter(_groupCommDriver, numEvaluators); CreateClassHierarchy(); } @@ -86,14 +86,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri public void OnNext(IAllocatedEvaluator allocatedEvaluator) { - IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); - IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); + IConfiguration contextConf = _groupCommDriver.GetContextConfiguration(); + IConfiguration serviceConf = _groupCommDriver.GetServiceConfiguration(); allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); } public void OnNext(IActiveContext activeContext) { - if (_mpiDriver.IsMasterTaskContext(activeContext)) + if (_groupCommDriver.IsMasterTaskContext(activeContext)) { // Configure Master Task IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( @@ -110,7 +110,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri .Build(); _commGroup.AddTask(GroupTestConstants.MasterTaskId); - _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + _groupCommTaskStarter.QueueTask(partialTaskConf, activeContext); } else { @@ -130,7 +130,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri .Build(); _commGroup.AddTask(slaveTaskId); - _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + _groupCommTaskStarter.QueueTask(partialTaskConf, activeContext); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs index 1ca2353..21d23f8 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs @@ -34,7 +34,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri private readonly int _numIters; private readonly int _numReduceSenders; - private readonly IMpiClient _mpiClient; + private readonly IGroupCommClient _groupCommClient; private readonly ICommunicationGroupClient _commGroup; private readonly IBroadcastSender<int> _broadcastSender; private readonly IReduceReceiver<int> _sumReducer; @@ -43,14 +43,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri public MasterTask( [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters, [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators, - IMpiClient mpiClient) + IGroupCommClient groupCommClient) { _logger.Log(Level.Info, "Hello from master task"); _numIters = numIters; _numReduceSenders = numEvaluators - 1; - _mpiClient = mpiClient; + _groupCommClient = groupCommClient; - _commGroup = mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _commGroup = groupCommClient.GetCommunicationGroup(GroupTestConstants.GroupName); _broadcastSender = _commGroup.GetBroadcastSender<int>(GroupTestConstants.BroadcastOperatorName); _sumReducer = _commGroup.GetReduceReceiver<int>(GroupTestConstants.ReduceOperatorName); } @@ -78,7 +78,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri public void Dispose() { - _mpiClient.Dispose(); + _groupCommClient.Dispose(); } private int TriangleNumber(int n) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs index c09bb80..6db3a5c 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs @@ -31,7 +31,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri private static readonly Logger _logger = Logger.GetLogger(typeof(SlaveTask)); private readonly int _numIterations; - private readonly IMpiClient _mpiClient; + private readonly IGroupCommClient _groupCommClient; private readonly ICommunicationGroupClient _commGroup; private readonly IBroadcastReceiver<int> _broadcastReceiver; private readonly IReduceSender<int> _triangleNumberSender; @@ -39,13 +39,13 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri [Inject] public SlaveTask( [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters, - IMpiClient mpiClient) + IGroupCommClient groupCommClient) { _logger.Log(Level.Info, "Hello from slave task"); _numIterations = numIters; - _mpiClient = mpiClient; - _commGroup = _mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _groupCommClient = groupCommClient; + _commGroup = _groupCommClient.GetCommunicationGroup(GroupTestConstants.GroupName); _broadcastReceiver = _commGroup.GetBroadcastReceiver<int>(GroupTestConstants.BroadcastOperatorName); _triangleNumberSender = _commGroup.GetReduceSender<int>(GroupTestConstants.ReduceOperatorName); } @@ -69,7 +69,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri public void Dispose() { - _mpiClient.Dispose(); + _groupCommClient.Dispose(); } private int TriangleNumber(int n) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs index d108d68..5d00fb1 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs @@ -50,16 +50,16 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR private readonly int _numIterations; private readonly int _chunkSize; - private readonly IMpiDriver _mpiDriver; + private readonly IGroupCommDriver _groupCommDriver; private readonly ICommunicationGroupDriver _commGroup; - private readonly TaskStarter _mpiTaskStarter; + private readonly TaskStarter _groupCommTaskStarter; [Inject] public PipelinedBroadcastReduceDriver( [Parameter(typeof (GroupTestConfig.NumEvaluators))] int numEvaluators, [Parameter(typeof(GroupTestConfig.NumIterations))] int numIterations, [Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize, - MpiDriver mpiDriver) + GroupCommDriver groupCommDriver) { Logger.Log(Level.Info, "*******entering the driver code " + chunkSize); @@ -68,9 +68,9 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR _numIterations = numIterations; _chunkSize = chunkSize; - _mpiDriver = mpiDriver; + _groupCommDriver = groupCommDriver; - _commGroup = _mpiDriver.DefaultGroup + _commGroup = _groupCommDriver.DefaultGroup .AddBroadcast<int[], IntArrayCodec>( GroupTestConstants.BroadcastOperatorName, GroupTestConstants.MasterTaskId, @@ -84,7 +84,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR new PipelineIntDataConverter(_chunkSize)) .Build(); - _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); + _groupCommTaskStarter = new TaskStarter(_groupCommDriver, numEvaluators); CreateClassHierarchy(); } @@ -99,14 +99,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR public void OnNext(IAllocatedEvaluator allocatedEvaluator) { - IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); - IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); + IConfiguration contextConf = _groupCommDriver.GetContextConfiguration(); + IConfiguration serviceConf = _groupCommDriver.GetServiceConfiguration(); allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); } public void OnNext(IActiveContext activeContext) { - if (_mpiDriver.IsMasterTaskContext(activeContext)) + if (_groupCommDriver.IsMasterTaskContext(activeContext)) { Logger.Log(Level.Info, "******* Master ID " + activeContext.Id ); @@ -128,7 +128,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR .Build(); _commGroup.AddTask(GroupTestConstants.MasterTaskId); - _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + _groupCommTaskStarter.QueueTask(partialTaskConf, activeContext); } else { @@ -151,7 +151,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR .Build(); _commGroup.AddTask(slaveTaskId); - _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + _groupCommTaskStarter.QueueTask(partialTaskConf, activeContext); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs index 0e4ccee..416fe41 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs @@ -35,7 +35,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR private readonly int _numReduceSenders; private readonly int _arraySize; - private readonly IMpiClient _mpiClient; + private readonly IGroupCommClient _groupCommClient; private readonly ICommunicationGroupClient _commGroup; private readonly IBroadcastSender<int[]> _broadcastSender; private readonly IReduceReceiver<int[]> _sumReducer; @@ -45,15 +45,15 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters, [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators, [Parameter(typeof(GroupTestConfig.ArraySize))] int arraySize, - IMpiClient mpiClient) + IGroupCommClient groupCommClient) { Logger.Log(Level.Info, "Hello from master task"); _numIters = numIters; _numReduceSenders = numEvaluators - 1; _arraySize = arraySize; - _mpiClient = mpiClient; + _groupCommClient = groupCommClient; - _commGroup = mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _commGroup = groupCommClient.GetCommunicationGroup(GroupTestConstants.GroupName); _broadcastSender = _commGroup.GetBroadcastSender<int[]>(GroupTestConstants.BroadcastOperatorName); _sumReducer = _commGroup.GetReduceReceiver<int[]>(GroupTestConstants.ReduceOperatorName); Logger.Log(Level.Info, "finished master task constructor"); @@ -91,7 +91,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR public void Dispose() { - _mpiClient.Dispose(); + _groupCommClient.Dispose(); } private int TriangleNumber(int n) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs index 503e6e3..547df28 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs @@ -31,7 +31,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedSlaveTask)); private readonly int _numIterations; - private readonly IMpiClient _mpiClient; + private readonly IGroupCommClient _groupCommClient; private readonly ICommunicationGroupClient _commGroup; private readonly IBroadcastReceiver<int[]> _broadcastReceiver; private readonly IReduceSender<int[]> _triangleNumberSender; @@ -39,13 +39,13 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR [Inject] public PipelinedSlaveTask( [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters, - IMpiClient mpiClient) + IGroupCommClient groupCommClient) { Logger.Log(Level.Info, "Hello from slave task"); _numIterations = numIters; - _mpiClient = mpiClient; - _commGroup = _mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _groupCommClient = groupCommClient; + _commGroup = _groupCommClient.GetCommunicationGroup(GroupTestConstants.GroupName); _broadcastReceiver = _commGroup.GetBroadcastReceiver<int[]>(GroupTestConstants.BroadcastOperatorName); _triangleNumberSender = _commGroup.GetReduceSender<int[]>(GroupTestConstants.ReduceOperatorName); } @@ -78,7 +78,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR public void Dispose() { - _mpiClient.Dispose(); + _groupCommClient.Dispose(); } private int TriangleNumber(int n) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs index a3d17d7..b949d56 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs @@ -31,18 +31,18 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive { private static readonly Logger _logger = Logger.GetLogger(typeof(MasterTask)); - private readonly IMpiClient _mpiClient; + private readonly IGroupCommClient _groupCommClient; private readonly ICommunicationGroupClient _commGroup; private readonly IScatterSender<int> _scatterSender; private readonly IReduceReceiver<int> _sumReducer; [Inject] - public MasterTask(IMpiClient mpiClient) + public MasterTask(IGroupCommClient groupCommClient) { _logger.Log(Level.Info, "Hello from master task"); - _mpiClient = mpiClient; + _groupCommClient = groupCommClient; - _commGroup = mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _commGroup = groupCommClient.GetCommunicationGroup(GroupTestConstants.GroupName); _scatterSender = _commGroup.GetScatterSender<int>(GroupTestConstants.ScatterOperatorName); _sumReducer = _commGroup.GetReduceReceiver<int>(GroupTestConstants.ReduceOperatorName); } @@ -60,7 +60,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive public void Dispose() { - _mpiClient.Dispose(); + _groupCommClient.Dispose(); } private List<string> GetScatterOrder() http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs index fab12e5..88dac3d 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs @@ -45,19 +45,19 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive private readonly int _numEvaluators; - private readonly IMpiDriver _mpiDriver; + private readonly IGroupCommDriver _groupCommDriver; private readonly ICommunicationGroupDriver _commGroup; - private readonly TaskStarter _mpiTaskStarter; + private readonly TaskStarter _groupCommTaskStarter; [Inject] public ScatterReduceDriver( [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators, - MpiDriver mpiDriver) + GroupCommDriver groupCommDriver) { Identifier = "BroadcastStartHandler"; _numEvaluators = numEvaluators; - _mpiDriver = mpiDriver; - _commGroup = _mpiDriver.DefaultGroup + _groupCommDriver = groupCommDriver; + _commGroup = _groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>( GroupTestConstants.ScatterOperatorName, GroupTestConstants.MasterTaskId, @@ -68,7 +68,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive new SumFunction()) .Build(); - _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); + _groupCommTaskStarter = new TaskStarter(_groupCommDriver, numEvaluators); CreateClassHierarchy(); } @@ -83,14 +83,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive public void OnNext(IAllocatedEvaluator allocatedEvaluator) { - IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); - IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); + IConfiguration contextConf = _groupCommDriver.GetContextConfiguration(); + IConfiguration serviceConf = _groupCommDriver.GetServiceConfiguration(); allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); } public void OnNext(IActiveContext activeContext) { - if (_mpiDriver.IsMasterTaskContext(activeContext)) + if (_groupCommDriver.IsMasterTaskContext(activeContext)) { // Configure Master Task IConfiguration partialTaskConf = TaskConfiguration.ConfigurationModule @@ -99,13 +99,13 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive .Build(); _commGroup.AddTask(GroupTestConstants.MasterTaskId); - _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + _groupCommTaskStarter.QueueTask(partialTaskConf, activeContext); } else { // Configure Slave Task string slaveTaskId = GroupTestConstants.SlaveTaskId + - _mpiDriver.GetContextNum(activeContext); + _groupCommDriver.GetContextNum(activeContext); IConfiguration partialTaskConf = TaskConfiguration.ConfigurationModule .Set(TaskConfiguration.Identifier, slaveTaskId) @@ -113,7 +113,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive .Build(); _commGroup.AddTask(slaveTaskId); - _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + _groupCommTaskStarter.QueueTask(partialTaskConf, activeContext); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs index f2b1c4a..27906b4 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs @@ -31,18 +31,18 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive { private static readonly Logger _logger = Logger.GetLogger(typeof(SlaveTask)); - private readonly IMpiClient _mpiClient; + private readonly IGroupCommClient _groupCommClient; private readonly ICommunicationGroupClient _commGroup; private readonly IScatterReceiver<int> _scatterReceiver; private readonly IReduceSender<int> _sumSender; [Inject] - public SlaveTask(IMpiClient mpiClient) + public SlaveTask(IGroupCommClient groupCommClient) { _logger.Log(Level.Info, "Hello from slave task"); - _mpiClient = mpiClient; - _commGroup = _mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _groupCommClient = groupCommClient; + _commGroup = _groupCommClient.GetCommunicationGroup(GroupTestConstants.GroupName); _scatterReceiver = _commGroup.GetScatterReceiver<int>(GroupTestConstants.ScatterOperatorName); _sumSender = _commGroup.GetReduceSender<int>(GroupTestConstants.ReduceOperatorName); } @@ -61,7 +61,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive public void Dispose() { - _mpiClient.Dispose(); + _groupCommClient.Dispose(); } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs index c0808d8..cb73cd6 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs @@ -99,9 +99,9 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 3; int fanOut = 2; - var mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + var groupCommunicationDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommunicationDriver.DefaultGroup .AddBroadcast<int, IntCodec>( broadcastOperatorName, masterTaskId) @@ -111,7 +111,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication new SumFunction()) .Build(); - var commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + var commGroups = CommGroupClients(groupName, numTasks, groupCommunicationDriver, commGroup); //for master task IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName); @@ -154,9 +154,9 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 5; int fanOut = 2; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>( scatterOperatorName, masterTaskId) @@ -166,7 +166,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication new SumFunction()) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName); IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName); @@ -217,13 +217,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int value = 1337; int fanOut = 3; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.DefaultGroup + var commGroup = groupCommDriver.DefaultGroup .AddBroadcast<int, IntCodec>(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName); IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName); @@ -251,13 +251,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int value = 1337; int fanOut = 3; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.DefaultGroup + var commGroup = groupCommDriver.DefaultGroup .AddBroadcast(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName); IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName); @@ -285,13 +285,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int value3 = 99; int fanOut = 2; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.DefaultGroup + var commGroup = groupCommDriver.DefaultGroup .AddBroadcast<int, IntCodec>(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName); IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName); @@ -324,13 +324,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication string masterTaskId = "task0"; int fanOut = 2; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.DefaultGroup + var commGroup = groupCommDriver.DefaultGroup .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction()) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName); IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName); @@ -359,13 +359,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication string masterTaskId = "task0"; int fanOut = 2; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.DefaultGroup + var commGroup = groupCommDriver.DefaultGroup .AddReduce<int, IntCodec>(operatorName, "task0", new SumFunction()) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName); IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName); @@ -403,13 +403,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 5; int fanOut = 2; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.DefaultGroup + var commGroup = groupCommDriver.DefaultGroup .AddScatter(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -442,13 +442,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 5; int fanOut = 2; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.DefaultGroup + var commGroup = groupCommDriver.DefaultGroup .AddScatter(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -481,13 +481,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 5; int fanOut = 2; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.DefaultGroup + var commGroup = groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -531,13 +531,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 4; int fanOut = 2; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.DefaultGroup + var commGroup = groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -578,13 +578,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 4; int fanOut = 2; - IMpiDriver mpiDriver = GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.DefaultGroup + var commGroup = groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName); @@ -641,25 +641,25 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication Assert.AreEqual(10, reduceFunction.Reduce(new int[] { 1, 2, 3, 4 })); } - public static IMpiDriver GetInstanceOfMpiDriver(string driverId, string masterTaskId, string groupName, int fanOut, int numTasks) + public static IGroupCommDriver GetInstanceOfGroupCommDriver(string driverId, string masterTaskId, string groupName, int fanOut, int numTasks) { var c = TangFactory.GetTang().NewConfigurationBuilder() - .BindStringNamedParam<MpiConfigurationOptions.DriverId>(driverId) - .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(masterTaskId) - .BindStringNamedParam<MpiConfigurationOptions.GroupName>(groupName) - .BindIntNamedParam<MpiConfigurationOptions.FanOut>(fanOut.ToString()) - .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString()) + .BindStringNamedParam<GroupCommConfigurationOptions.DriverId>(driverId) + .BindStringNamedParam<GroupCommConfigurationOptions.MasterTaskId>(masterTaskId) + .BindStringNamedParam<GroupCommConfigurationOptions.GroupName>(groupName) + .BindIntNamedParam<GroupCommConfigurationOptions.FanOut>(fanOut.ToString()) + .BindIntNamedParam<GroupCommConfigurationOptions.NumberOfTasks>(numTasks.ToString()) .BindImplementation(GenericType<IConfigurationSerializer>.Class, GenericType<AvroConfigurationSerializer>.Class) .Build(); - IMpiDriver mpiDriver = TangFactory.GetTang().NewInjector(c).GetInstance<MpiDriver>(); - return mpiDriver; + IGroupCommDriver groupCommDriver = TangFactory.GetTang().NewInjector(c).GetInstance<GroupCommDriver>(); + return groupCommDriver; } - public static List<ICommunicationGroupClient> CommGroupClients(string groupName, int numTasks, IMpiDriver mpiDriver, ICommunicationGroupDriver commGroup) + public static List<ICommunicationGroupClient> CommGroupClients(string groupName, int numTasks, IGroupCommDriver groupCommDriver, ICommunicationGroupDriver commGroup) { List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); - IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration(); + IConfiguration serviceConfig = groupCommDriver.GetServiceConfiguration(); List<IConfiguration> partialConfigs = new List<IConfiguration>(); for (int i = 0; i < numTasks; i++) @@ -678,12 +678,12 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication for (int i = 0; i < numTasks; i++) { string taskId = "task" + i; - IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId); - IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig); + IConfiguration groupCommTaskConfig = groupCommDriver.GetGroupCommTaskConfiguration(taskId); + IConfiguration mergedConf = Configurations.Merge(groupCommTaskConfig, partialConfigs[i], serviceConfig); IInjector injector = TangFactory.GetTang().NewInjector(mergedConf); - IMpiClient mpiClient = injector.GetInstance<IMpiClient>(); - commGroups.Add(mpiClient.GetCommunicationGroup(groupName)); + IGroupCommClient groupCommClient = injector.GetInstance<IGroupCommClient>(); + commGroups.Add(groupCommClient.GetCommunicationGroup(groupName)); } return commGroups; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs index b244f33..b159153 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs @@ -58,13 +58,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication string masterTaskId = "task0"; int fanOut = 3; - var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup .AddReduce<int, IntCodec>(operatorName, masterTaskId, new SumFunction(), TopologyTypes.Tree) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName); IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName); @@ -114,13 +114,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int value3 = 99; int fanOut = 3; - var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup .AddBroadcast<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName); IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName); @@ -190,9 +190,9 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 10; int fanOut = 3; - var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup .AddBroadcast<int, IntCodec>( broadcastOperatorName, masterTaskId, @@ -204,7 +204,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication TopologyTypes.Tree) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); //for master task IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName); @@ -303,13 +303,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 5; int fanOut = 2; - var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -348,13 +348,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 5; int fanOut = 2; - var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -402,13 +402,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 4; int fanOut = 2; - var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -453,13 +453,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 4; int fanOut = 2; - var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -505,13 +505,13 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 6; int fanOut = 2; - var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>(operatorName, masterTaskId, TopologyTypes.Tree) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -569,9 +569,9 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication int numTasks = 5; int fanOut = 2; - var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); + var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup .AddScatter<int, IntCodec>( scatterOperatorName, masterTaskId, @@ -582,7 +582,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication new SumFunction(), TopologyTypes.Tree).Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName); IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs new file mode 100644 index 0000000..f86c546 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Network.Group.Config +{ + public class GroupCommConfigurationOptions + { + [NamedParameter("Name of the communication group")] + public class CommunicationGroupName : Name<string> + { + } + + [NamedParameter("Name of the Group Communication operator")] + public class OperatorName : Name<string> + { + } + + [NamedParameter("Driver identifier")] + public class DriverId : Name<string> + { + } + + [NamedParameter("Timeout for receiving data", defaultValue: "50000")] + public class Timeout : Name<int> + { + } + + [NamedParameter("Retry times", defaultValue: "5")] + public class RetryCount : Name<int> + { + } + + [NamedParameter("sleep time to wait for handlers to be registered", defaultValue: "500")] + public class SleepTimeWaitingForHandler : Name<int> + { + } + + [NamedParameter("Retry times to wait for handlers to be registered", defaultValue: "5")] + public class RetryCountWaitingForHanler : Name<int> + { + } + + [NamedParameter("Master task identifier")] + public class MasterTaskId : Name<string> + { + } + + [NamedParameter("Group name", defaultValue: "Group1")] + public class GroupName : Name<string> + { + } + + [NamedParameter("Number of tasks", defaultValue: "5")] + public class NumberOfTasks : Name<int> + { + } + + [NamedParameter("with of the tree in topology", defaultValue:"2")] + public class FanOut : Name<int> + { + } + + [NamedParameter("Serialized communication group configuration")] + public class SerializedGroupConfigs : Name<ISet<string>> + { + } + + [NamedParameter("Serialized operator configuration")] + public class SerializedOperatorConfigs : Name<ISet<string>> + { + } + + [NamedParameter("Id of root task in operator topology")] + public class TopologyRootTaskId : Name<string> + { + } + + [NamedParameter("Ids of child tasks in operator topology")] + public class TopologyChildTaskIds : Name<ISet<string>> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c330dcff/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs deleted file mode 100644 index 7ef575b..0000000 --- a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.Collections.Generic; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Network.Group.Config -{ - public class MpiConfigurationOptions - { - [NamedParameter("Name of the communication group")] - public class CommunicationGroupName : Name<string> - { - } - - [NamedParameter("Name of the MPI operator")] - public class OperatorName : Name<string> - { - } - - [NamedParameter("Driver identifier")] - public class DriverId : Name<string> - { - } - - [NamedParameter("Timeout for receiving data", defaultValue: "50000")] - public class Timeout : Name<int> - { - } - - [NamedParameter("Retry times", defaultValue: "5")] - public class RetryCount : Name<int> - { - } - - [NamedParameter("sleep time to wait for handlers to be registered", defaultValue: "500")] - public class SleepTimeWaitingForHandler : Name<int> - { - } - - [NamedParameter("Retry times to wait for handlers to be registered", defaultValue: "5")] - public class RetryCountWaitingForHanler : Name<int> - { - } - - [NamedParameter("Master task identifier")] - public class MasterTaskId : Name<string> - { - } - - [NamedParameter("Group name", defaultValue: "Group1")] - public class GroupName : Name<string> - { - } - - [NamedParameter("Number of tasks", defaultValue: "5")] - public class NumberOfTasks : Name<int> - { - } - - [NamedParameter("with of the tree in topology", defaultValue:"2")] - public class FanOut : Name<int> - { - } - - [NamedParameter("Serialized communication group configuration")] - public class SerializedGroupConfigs : Name<ISet<string>> - { - } - - [NamedParameter("Serialized operator configuration")] - public class SerializedOperatorConfigs : Name<ISet<string>> - { - } - - [NamedParameter("Id of root task in operator topology")] - public class TopologyRootTaskId : Name<string> - { - } - - [NamedParameter("Ids of child tasks in operator topology")] - public class TopologyChildTaskIds : Name<ISet<string>> - { - } - } -}
