Repository: incubator-reef
Updated Branches:
  refs/heads/master 3670eabfc -> 2f0cf34d8


[REEF-554] Update IMRU APIs to enable writing of REEF IMRU

This addressed the issue by
  * modifying the APIs
  * modifying MapperCount example to be compatible with new API
  * modifying InProcessIMRU to be compatible with the current API

JIRA:
  [REEF-554](https://issues.apache.org/jira/browse/REEF-554)

Pull Request:
  This closes #334


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2f0cf34d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2f0cf34d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2f0cf34d

Branch: refs/heads/master
Commit: 2f0cf34d85c988c9f4340f1f7a702ec588f22790
Parents: 3670eab
Author: Dhruv <[email protected]>
Authored: Mon Aug 3 15:52:57 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 7 08:55:18 2015 -0700

----------------------------------------------------------------------
 .../MapperCountTest.cs                          |   2 +-
 lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs |   2 +-
 .../API/IMRUCodecConfiguration.cs               |  46 +++++
 .../API/IMRUConfiguration.cs                    |  82 ---------
 .../API/IMRUJobDefinition.cs                    | 130 ++++++++++++-
 .../API/IMRUJobDefinitionBuilder.cs             | 184 +++++++++++++++++--
 .../API/IMRUMapConfiguration.cs                 |  46 +++++
 .../IMRUPipelineDataConverterConfiguration.cs   |  49 +++++
 .../API/IMRUReduceFunctionConfiguration.cs      |  46 +++++
 .../API/IMRUUpdateConfiguration.cs              |  47 +++++
 .../cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs |   2 +-
 .../Org.Apache.REEF.IMRU/API/IUpdateFunction.cs |   2 +-
 .../API/Parameters/MapInputCodec.cs             |  33 ----
 .../API/Parameters/MapOutputCodec.cs            |  33 ----
 .../API/Parameters/ResultCodec.cs               |  33 ----
 .../Examples/MapperCount/IdentityMapFunction.cs |   5 +
 .../MapperCount/IntSumReduceFunction.cs         |   5 +
 .../Examples/MapperCount/MapperCount.cs         |  46 +++--
 .../MapperCount/MapperCountUpdateFunction.cs    |   9 +
 .../InProcess/IMRURunner.cs                     |   6 +-
 .../InProcess/InProcessIMRUClient.cs            |  19 +-
 .../InProcess/InProcessIMRUConfiguration.cs     |   3 +
 .../Org.Apache.REEF.IMRU.csproj                 |  13 +-
 23 files changed, 612 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs
index 42f165f..ad0e477 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs
@@ -46,7 +46,7 @@ namespace Org.Apache.REEF.IMRU.Tests
                             .Build()
                     )
                     .GetInstance<MapperCount>();
