Repository: incubator-reef
Updated Branches:
  refs/heads/master 2c00d8910 -> 2a453f64a


[REEF-608] Implement IMRU Driver and Client

This adds

  * the REEF IMRU Driver that actually runs Map and Update tasks given the job
    definition given by user and
  * the REEF IMRU Client.

Also, this introduces a StreamingCodecConfigurationMinusMessage configuration
module in REEF.Network.Group.Config that gives internal codec configurations
used within group communication to the external applications like IMRU.

JIRA:
  [REEF-608](https://issues.apache.org/jira/browse/REEF-608)

Pull Request:
  This closes #385


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2a453f64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2a453f64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2a453f64

Branch: refs/heads/master
Commit: 2a453f64a278d68ea0b578be4beb618c6e6410fb
Parents: 2c00d89
Author: Dhruv <[email protected]>
Authored: Wed Aug 19 10:49:13 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Aug 19 19:32:08 2015 -0700

----------------------------------------------------------------------
 .../API/IMRUJobDefinition.cs                    |   2 +-
 .../OnREEF/Client/REEFIMRUClient.cs             | 119 ++++++
 .../Client/REEFIMRUClientConfiguration.cs       |  41 +++
 .../OnREEF/Driver/IMRUDriver.cs                 | 360 +++++++++++++++++++
 .../Org.Apache.REEF.IMRU.csproj                 |   8 +
 .../StreamingCodecConfigurationMinusMessage.cs  |  47 +++
 .../Org.Apache.REEF.Network.csproj              |   1 +
 7 files changed, 577 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index 2f2bf26..6a955c2 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -146,7 +146,7 @@ namespace Org.Apache.REEF.IMRU.API
         /// <summary>
         /// Configuration of partitioned dataset
         /// </summary>
-        internal IConfiguration PartitionedDatasetConfgiuration
+        internal IConfiguration PartitionedDatasetConfiguration
         {
             get { return _partitionedDatasetConfiguration; }
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
new file mode 100644
index 0000000..99f5313
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using System.Globalization;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Client
+{
+    /// <summary>
+    /// Implements the IMRU client API on REEF.
+    /// </summary>
+    /// <typeparam name="TMapInput">The type of the side information provided 
to the Map function</typeparam>
+    /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
+    /// <typeparam name="TResult">The return type of the 
computation.</typeparam>
+    internal sealed class REEFIMRUClient<TMapInput, TMapOutput, TResult> : 
IIMRUClient<TMapInput, TMapOutput, TResult>
+    {
+        private readonly IREEFClient _reefClient;
+        private readonly JobSubmissionBuilderFactory 
_jobSubmissionBuilderFactory;
+        private readonly AvroConfigurationSerializer _configurationSerializer;
+
+        [Inject]
+        private REEFIMRUClient(IREEFClient reefClient, 
AvroConfigurationSerializer configurationSerializer,
+            JobSubmissionBuilderFactory jobSubmissionBuilderFactory)
+        {
+            _reefClient = reefClient;
+            _configurationSerializer = configurationSerializer;
+            _jobSubmissionBuilderFactory = jobSubmissionBuilderFactory;
+        }
+
+        /// <summary>
+        /// Submits the job to reefClient
+        /// </summary>
+        /// <param name="jobDefinition">IMRU job definition given by the 
user</param>
+        /// <returns>Null as results will be later written to some 
directory</returns>
+        IEnumerable<TResult> IIMRUClient<TMapInput, TMapOutput, 
TResult>.Submit(IMRUJobDefinition jobDefinition)
+        {
+            string driverId = string.Format("IMRU-{0}-Driver", 
jobDefinition.JobName);
+
+            // The driver configuration contains all the needed bindings.
+            var imruDriverConfiguration = 
TangFactory.GetTang().NewConfigurationBuilder(new[]
+            {
+                DriverConfiguration.ConfigurationModule
+                    .Set(DriverConfiguration.OnEvaluatorAllocated,
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
+                    .Set(DriverConfiguration.OnDriverStarted,
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
+                    .Set(DriverConfiguration.OnContextActive,
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
+                    .Set(DriverConfiguration.OnTaskCompleted,
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
+                    .Build(),
+                TangFactory.GetTang().NewConfigurationBuilder()
+                    
.BindStringNamedParam<GroupCommConfigurationOptions.DriverId>(driverId)
+                    
.BindStringNamedParam<GroupCommConfigurationOptions.MasterTaskId>(IMRUConstants.UpdateTaskName)
+                    
.BindStringNamedParam<GroupCommConfigurationOptions.GroupName>(IMRUConstants.CommunicationGroupName)
+                    .BindIntNamedParam<GroupCommConfigurationOptions.FanOut>(
+                        
IMRUConstants.TreeFanout.ToString(CultureInfo.InvariantCulture)
+                            .ToString(CultureInfo.InvariantCulture))
+                    
.BindIntNamedParam<GroupCommConfigurationOptions.NumberOfTasks>(
+                        (jobDefinition.NumberOfMappers + 
1).ToString(CultureInfo.InvariantCulture))
+                    .BindImplementation(GenericType<IGroupCommDriver>.Class, 
GenericType<GroupCommDriver>.Class)
+                    .Build(),
+                jobDefinition.PartitionedDatasetConfiguration
+            })
+                .BindNamedParameter(typeof (SerializedMapConfiguration),
+                    
_configurationSerializer.ToString(jobDefinition.MapFunctionConfiguration))
+                .BindNamedParameter(typeof (SerializedUpdateConfiguration),
+                    
_configurationSerializer.ToString(jobDefinition.UpdateFunctionConfiguration))
+                .BindNamedParameter(typeof 
(SerializedMapInputCodecConfiguration),
+                    
_configurationSerializer.ToString(jobDefinition.MapInputCodecConfiguration))
+                .BindNamedParameter(typeof 
(SerializedMapInputPipelineDataConverterConfiguration),
+                    
_configurationSerializer.ToString(jobDefinition.MapInputPipelineDataConverterConfiguration))
+                .BindNamedParameter(typeof 
(SerializedUpdateFunctionCodecsConfiguration),
+                    
_configurationSerializer.ToString(jobDefinition.UpdateFunctionCodecsConfiguration))
+                .BindNamedParameter(typeof 
(SerializedMapOutputPipelineDataConverterConfiguration),
+                    
_configurationSerializer.ToString(jobDefinition.MapOutputPipelineDataConverterConfiguration))
+                .BindNamedParameter(typeof (SerializedReduceConfiguration),
+                    
_configurationSerializer.ToString(jobDefinition.ReduceFunctionConfiguration))
+                .Build();
+
+            // The JobSubmission contains the Driver configuration as well as 
the files needed on the Driver.
+            var imruJobSubmission = 
_jobSubmissionBuilderFactory.GetJobSubmissionBuilder()
+                .AddDriverConfiguration(imruDriverConfiguration)
+                .AddGlobalAssemblyForType(typeof (IMRUDriver<TMapInput, 
TMapOutput, TResult>))
+                .SetJobIdentifier(jobDefinition.JobName)
+                .Build();
+
+            _reefClient.Submit(imruJobSubmission);
+
+            return null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClientConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClientConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClientConfiguration.cs
new file mode 100644
index 0000000..ac3bef3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClientConfiguration.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Client
+{
+    /// <summary>
+    /// A configuration module for specifying REEFIMRUClient
+    /// </summary>
+    public sealed class REEFIMRUClientConfiguration<TMapInput, TMapOutput, 
TResult> : ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// Configuration module
+        /// </summary>
+        public static ConfigurationModule ConfigurationModule =
+            new REEFIMRUClientConfiguration<TMapInput, TMapOutput, TResult>()
+                .BindImplementation(GenericType<IIMRUClient<TMapInput, 
TMapOutput, TResult>>.Class,
+                    GenericType<REEFIMRUClient<TMapInput, TMapOutput, 
TResult>>.Class)
+                .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
new file mode 100644
index 0000000..026563c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -0,0 +1,360 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Globalization;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.IO.PartitionedData;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
+using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// Implements the IMRU driver on REEF
+    /// </summary>
+    /// <typeparam name="TMapInput">Map Input</typeparam>
+    /// <typeparam name="TMapOutput">Map output</typeparam>
+    /// <typeparam name="TResult">Result</typeparam>
+    internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult> : 
IObserver<IDriverStarted>,
+        IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, 
IObserver<ICompletedTask>
+    {
+        private static readonly Logger Logger = Logger.GetLogger(typeof 
(IMRUDriver<TMapInput, TMapOutput, TResult>));
+
+        private readonly ConfigurationManager _configurationManager;
+        private readonly IPartitionedDataSet _dataSet;
+        private readonly IEvaluatorRequestor _evaluatorRequestor;
+        private ICommunicationGroupDriver _commGroup;
+        private readonly IGroupCommDriver _groupCommDriver;
+        private readonly TaskStarter _groupCommTaskStarter;
+        private IConfiguration _tcpPortProviderConfig;
+        private readonly ConcurrentStack<string> _taskIdStack;
+        private readonly ConcurrentStack<IPartitionDescriptor> 
_partitionDescriptorStack;
+        private readonly int _coresPerMapper;
+        private readonly int _coresForUpdateTask;
+        private readonly int _memoryPerMapper;
+        private readonly int _memoryForUpdateTask;
+        private bool _allocatedUpdateTaskEvaluator;
+        private readonly ConcurrentBag<ICompletedTask> _completedTasks;
+            
+        [Inject]
+        private IMRUDriver(IPartitionedDataSet dataSet,
+            ConfigurationManager configurationManager,
+            IEvaluatorRequestor evaluatorRequestor,
+            [Parameter(typeof (TcpPortRangeStart))] int startingPort,
+            [Parameter(typeof (TcpPortRangeCount))] int portRange,
+            [Parameter(typeof(CoresPerMapper))] int coresPerMapper,
+            [Parameter(typeof(CoresForUpdateTask))] int coresForUpdateTask,
+            [Parameter(typeof(MemoryPerMapper))] int memoryPerMapper,
+            [Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask,
+            IGroupCommDriver groupCommDriver)
+        {
+            _dataSet = dataSet;
+            _configurationManager = configurationManager;
+            _evaluatorRequestor = evaluatorRequestor;
+            _groupCommDriver = groupCommDriver;
+            _coresPerMapper = coresPerMapper;
+            _coresForUpdateTask = coresForUpdateTask;
+            _memoryPerMapper = memoryPerMapper;
+            _memoryForUpdateTask = memoryForUpdateTask;
+            _allocatedUpdateTaskEvaluator = false;
+            _completedTasks = new ConcurrentBag<ICompletedTask>();
+
+            AddGroupCommunicationOperators();
+            
+            //TODO[REEF-600]: Once the configuration module for 
TcpPortProvider 
+            //TODO[REEF-600]: will be provided, the configuraiton will be 
automatically
+            //TODO[REEF-600]: carried over to evaluators and below function 
will be obsolete.
+            ConstructTcpPortProviderConfig(startingPort, portRange);
+
+            _groupCommTaskStarter = new TaskStarter(_groupCommDriver, 
_dataSet.Count + 1);
+
+            _taskIdStack = new ConcurrentStack<string>();
+            _partitionDescriptorStack = new 
ConcurrentStack<IPartitionDescriptor>();
+            ConstructTaskIdAndPartitionDescriptorStack();
+        }
+
+        /// <summary>
+        /// Requests for evaluator for update task
+        /// </summary>
+        /// <param name="value">Event fired when driver started</param>
+        public void OnNext(IDriverStarted value)
+        {
+            _evaluatorRequestor.Submit(new EvaluatorRequest(1, 
_memoryForUpdateTask, _coresForUpdateTask));
+            //TODO[REEF-598]: Set a timeout for this request to be satisfied. 
If it is not within that time, exit the Driver.
+        }
+
+        /// <summary>
+        /// Specifies context and service configuration for evaluator depending
+        /// on whether it is for Update function or for map function
+        /// </summary>
+        /// <param name="allocatedEvaluator">The allocated evaluator</param>
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+        {
+            IConfiguration contextConf = 
_groupCommDriver.GetContextConfiguration();
+            IConfiguration serviceConf = 
_groupCommDriver.GetServiceConfiguration();
+
+            if (!_allocatedUpdateTaskEvaluator)
+            {
+                var codecConfig =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder(
+                            new[]
+                            {
+                                
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
+                                    
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
+                                    
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
+                                
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
+                                
_configurationManager.UpdateFunctionCodecsConfiguration
+                            }
+                        ).Build();
+               
+                serviceConf = Configurations.Merge(serviceConf, codecConfig, 
_tcpPortProviderConfig);
+                _allocatedUpdateTaskEvaluator = true;
+
+                _evaluatorRequestor.Submit(new 
EvaluatorRequest(_dataSet.Count, _memoryPerMapper, _coresPerMapper));
+                //TODO[REEF-598]: Set a timeout for this request to be 
satisfied. If it is not within that time, exit the Driver.
+            }
+            else
+            {
+                IPartitionDescriptor partitionDescriptor;
+
+                if (!_partitionDescriptorStack.TryPop(out partitionDescriptor))
+                {
+                    Logger.Log(Level.Warning, "partition descriptor exist for 
the context of evaluator");
+                    allocatedEvaluator.Dispose();
+                    return;
+                }
+
+                var codecConfig =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder(
+                            new[]
+                            {
+                                
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
+                                    
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
+                                    
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
+                                
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
+                                
_configurationManager.MapInputCodecConfiguration
+                            }
+                        ).Build();
+
+                contextConf = Configurations.Merge(contextConf, 
partitionDescriptor.GetPartitionConfiguration());
+                serviceConf = Configurations.Merge(serviceConf, codecConfig,
+                    _tcpPortProviderConfig);
+            }
+            
+            allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
+        }
+
+        /// <summary>
+        /// Specfies the Map or Update task to run on the active context
+        /// </summary>
+        /// <param name="activeContext"></param>
+        public void OnNext(IActiveContext activeContext)
+        {
+            Logger.Log(Level.Verbose, string.Format("Received Active Context 
{0}", activeContext.Id));
+
+            if (_groupCommDriver.IsMasterTaskContext(activeContext))
+            {
+                var partialTaskConf =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder(new[]
+                        {
+                            TaskConfiguration.ConfigurationModule
+                                .Set(TaskConfiguration.Identifier,
+                                    IMRUConstants.UpdateTaskName)
+                                .Set(TaskConfiguration.Task,
+                                    GenericType<UpdateTaskHost<TMapInput, 
TMapOutput, TResult>>.Class)
+                                .Build(),
+                            _configurationManager.UpdateFunctionConfiguration
+                        })
+                        .Build();
+
+                _commGroup.AddTask(IMRUConstants.UpdateTaskName);
+                _groupCommTaskStarter.QueueTask(partialTaskConf, 
activeContext);
+            }
+            else
+            {
+                string taskId;
+
+                if (!_taskIdStack.TryPop(out taskId))
+                {
+                    Logger.Log(Level.Warning, "No task Ids exist for the 
active context {0}. Disposing the context.",
+                        activeContext.Id);
+                    activeContext.Dispose();
+                    return;
+                }
+
+                var partialTaskConf =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder(new[]
+                        {
+                            TaskConfiguration.ConfigurationModule
+                                .Set(TaskConfiguration.Identifier, taskId)
+                                .Set(TaskConfiguration.Task, 
GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class)
+                                .Build(),
+                            _configurationManager.MapFunctionConfiguration
+                        })
+                        .Build();
+
+                _commGroup.AddTask(taskId);
+                _groupCommTaskStarter.QueueTask(partialTaskConf, 
activeContext);
+            }
+        }
+
+        /// <summary>
+        /// Specfies what to do when the task is completed
+        /// In this case just disposes off the task
+        /// </summary>
+        /// <param name="completedTask">The link to the completed task</param>
+        public void OnNext(ICompletedTask completedTask)
+        {
+            _completedTasks.Add(completedTask);
+
+            if (_completedTasks.Count != _dataSet.Count + 1) return;
+            
+            foreach (var task in _completedTasks)
+            {
+                Logger.Log(Level.Verbose, String.Format("Disposing task: {0}", 
task.Id));
+                task.ActiveContext.Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Specfies how to handle exception or error
+        /// </summary>
+        /// <param name="error">Kind of exception</param>
+        public void OnError(Exception error)
+        {
+            Logger.Log(Level.Error,"Cannot currently handle the Exception in 
OnError function");
+            throw new NotImplementedException("Cannot currently handle 
exception in OneError", error);
+        }
+
+        /// <summary>
+        /// Specfies what to do when driver is done
+        /// In this case do nothing
+        /// </summary>
+        public void OnCompleted()
+        {
+        }
+
+        private void AddGroupCommunicationOperators()
+        {
+            var reduceFunctionConfig = 
_configurationManager.ReduceFunctionConfiguration;
+            var mapOutputPipelineDataConverterConfig = 
_configurationManager.MapOutputPipelineDataConverterConfiguration;
+            var mapInputPipelineDataConverterConfig = 
_configurationManager.MapInputPipelineDataConverterConfiguration;
+
+            try
+            {
+                TangFactory.GetTang()
+                    .NewInjector(mapInputPipelineDataConverterConfig)
+                    .GetInstance<IPipelineDataConverter<TMapInput>>();
+
+                mapInputPipelineDataConverterConfig =
+                    TangFactory.GetTang()
+                        
.NewConfigurationBuilder(mapInputPipelineDataConverterConfig)
+                        .BindImplementation(
+                            
GenericType<IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class,
+                            
GenericType<MapInputwithControlMessagePipelineDataConverter<TMapInput>>.Class)
+                        .Build();
+            }
+            catch (Exception e)
+            {
+                mapInputPipelineDataConverterConfig = TangFactory.GetTang()
+                    .NewConfigurationBuilder()
+                    .BindImplementation(
+                        
GenericType<IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class,
+                        
GenericType<DefaultPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class)
+                    .Build();
+            }
+
+            try
+            {
+                TangFactory.GetTang()
+                    .NewInjector(mapInputPipelineDataConverterConfig)
+                    .GetInstance<IPipelineDataConverter<TMapOutput>>();
+            }
+            catch (Exception e)
+            {
+                mapOutputPipelineDataConverterConfig =
+                    TangFactory.GetTang()
+                        .NewConfigurationBuilder()
+                        
.BindImplementation(GenericType<IPipelineDataConverter<TMapOutput>>.Class,
+                            
GenericType<DefaultPipelineDataConverter<TMapOutput>>.Class)
+                        .Build();
+            }
+
+            _commGroup =
+                _groupCommDriver.DefaultGroup
+                    .AddBroadcast<MapInputWithControlMessage<TMapInput>>(
+                        IMRUConstants.BroadcastOperatorName,
+                        IMRUConstants.UpdateTaskName,
+                        TopologyTypes.Tree,
+                        mapInputPipelineDataConverterConfig)
+                    .AddReduce<TMapOutput>(
+                        IMRUConstants.ReduceOperatorName,
+                        IMRUConstants.UpdateTaskName,
+                        TopologyTypes.Tree,
+                        reduceFunctionConfig,
+                        mapOutputPipelineDataConverterConfig)
+                    .Build();
+        }
+
+        private void ConstructTcpPortProviderConfig(int startingPort, int 
portRange)
+        {
+            _tcpPortProviderConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                .BindNamedParameter<TcpPortRangeStart, 
int>(GenericType<TcpPortRangeStart>.Class,
+                    startingPort.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<TcpPortRangeCount, 
int>(GenericType<TcpPortRangeCount>.Class,
+                    portRange.ToString(CultureInfo.InvariantCulture))
+                .Build();
+        }
+
+        private void ConstructTaskIdAndPartitionDescriptorStack()
+        {
+            int counter = 0;
+
+            foreach (var partitionDescriptor in _dataSet)
+            {
+                string id = IMRUConstants.MapTaskPrefix + "-Id" + counter + 
"-Version0";
+                _taskIdStack.Push(id);
+                _partitionDescriptorStack.Push(partitionDescriptor);
+                counter++;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 52bc9b3..997e4bf 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -61,8 +61,11 @@ under the License.
     <Compile Include="InProcess\InProcessIMRUConfiguration.cs" />
     <Compile Include="InProcess\MapFunctions.cs" />
     <Compile Include="InProcess\Parameters\NumberOfMappers.cs" />
+    <Compile Include="OnREEF\Client\REEFIMRUClientConfiguration.cs" />
+    <Compile Include="OnREEF\Client\REEFIMRUClient.cs" />
     <Compile Include="OnREEF\Driver\ConfigurationManager.cs" />
     <Compile Include="OnREEF\Driver\IMRUConstants.cs" />
+    <Compile Include="OnREEF\Driver\IMRUDriver.cs" />
     <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
     <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" />
     <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" 
/>
@@ -115,8 +118,13 @@ under the License.
       <Project>{dec0f0a8-dbef-4ebf-b69c-e2369c15abf1}</Project>
       <Name>Org.Apache.REEF.IO</Name>
     </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj">
+      <Project>{5094c35b-4fdb-4322-ac05-45d684501cbf}</Project>
+      <Name>Org.Apache.REEF.Client</Name>
+    </ProjectReference>
   </ItemGroup>
   <ItemGroup>
     <None Include="Org.Apache.REEF.IMRU.nuspec" />
   </ItemGroup>
+  <ItemGroup />
 </Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfigurationMinusMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfigurationMinusMessage.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfigurationMinusMessage.cs
new file mode 100644
index 0000000..fb0b6d9
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfigurationMinusMessage.cs
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.Group.Config
+{
+    /// <summary>
+    /// Defines configuration for internal streaming codecs and internal 
pipeline message 
+    /// streaming codecs required for group communication
+    /// </summary>
+    /// <typeparam name="T">Generic type of message</typeparam>
+    public sealed class StreamingCodecConfigurationMinusMessage<T> : 
ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// Configuration Module for Codec
+        /// </summary>
+        public static ConfigurationModule Conf = new 
StreamingCodecConfigurationMinusMessage<T>()
+            
.BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class,
+                GenericType<StreamingPipelineMessageCodec<T>>.Class)
+            
.BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<T>>>.Class,
+                GenericType<GroupCommunicationMessageStreamingCodec<T>>.Class)
+            
.BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<PipelineMessage<T>>>>.Class,
+                
GenericType<GroupCommunicationMessageStreamingCodec<PipelineMessage<T>>>.Class)
+            .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj 
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 2d62cb4..884001f 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -52,6 +52,7 @@ under the License.
   </ItemGroup>
   <ItemGroup>
     <Compile Include="Group\Config\CodecToStreamingCodecConfiguration.cs" />
+    <Compile Include="Group\Config\StreamingCodecConfigurationMinusMessage.cs" 
/>
     <Compile Include="Group\Driver\Impl\GeneralGroupCommunicationMessage.cs" />
     <Compile Include="Group\Config\CodecConfiguration.cs" />
     <Compile Include="Group\Config\GroupCommConfigurationOptions.cs" />

Reply via email to