Repository: incubator-reef
Updated Branches:
refs/heads/master 2c00d8910 -> 2a453f64a
[REEF-608] Implement IMRU Driver and Client
This adds
* the REEF IMRU Driver that actually runs Map and Update tasks given the job
definition given by user and
* the REEF IMRU Client.
Also, this introduces a StreamingCodecConfigurationMinusMessage configuration
module in REEF.Network.Group.Config that gives internal codec configurations
used within group communication to the external applications like IMRU.
JIRA:
[REEF-608](https://issues.apache.org/jira/browse/REEF-608)
Pull Request:
This closes #385
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2a453f64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2a453f64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2a453f64
Branch: refs/heads/master
Commit: 2a453f64a278d68ea0b578be4beb618c6e6410fb
Parents: 2c00d89
Author: Dhruv <[email protected]>
Authored: Wed Aug 19 10:49:13 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Aug 19 19:32:08 2015 -0700
----------------------------------------------------------------------
.../API/IMRUJobDefinition.cs | 2 +-
.../OnREEF/Client/REEFIMRUClient.cs | 119 ++++++
.../Client/REEFIMRUClientConfiguration.cs | 41 +++
.../OnREEF/Driver/IMRUDriver.cs | 360 +++++++++++++++++++
.../Org.Apache.REEF.IMRU.csproj | 8 +
.../StreamingCodecConfigurationMinusMessage.cs | 47 +++
.../Org.Apache.REEF.Network.csproj | 1 +
7 files changed, 577 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index 2f2bf26..6a955c2 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -146,7 +146,7 @@ namespace Org.Apache.REEF.IMRU.API
/// <summary>
/// Configuration of partitioned dataset
/// </summary>
- internal IConfiguration PartitionedDatasetConfgiuration
+ internal IConfiguration PartitionedDatasetConfiguration
{
get { return _partitionedDatasetConfiguration; }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
new file mode 100644
index 0000000..99f5313
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using System.Globalization;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Client
+{
+ /// <summary>
+ /// Implements the IMRU client API on REEF.
+ /// </summary>
+ /// <typeparam name="TMapInput">The type of the side information provided
to the Map function</typeparam>
+ /// <typeparam name="TMapOutput">The return type of the Map
function</typeparam>
+ /// <typeparam name="TResult">The return type of the
computation.</typeparam>
+ internal sealed class REEFIMRUClient<TMapInput, TMapOutput, TResult> :
IIMRUClient<TMapInput, TMapOutput, TResult>
+ {
+ private readonly IREEFClient _reefClient;
+ private readonly JobSubmissionBuilderFactory
_jobSubmissionBuilderFactory;
+ private readonly AvroConfigurationSerializer _configurationSerializer;
+
+ [Inject]
+ private REEFIMRUClient(IREEFClient reefClient,
AvroConfigurationSerializer configurationSerializer,
+ JobSubmissionBuilderFactory jobSubmissionBuilderFactory)
+ {
+ _reefClient = reefClient;
+ _configurationSerializer = configurationSerializer;
+ _jobSubmissionBuilderFactory = jobSubmissionBuilderFactory;
+ }
+
+ /// <summary>
+ /// Submits the job to reefClient
+ /// </summary>
+ /// <param name="jobDefinition">IMRU job definition given by the
user</param>
+ /// <returns>Null as results will be later written to some
directory</returns>
+ IEnumerable<TResult> IIMRUClient<TMapInput, TMapOutput,
TResult>.Submit(IMRUJobDefinition jobDefinition)
+ {
+ string driverId = string.Format("IMRU-{0}-Driver",
jobDefinition.JobName);
+
+ // The driver configuration contains all the needed bindings.
+ var imruDriverConfiguration =
TangFactory.GetTang().NewConfigurationBuilder(new[]
+ {
+ DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnEvaluatorAllocated,
+ GenericType<IMRUDriver<TMapInput, TMapOutput,
TResult>>.Class)
+ .Set(DriverConfiguration.OnDriverStarted,
+ GenericType<IMRUDriver<TMapInput, TMapOutput,
TResult>>.Class)
+ .Set(DriverConfiguration.OnContextActive,
+ GenericType<IMRUDriver<TMapInput, TMapOutput,
TResult>>.Class)
+ .Set(DriverConfiguration.OnTaskCompleted,
+ GenericType<IMRUDriver<TMapInput, TMapOutput,
TResult>>.Class)
+ .Build(),
+ TangFactory.GetTang().NewConfigurationBuilder()
+
.BindStringNamedParam<GroupCommConfigurationOptions.DriverId>(driverId)
+
.BindStringNamedParam<GroupCommConfigurationOptions.MasterTaskId>(IMRUConstants.UpdateTaskName)
+
.BindStringNamedParam<GroupCommConfigurationOptions.GroupName>(IMRUConstants.CommunicationGroupName)
+ .BindIntNamedParam<GroupCommConfigurationOptions.FanOut>(
+
IMRUConstants.TreeFanout.ToString(CultureInfo.InvariantCulture)
+ .ToString(CultureInfo.InvariantCulture))
+
.BindIntNamedParam<GroupCommConfigurationOptions.NumberOfTasks>(
+ (jobDefinition.NumberOfMappers +
1).ToString(CultureInfo.InvariantCulture))
+ .BindImplementation(GenericType<IGroupCommDriver>.Class,
GenericType<GroupCommDriver>.Class)
+ .Build(),
+ jobDefinition.PartitionedDatasetConfiguration
+ })
+ .BindNamedParameter(typeof (SerializedMapConfiguration),
+
_configurationSerializer.ToString(jobDefinition.MapFunctionConfiguration))
+ .BindNamedParameter(typeof (SerializedUpdateConfiguration),
+
_configurationSerializer.ToString(jobDefinition.UpdateFunctionConfiguration))
+ .BindNamedParameter(typeof
(SerializedMapInputCodecConfiguration),
+
_configurationSerializer.ToString(jobDefinition.MapInputCodecConfiguration))
+ .BindNamedParameter(typeof
(SerializedMapInputPipelineDataConverterConfiguration),
+
_configurationSerializer.ToString(jobDefinition.MapInputPipelineDataConverterConfiguration))
+ .BindNamedParameter(typeof
(SerializedUpdateFunctionCodecsConfiguration),
+
_configurationSerializer.ToString(jobDefinition.UpdateFunctionCodecsConfiguration))
+ .BindNamedParameter(typeof
(SerializedMapOutputPipelineDataConverterConfiguration),
+
_configurationSerializer.ToString(jobDefinition.MapOutputPipelineDataConverterConfiguration))
+ .BindNamedParameter(typeof (SerializedReduceConfiguration),
+
_configurationSerializer.ToString(jobDefinition.ReduceFunctionConfiguration))
+ .Build();
+
+ // The JobSubmission contains the Driver configuration as well as
the files needed on the Driver.
+ var imruJobSubmission =
_jobSubmissionBuilderFactory.GetJobSubmissionBuilder()
+ .AddDriverConfiguration(imruDriverConfiguration)
+ .AddGlobalAssemblyForType(typeof (IMRUDriver<TMapInput,
TMapOutput, TResult>))
+ .SetJobIdentifier(jobDefinition.JobName)
+ .Build();
+
+ _reefClient.Submit(imruJobSubmission);
+
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClientConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClientConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClientConfiguration.cs
new file mode 100644
index 0000000..ac3bef3
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClientConfiguration.cs
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Client
+{
+ /// <summary>
+ /// A configuration module for specifying REEFIMRUClient
+ /// </summary>
+ public sealed class REEFIMRUClientConfiguration<TMapInput, TMapOutput,
TResult> : ConfigurationModuleBuilder
+ {
+ /// <summary>
+ /// Configuration module
+ /// </summary>
+ public static ConfigurationModule ConfigurationModule =
+ new REEFIMRUClientConfiguration<TMapInput, TMapOutput, TResult>()
+ .BindImplementation(GenericType<IIMRUClient<TMapInput,
TMapOutput, TResult>>.Class,
+ GenericType<REEFIMRUClient<TMapInput, TMapOutput,
TResult>>.Class)
+ .Build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
new file mode 100644
index 0000000..026563c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -0,0 +1,360 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Concurrent;
+using System.Globalization;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.IO.PartitionedData;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
+using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Parameters;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+ /// <summary>
+ /// Implements the IMRU driver on REEF
+ /// </summary>
+ /// <typeparam name="TMapInput">Map Input</typeparam>
+ /// <typeparam name="TMapOutput">Map output</typeparam>
+ /// <typeparam name="TResult">Result</typeparam>
+ internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult> :
IObserver<IDriverStarted>,
+ IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>,
IObserver<ICompletedTask>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof
(IMRUDriver<TMapInput, TMapOutput, TResult>));
+
+ private readonly ConfigurationManager _configurationManager;
+ private readonly IPartitionedDataSet _dataSet;
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
+ private ICommunicationGroupDriver _commGroup;
+ private readonly IGroupCommDriver _groupCommDriver;
+ private readonly TaskStarter _groupCommTaskStarter;
+ private IConfiguration _tcpPortProviderConfig;
+ private readonly ConcurrentStack<string> _taskIdStack;
+ private readonly ConcurrentStack<IPartitionDescriptor>
_partitionDescriptorStack;
+ private readonly int _coresPerMapper;
+ private readonly int _coresForUpdateTask;
+ private readonly int _memoryPerMapper;
+ private readonly int _memoryForUpdateTask;
+ private bool _allocatedUpdateTaskEvaluator;
+ private readonly ConcurrentBag<ICompletedTask> _completedTasks;
+
+ [Inject]
+ private IMRUDriver(IPartitionedDataSet dataSet,
+ ConfigurationManager configurationManager,
+ IEvaluatorRequestor evaluatorRequestor,
+ [Parameter(typeof (TcpPortRangeStart))] int startingPort,
+ [Parameter(typeof (TcpPortRangeCount))] int portRange,
+ [Parameter(typeof(CoresPerMapper))] int coresPerMapper,
+ [Parameter(typeof(CoresForUpdateTask))] int coresForUpdateTask,
+ [Parameter(typeof(MemoryPerMapper))] int memoryPerMapper,
+ [Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask,
+ IGroupCommDriver groupCommDriver)
+ {
+ _dataSet = dataSet;
+ _configurationManager = configurationManager;
+ _evaluatorRequestor = evaluatorRequestor;
+ _groupCommDriver = groupCommDriver;
+ _coresPerMapper = coresPerMapper;
+ _coresForUpdateTask = coresForUpdateTask;
+ _memoryPerMapper = memoryPerMapper;
+ _memoryForUpdateTask = memoryForUpdateTask;
+ _allocatedUpdateTaskEvaluator = false;
+ _completedTasks = new ConcurrentBag<ICompletedTask>();
+
+ AddGroupCommunicationOperators();
+
+ //TODO[REEF-600]: Once the configuration module for
TcpPortProvider
+ //TODO[REEF-600]: will be provided, the configuraiton will be
automatically
+ //TODO[REEF-600]: carried over to evaluators and below function
will be obsolete.
+ ConstructTcpPortProviderConfig(startingPort, portRange);
+
+ _groupCommTaskStarter = new TaskStarter(_groupCommDriver,
_dataSet.Count + 1);
+
+ _taskIdStack = new ConcurrentStack<string>();
+ _partitionDescriptorStack = new
ConcurrentStack<IPartitionDescriptor>();
+ ConstructTaskIdAndPartitionDescriptorStack();
+ }
+
+ /// <summary>
+ /// Requests for evaluator for update task
+ /// </summary>
+ /// <param name="value">Event fired when driver started</param>
+ public void OnNext(IDriverStarted value)
+ {
+ _evaluatorRequestor.Submit(new EvaluatorRequest(1,
_memoryForUpdateTask, _coresForUpdateTask));
+ //TODO[REEF-598]: Set a timeout for this request to be satisfied.
If it is not within that time, exit the Driver.
+ }
+
+ /// <summary>
+ /// Specifies context and service configuration for evaluator depending
+ /// on whether it is for Update function or for map function
+ /// </summary>
+ /// <param name="allocatedEvaluator">The allocated evaluator</param>
+ public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+ {
+ IConfiguration contextConf =
_groupCommDriver.GetContextConfiguration();
+ IConfiguration serviceConf =
_groupCommDriver.GetServiceConfiguration();
+
+ if (!_allocatedUpdateTaskEvaluator)
+ {
+ var codecConfig =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder(
+ new[]
+ {
+
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
+
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
+
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
+
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
+
_configurationManager.UpdateFunctionCodecsConfiguration
+ }
+ ).Build();
+
+ serviceConf = Configurations.Merge(serviceConf, codecConfig,
_tcpPortProviderConfig);
+ _allocatedUpdateTaskEvaluator = true;
+
+ _evaluatorRequestor.Submit(new
EvaluatorRequest(_dataSet.Count, _memoryPerMapper, _coresPerMapper));
+ //TODO[REEF-598]: Set a timeout for this request to be
satisfied. If it is not within that time, exit the Driver.
+ }
+ else
+ {
+ IPartitionDescriptor partitionDescriptor;
+
+ if (!_partitionDescriptorStack.TryPop(out partitionDescriptor))
+ {
+ Logger.Log(Level.Warning, "partition descriptor exist for
the context of evaluator");
+ allocatedEvaluator.Dispose();
+ return;
+ }
+
+ var codecConfig =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder(
+ new[]
+ {
+
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
+
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
+
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
+
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
+
_configurationManager.MapInputCodecConfiguration
+ }
+ ).Build();
+
+ contextConf = Configurations.Merge(contextConf,
partitionDescriptor.GetPartitionConfiguration());
+ serviceConf = Configurations.Merge(serviceConf, codecConfig,
+ _tcpPortProviderConfig);
+ }
+
+ allocatedEvaluator.SubmitContextAndService(contextConf,
serviceConf);
+ }
+
+ /// <summary>
+ /// Specfies the Map or Update task to run on the active context
+ /// </summary>
+ /// <param name="activeContext"></param>
+ public void OnNext(IActiveContext activeContext)
+ {
+ Logger.Log(Level.Verbose, string.Format("Received Active Context
{0}", activeContext.Id));
+
+ if (_groupCommDriver.IsMasterTaskContext(activeContext))
+ {
+ var partialTaskConf =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder(new[]
+ {
+ TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier,
+ IMRUConstants.UpdateTaskName)
+ .Set(TaskConfiguration.Task,
+ GenericType<UpdateTaskHost<TMapInput,
TMapOutput, TResult>>.Class)
+ .Build(),
+ _configurationManager.UpdateFunctionConfiguration
+ })
+ .Build();
+
+ _commGroup.AddTask(IMRUConstants.UpdateTaskName);
+ _groupCommTaskStarter.QueueTask(partialTaskConf,
activeContext);
+ }
+ else
+ {
+ string taskId;
+
+ if (!_taskIdStack.TryPop(out taskId))
+ {
+ Logger.Log(Level.Warning, "No task Ids exist for the
active context {0}. Disposing the context.",
+ activeContext.Id);
+ activeContext.Dispose();
+ return;
+ }
+
+ var partialTaskConf =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder(new[]
+ {
+ TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, taskId)
+ .Set(TaskConfiguration.Task,
GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class)
+ .Build(),
+ _configurationManager.MapFunctionConfiguration
+ })
+ .Build();
+
+ _commGroup.AddTask(taskId);
+ _groupCommTaskStarter.QueueTask(partialTaskConf,
activeContext);
+ }
+ }
+
+ /// <summary>
+ /// Specfies what to do when the task is completed
+ /// In this case just disposes off the task
+ /// </summary>
+ /// <param name="completedTask">The link to the completed task</param>
+ public void OnNext(ICompletedTask completedTask)
+ {
+ _completedTasks.Add(completedTask);
+
+ if (_completedTasks.Count != _dataSet.Count + 1) return;
+
+ foreach (var task in _completedTasks)
+ {
+ Logger.Log(Level.Verbose, String.Format("Disposing task: {0}",
task.Id));
+ task.ActiveContext.Dispose();
+ }
+ }
+
+ /// <summary>
+ /// Specfies how to handle exception or error
+ /// </summary>
+ /// <param name="error">Kind of exception</param>
+ public void OnError(Exception error)
+ {
+ Logger.Log(Level.Error,"Cannot currently handle the Exception in
OnError function");
+ throw new NotImplementedException("Cannot currently handle
exception in OneError", error);
+ }
+
+ /// <summary>
+ /// Specfies what to do when driver is done
+ /// In this case do nothing
+ /// </summary>
+ public void OnCompleted()
+ {
+ }
+
+ private void AddGroupCommunicationOperators()
+ {
+ var reduceFunctionConfig =
_configurationManager.ReduceFunctionConfiguration;
+ var mapOutputPipelineDataConverterConfig =
_configurationManager.MapOutputPipelineDataConverterConfiguration;
+ var mapInputPipelineDataConverterConfig =
_configurationManager.MapInputPipelineDataConverterConfiguration;
+
+ try
+ {
+ TangFactory.GetTang()
+ .NewInjector(mapInputPipelineDataConverterConfig)
+ .GetInstance<IPipelineDataConverter<TMapInput>>();
+
+ mapInputPipelineDataConverterConfig =
+ TangFactory.GetTang()
+
.NewConfigurationBuilder(mapInputPipelineDataConverterConfig)
+ .BindImplementation(
+
GenericType<IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class,
+
GenericType<MapInputwithControlMessagePipelineDataConverter<TMapInput>>.Class)
+ .Build();
+ }
+ catch (Exception e)
+ {
+ mapInputPipelineDataConverterConfig = TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindImplementation(
+
GenericType<IPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class,
+
GenericType<DefaultPipelineDataConverter<MapInputWithControlMessage<TMapInput>>>.Class)
+ .Build();
+ }
+
+ try
+ {
+ TangFactory.GetTang()
+ .NewInjector(mapInputPipelineDataConverterConfig)
+ .GetInstance<IPipelineDataConverter<TMapOutput>>();
+ }
+ catch (Exception e)
+ {
+ mapOutputPipelineDataConverterConfig =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder()
+
.BindImplementation(GenericType<IPipelineDataConverter<TMapOutput>>.Class,
+
GenericType<DefaultPipelineDataConverter<TMapOutput>>.Class)
+ .Build();
+ }
+
+ _commGroup =
+ _groupCommDriver.DefaultGroup
+ .AddBroadcast<MapInputWithControlMessage<TMapInput>>(
+ IMRUConstants.BroadcastOperatorName,
+ IMRUConstants.UpdateTaskName,
+ TopologyTypes.Tree,
+ mapInputPipelineDataConverterConfig)
+ .AddReduce<TMapOutput>(
+ IMRUConstants.ReduceOperatorName,
+ IMRUConstants.UpdateTaskName,
+ TopologyTypes.Tree,
+ reduceFunctionConfig,
+ mapOutputPipelineDataConverterConfig)
+ .Build();
+ }
+
+ private void ConstructTcpPortProviderConfig(int startingPort, int
portRange)
+ {
+ _tcpPortProviderConfig =
TangFactory.GetTang().NewConfigurationBuilder()
+ .BindNamedParameter<TcpPortRangeStart,
int>(GenericType<TcpPortRangeStart>.Class,
+ startingPort.ToString(CultureInfo.InvariantCulture))
+ .BindNamedParameter<TcpPortRangeCount,
int>(GenericType<TcpPortRangeCount>.Class,
+ portRange.ToString(CultureInfo.InvariantCulture))
+ .Build();
+ }
+
+ private void ConstructTaskIdAndPartitionDescriptorStack()
+ {
+ int counter = 0;
+
+ foreach (var partitionDescriptor in _dataSet)
+ {
+ string id = IMRUConstants.MapTaskPrefix + "-Id" + counter +
"-Version0";
+ _taskIdStack.Push(id);
+ _partitionDescriptorStack.Push(partitionDescriptor);
+ counter++;
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 52bc9b3..997e4bf 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -61,8 +61,11 @@ under the License.
<Compile Include="InProcess\InProcessIMRUConfiguration.cs" />
<Compile Include="InProcess\MapFunctions.cs" />
<Compile Include="InProcess\Parameters\NumberOfMappers.cs" />
+ <Compile Include="OnREEF\Client\REEFIMRUClientConfiguration.cs" />
+ <Compile Include="OnREEF\Client\REEFIMRUClient.cs" />
<Compile Include="OnREEF\Driver\ConfigurationManager.cs" />
<Compile Include="OnREEF\Driver\IMRUConstants.cs" />
+ <Compile Include="OnREEF\Driver\IMRUDriver.cs" />
<Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
<Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" />
<Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs"
/>
@@ -115,8 +118,13 @@ under the License.
<Project>{dec0f0a8-dbef-4ebf-b69c-e2369c15abf1}</Project>
<Name>Org.Apache.REEF.IO</Name>
</ProjectReference>
+ <ProjectReference
Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj">
+ <Project>{5094c35b-4fdb-4322-ac05-45d684501cbf}</Project>
+ <Name>Org.Apache.REEF.Client</Name>
+ </ProjectReference>
</ItemGroup>
<ItemGroup>
<None Include="Org.Apache.REEF.IMRU.nuspec" />
</ItemGroup>
+ <ItemGroup />
</Project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfigurationMinusMessage.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfigurationMinusMessage.cs
b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfigurationMinusMessage.cs
new file mode 100644
index 0000000..fb0b6d9
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.Network/Group/Config/StreamingCodecConfigurationMinusMessage.cs
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.StreamingCodec;
+
+namespace Org.Apache.REEF.Network.Group.Config
+{
+ /// <summary>
+ /// Defines configuration for internal streaming codecs and internal
pipeline message
+ /// streaming codecs required for group communication
+ /// </summary>
+ /// <typeparam name="T">Generic type of message</typeparam>
+ public sealed class StreamingCodecConfigurationMinusMessage<T> :
ConfigurationModuleBuilder
+ {
+ /// <summary>
+ /// Configuration Module for Codec
+ /// </summary>
+ public static ConfigurationModule Conf = new
StreamingCodecConfigurationMinusMessage<T>()
+
.BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class,
+ GenericType<StreamingPipelineMessageCodec<T>>.Class)
+
.BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<T>>>.Class,
+ GenericType<GroupCommunicationMessageStreamingCodec<T>>.Class)
+
.BindImplementation(GenericType<IStreamingCodec<GroupCommunicationMessage<PipelineMessage<T>>>>.Class,
+
GenericType<GroupCommunicationMessageStreamingCodec<PipelineMessage<T>>>.Class)
+ .Build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2a453f64/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 2d62cb4..884001f 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -52,6 +52,7 @@ under the License.
</ItemGroup>
<ItemGroup>
<Compile Include="Group\Config\CodecToStreamingCodecConfiguration.cs" />
+ <Compile Include="Group\Config\StreamingCodecConfigurationMinusMessage.cs"
/>
<Compile Include="Group\Driver\Impl\GeneralGroupCommunicationMessage.cs" />
<Compile Include="Group\Config\CodecConfiguration.cs" />
<Compile Include="Group\Config\GroupCommConfigurationOptions.cs" />