-            var result = tested.Run();
+            var result = tested.Run(NumberOfMappers);
             Assert.AreEqual(NumberOfMappers, result, "The result of the run 
should be the number of Mappers.");
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
index 29c45cf..247d721 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
@@ -27,7 +27,7 @@ namespace Org.Apache.REEF.IMRU.API
     /// <typeparam name="TMapInput">The type of the side information provided 
to the Map function</typeparam>
     /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
     /// <typeparam name="TResult">The return type of the 
computation.</typeparam>
-    public interface IIMRUClient<TMapInput, TMapOutput, TResult>
+    public interface IIMRUClient<TMapInput, TMapOutput, out TResult>
     {
         /// <summary>
         /// Submit the given job for execution.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUCodecConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUCodecConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUCodecConfiguration.cs
new file mode 100644
index 0000000..00df10c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUCodecConfiguration.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.IMRU.API
+{
+   /// <summary>
+   /// A configuraton module for specifying codecs for IMRU
+   /// </summary>
+   /// <typeparam name="T">Generic type</typeparam>
+    public sealed class IMRUCodecConfiguration<T> : ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// The codec to be used
+        /// </summary>
+        public static readonly RequiredImpl<IStreamingCodec<T>> Codec =
+            new RequiredImpl<IStreamingCodec<T>>();
+
+        /// <summary>
+        /// Configuration module
+        /// </summary>
+        public static ConfigurationModule ConfigurationModule =
+            new IMRUCodecConfiguration<T>()
+                .BindImplementation(GenericType<IStreamingCodec<T>>.Class, 
Codec)
+                .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs
deleted file mode 100644
index 595f64a..0000000
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.IMRU.API.Parameters;
-using Org.Apache.REEF.Network.Group.Operators;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Wake.StreamingCodec;
-
-namespace Org.Apache.REEF.IMRU.API
-{
-    /// <summary>
-    /// A configuration module for IMRU.
-    /// </summary>
-    /// <typeparam name="TMapInput">The type of the side information provided 
to the Map function</typeparam>
-    /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
-    /// <typeparam name="TResult">The return type of the 
computation.</typeparam>
-    public sealed class IMRUConfiguration<TMapInput, TMapOutput, TResult> : 
ConfigurationModuleBuilder
-    {
-        /// <summary>
-        /// The codec to be used for the map input.
-        /// </summary>
-        public static readonly RequiredImpl<IStreamingCodec<TMapInput>> 
MapInputCodec =
-            new RequiredImpl<IStreamingCodec<TMapInput>>();
-
-        /// <summary>
-        /// The codec to be used for the map output.
-        /// </summary>
-        public static readonly RequiredImpl<IStreamingCodec<TMapOutput>> 
MapOutputCodec =
-            new RequiredImpl<IStreamingCodec<TMapOutput>>();
-
-        /// <summary>
-        /// The codec to be used for the result.
-        /// </summary>
-        public static readonly RequiredImpl<IStreamingCodec<TResult>> 
ResultCodec =
-            new RequiredImpl<IStreamingCodec<TResult>>();
-
-        /// <summary>
-        /// The IReduceFunction type to use.
-        /// </summary>
-        public static readonly RequiredImpl<IReduceFunction<TMapOutput>> 
ReduceFunction =
-            new RequiredImpl<IReduceFunction<TMapOutput>>();
-
-        /// <summary>
-        /// The IUpdateFunction type to use.
-        /// </summary>
-        public static readonly RequiredImpl<IUpdateFunction<TMapInput, 
TMapOutput, TResult>> UpdateFunction =
-            new RequiredImpl<IUpdateFunction<TMapInput, TMapOutput, 
TResult>>();
-
-        /// <summary>
-        /// The IMapFunction type to use.
-        /// </summary>
-        public static readonly RequiredImpl<IMapFunction<TMapInput, 
TMapOutput>> MapFunction =
-            new RequiredImpl<IMapFunction<TMapInput, TMapOutput>>();
-
-        public static ConfigurationModule ConfigurationModule =
-            new IMRUConfiguration<TMapInput, TMapOutput, TResult>()
-                
.BindNamedParameter(GenericType<MapInputCodec<TMapInput>>.Class, MapInputCodec)
-                
.BindNamedParameter(GenericType<MapOutputCodec<TMapOutput>>.Class, 
MapOutputCodec)
-                .BindNamedParameter(GenericType<ResultCodec<TResult>>.Class, 
ResultCodec)
-                
.BindImplementation(GenericType<IReduceFunction<TMapOutput>>.Class, 
ReduceFunction)
-                .BindImplementation(GenericType<IUpdateFunction<TMapInput, 
TMapOutput, TResult>>.Class, UpdateFunction)
-                .BindImplementation(GenericType<IMapFunction<TMapInput, 
TMapOutput>>.Class, MapFunction)
-                .Build();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index 5744be8..2f2bf26 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -27,23 +27,139 @@ namespace Org.Apache.REEF.IMRU.API
     /// <seealso cref="IMRUJobDefinitionBuilder" />
     public sealed class IMRUJobDefinition
     {
-        private readonly IConfiguration _configuration;
         private readonly string _jobName;
+        private readonly IConfiguration _mapFunctionConfiguration;
+        private readonly IConfiguration _mapInputCodecConfiguration;
+        private readonly IConfiguration _updateFunctionCodecsConfiguration;
+        private readonly IConfiguration _reduceFunctionConfiguration;
+        private readonly IConfiguration _updateFunctionConfiguration;
+        private readonly IConfiguration 
_mapOutputPipelineDataConverterConfiguration;
+        private readonly IConfiguration 
_mapInputPipelineDataConverterConfiguration;
+        private readonly IConfiguration _partitionedDatasetConfiguration;
+        private readonly int _numberOfMappers;
 
-        internal IMRUJobDefinition(IConfiguration configuration, string 
jobName)
+        /// <summary>
+        /// Constructor
+        /// </summary>
+        /// <param name="mapFunctionConfiguration">Map function 
configuration</param>
+        /// <param name="mapInputCodecConfiguration">Map input codec 
configuration</param>
+        /// <param name="updateFunctionCodecsConfiguration">codec 
configuration for update 
+        /// function. It is union of TMapInput, TMapOutput and TResult 
configuration</param>
+        /// <param name="reduceFunctionConfiguration">Reduce function 
configuration</param>
+        /// <param name="updateFunctionConfiguration">Update function 
configuration</param>
+        /// <param 
name="mapOutputPipelineDataConverterConfiguration">Configuration of 
+        /// PipelineDataConverter for TMapOutput</param>
+        /// <param 
name="mapInputPipelineDataConverterConfiguration">Configuration of 
+        /// PipelineDataConverter for TMapInput</param>
+        /// <param name="partitionedDatasetConfiguration">Configuration of 
partitioned 
+        /// dataset</param>
+        /// <param name="numberOfMappers">Number of mappers</param>
+        /// <param name="jobName">Job name</param>
+        internal IMRUJobDefinition(
+            IConfiguration mapFunctionConfiguration,
+            IConfiguration mapInputCodecConfiguration,
+            IConfiguration updateFunctionCodecsConfiguration,
+            IConfiguration reduceFunctionConfiguration,
+            IConfiguration updateFunctionConfiguration,
+            IConfiguration mapOutputPipelineDataConverterConfiguration,
+            IConfiguration mapInputPipelineDataConverterConfiguration,
+            IConfiguration partitionedDatasetConfiguration,
+            int numberOfMappers,
+            string jobName)
         {
-            _configuration = configuration;
+            _mapFunctionConfiguration = mapFunctionConfiguration;
+            _mapInputCodecConfiguration = mapInputCodecConfiguration;
+            _updateFunctionCodecsConfiguration = 
updateFunctionCodecsConfiguration;
+            _reduceFunctionConfiguration = reduceFunctionConfiguration;
+            _updateFunctionConfiguration = updateFunctionConfiguration;
+            _mapOutputPipelineDataConverterConfiguration = 
mapOutputPipelineDataConverterConfiguration;
+            _mapInputPipelineDataConverterConfiguration = 
mapInputPipelineDataConverterConfiguration;
+            _partitionedDatasetConfiguration = partitionedDatasetConfiguration;
+            _numberOfMappers = numberOfMappers;
             _jobName = jobName;
         }
 
-        internal IConfiguration Configuration
+        /// <summary>
+        /// Name of the job
+        /// </summary>
+        internal string JobName
         {
-            get { return _configuration; }
+            get { return _jobName; }
         }
 
-        internal string JobName
+        /// <summary>
+        /// Configuration of map function
+        /// </summary>
+        internal IConfiguration MapFunctionConfiguration
         {
-            get { return _jobName; }
+            get { return _mapFunctionConfiguration; }
+        }
+
+        /// <summary>
+        /// Configuration of codec for TMapInput
+        /// </summary>
+        internal IConfiguration MapInputCodecConfiguration
+        {
+            get { return _mapInputCodecConfiguration; }
+        }
+
+        /// <summary>
+        /// Configuration of codecs needed by Update function
+        /// </summary>
+        internal IConfiguration UpdateFunctionCodecsConfiguration
+        {
+            get { return _updateFunctionCodecsConfiguration; }
+        }
+
+        /// <summary>
+        /// Configuration of reduce function
+        /// </summary>
+        internal IConfiguration ReduceFunctionConfiguration
+        {
+            get { return _reduceFunctionConfiguration; }
+        }
+
+        /// <summary>
+        /// Configuration of update function
+        /// </summary>
+        internal IConfiguration UpdateFunctionConfiguration
+        {
+            get { return _updateFunctionConfiguration; }
+        }
+
+        /// <summary>
+        /// Configuration of PipelineDataConverter for Map outout
+        /// </summary>
+        internal IConfiguration MapOutputPipelineDataConverterConfiguration
+        {
+            get { return _mapOutputPipelineDataConverterConfiguration; }
+        }
+
+        /// <summary>
+        /// Configuration of PipelineDataConverter for Map Input
+        /// </summary>
+        internal IConfiguration MapInputPipelineDataConverterConfiguration
+        {
+            get { return _mapInputPipelineDataConverterConfiguration; }
+        }
+
+        /// <summary>
+        /// Configuration of partitioned dataset
+        /// </summary>
+        internal IConfiguration PartitionedDatasetConfgiuration
+        {
+            get { return _partitionedDatasetConfiguration; }
+        }
+
+        /// <summary>
+        /// Number of mappers
+        /// </summary>
+        /// TODO: This is duplicate in a sense that it can be determined 
+        /// TODO: automatically from IPartitionedDataset. However, right now 
+        /// TODO: GroupComm. instantiated in IMRUDriver needs this parameter 
+        /// TODO: in constructor. This will be removed once we remove it from 
GroupComm. 
+        internal int NumberOfMappers {
+            get { return _numberOfMappers; }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
index 2427016..7f54459 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
@@ -18,7 +18,11 @@
  */
 
 using System;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.IMRU.API
 {
@@ -28,19 +32,30 @@ namespace Org.Apache.REEF.IMRU.API
     /// <seealso cref="IMRUJobDefinition" />
     public sealed class IMRUJobDefinitionBuilder
     {
-        private IConfiguration _configuration;
+        private static readonly Logger Logger = Logger.GetLogger(typeof 
(IMRUJobDefinitionBuilder));
+
         private string _jobName;
+        private int _numberOfMappers;
+        private IConfiguration _mapFunctionConfiguration;
+        private IConfiguration _mapInputCodecConfiguration;
+        private IConfiguration _updateFunctionCodecsConfiguration;
+        private IConfiguration _reduceFunctionConfiguration;
+        private IConfiguration _updateFunctionConfiguration;
+        private IConfiguration _mapOutputPipelineDataConverterConfiguration;
+        private IConfiguration _mapInputPipelineDataConverterConfiguration;
+        private IConfiguration _partitionedDatasetConfiguration;
+
+        private static readonly IConfiguration EmptyConfiguration =
+            TangFactory.GetTang().NewConfigurationBuilder().Build();
 
         /// <summary>
-        /// Set the Configuration used to instantiate the IMapFunction, 
IReduceFunction, IUpdateFunction and all codec instances
+        /// Constructor
         /// </summary>
-        /// <param name="configuration">The Configuration used to instantiate 
the IMapFunction instance.</param>
-        /// <seealso cref="IMRUConfiguration{TMapInput,TMapOutput,TResult}" />
-        /// <returns>this</returns>
-        public IMRUJobDefinitionBuilder SetConfiguration(IConfiguration 
configuration)
+        public IMRUJobDefinitionBuilder()
         {
-            _configuration = configuration;
-            return this;
+            _mapInputPipelineDataConverterConfiguration = EmptyConfiguration;
+            _mapOutputPipelineDataConverterConfiguration = EmptyConfiguration;
+            _partitionedDatasetConfiguration = EmptyConfiguration;
         }
 
         /// <summary>
@@ -55,21 +70,162 @@ namespace Org.Apache.REEF.IMRU.API
         }
 
         /// <summary>
+        /// Sets configuration of map function
+        /// </summary>
+        /// <param name="mapFunctionConfiguration">Configuration</param>
+        /// <returns>this</returns>
+        public IMRUJobDefinitionBuilder 
SetMapFunctionConfiguration(IConfiguration mapFunctionConfiguration)
+        {
+            _mapFunctionConfiguration = mapFunctionConfiguration;
+            return this;
+        }
+
+        /// <summary>
+        /// Sets configuration of codec for TMapInput
+        /// </summary>
+        /// <param name="mapInputCodecConfiguration">Configuration</param>
+        /// <returns>this</returns>
+        public IMRUJobDefinitionBuilder 
SetMapInputCodecConfiguration(IConfiguration mapInputCodecConfiguration)
+        {
+            _mapInputCodecConfiguration = mapInputCodecConfiguration;
+            return this;
+        }
+
+        /// <summary>
+        /// Sets configuration of codecs needed by Update function
+        /// </summary>
+        /// <param 
name="updateFunctionCodecsConfiguration">Configuration</param>
+        /// <returns>this</returns>
+        public IMRUJobDefinitionBuilder SetUpdateFunctionCodecsConfiguration(
+            IConfiguration updateFunctionCodecsConfiguration)
+        {
+            _updateFunctionCodecsConfiguration = 
updateFunctionCodecsConfiguration;
+            return this;
+        }
+
+        /// <summary>
+        /// Sets configuration of reduce function
+        /// </summary>
+        /// <param name="reduceFunctionConfiguration">Configuration</param>
+        /// <returns>this</returns>
+        public IMRUJobDefinitionBuilder 
SetReduceFunctionConfiguration(IConfiguration reduceFunctionConfiguration)
+        {
+            _reduceFunctionConfiguration = reduceFunctionConfiguration;
+            return this;
+        }
+
+        /// <summary>
+        /// Sets configuration of update function
+        /// </summary>
+        /// <param name="updateFunctionConfiguration">Configuration</param>
+        /// <returns>this</returns>
+        public IMRUJobDefinitionBuilder 
SetUpdateFunctionConfiguration(IConfiguration updateFunctionConfiguration)
+        {
+            _updateFunctionConfiguration = updateFunctionConfiguration;
+            return this;
+        }
+
+        /// <summary>
+        /// Sets configuration of PipelineDataConverter for Map output
+        /// </summary>
+        /// <param 
name="mapOutputPipelineDataConverterConfiguration">Configuration</param>
+        /// <returns>this</returns>
+        public IMRUJobDefinitionBuilder 
SetMapOutputPipelineDataConverterConfiguration(
+            IConfiguration mapOutputPipelineDataConverterConfiguration)
+        {
+            _mapOutputPipelineDataConverterConfiguration = 
mapOutputPipelineDataConverterConfiguration;
+            return this;
+        }
+
+        /// <summary>
+        /// Sets configuration of PipelineDataConverter for Map Input
+        /// </summary>
+        /// <param 
name="mapInputPipelineDataConverterConfiguration">Configuration</param>
+        /// <returns>this</returns>
+        public IMRUJobDefinitionBuilder 
SetMapInputPipelineDataConverterConfiguration(
+            IConfiguration mapInputPipelineDataConverterConfiguration)
+        {
+            _mapInputPipelineDataConverterConfiguration = 
mapInputPipelineDataConverterConfiguration;
+            return this;
+        }
+
+        /// <summary>
+        /// Sets configuration of partitioned dataset
+        /// </summary>
+        /// <param name="partitionedDatasetConfiguration">Configuration</param>
+        /// <returns>this</returns>
+        public IMRUJobDefinitionBuilder SetPartitionedDatasetConfiguration(
+            IConfiguration partitionedDatasetConfiguration)
+        {
+            _partitionedDatasetConfiguration = partitionedDatasetConfiguration;
+            return this;
+        }
+
+        /// <summary>
+        /// Sets Number of mappers
+        /// </summary>
+        /// <param name="numberOfMappers">Number of mappers</param>
+        /// <returns>this</returns>
+        /// TODO: This is duplicate in a sense that it can be determined 
+        /// TODO: automatically from IPartitionedDataset. However, right now 
+        /// TODO: GroupComm. instantiated in IMRUDriver needs this parameter 
+        /// TODO: in constructor. This will be removed once we remove it from 
GroupComm. 
+        public IMRUJobDefinitionBuilder SetNumberOfMappers(int numberOfMappers)
+        {
+            _numberOfMappers = numberOfMappers;
+            return this;
+        }
+
+        /// <summary>
         /// Instantiate the IMRUJobDefinition.
         /// </summary>
         /// <returns>The IMRUJobDefintion configured.</returns>
-        /// <exception cref="NullReferenceException">If any of the required 
paremeters is not set.</exception>
+        /// <exception cref="NullReferenceException">If any of the required 
parameters is not set.</exception>
         public IMRUJobDefinition Build()
         {
-            if (null == _configuration)
+            if (null == _jobName)
             {
-                throw new NullReferenceException("Configuration can't be 
null.");
+                Exceptions.Throw(new NullReferenceException("Job name cannot 
be null"),
+                    Logger);
             }
-            if (null == _jobName)
+
+            if (null == _mapFunctionConfiguration)
+            {
+                Exceptions.Throw(new NullReferenceException("Map function 
configuration cannot be null"), Logger);
+            }
+
+            if (null == _mapInputCodecConfiguration)
             {
-                throw new NullReferenceException("JobName can't be null.");
+                Exceptions.Throw(new NullReferenceException("Map input codec 
configuration cannot be null"), Logger);
             }
-            return new IMRUJobDefinition(_configuration, _jobName);
+
+            if (null == _updateFunctionCodecsConfiguration)
+            {
+                Exceptions.Throw(new NullReferenceException("Update function 
codecs configuration cannot be null"),
+                    Logger);
+            }
+
+            if (null == _reduceFunctionConfiguration)
+            {
+                Exceptions.Throw(new NullReferenceException("Reduce function 
configuration cannot be null"), Logger);
+            }
+
+            if (null == _updateFunctionConfiguration)
+            {
+                Exceptions.Throw(new NullReferenceException("Update function 
configuration cannot be null"), Logger);
+            }
+
+            return new IMRUJobDefinition(
+                _mapFunctionConfiguration,
+                _mapInputCodecConfiguration,
+                _updateFunctionCodecsConfiguration,
+                _reduceFunctionConfiguration,
+                _updateFunctionConfiguration,
+                _mapOutputPipelineDataConverterConfiguration,
+                _mapInputPipelineDataConverterConfiguration,
+                _partitionedDatasetConfiguration,
+                _numberOfMappers,
+                _jobName);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUMapConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUMapConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUMapConfiguration.cs
new file mode 100644
index 0000000..7925d63
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUMapConfiguration.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IMRU.API
+{
+    /// <summary>
+    /// A configuration module for IMRU IMapFunction.
+    /// </summary>
+    /// <typeparam name="TMapInput">The type of the side information provided 
to the Map function</typeparam>
+    /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
+    public sealed class IMRUMapConfiguration<TMapInput, TMapOutput> : 
ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// The IMapFunction type to use.
+        /// </summary>
+        public static readonly RequiredImpl<IMapFunction<TMapInput, 
TMapOutput>> MapFunction =
+            new RequiredImpl<IMapFunction<TMapInput, TMapOutput>>();
+
+        /// <summary>
+        /// Configuration module
+        /// </summary>
+        public static ConfigurationModule ConfigurationModule =
+            new IMRUMapConfiguration<TMapInput, TMapOutput>()
+                .BindImplementation(GenericType<IMapFunction<TMapInput, 
TMapOutput>>.Class, MapFunction)
+                .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUPipelineDataConverterConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUPipelineDataConverterConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUPipelineDataConverterConfiguration.cs
new file mode 100644
index 0000000..fb63f2b
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUPipelineDataConverterConfiguration.cs
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IMRU.API
+{
+    /// <summary>
+    /// A configuration module for The PipelineDataConverter used for chunking 
+    /// and dechunking TMapInput and TMapOutput
+    /// </summary>
+    /// <typeparam name="T">Generic type</typeparam>
+    public sealed class IMRUPipelineDataConverterConfiguration<T> : 
ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// The PipelineDataConverter used for chunking and
+        ///  dechunking TMapInput and TMapOutput
+        /// </summary>
+        public static readonly
+            RequiredImpl<IPipelineDataConverter<T>> 
MapInputPiplelineDataConverter =
+                new RequiredImpl<IPipelineDataConverter<T>>();
+
+        /// <summary>
+        /// Configuration module
+        /// </summary>
+        public static ConfigurationModule ConfigurationModule =
+            new IMRUPipelineDataConverterConfiguration<T>()
+                
.BindImplementation(GenericType<IPipelineDataConverter<T>>.Class, 
MapInputPiplelineDataConverter)
+                .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUReduceFunctionConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUReduceFunctionConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUReduceFunctionConfiguration.cs
new file mode 100644
index 0000000..033ed69
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUReduceFunctionConfiguration.cs
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IMRU.API
+{
+    /// <summary>
+    /// A configuration module for IMRU Reduce function.
+    /// </summary>
+    /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
+    public sealed class IMRUReduceFunctionConfiguration<TMapOutput> : 
ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// The IReduceFunction type to use.
+        /// </summary>
+        public static readonly RequiredImpl<IReduceFunction<TMapOutput>> 
ReduceFunction =
+            new RequiredImpl<IReduceFunction<TMapOutput>>();
+
+        /// <summary>
+        /// Configuration module
+        /// </summary>
+        public static ConfigurationModule ConfigurationModule =
+            new IMRUReduceFunctionConfiguration<TMapOutput>()
+                
.BindImplementation(GenericType<IReduceFunction<TMapOutput>>.Class, 
ReduceFunction)
+                .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMRUUpdateConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUUpdateConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUUpdateConfiguration.cs
new file mode 100644
index 0000000..de999ce
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUUpdateConfiguration.cs
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IMRU.API
+{
+    /// <summary>
+    /// A configuration module for IMRU Update function.
+    /// </summary>
+    /// <typeparam name="TMapInput">The type of the side information provided 
to the Map function</typeparam>
+    /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
+    /// <typeparam name="TResult">The return type of the 
computation.</typeparam>
+    public sealed class IMRUUpdateConfiguration<TMapInput, TMapOutput, 
TResult> : ConfigurationModuleBuilder
+    {
+        /// <summary>
+        /// The IUpdateFunction type to use.
+        /// </summary>
+        public static readonly RequiredImpl<IUpdateFunction<TMapInput, 
TMapOutput, TResult>> UpdateFunction =
+            new RequiredImpl<IUpdateFunction<TMapInput, TMapOutput, 
TResult>>();
+
+        /// <summary>
+        /// Configuration module
+        /// </summary>
+        public static ConfigurationModule ConfigurationModule =
+            new IMRUUpdateConfiguration<TMapInput, TMapOutput, TResult>()
+                .BindImplementation(GenericType<IUpdateFunction<TMapInput, 
TMapOutput, TResult>>.Class, UpdateFunction)
+                .Build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs
index 9c6479c..58c6088 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs
@@ -29,7 +29,7 @@ namespace Org.Apache.REEF.IMRU.API
     /// </remarks>
     /// <typeparam name="TMapInput">The type of the side information provided 
to the Map function</typeparam>
     /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
-    public interface IMapFunction<TMapInput, TMapOutput>
+    public interface IMapFunction<in TMapInput, out TMapOutput>
     {
         /// <summary>
         /// Computes new output based on the given side information and data.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs
index bae1448..ce23085 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs
@@ -28,7 +28,7 @@ namespace Org.Apache.REEF.IMRU.API
     /// <typeparam name="TMapInput">The type of the side information provided 
to the Map function</typeparam>
     /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
     /// <typeparam name="TResult">The return type of the 
computation.</typeparam>
-    public interface IUpdateFunction<TMapInput, TMapOutput, TResult>
+    public interface IUpdateFunction<TMapInput, in TMapOutput, TResult>
     {
         /// <summary>
         /// The Update task for IMRU.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs
deleted file mode 100644
index fc21e79..0000000
--- a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.StreamingCodec;
-
-namespace Org.Apache.REEF.IMRU.API.Parameters
-{
-    /// <summary>
-    /// The codec to be used for the map input.
-    /// </summary>
-    /// <typeparam name="TMapInput"></typeparam>
-    [NamedParameter("The codec to be used for the map input.")]
-    public sealed class MapInputCodec<TMapInput> : 
Name<IStreamingCodec<TMapInput>>
-    {
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs
deleted file mode 100644
index 095415d..0000000
--- a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.StreamingCodec;
-
-namespace Org.Apache.REEF.IMRU.API.Parameters
-{
-    /// <summary>
-    /// The codec to be used for the map output.
-    /// </summary>
-    /// <typeparam name="TMapOutput"></typeparam>
-    [NamedParameter("The codec to be used for the map output.")]
-    public sealed class MapOutputCodec<TMapOutput> : 
Name<IStreamingCodec<TMapOutput>>
-    {
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs
deleted file mode 100644
index d6a35a5..0000000
--- a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Wake.StreamingCodec;
-
-namespace Org.Apache.REEF.IMRU.API.Parameters
-{
-    /// <summary>
-    /// The codec to be used for the result.
-    /// </summary>
-    /// <typeparam name="TResult"></typeparam>
-    [NamedParameter("The codec to be used for the result.")]
-    public class ResultCodec<TResult> : Name<IStreamingCodec<TResult>>
-    {
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs 
b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs
index 366881b..19eaeb5 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs
@@ -32,6 +32,11 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount
         {
         }
 
+        /// <summary>
+        /// Identity map function
+        /// </summary>
+        /// <param name="mapInput"></param>
+        /// <returns>mapInput itself</returns>
         public int Map(int mapInput)
         {
             return mapInput;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs 
b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs
index 03d9605..94c4332 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs
@@ -34,6 +34,11 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount
         {
         }
 
+        /// <summary>
+        /// Reduce function that returns the sum of elements 
+        /// </summary>
+        /// <param name="elements">List of elements</param>
+        /// <returns>The sum of elements</returns>
         public int Reduce(IEnumerable<int> elements)
         {
             return elements.Aggregate((x, y) => x + y);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs 
b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs
index 9b5b247..ae57e48 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs
@@ -19,6 +19,7 @@
 
 using System.Linq;
 using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IO.PartitionedData.Random;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
@@ -39,29 +40,44 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount
         }
 
         /// <summary>
+        /// Runs the actual mapper count job
         /// </summary>
         /// <returns>The number of MapFunction instances that are part of the 
job.</returns>
-        public int Run()
+        public int Run(int numberofMappers)
         {
             var results = _imruClient.Submit(
                 new IMRUJobDefinitionBuilder()
-                    .SetConfiguration(
-                        IMRUConfiguration<int, int, int>.ConfigurationModule
-                            .Set(IMRUConfiguration<int, int, int>.MapFunction, 
GenericType<IdentityMapFunction>.Class)
-                            .Set(IMRUConfiguration<int, int, 
int>.ReduceFunction,
-                                GenericType<IntSumReduceFunction>.Class)
-                            .Set(IMRUConfiguration<int, int, 
int>.UpdateFunction,
+                    .SetMapFunctionConfiguration(IMRUMapConfiguration<int, 
int>.ConfigurationModule
+                        .Set(IMRUMapConfiguration<int, int>.MapFunction, 
GenericType<IdentityMapFunction>.Class)
+                        .Build())
+                    .SetUpdateFunctionConfiguration(
+                        IMRUUpdateConfiguration<int, int, 
int>.ConfigurationModule
+                            .Set(IMRUUpdateConfiguration<int, int, 
int>.UpdateFunction,
                                 GenericType<MapperCountUpdateFunction>.Class)
-                            .Set(IMRUConfiguration<int, int, 
int>.MapInputCodec,
-                                GenericType<IntStreamingCodec>.Class)
-                            .Set(IMRUConfiguration<int, int, 
int>.MapOutputCodec,
-                                GenericType<IntStreamingCodec>.Class)
-                            .Set(IMRUConfiguration<int, int, int>.ResultCodec, 
GenericType<IntStreamingCodec>.Class)
                             .Build())
+                    
.SetMapInputCodecConfiguration(IMRUCodecConfiguration<int>.ConfigurationModule
+                        .Set(IMRUCodecConfiguration<int>.Codec, 
GenericType<IntStreamingCodec>.Class)
+                        .Build())
+                    
.SetUpdateFunctionCodecsConfiguration(IMRUCodecConfiguration<int>.ConfigurationModule
+                        .Set(IMRUCodecConfiguration<int>.Codec, 
GenericType<IntStreamingCodec>.Class)
+                        .Build())
+                    
.SetReduceFunctionConfiguration(IMRUReduceFunctionConfiguration<int>.ConfigurationModule
+                        
.Set(IMRUReduceFunctionConfiguration<int>.ReduceFunction,
+                            GenericType<IntSumReduceFunction>.Class)
+                        .Build())
+                    .SetPartitionedDatasetConfiguration(
+                        
RandomDataConfiguration.ConfigurationModule.Set(RandomDataConfiguration.NumberOfPartitions,
+                            numberofMappers.ToString()).Build())
                     .SetJobName("MapperCount")
-                    .Build()
-                );
-            return results.First();
+                    .SetNumberOfMappers(numberofMappers)
+                    .Build());
+
+            if (results != null)
+            {
+                return results.First();
+            }
+
+            return -1;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs
index f69605f..72f25ec 100644
--- 
a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs
@@ -36,11 +36,20 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount
         {
         }
 
+        /// <summary>
+        /// Update function
+        /// </summary>
+        /// <param name="input">Input containing sum of all mappers</param>
+        /// <returns>The Update Result with only result</returns>
         public UpdateResult<int, int> Update(int input)
         {
             return UpdateResult<int, int>.Done(input);
         }
 
+        /// <summary>
+        /// Initialize function. Sends 1 to all mappers
+        /// </summary>
+        /// <returns>Map input</returns>
         public UpdateResult<int, int> Initialize()
         {
             return UpdateResult<int, int>.AnotherRound(1);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs 
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
index 8ff36ab..2009d4c 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs
@@ -29,9 +29,9 @@ namespace Org.Apache.REEF.IMRU.InProcess
     /// <summary>
     /// Simple, single-threaded executor for IMRU Jobs.
     /// </summary>
-    /// <typeparam name="TMapInput"></typeparam>
-    /// <typeparam name="TMapOutput"></typeparam>
-    /// <typeparam name="TResult"></typeparam>
+    /// <typeparam name="TMapInput">Input to map function</typeparam>
+    /// <typeparam name="TMapOutput">Output of map function</typeparam>
+    /// <typeparam name="TResult">Final result</typeparam>
     internal sealed class IMRURunner<TMapInput, TMapOutput, TResult>
     {
         private readonly ISet<IMapFunction<TMapInput, TMapOutput>> 
_mapfunctions;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs 
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
index 4207bd4..bb22945 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
@@ -17,11 +17,13 @@
  * under the License.
  */
 
+using System;
 using System.Collections.Generic;
 using System.Diagnostics;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.InProcess.Parameters;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
@@ -52,19 +54,30 @@ namespace Org.Apache.REEF.IMRU.InProcess
             _numberOfMappers = numberOfMappers;
         }
 
+        /// <summary>
+        /// Submits the map job
+        /// </summary>
+        /// <param name="jobDefinition">Job definition given by the 
user</param>
+        /// <returns>The result of the job</returns>
         public IEnumerable<TResult> Submit(IMRUJobDefinition jobDefinition)
         {
-            var injector = 
TangFactory.GetTang().NewInjector(jobDefinition.Configuration);
+            var mergedConfig = Configurations.Merge(
+                jobDefinition.ReduceFunctionConfiguration,
+                jobDefinition.UpdateFunctionConfiguration);
+
+            var injector = TangFactory.GetTang().NewInjector(mergedConfig);
 
             injector.BindVolatileInstance(GenericType<MapFunctions<TMapInput, 
TMapOutput>>.Class,
-                MakeMapFunctions(injector));
+                MakeMapFunctions(jobDefinition.MapFunctionConfiguration));
 
             var runner = injector.GetInstance<IMRURunner<TMapInput, 
TMapOutput, TResult>>();
             return runner.Run();
         }
 
-        private MapFunctions<TMapInput, TMapOutput> MakeMapFunctions(IInjector 
injector)
+        private MapFunctions<TMapInput, TMapOutput> 
MakeMapFunctions(IConfiguration configuration)
         {
+            var injector = TangFactory.GetTang().NewInjector(configuration);
+
             ISet<IMapFunction<TMapInput, TMapOutput>> mappers = new 
HashSet<IMapFunction<TMapInput, TMapOutput>>();
             for (var i = 0; i < _numberOfMappers; ++i)
             {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs
index 75346ea..6a2f01b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs
@@ -37,6 +37,9 @@ namespace Org.Apache.REEF.IMRU.InProcess
         /// </summary>
         public static readonly OptionalParameter<int> NumberOfMappers = new 
OptionalParameter<int>();
 
+        /// <summary>
+        /// Configuration module
+        /// </summary>
         public static ConfigurationModule ConfigurationModule =
             new InProcessIMRUConfiguration<TMapInput, TMapOutput, TResult>()
                 .BindImplementation(GenericType<IIMRUClient<TMapInput, 
TMapOutput, TResult>>.Class,

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2f0cf34d/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index ca0ae8e..5ae900f 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -41,15 +41,16 @@ under the License.
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
-    <Compile Include="API\IMRUConfiguration.cs" />
     <Compile Include="API\IIMRUClient.cs" />
     <Compile Include="API\IMapFunction.cs" />
+    <Compile Include="API\IMRUCodecConfiguration.cs" />
+    <Compile Include="API\IMRUPipelineDataConverterConfiguration.cs" />
     <Compile Include="API\IMRUJobDefinition.cs" />
     <Compile Include="API\IMRUJobDefinitionBuilder.cs" />
+    <Compile Include="API\IMRUMapConfiguration.cs" />
+    <Compile Include="API\IMRUReduceFunctionConfiguration.cs" />
+    <Compile Include="API\IMRUUpdateConfiguration.cs" />
     <Compile Include="API\IUpdateFunction.cs" />
-    <Compile Include="API\Parameters\MapInputCodec.cs" />
-    <Compile Include="API\Parameters\MapOutputCodec.cs" />
-    <Compile Include="API\Parameters\ResultCodec.cs" />
     <Compile Include="API\UpdateResult.cs" />
     <Compile Include="Examples\MapperCount\MapperCount.cs" />
     <Compile Include="Examples\MapperCount\IdentityMapFunction.cs" />
@@ -95,6 +96,10 @@ under the License.
       <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
       <Name>Org.Apache.REEF.Wake</Name>
     </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.IO\Org.Apache.REEF.IO.csproj">
+      <Project>{dec0f0a8-dbef-4ebf-b69c-e2369c15abf1}</Project>
+      <Name>Org.Apache.REEF.IO</Name>
+    </ProjectReference>
   </ItemGroup>
   <ItemGroup>
     <None Include="Org.Apache.REEF.IMRU.nuspec" />

Reply via email to