Repository: incubator-reef
Updated Branches:
refs/heads/master 8382114b0 -> 528d8ebbc
[REEF-658] Introduce retry logic for failed evaluators
This addressed the issue by
* introducing extra parameter AllowedFailedEvaluatorsFraction that determines
which fraction of evaluators are allowed to fail.
* storing context and service configuration of allocated evaluators as well as
id of failed evaluators
* submitting the request for the new evaluator and resubmitting the stored
context and service configuration based on Id.
JIRA:
[REEF-658](https://issues.apache.org/jira/browse/REEF-658)
Pull Request:
This closes #474
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/528d8ebb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/528d8ebb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/528d8ebb
Branch: refs/heads/master
Commit: 528d8ebbc0066c774a925c123aa7c720730ee2c1
Parents: 8382114
Author: Dhruv <[email protected]>
Authored: Fri Sep 4 15:19:47 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Sep 14 16:41:33 2015 -0700
----------------------------------------------------------------------
.../OnREEF/Client/REEFIMRUClient.cs | 2 +
.../Driver/ContextAndServiceConfiguration.cs | 46 +++++
.../OnREEF/Driver/IMRUDriver.cs | 163 ++++++++--------
.../ServiceAndContextConfigurationProvider.cs | 186 +++++++++++++++++++
.../AllowedFailedEvaluatorsFraction.cs | 26 +++
.../Org.Apache.REEF.IMRU.csproj | 3 +
6 files changed, 346 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 752aa42..9c492d0 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -93,6 +93,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
GenericType<IMRUDriver<TMapInput, TMapOutput,
TResult>>.Class)
.Set(DriverConfiguration.OnTaskCompleted,
GenericType<IMRUDriver<TMapInput, TMapOutput,
TResult>>.Class)
+ .Set(DriverConfiguration.OnEvaluatorFailed,
+ GenericType<IMRUDriver<TMapInput, TMapOutput,
TResult>>.Class)
.Build(),
TangFactory.GetTang().NewConfigurationBuilder()
.BindStringNamedParam<GroupCommConfigurationOptions.DriverId>(driverId)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ContextAndServiceConfiguration.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ContextAndServiceConfiguration.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ContextAndServiceConfiguration.cs
new file mode 100644
index 0000000..e280f72
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ContextAndServiceConfiguration.cs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+ internal class ContextAndServiceConfiguration
+ {
+ /// <summary>
+ /// The context configuration
+ /// </summary>
+ internal IConfiguration Context { get; private set; }
+
+ /// <summary>
+ /// The service configuration
+ /// </summary>
+ internal IConfiguration Service { get; private set; }
+
+ /// <summary>
+ /// internal constructor
+ /// </summary>
+ /// <param name="context">Context configuration</param>
+ /// <param name="service">Service configuration</param>
+ internal ContextAndServiceConfiguration(IConfiguration context,
IConfiguration service)
+ {
+ Context = context;
+ Service = service;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 452298c..7322e4e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -20,6 +20,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
+using System.Threading;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Context;
@@ -30,7 +31,6 @@ using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
using Org.Apache.REEF.IMRU.OnREEF.Parameters;
using Org.Apache.REEF.IO.PartitionedData;
-using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
using Org.Apache.REEF.Network.Group.Driver.Impl;
using Org.Apache.REEF.Network.Group.Pipelining;
@@ -41,6 +41,7 @@ using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
using Org.Apache.REEF.Wake.Remote.Parameters;
@@ -53,7 +54,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// <typeparam name="TMapOutput">Map output</typeparam>
/// <typeparam name="TResult">Result</typeparam>
internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult> :
IObserver<IDriverStarted>,
- IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>,
IObserver<ICompletedTask>
+ IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>,
IObserver<ICompletedTask>, IObserver<IFailedEvaluator>
{
private static readonly Logger Logger = Logger.GetLogger(typeof
(IMRUDriver<TMapInput, TMapOutput, TResult>));
@@ -66,26 +67,32 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
private IConfiguration _tcpPortProviderConfig;
private readonly ConcurrentStack<string> _taskIdStack;
private readonly ConcurrentStack<IConfiguration>
_perMapperConfiguration;
- private readonly ConcurrentStack<IPartitionDescriptor>
_partitionDescriptorStack;
+ private readonly Stack<IPartitionDescriptor> _partitionDescriptorStack;
private readonly int _coresPerMapper;
private readonly int _coresForUpdateTask;
private readonly int _memoryPerMapper;
private readonly int _memoryForUpdateTask;
private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs;
- private bool _allocatedUpdateTaskEvaluator;
private readonly ConcurrentBag<ICompletedTask> _completedTasks;
-
+ private readonly int _allowedFailedEvaluators;
+ private int _currentFailedEvaluators = 0;
+ private bool _reachedUpdateTaskActiveContext = false;
+
+ private readonly ServiceAndContextConfigurationProvider<TMapInput,
TMapOutput>
+ _serviceAndContextConfigurationProvider;
+
[Inject]
private IMRUDriver(IPartitionedDataSet dataSet,
- [Parameter(typeof(PerMapConfigGeneratorSet))]
ISet<IPerMapperConfigGenerator> perMapperConfigs,
+ [Parameter(typeof (PerMapConfigGeneratorSet))]
ISet<IPerMapperConfigGenerator> perMapperConfigs,
ConfigurationManager configurationManager,
IEvaluatorRequestor evaluatorRequestor,
[Parameter(typeof (TcpPortRangeStart))] int startingPort,
[Parameter(typeof (TcpPortRangeCount))] int portRange,
- [Parameter(typeof(CoresPerMapper))] int coresPerMapper,
- [Parameter(typeof(CoresForUpdateTask))] int coresForUpdateTask,
- [Parameter(typeof(MemoryPerMapper))] int memoryPerMapper,
- [Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask,
+ [Parameter(typeof (CoresPerMapper))] int coresPerMapper,
+ [Parameter(typeof (CoresForUpdateTask))] int coresForUpdateTask,
+ [Parameter(typeof (MemoryPerMapper))] int memoryPerMapper,
+ [Parameter(typeof (MemoryForUpdateTask))] int memoryForUpdateTask,
+ [Parameter(typeof (AllowedFailedEvaluatorsFraction))] double
failedEvaluatorsFraction,
IGroupCommDriver groupCommDriver)
{
_dataSet = dataSet;
@@ -97,11 +104,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
_memoryPerMapper = memoryPerMapper;
_memoryForUpdateTask = memoryForUpdateTask;
_perMapperConfigs = perMapperConfigs;
- _allocatedUpdateTaskEvaluator = false;
_completedTasks = new ConcurrentBag<ICompletedTask>();
+ _allowedFailedEvaluators = (int)
(failedEvaluatorsFraction*dataSet.Count);
AddGroupCommunicationOperators();
-
+
//TODO[REEF-600]: Once the configuration module for
TcpPortProvider
//TODO[REEF-600]: will be provided, the configuraiton will be
automatically
//TODO[REEF-600]: carried over to evaluators and below function
will be obsolete.
@@ -111,8 +118,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
_taskIdStack = new ConcurrentStack<string>();
_perMapperConfiguration = new ConcurrentStack<IConfiguration>();
- _partitionDescriptorStack = new
ConcurrentStack<IPartitionDescriptor>();
+ _partitionDescriptorStack = new Stack<IPartitionDescriptor>();
ConstructTaskIdAndPartitionDescriptorStack();
+ _serviceAndContextConfigurationProvider =
+ new ServiceAndContextConfigurationProvider<TMapInput,
TMapOutput>(dataSet.Count + 1, groupCommDriver,
+ _configurationManager, _tcpPortProviderConfig,
_partitionDescriptorStack);
}
/// <summary>
@@ -121,83 +131,20 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
/// <param name="value">Event fired when driver started</param>
public void OnNext(IDriverStarted value)
{
- var request =
- _evaluatorRequestor.NewBuilder()
- .SetCores(_coresForUpdateTask)
- .SetMegabytes(_memoryForUpdateTask)
- .SetNumber(1)
- .Build();
- _evaluatorRequestor.Submit(request);
+ RequestUpdateEvaluator();
//TODO[REEF-598]: Set a timeout for this request to be satisfied.
If it is not within that time, exit the Driver.
}
/// <summary>
/// Specifies context and service configuration for evaluator depending
/// on whether it is for Update function or for map function
+ /// Also handles evaluator failures
/// </summary>
/// <param name="allocatedEvaluator">The allocated evaluator</param>
public void OnNext(IAllocatedEvaluator allocatedEvaluator)
{
- IConfiguration contextConf =
_groupCommDriver.GetContextConfiguration();
- IConfiguration serviceConf =
_groupCommDriver.GetServiceConfiguration();
-
- if (!_allocatedUpdateTaskEvaluator)
- {
- var codecConfig =
- TangFactory.GetTang()
- .NewConfigurationBuilder(
- new[]
- {
-
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
-
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
-
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
-
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
-
_configurationManager.UpdateFunctionCodecsConfiguration
- }
- ).Build();
-
- serviceConf = Configurations.Merge(serviceConf, codecConfig,
_tcpPortProviderConfig);
- _allocatedUpdateTaskEvaluator = true;
-
- var request =
- _evaluatorRequestor.NewBuilder()
- .SetMegabytes(_memoryForUpdateTask)
- .SetNumber(_dataSet.Count)
- .SetCores(_coresPerMapper)
- .Build();
- _evaluatorRequestor.Submit(request);
- //TODO[REEF-598]: Set a timeout for this request to be
satisfied. If it is not within that time, exit the Driver.
- }
- else
- {
- IPartitionDescriptor partitionDescriptor;
-
- if (!_partitionDescriptorStack.TryPop(out partitionDescriptor))
- {
- Logger.Log(Level.Warning, "partition descriptor exist for
the context of evaluator");
- allocatedEvaluator.Dispose();
- return;
- }
-
- var codecConfig =
- TangFactory.GetTang()
- .NewConfigurationBuilder(
- new[]
- {
-
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
-
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
-
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
-
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
-
_configurationManager.MapInputCodecConfiguration
- }
- ).Build();
-
- contextConf = Configurations.Merge(contextConf,
partitionDescriptor.GetPartitionConfiguration());
- serviceConf = Configurations.Merge(serviceConf, codecConfig,
- _tcpPortProviderConfig);
- }
-
- allocatedEvaluator.SubmitContextAndService(contextConf,
serviceConf);
+ var configs =
_serviceAndContextConfigurationProvider.GetNextConfiguration(allocatedEvaluator.Id);
+ allocatedEvaluator.SubmitContextAndService(configs.Context,
configs.Service);
}
/// <summary>
@@ -210,6 +157,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
if (_groupCommDriver.IsMasterTaskContext(activeContext))
{
+ _reachedUpdateTaskActiveContext = true;
+ RequestMapEvaluators(_dataSet.Count);
+
var partialTaskConf =
TangFactory.GetTang()
.NewConfigurationBuilder(new[]
@@ -286,6 +236,39 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
}
}
+ public void OnNext(IFailedEvaluator value)
+ {
+ Logger.Log(Level.Info, "An evaluator failed, checking if it failed
before context and service was submitted");
+ int currFailedEvaluators = Interlocked.Increment(ref
_currentFailedEvaluators);
+
+ if (value.FailedContexts != null && value.FailedContexts.Count !=
0)
+ {
+ Logger.Log(Level.Info, "Some active context failed, cannot
continue IMRU task");
+ Exceptions.Throw(new Exception(), Logger);
+ }
+
+ if (currFailedEvaluators > _allowedFailedEvaluators)
+ {
+ Exceptions.Throw(new Exception("Cannot continue IMRU job,
Failed evaluators reach maximum limit"),
+ Logger);
+ }
+
+ Logger.Log(Level.Info, "Requesting for the failed evaluator
again");
+
+ _serviceAndContextConfigurationProvider.EvaluatorFailed(value.Id);
+
+ //If active context stage is reached for Update Task then assume
that failed
+ //evaluator belongs to mapper
+ if (_reachedUpdateTaskActiveContext)
+ {
+ RequestMapEvaluators(1);
+ }
+ else
+ {
+ RequestUpdateEvaluator();
+ }
+ }
+
/// <summary>
/// Specfies how to handle exception or error
/// </summary>
@@ -392,5 +375,25 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
counter++;
}
}
+
+ private void RequestMapEvaluators(int numEvaluators)
+ {
+ _evaluatorRequestor.Submit(
+ _evaluatorRequestor.NewBuilder()
+ .SetMegabytes(_memoryPerMapper)
+ .SetNumber(numEvaluators)
+ .SetCores(_coresPerMapper)
+ .Build());
+ }
+
+ private void RequestUpdateEvaluator()
+ {
+ _evaluatorRequestor.Submit(
+ _evaluatorRequestor.NewBuilder()
+ .SetCores(_coresForUpdateTask)
+ .SetMegabytes(_memoryForUpdateTask)
+ .SetNumber(1)
+ .Build());
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
new file mode 100644
index 0000000..40faa9c
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
+using Org.Apache.REEF.IO.PartitionedData;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+ /// <summary>
+ /// Class that handles failed evaluators and also provides Service
+ /// and Context configuration
+ /// </summary>
+ /// <typeparam name="TMapInput"></typeparam>
+ /// <typeparam name="TMapOutput"></typeparam>
+ internal class ServiceAndContextConfigurationProvider<TMapInput,TMapOutput>
+ {
+ private static readonly Logger Logger =
Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput,TMapOutput>));
+
+ private readonly Dictionary<string, ContextAndServiceConfiguration>
_configurationProvider;
+ private readonly ISet<string> _failedEvaluators;
+ private readonly ISet<string> _submittedEvaluators;
+ private readonly object _lock;
+ private readonly int _numNodes;
+ private int _assignedPartitionDescriptors;
+ private readonly IGroupCommDriver _groupCommDriver;
+ private readonly ConfigurationManager _configurationManager;
+ private readonly IConfiguration _tcpPortProviderConfig;
+ private readonly Stack<IPartitionDescriptor> _partitionDescriptors;
+
+ internal ServiceAndContextConfigurationProvider(int numNodes,
IGroupCommDriver groupCommDriver,
+ ConfigurationManager configurationManager, IConfiguration
tcpPortProviderConfig, Stack<IPartitionDescriptor> partitionDescriptors)
+ {
+ _configurationProvider = new Dictionary<string,
ContextAndServiceConfiguration>();
+ _failedEvaluators = new HashSet<string>();
+ _submittedEvaluators = new HashSet<string>();
+ _numNodes = numNodes;
+ _groupCommDriver = groupCommDriver;
+ _configurationManager = configurationManager;
+ _tcpPortProviderConfig = tcpPortProviderConfig;
+ _assignedPartitionDescriptors = 0;
+ _partitionDescriptors = partitionDescriptors;
+ _lock = new object();
+ }
+
+ /// <summary>
+ /// Handles failed evaluator. Moves the id from
+ /// submitted evaluator to failed evaluator
+ /// </summary>
+ /// <param name="evaluatorId"></param>
+ internal void EvaluatorFailed(string evaluatorId)
+ {
+ lock (_lock)
+ {
+ if (!_submittedEvaluators.Contains(evaluatorId))
+ {
+ Exceptions.Throw(new Exception("Failed evaluator was never
submitted"), Logger);
+ }
+
+ _failedEvaluators.Add(evaluatorId);
+ _submittedEvaluators.Remove(evaluatorId);
+ }
+ }
+
+ /// <summary>
+ /// Gives context and service configuration for next evaluator either
from failed
+ /// evaluator or new configuration
+ /// </summary>
+ /// <param name="evaluatorId"></param>
+ /// <returns></returns>
+ internal ContextAndServiceConfiguration GetNextConfiguration(string
evaluatorId)
+ {
+ lock (_lock)
+ {
+ if (_submittedEvaluators.Contains(evaluatorId))
+ {
+ Exceptions.Throw(new Exception("The evaluator is already
submitted"), Logger);
+ }
+
+ if (_failedEvaluators.Count == 0 &&
_assignedPartitionDescriptors >= _numNodes)
+ {
+ Exceptions.Throw(new Exception("No more configuration can
be provided"), Logger);
+ }
+
+ //If some failed id exists return that configuration
+ if (_failedEvaluators.Count != 0)
+ {
+ string failedEvaluatorId = _failedEvaluators.First();
+ _failedEvaluators.Remove(failedEvaluatorId);
+ var config = _configurationProvider[failedEvaluatorId];
+ _configurationProvider.Remove(failedEvaluatorId);
+ _configurationProvider[evaluatorId] = config;
+ }
+ else
+ {
+ _assignedPartitionDescriptors++;
+
+ if (_configurationProvider.ContainsKey(evaluatorId))
+ {
+ Exceptions.Throw(
+ new Exception(
+ "Evaluator Id already present in configuration
cache, they have to be unique"),
+ Logger);
+ }
+
+ //Checks whether to put update task configuration or map
task configuration
+ if (_assignedPartitionDescriptors == 1)
+ {
+ _configurationProvider[evaluatorId] =
GetUpdateTaskContextAndServiceConfiguration();
+ }
+ else
+ {
+ _configurationProvider[evaluatorId] =
+
GetMapTaskContextAndServiceConfiguration(_partitionDescriptors.Pop());
+ }
+ }
+
+ _submittedEvaluators.Add(evaluatorId);
+ return _configurationProvider[evaluatorId];
+ }
+ }
+
+ private ContextAndServiceConfiguration
GetMapTaskContextAndServiceConfiguration(IPartitionDescriptor
partitionDescriptor)
+ {
+ var codecConfig =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder(
+
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
+
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
+
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
+
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
+ _configurationManager.MapInputCodecConfiguration
+ ).Build();
+
+ var contextConf =
Configurations.Merge(_groupCommDriver.GetContextConfiguration(),
partitionDescriptor.GetPartitionConfiguration());
+ var serviceConf =
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig,
+ _tcpPortProviderConfig);
+
+ return new ContextAndServiceConfiguration(contextConf,
serviceConf);
+ }
+
+ private ContextAndServiceConfiguration
GetUpdateTaskContextAndServiceConfiguration()
+ {
+ var codecConfig =
+ TangFactory.GetTang()
+ .NewConfigurationBuilder(
+ new[]
+ {
+
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
+
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
+
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
+
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
+
_configurationManager.UpdateFunctionCodecsConfiguration
+ }
+ ).Build();
+
+ var serviceConf =
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig,
+ _tcpPortProviderConfig);
+ return new
ContextAndServiceConfiguration(_groupCommDriver.GetContextConfiguration(),
serviceConf);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
new file mode 100644
index 0000000..5a5b9c3
--- /dev/null
+++
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("Determines number of failed evaluators
(AllowedFailedEvaluators * Number of mappers) tolerated before throwing
exception", "failedevaluators", "2.0")]
+ internal sealed class AllowedFailedEvaluatorsFraction : Name<double>
+ {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 332137b..8459279 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -65,14 +65,17 @@ under the License.
<Compile Include="OnREEF\Client\REEFIMRUClientConfiguration.cs" />
<Compile Include="OnREEF\Client\REEFIMRUClient.cs" />
<Compile Include="OnREEF\Driver\ConfigurationManager.cs" />
+ <Compile Include="OnREEF\Driver\ContextAndServiceConfiguration.cs" />
<Compile Include="OnREEF\Driver\IMRUConstants.cs" />
<Compile Include="OnREEF\Driver\IMRUDriver.cs" />
+ <Compile Include="OnREEF\Driver\ServiceAndContextConfigurationProvider.cs"
/>
<Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
<Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" />
<Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs"
/>
<Compile
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" />
<Compile
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessageCodec.cs"
/>
<Compile
Include="OnREEF\MapInputWithControlMessage\MapInputwithControlMessagePipelineDataConverter.cs"
/>
+ <Compile Include="OnREEF\Parameters\AllowedFailedEvaluatorsFraction.cs" />
<Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" />
<Compile Include="OnREEF\Parameters\CoresPerMapper.cs" />
<Compile Include="OnREEF\Parameters\MemoryForUpdateTask.cs" />