Repository: incubator-reef
Updated Branches:
refs/heads/master 1dc1cf3a5 -> deedbe3cd
[REEF-556] Implement MapTaskHost, UpdateTaskHost and ConfigurationManager
required by IMRUDriver
This addressed the issue by implementing following classes
* `MapTaskHost` - Runs mapper iterations: receives input using group
communication, does map update, sends results back to update
function using group communications
* `UpdateTaskHost` - Runs update iterations: sends input to mappers
using group communications, receive aggregated output message from
mappers and then apply update function.
* `ConfigurationManager` - Takes serialized configuration specified by
client, deserializes it to get Map, update, codecs configuration
etc. from user. This class will be passed to IMRUDriver constructor.
JIRA:
[REEF-556](https://issues.apache.org/jira/browse/REEF-556)
Pull Request:
This closes #349
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/deedbe3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/deedbe3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/deedbe3c
Branch: refs/heads/master
Commit: deedbe3cdb74d387bafc7de3972da17ac43193d0
Parents: 1dc1cf3
Author: Dhruv <[email protected]>
Authored: Thu Aug 6 22:32:36 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Sun Aug 9 17:00:32 2015 -0700
----------------------------------------------------------------------
.../OnREEF/Driver/ConfigurationManager.cs | 177 +++++++++++++++++++
.../OnREEF/Driver/IMRUConstants.cs | 29 +++
.../OnREEF/IMRUTasks/MapTaskHost.cs | 81 +++++++++
.../OnREEF/IMRUTasks/UpdateTaskHost.cs | 98 ++++++++++
.../OnREEF/Parameters/CoresForUpdateTask.cs | 30 ++++
.../OnREEF/Parameters/CoresPerMapper.cs | 30 ++++
.../OnREEF/Parameters/MemoryForUpdateTask.cs | 30 ++++
.../OnREEF/Parameters/MemoryPerMapper.cs | 30 ++++
.../Parameters/SerializedMapConfiguration.cs | 26 +++
.../SerializedMapInputCodecConfiguration.cs | 26 +++
...apInputPipelineDataConverterConfiguration.cs | 30 ++++
...pOutputPipelineDataConverterConfiguration.cs | 30 ++++
.../Parameters/SerializedReduceConfiguration.cs | 30 ++++
.../Parameters/SerializedUpdateConfiguration.cs | 30 ++++
...rializedUpdateFunctionCodecsConfiguration.cs | 26 +++
.../Org.Apache.REEF.IMRU.csproj | 15 ++
16 files changed, 718 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
new file mode 100644
index 0000000..8c0188c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+ /// <summary>
+ /// Helper class that deserializes the various user-provided configurations
+ /// </summary>
+ internal sealed class ConfigurationManager
+ {
+ private static readonly Logger Logger =
Logger.GetLogger(typeof(ConfigurationManager));
+
+ private readonly IConfiguration _mapFunctionConfiguration;
+ private readonly IConfiguration _mapInputCodecConfiguration;
+ private readonly IConfiguration _updateFunctionCodecsConfiguration;
+ private readonly IConfiguration _reduceFunctionConfiguration;
+ private readonly IConfiguration _updateFunctionConfiguration;
+ private readonly IConfiguration
_mapOutputPipelineDataConverterConfiguration;
+ private readonly IConfiguration
_mapInputPipelineDataConverterConfiguration;
+
+ [Inject]
+ private ConfigurationManager(
+ AvroConfigurationSerializer configurationSerializer,
+ [Parameter(typeof(SerializedMapConfiguration))] string mapConfig,
+ [Parameter(typeof(SerializedReduceConfiguration))] string
reduceConfig,
+ [Parameter(typeof(SerializedUpdateConfiguration))] string
updateConfig,
+ [Parameter(typeof(SerializedMapInputCodecConfiguration))] string
mapInputCodecConfig,
+ [Parameter(typeof(SerializedUpdateFunctionCodecsConfiguration))]
string updateFunctionCodecsConfig,
+
[Parameter(typeof(SerializedMapOutputPipelineDataConverterConfiguration))]
string mapOutputPipelineDataConverterConfiguration,
+
[Parameter(typeof(SerializedMapInputPipelineDataConverterConfiguration))]
string mapInputPipelineDataConverterConfiguration)
+ {
+ try
+ {
+ _mapFunctionConfiguration =
configurationSerializer.FromString(mapConfig);
+ }
+ catch(Exception e)
+ {
+ Exceptions.Throw(e, "Unable to deserialize map function
configuration", Logger);
+ }
+
+ try
+ {
+ _reduceFunctionConfiguration =
configurationSerializer.FromString(reduceConfig);
+ }
+ catch (Exception e)
+ {
+ Exceptions.Throw(e, "Unable to deserialize reduce function
configuration", Logger);
+ }
+
+ try
+ {
+ _updateFunctionConfiguration =
configurationSerializer.FromString(updateConfig);
+ }
+ catch (Exception e)
+ {
+ Exceptions.Throw(e, "Unable to deserialize update function
configuration", Logger);
+ }
+
+ try
+ {
+ _updateFunctionCodecsConfiguration =
configurationSerializer.FromString(updateFunctionCodecsConfig);
+ }
+ catch (Exception e)
+ {
+ Exceptions.Throw(e, "Unable to deserialize update function
codec configuration", Logger);
+ }
+
+ try
+ {
+ _mapInputCodecConfiguration =
configurationSerializer.FromString(mapInputCodecConfig);
+ }
+ catch (Exception e)
+ {
+ Exceptions.Throw(e, "Unable to deserialize map input codec
configuration", Logger);
+ }
+
+ try
+ {
+ _mapOutputPipelineDataConverterConfiguration =
+
configurationSerializer.FromString(mapOutputPipelineDataConverterConfiguration);
+ }
+ catch (Exception e)
+ {
+ Exceptions.Throw(e, "Unable to deserialize map output pipeline
data converter configuration", Logger);
+ }
+
+ try
+ {
+ _mapInputPipelineDataConverterConfiguration =
+
configurationSerializer.FromString(mapInputPipelineDataConverterConfiguration);
+ }
+ catch (Exception e)
+ {
+ Exceptions.Throw(e, "Unable to deserialize map input pipeline
data converter configuration", Logger);
+ }
+ }
+
+ /// <summary>
+ /// Configuration of map function
+ /// </summary>
+ internal IConfiguration MapFunctionConfiguration
+ {
+ get { return _mapFunctionConfiguration; }
+ }
+
+ /// <summary>
+ /// Configuration of Map input codec
+ /// </summary>
+ internal IConfiguration MapInputCodecConfiguration
+ {
+ get { return _mapInputCodecConfiguration; }
+ }
+
+ /// <summary>
+ /// Configuration of codecs required in the Update function
+ /// Union of Map input, Map output and Result codecs
+ /// </summary>
+ internal IConfiguration UpdateFunctionCodecsConfiguration
+ {
+ get { return _updateFunctionCodecsConfiguration; }
+ }
+
+ /// <summary>
+ /// Configuration of reduce function
+ /// </summary>
+ internal IConfiguration ReduceFunctionConfiguration
+ {
+ get { return _reduceFunctionConfiguration; }
+ }
+
+ /// <summary>
+ /// Configuration of Update function
+ /// </summary>
+ internal IConfiguration UpdateFunctionConfiguration
+ {
+ get { return _updateFunctionConfiguration; }
+ }
+
+ /// <summary>
+ /// Configuration of PipelineDataConverter for chunking and dechunking
Map input
+ /// </summary>
+ internal IConfiguration MapOutputPipelineDataConverterConfiguration
+ {
+ get { return _mapOutputPipelineDataConverterConfiguration; }
+ }
+
+ /// <summary>
+ /// Configuration of PipelineDataConverter for chunking and dechunking
Map output
+ /// </summary>
+ internal IConfiguration MapInputPipelineDataConverterConfiguration
+ {
+ get { return _mapInputPipelineDataConverterConfiguration; }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs
new file mode 100644
index 0000000..dd9edf2
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+ internal static class IMRUConstants
+ {
+ internal const string CommunicationGroupName =
"IMRUCommunicationGroup";
+ internal const string BroadcastOperatorName = "IMRUBroadcast";
+ internal const string ReduceOperatorName = "IMRUReduce";
+ internal const int TreeFanout = 2;
+ internal const string MapTaskPrefix = "IMRUMap";
+ internal const string UpdateTaskName = "IMRUMaster";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
new file mode 100644
index 0000000..b1af870
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
+{
+ /// <summary>
+ /// Hosts the IMRU MapTask in a REEF Task.
+ /// </summary>
+ /// <typeparam name="TMapInput">Map input</typeparam>
+ /// <typeparam name="TMapOutput">Map output</typeparam>
+ internal sealed class MapTaskHost<TMapInput, TMapOutput> : ITask
+ {
+ private static readonly Logger Logger =
Logger.GetLogger(typeof(MapTaskHost<TMapInput, TMapOutput>));
+
+ private readonly
IBroadcastReceiver<MapInputWithControlMessage<TMapInput>>
_dataAndMessageReceiver;
+ private readonly IReduceSender<TMapOutput> _dataReducer;
+ private readonly IMapFunction<TMapInput, TMapOutput> _mapTask;
+
+ /// <summary>
+ /// </summary>
+ /// <param name="mapTask">The MapTask hosted in this REEF Task.</param>
+ /// <param name="groupCommunicationsClient">Used to setup the
communications.</param>
+ [Inject]
+ private MapTaskHost(IMapFunction<TMapInput, TMapOutput> mapTask,
IGroupCommClient groupCommunicationsClient)
+ {
+ _mapTask = mapTask;
+ var cg =
groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
+ _dataAndMessageReceiver =
cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
+ _dataReducer =
cg.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName);
+ }
+
+ /// <summary>
+ /// Performs IMRU iterations on map side
+ /// </summary>
+ /// <param name="memento"></param>
+ /// <returns></returns>
+ public byte[] Call(byte[] memento)
+ {
+ MapInputWithControlMessage<TMapInput> mapInput =
_dataAndMessageReceiver.Receive();
+
+ while (mapInput.ControlMessage == MapControlMessage.AnotherRound)
+ {
+ var result = _mapTask.Map(mapInput.Message);
+ _dataReducer.Send(result);
+ mapInput = _dataAndMessageReceiver.Receive();
+ }
+
+ return null;
+ }
+
+ /// <summary>
+ /// Dispose function
+ /// </summary>
+ public void Dispose()
+ {
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
new file mode 100644
index 0000000..f858eaf
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
+{
+ /// <summary>
+ /// Hosts the IMRU UpdateTask in a REEF task
+ /// </summary>
+ /// <typeparam name="TMapInput">Map input</typeparam>
+ /// <typeparam name="TMapOutput">Map output</typeparam>
+ /// <typeparam name="TResult">Final result</typeparam>
+ public sealed class UpdateTaskHost<TMapInput, TMapOutput, TResult> : ITask
+ {
+ private static readonly Logger Logger =
Logger.GetLogger(typeof(UpdateTaskHost<TMapInput, TMapOutput, TResult>));
+
+ private readonly IReduceReceiver<TMapOutput> _dataReceiver;
+ private readonly
IBroadcastSender<MapInputWithControlMessage<TMapInput>>
_dataAndControlMessageSender;
+ private readonly IUpdateFunction<TMapInput, TMapOutput, TResult>
_updateTask;
+
+ /// <summary>
+ /// </summary>
+ /// <param name="updateTask">The UpdateTask hosted in this REEF
Task.</param>
+ /// <param name="groupCommunicationsClient">Used to setup the
communications.</param>
+ [Inject]
+ private UpdateTaskHost(
+ IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask,
+ IGroupCommClient groupCommunicationsClient)
+ {
+ _updateTask = updateTask;
+ var cg =
groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
+ _dataAndControlMessageSender =
cg.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
+ _dataReceiver =
cg.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName);
+ }
+
+ /// <summary>
+ /// Performs IMRU iterations on update side
+ /// </summary>
+ /// <param name="memento"></param>
+ /// <returns></returns>
+ public byte[] Call(byte[] memento)
+ {
+ var updateResult = _updateTask.Initialize();
+ MapInputWithControlMessage<TMapInput> message =
+ new
MapInputWithControlMessage<TMapInput>(MapControlMessage.AnotherRound);
+
+ while (updateResult.HasMapInput)
+ {
+ message.Message = updateResult.MapInput;
+ _dataAndControlMessageSender.Send(message);
+ updateResult = _updateTask.Update(_dataReceiver.Reduce());
+ if (updateResult.HasResult)
+ {
+ // TODO[REEF-576]: Emit output somewhere.
+ }
+ }
+
+ message.ControlMessage = MapControlMessage.Stop;
+ _dataAndControlMessageSender.Send(message);
+
+ if (updateResult.HasResult)
+ {
+ // TODO[REEF-576]: Emit output somewhere.
+ }
+
+ return null;
+ }
+
+ /// <summary>
+ /// Dispose function
+ /// </summary>
+ public void Dispose()
+ {
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CoresForUpdateTask.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CoresForUpdateTask.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CoresForUpdateTask.cs
new file mode 100644
index 0000000..7439cc1
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CoresForUpdateTask.cs
@@ -0,0 +1,30 @@
+#region LICENSE
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#endregion
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("Number of cores for Update task in IMRU.",
"coresforupdatetask", "1")]
+ internal sealed class CoresForUpdateTask : Name<int>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CoresPerMapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CoresPerMapper.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CoresPerMapper.cs
new file mode 100644
index 0000000..fcd47e4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CoresPerMapper.cs
@@ -0,0 +1,30 @@
+#region LICENSE
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#endregion
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("Number of cores per Mapper in IMRU.",
"corespermappertask", "1")]
+ internal sealed class CoresPerMapper : Name<int>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MemoryForUpdateTask.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MemoryForUpdateTask.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MemoryForUpdateTask.cs
new file mode 100644
index 0000000..8bee0f7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MemoryForUpdateTask.cs
@@ -0,0 +1,30 @@
+#region LICENSE
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#endregion
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("Memory in MB for Update task in IMRU.",
"updatetaskmemory", "512")]
+ internal sealed class MemoryForUpdateTask : Name<int>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MemoryPerMapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MemoryPerMapper.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MemoryPerMapper.cs
new file mode 100644
index 0000000..2403db4
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MemoryPerMapper.cs
@@ -0,0 +1,30 @@
+#region LICENSE
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#endregion
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("Memory in MB per Mapper in IMRU.", "maptaskmemory",
"512")]
+ internal sealed class MemoryPerMapper : Name<int>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapConfiguration.cs
new file mode 100644
index 0000000..a1424d4
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapConfiguration.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for mappers.")]
+ internal sealed class SerializedMapConfiguration : Name<string>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapInputCodecConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapInputCodecConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapInputCodecConfiguration.cs
new file mode 100644
index 0000000..be08ce5
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapInputCodecConfiguration.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for the map input codec.")]
+ internal sealed class SerializedMapInputCodecConfiguration : Name<string>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapInputPipelineDataConverterConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapInputPipelineDataConverterConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapInputPipelineDataConverterConfiguration.cs
new file mode 100644
index 0000000..d6917f4
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapInputPipelineDataConverterConfiguration.cs
@@ -0,0 +1,30 @@
+#region LICENSE
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#endregion
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for specifying DataConverter
for pipelinig for MapInput.")]
+ internal sealed class SerializedMapInputPipelineDataConverterConfiguration
: Name<string>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapOutputPipelineDataConverterConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapOutputPipelineDataConverterConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapOutputPipelineDataConverterConfiguration.cs
new file mode 100644
index 0000000..0a4ea68
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapOutputPipelineDataConverterConfiguration.cs
@@ -0,0 +1,30 @@
+#region LICENSE
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#endregion
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for specifying DataConverter
for pipelinig for MapOutput.")]
+ internal sealed class
SerializedMapOutputPipelineDataConverterConfiguration : Name<string>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedReduceConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedReduceConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedReduceConfiguration.cs
new file mode 100644
index 0000000..f1e9b1c
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedReduceConfiguration.cs
@@ -0,0 +1,30 @@
+#region LICENSE
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#endregion
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for the Reduce function.")]
+ internal sealed class SerializedReduceConfiguration : Name<string>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateConfiguration.cs
new file mode 100644
index 0000000..103fdc8
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateConfiguration.cs
@@ -0,0 +1,30 @@
+#region LICENSE
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#endregion
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for the update function.")]
+ internal sealed class SerializedUpdateConfiguration : Name<string>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateFunctionCodecsConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateFunctionCodecsConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateFunctionCodecsConfiguration.cs
new file mode 100644
index 0000000..5649ee4
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateFunctionCodecsConfiguration.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for the codecs required in
Update function.")]
+ internal sealed class SerializedUpdateFunctionCodecsConfiguration :
Name<string>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/deedbe3c/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 5ae900f..52bc9b3 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -61,10 +61,25 @@ under the License.
<Compile Include="InProcess\InProcessIMRUConfiguration.cs" />
<Compile Include="InProcess\MapFunctions.cs" />
<Compile Include="InProcess\Parameters\NumberOfMappers.cs" />
+ <Compile Include="OnREEF\Driver\ConfigurationManager.cs" />
+ <Compile Include="OnREEF\Driver\IMRUConstants.cs" />
+ <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
+ <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" />
<Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs"
/>
<Compile
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" />
<Compile
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessageCodec.cs"
/>
<Compile
Include="OnREEF\MapInputWithControlMessage\MapInputwithControlMessagePipelineDataConverter.cs"
/>
+ <Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" />
+ <Compile Include="OnREEF\Parameters\CoresPerMapper.cs" />
+ <Compile Include="OnREEF\Parameters\MemoryForUpdateTask.cs" />
+ <Compile Include="OnREEF\Parameters\MemoryPerMapper.cs" />
+ <Compile Include="OnREEF\Parameters\SerializedMapConfiguration.cs" />
+ <Compile
Include="OnREEF\Parameters\SerializedMapInputCodecConfiguration.cs" />
+ <Compile
Include="OnREEF\Parameters\SerializedMapInputPipelineDataConverterConfiguration.cs"
/>
+ <Compile
Include="OnREEF\Parameters\SerializedMapOutputPipelineDataConverterConfiguration.cs"
/>
+ <Compile Include="OnREEF\Parameters\SerializedReduceConfiguration.cs" />
+ <Compile Include="OnREEF\Parameters\SerializedUpdateConfiguration.cs" />
+ <Compile
Include="OnREEF\Parameters\SerializedUpdateFunctionCodecsConfiguration.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>