Repository: incubator-reef
Updated Branches:
  refs/heads/master 8382114b0 -> 528d8ebbc


[REEF-658] Introduce retry logic for failed evaluators

This addressed the issue by

  * introducing extra parameter AllowedFailedEvaluatorsFraction that determines
    which fraction of evaluators are allowed to fail.
  * storing context and service configuration of allocated evaluators as well as
    id of failed evaluators
  * submitting the request for the new evaluator and resubmitting the stored
    context and service configuration based on Id.

JIRA:
  [REEF-658](https://issues.apache.org/jira/browse/REEF-658)

Pull Request:
  This closes #474


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/528d8ebb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/528d8ebb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/528d8ebb

Branch: refs/heads/master
Commit: 528d8ebbc0066c774a925c123aa7c720730ee2c1
Parents: 8382114
Author: Dhruv <[email protected]>
Authored: Fri Sep 4 15:19:47 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Sep 14 16:41:33 2015 -0700

----------------------------------------------------------------------
 .../OnREEF/Client/REEFIMRUClient.cs             |   2 +
 .../Driver/ContextAndServiceConfiguration.cs    |  46 +++++
 .../OnREEF/Driver/IMRUDriver.cs                 | 163 ++++++++--------
 .../ServiceAndContextConfigurationProvider.cs   | 186 +++++++++++++++++++
 .../AllowedFailedEvaluatorsFraction.cs          |  26 +++
 .../Org.Apache.REEF.IMRU.csproj                 |   3 +
 6 files changed, 346 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 752aa42..9c492d0 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -93,6 +93,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
                         GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
                     .Set(DriverConfiguration.OnTaskCompleted,
                         GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorFailed,
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
                     .Build(),
                 TangFactory.GetTang().NewConfigurationBuilder()
                     
.BindStringNamedParam<GroupCommConfigurationOptions.DriverId>(driverId)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ContextAndServiceConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ContextAndServiceConfiguration.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ContextAndServiceConfiguration.cs
new file mode 100644
index 0000000..e280f72
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ContextAndServiceConfiguration.cs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Interface;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    internal class ContextAndServiceConfiguration
+    {
+        /// <summary>
+        /// The context configuration
+        /// </summary>
+        internal IConfiguration Context { get; private set; }
+
+        /// <summary>
+        /// The service configuration
+        /// </summary>
+        internal IConfiguration Service { get; private set; }
+
+        /// <summary>
+        /// internal constructor
+        /// </summary>
+        /// <param name="context">Context configuration</param>
+        /// <param name="service">Service configuration</param>
+        internal ContextAndServiceConfiguration(IConfiguration context, 
IConfiguration service)
+        {
+            Context = context;
+            Service = service;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 452298c..7322e4e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -20,6 +20,7 @@ using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Globalization;
 using System.Linq;
+using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Context;
@@ -30,7 +31,6 @@ using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
 using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
 using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using Org.Apache.REEF.IO.PartitionedData;
-using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining;
@@ -41,6 +41,7 @@ using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Remote.Parameters;
 
@@ -53,7 +54,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
     /// <typeparam name="TMapOutput">Map output</typeparam>
     /// <typeparam name="TResult">Result</typeparam>
     internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult> : 
IObserver<IDriverStarted>,
-        IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, 
IObserver<ICompletedTask>
+        IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, 
IObserver<ICompletedTask>, IObserver<IFailedEvaluator>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof 
(IMRUDriver<TMapInput, TMapOutput, TResult>));
 
@@ -66,26 +67,32 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private IConfiguration _tcpPortProviderConfig;
         private readonly ConcurrentStack<string> _taskIdStack;
         private readonly ConcurrentStack<IConfiguration> 
_perMapperConfiguration;
-        private readonly ConcurrentStack<IPartitionDescriptor> 
_partitionDescriptorStack;
+        private readonly Stack<IPartitionDescriptor> _partitionDescriptorStack;
         private readonly int _coresPerMapper;
         private readonly int _coresForUpdateTask;
         private readonly int _memoryPerMapper;
         private readonly int _memoryForUpdateTask;
         private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs;
-        private bool _allocatedUpdateTaskEvaluator;
         private readonly ConcurrentBag<ICompletedTask> _completedTasks;
-            
+        private readonly int _allowedFailedEvaluators;
+        private int _currentFailedEvaluators = 0;
+        private bool _reachedUpdateTaskActiveContext = false;
+
+        private readonly ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput>
+            _serviceAndContextConfigurationProvider;
+
         [Inject]
         private IMRUDriver(IPartitionedDataSet dataSet,
-            [Parameter(typeof(PerMapConfigGeneratorSet))] 
ISet<IPerMapperConfigGenerator> perMapperConfigs,
+            [Parameter(typeof (PerMapConfigGeneratorSet))] 
ISet<IPerMapperConfigGenerator> perMapperConfigs,
             ConfigurationManager configurationManager,
             IEvaluatorRequestor evaluatorRequestor,
             [Parameter(typeof (TcpPortRangeStart))] int startingPort,
             [Parameter(typeof (TcpPortRangeCount))] int portRange,
-            [Parameter(typeof(CoresPerMapper))] int coresPerMapper,
-            [Parameter(typeof(CoresForUpdateTask))] int coresForUpdateTask,
-            [Parameter(typeof(MemoryPerMapper))] int memoryPerMapper,
-            [Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask,
+            [Parameter(typeof (CoresPerMapper))] int coresPerMapper,
+            [Parameter(typeof (CoresForUpdateTask))] int coresForUpdateTask,
+            [Parameter(typeof (MemoryPerMapper))] int memoryPerMapper,
+            [Parameter(typeof (MemoryForUpdateTask))] int memoryForUpdateTask,
+            [Parameter(typeof (AllowedFailedEvaluatorsFraction))] double 
failedEvaluatorsFraction,
             IGroupCommDriver groupCommDriver)
         {
             _dataSet = dataSet;
@@ -97,11 +104,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             _memoryPerMapper = memoryPerMapper;
             _memoryForUpdateTask = memoryForUpdateTask;
             _perMapperConfigs = perMapperConfigs;
-            _allocatedUpdateTaskEvaluator = false;
             _completedTasks = new ConcurrentBag<ICompletedTask>();
+            _allowedFailedEvaluators = (int) 
(failedEvaluatorsFraction*dataSet.Count);
 
             AddGroupCommunicationOperators();
-            
+
             //TODO[REEF-600]: Once the configuration module for 
TcpPortProvider 
             //TODO[REEF-600]: will be provided, the configuraiton will be 
automatically
             //TODO[REEF-600]: carried over to evaluators and below function 
will be obsolete.
@@ -111,8 +118,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
             _taskIdStack = new ConcurrentStack<string>();
             _perMapperConfiguration = new ConcurrentStack<IConfiguration>();
-            _partitionDescriptorStack = new 
ConcurrentStack<IPartitionDescriptor>();
+            _partitionDescriptorStack = new Stack<IPartitionDescriptor>();
             ConstructTaskIdAndPartitionDescriptorStack();
+            _serviceAndContextConfigurationProvider =
+                new ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput>(dataSet.Count + 1, groupCommDriver,
+                    _configurationManager, _tcpPortProviderConfig, 
_partitionDescriptorStack);
         }
 
         /// <summary>
@@ -121,83 +131,20 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <param name="value">Event fired when driver started</param>
         public void OnNext(IDriverStarted value)
         {
-            var request =
-                _evaluatorRequestor.NewBuilder()
-                    .SetCores(_coresForUpdateTask)
-                    .SetMegabytes(_memoryForUpdateTask)
-                    .SetNumber(1)
-                    .Build();
-            _evaluatorRequestor.Submit(request);
+            RequestUpdateEvaluator();
             //TODO[REEF-598]: Set a timeout for this request to be satisfied. 
If it is not within that time, exit the Driver.
         }
 
         /// <summary>
         /// Specifies context and service configuration for evaluator depending
         /// on whether it is for Update function or for map function
+        /// Also handles evaluator failures
         /// </summary>
         /// <param name="allocatedEvaluator">The allocated evaluator</param>
         public void OnNext(IAllocatedEvaluator allocatedEvaluator)
         {
-            IConfiguration contextConf = 
_groupCommDriver.GetContextConfiguration();
-            IConfiguration serviceConf = 
_groupCommDriver.GetServiceConfiguration();
-
-            if (!_allocatedUpdateTaskEvaluator)
-            {
-                var codecConfig =
-                    TangFactory.GetTang()
-                        .NewConfigurationBuilder(
-                            new[]
-                            {
-                                
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
-                                    
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
-                                    
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
-                                
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
-                                
_configurationManager.UpdateFunctionCodecsConfiguration
-                            }
-                        ).Build();
-               
-                serviceConf = Configurations.Merge(serviceConf, codecConfig, 
_tcpPortProviderConfig);
-                _allocatedUpdateTaskEvaluator = true;
-
-                var request =
-                    _evaluatorRequestor.NewBuilder()
-                        .SetMegabytes(_memoryForUpdateTask)
-                        .SetNumber(_dataSet.Count)
-                        .SetCores(_coresPerMapper)
-                        .Build();
-                _evaluatorRequestor.Submit(request);
-                //TODO[REEF-598]: Set a timeout for this request to be 
satisfied. If it is not within that time, exit the Driver.
-            }
-            else
-            {
-                IPartitionDescriptor partitionDescriptor;
-
-                if (!_partitionDescriptorStack.TryPop(out partitionDescriptor))
-                {
-                    Logger.Log(Level.Warning, "partition descriptor exist for 
the context of evaluator");
-                    allocatedEvaluator.Dispose();
-                    return;
-                }
-
-                var codecConfig =
-                    TangFactory.GetTang()
-                        .NewConfigurationBuilder(
-                            new[]
-                            {
-                                
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
-                                    
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
-                                    
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
-                                
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
-                                
_configurationManager.MapInputCodecConfiguration
-                            }
-                        ).Build();
-
-                contextConf = Configurations.Merge(contextConf, 
partitionDescriptor.GetPartitionConfiguration());
-                serviceConf = Configurations.Merge(serviceConf, codecConfig,
-                    _tcpPortProviderConfig);
-            }
-            
-            allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
+            var configs = 
_serviceAndContextConfigurationProvider.GetNextConfiguration(allocatedEvaluator.Id);
+            allocatedEvaluator.SubmitContextAndService(configs.Context, 
configs.Service);
         }
 
         /// <summary>
@@ -210,6 +157,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
             if (_groupCommDriver.IsMasterTaskContext(activeContext))
             {
+                _reachedUpdateTaskActiveContext = true;
+                RequestMapEvaluators(_dataSet.Count);
+
                 var partialTaskConf =
                     TangFactory.GetTang()
                         .NewConfigurationBuilder(new[]
@@ -286,6 +236,39 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             }
         }
 
+        public void OnNext(IFailedEvaluator value)
+        {
+            Logger.Log(Level.Info, "An evaluator failed, checking if it failed 
before context and service was submitted");
+            int currFailedEvaluators = Interlocked.Increment(ref 
_currentFailedEvaluators);
+
+            if (value.FailedContexts != null && value.FailedContexts.Count != 
0)
+            {
+                Logger.Log(Level.Info, "Some active context failed, cannot 
continue IMRU task");        
+                Exceptions.Throw(new Exception(), Logger);
+            }
+
+            if (currFailedEvaluators > _allowedFailedEvaluators)
+            {
+                Exceptions.Throw(new Exception("Cannot continue IMRU job, 
Failed evaluators reach maximum limit"),
+                    Logger);
+            }
+
+            Logger.Log(Level.Info, "Requesting for the failed evaluator 
again");
+
+            _serviceAndContextConfigurationProvider.EvaluatorFailed(value.Id);
+
+            //If active context stage is reached for Update Task then assume 
that failed
+            //evaluator belongs to mapper
+            if (_reachedUpdateTaskActiveContext)
+            {
+                RequestMapEvaluators(1);
+            }
+            else
+            {
+                RequestUpdateEvaluator();
+            }
+        }
+
         /// <summary>
         /// Specfies how to handle exception or error
         /// </summary>
@@ -392,5 +375,25 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                 counter++;
             }
         }
+
+        private void RequestMapEvaluators(int numEvaluators)
+        {
+            _evaluatorRequestor.Submit(
+                _evaluatorRequestor.NewBuilder()
+                    .SetMegabytes(_memoryPerMapper)
+                    .SetNumber(numEvaluators)
+                    .SetCores(_coresPerMapper)
+                    .Build());
+        }
+
+        private void RequestUpdateEvaluator()
+        {
+            _evaluatorRequestor.Submit(
+                _evaluatorRequestor.NewBuilder()
+                    .SetCores(_coresForUpdateTask)
+                    .SetMegabytes(_memoryForUpdateTask)
+                    .SetNumber(1)
+                    .Build());
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
new file mode 100644
index 0000000..40faa9c
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
+using Org.Apache.REEF.IO.PartitionedData;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// Class that handles failed evaluators and also provides Service 
+    /// and Context configuration
+    /// </summary>
+    /// <typeparam name="TMapInput"></typeparam>
+    /// <typeparam name="TMapOutput"></typeparam>
+    internal class ServiceAndContextConfigurationProvider<TMapInput,TMapOutput>
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput,TMapOutput>));
+
+        private readonly Dictionary<string, ContextAndServiceConfiguration> 
_configurationProvider;
+        private readonly ISet<string> _failedEvaluators;
+        private readonly ISet<string> _submittedEvaluators; 
+        private readonly object _lock;
+        private readonly int _numNodes;
+        private int _assignedPartitionDescriptors;
+        private readonly IGroupCommDriver _groupCommDriver;
+        private readonly ConfigurationManager _configurationManager;
+        private readonly IConfiguration _tcpPortProviderConfig;
+        private readonly Stack<IPartitionDescriptor> _partitionDescriptors;
+
+        internal ServiceAndContextConfigurationProvider(int numNodes, 
IGroupCommDriver groupCommDriver,
+            ConfigurationManager configurationManager, IConfiguration 
tcpPortProviderConfig, Stack<IPartitionDescriptor> partitionDescriptors)
+        {
+            _configurationProvider = new Dictionary<string, 
ContextAndServiceConfiguration>();
+            _failedEvaluators = new HashSet<string>();
+            _submittedEvaluators = new HashSet<string>();
+            _numNodes = numNodes;
+            _groupCommDriver = groupCommDriver;
+            _configurationManager = configurationManager;
+            _tcpPortProviderConfig = tcpPortProviderConfig;
+            _assignedPartitionDescriptors = 0;
+            _partitionDescriptors = partitionDescriptors;
+            _lock = new object();
+        }
+
+        /// <summary>
+        /// Handles failed evaluator. Moves the id from 
+        /// submitted evaluator to failed evaluator
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        internal void EvaluatorFailed(string evaluatorId)
+        {
+            lock (_lock)
+            {
+                if (!_submittedEvaluators.Contains(evaluatorId))
+                {
+                    Exceptions.Throw(new Exception("Failed evaluator was never 
submitted"), Logger);
+                }
+
+                _failedEvaluators.Add(evaluatorId);
+                _submittedEvaluators.Remove(evaluatorId);
+            }
+        }
+
+        /// <summary>
+        /// Gives context and service configuration for next evaluator either 
from failed 
+        /// evaluator or new configuration
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        /// <returns></returns>
+        internal ContextAndServiceConfiguration GetNextConfiguration(string 
evaluatorId)
+        {
+            lock (_lock)
+            {
+                if (_submittedEvaluators.Contains(evaluatorId))
+                {
+                    Exceptions.Throw(new Exception("The evaluator is already 
submitted"), Logger);
+                }
+
+                if (_failedEvaluators.Count == 0 && 
_assignedPartitionDescriptors >= _numNodes)
+                {
+                    Exceptions.Throw(new Exception("No more configuration can 
be provided"), Logger);
+                }
+
+                //If some failed id exists return that configuration
+                if (_failedEvaluators.Count != 0)
+                {
+                    string failedEvaluatorId = _failedEvaluators.First();
+                    _failedEvaluators.Remove(failedEvaluatorId);
+                    var config = _configurationProvider[failedEvaluatorId];
+                    _configurationProvider.Remove(failedEvaluatorId);
+                    _configurationProvider[evaluatorId] = config;
+                }
+                else
+                {
+                    _assignedPartitionDescriptors++;
+
+                    if (_configurationProvider.ContainsKey(evaluatorId))
+                    {
+                        Exceptions.Throw(
+                            new Exception(
+                                "Evaluator Id already present in configuration 
cache, they have to be unique"),
+                            Logger);
+                    }
+
+                    //Checks whether to put update task configuration or map 
task configuration
+                    if (_assignedPartitionDescriptors == 1)
+                    {
+                        _configurationProvider[evaluatorId] = 
GetUpdateTaskContextAndServiceConfiguration();
+                    }
+                    else
+                    {
+                        _configurationProvider[evaluatorId] =
+                            
GetMapTaskContextAndServiceConfiguration(_partitionDescriptors.Pop());
+                    }
+                }
+
+                _submittedEvaluators.Add(evaluatorId);
+                return _configurationProvider[evaluatorId];
+            }
+        }
+
+        private ContextAndServiceConfiguration 
GetMapTaskContextAndServiceConfiguration(IPartitionDescriptor 
partitionDescriptor)
+        {
+            var codecConfig =
+                TangFactory.GetTang()
+                    .NewConfigurationBuilder(
+                        
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
+                            
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
+                            
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
+                        
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
+                        _configurationManager.MapInputCodecConfiguration
+                    ).Build();
+
+            var contextConf = 
Configurations.Merge(_groupCommDriver.GetContextConfiguration(), 
partitionDescriptor.GetPartitionConfiguration());
+            var serviceConf = 
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig,
+                            _tcpPortProviderConfig);
+
+            return new ContextAndServiceConfiguration(contextConf, 
serviceConf);
+        }
+
+        private ContextAndServiceConfiguration 
GetUpdateTaskContextAndServiceConfiguration()
+        {
+            var codecConfig =
+                TangFactory.GetTang()
+                    .NewConfigurationBuilder(
+                        new[]
+                        {
+                            
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
+                                
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
+                                
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
+                            
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
+                            
_configurationManager.UpdateFunctionCodecsConfiguration
+                        }
+                    ).Build();
+
+            var serviceConf = 
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig,
+                _tcpPortProviderConfig);
+            return new 
ContextAndServiceConfiguration(_groupCommDriver.GetContextConfiguration(), 
serviceConf);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
new file mode 100644
index 0000000..5a5b9c3
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    [NamedParameter("Determines number of failed evaluators 
(AllowedFailedEvaluators * Number of mappers) tolerated before throwing 
exception", "failedevaluators", "2.0")]
+    internal sealed class AllowedFailedEvaluatorsFraction : Name<double>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/528d8ebb/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 332137b..8459279 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -65,14 +65,17 @@ under the License.
     <Compile Include="OnREEF\Client\REEFIMRUClientConfiguration.cs" />
     <Compile Include="OnREEF\Client\REEFIMRUClient.cs" />
     <Compile Include="OnREEF\Driver\ConfigurationManager.cs" />
+    <Compile Include="OnREEF\Driver\ContextAndServiceConfiguration.cs" />
     <Compile Include="OnREEF\Driver\IMRUConstants.cs" />
     <Compile Include="OnREEF\Driver\IMRUDriver.cs" />
+    <Compile Include="OnREEF\Driver\ServiceAndContextConfigurationProvider.cs" 
/>
     <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
     <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" />
     <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" 
/>
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" />
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessageCodec.cs" 
/>
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputwithControlMessagePipelineDataConverter.cs"
 />
+    <Compile Include="OnREEF\Parameters\AllowedFailedEvaluatorsFraction.cs" />
     <Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" />
     <Compile Include="OnREEF\Parameters\CoresPerMapper.cs" />
     <Compile Include="OnREEF\Parameters\MemoryForUpdateTask.cs" />

Reply via email to