Repository: incubator-reef
Updated Branches:
  refs/heads/master db4c1b55b -> e742d7ebc


[REEF-731] Improve DriverRestart Example

This addressed the issue by
  * Adding more checks and keeping more state in DriverRestart example.
  * Adding a success message to print such that the job submitter knows
    whether the example has passed.
  * Bind ``DriverRestartEvaluatorFailedHandler`` properly.

JIRA:
  [REEF-731](https://issues.apache.org/jira/browse/REEF-731)

Pull Request:
  This closes #480


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/e742d7eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/e742d7eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/e742d7eb

Branch: refs/heads/master
Commit: e742d7ebca72cd250957b7e2cd10e6adf07d8779
Parents: db4c1b5
Author: Andrew Chung <[email protected]>
Authored: Wed Sep 9 16:54:30 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Sep 11 14:20:16 2015 -0700

----------------------------------------------------------------------
 .../DriverRestart.cs                            |   1 +
 .../DriverRestart/HelloRestartDriver.cs         | 187 +++++++++++++++----
 2 files changed, 152 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e742d7eb/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs 
b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
index 2db7eb3..6924c44 100644
--- a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
@@ -63,6 +63,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart
                 .Set(DriverConfiguration.OnDriverRestartTaskRunning, 
GenericType<HelloRestartDriver>.Class)
                 .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<HelloRestartDriver>.Class)
                 .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<HelloRestartDriver>.Class)
+                .Set(DriverConfiguration.OnDriverRestartEvaluatorFailed, 
GenericType<HelloRestartDriver>.Class)
                 .Set(DriverConfiguration.OnDriverReconnect, 
GenericType<DefaultYarnClusterHttpDriverConnection>.Class)
                 
.Set(DriverConfiguration.DriverRestartEvaluatorRecoverySeconds, (5 * 
60).ToString())
                 .Set(DriverConfiguration.MaxApplicationSubmissions, 
2.ToString())

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e742d7eb/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs 
b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
index 00240f1..36d6178 100644
--- a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
@@ -20,7 +20,9 @@
 using System;
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Linq;
 using System.Text;
+using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Context;
@@ -42,29 +44,31 @@ namespace Org.Apache.REEF.Examples.DriverRestart
     /// Once all tasks are running, the driver kills itself and expects the RM 
to restart it.
     /// On restart, it expects all of the running task(s) to report back to it.
     /// </summary>
-    public sealed class HelloRestartDriver : 
IObserver<IDriverRestartCompleted>, IObserver<IAllocatedEvaluator>, 
IObserver<IDriverStarted>, 
+    public sealed class HelloRestartDriver : 
IObserver<IDriverRestartCompleted>, IObserver<IAllocatedEvaluator>, 
IObserver<IDriverStarted>,
         IObserver<IDriverRestarted>, IObserver<IActiveContext>, 
IObserver<IRunningTask>, IObserver<ICompletedTask>, IObserver<IFailedTask>,
         IObserver<IFailedEvaluator>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(HelloRestartDriver));
         private const int NumberOfTasksToSubmit = 1;
+        private const int NumberOfTasksToSubmitOnRestart = 1;
 
         private readonly IEvaluatorRequestor _evaluatorRequestor;
-        private readonly ISet<string> _receivedEvaluators = new 
HashSet<string>(StringComparer.OrdinalIgnoreCase);
-        private readonly object _lockObj;
-        
-        private int _runningTaskCount;
-        private int _finishedTaskCount;
-        private bool _restarted;
+
+        private bool _isRestart;
+        private readonly IDictionary<string, EvaluatorState> _evaluators = new 
Dictionary<string, EvaluatorState>(StringComparer.OrdinalIgnoreCase);
+
+        private readonly object _lockObj = new object();
+        private readonly Timer _exceptionTimer;
 
         [Inject]
         private HelloRestartDriver(IEvaluatorRequestor evaluatorRequestor)
         {
-            _finishedTaskCount = 0;
-            _runningTaskCount = 0;
+            _exceptionTimer = new Timer(obj =>
+            {
+                throw new Exception("Expected driver to be finished by now.");
+            }, new object(), TimeSpan.FromMinutes(10), 
TimeSpan.FromMinutes(10));
+
             _evaluatorRequestor = evaluatorRequestor;
-            _restarted = false;
-            _lockObj = new object();
         }
 
         /// <summary>
@@ -74,7 +78,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart
         {
             lock (_lockObj)
             {
-                _receivedEvaluators.Add(allocatedEvaluator.Id);
+                _evaluators.Add(allocatedEvaluator.Id, 
EvaluatorState.NewAllocated);
             }
 
             var taskConfiguration = TaskConfiguration.ConfigurationModule
@@ -91,6 +95,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart
         /// </summary>
         public void OnNext(IDriverStarted driverStarted)
         {
+            _isRestart = false;
             Logger.Log(Level.Info, "HelloRestartDriver started at {0}", 
driverStarted.StartTime);
             
_evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder().SetNumber(NumberOfTasksToSubmit).SetMegabytes(64).Build());
         }
@@ -100,40 +105,64 @@ namespace Org.Apache.REEF.Examples.DriverRestart
         /// </summary>
         public void OnNext(IDriverRestarted value)
         {
-            _restarted = true;
+            _isRestart = true;
             Logger.Log(Level.Info, "Hello! HelloRestartDriver has restarted! 
Expecting these Evaluator IDs [{0}]", string.Join(", ", 
value.ExpectedEvaluatorIds));
+            foreach (var expectedEvaluatorId in value.ExpectedEvaluatorIds)
+            {
+                _evaluators.Add(expectedEvaluatorId, EvaluatorState.Expected);
+            }
+
+            Logger.Log(Level.Info, "Requesting {0} new Evaluators on 
restart.", NumberOfTasksToSubmitOnRestart);
+            
_evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder().SetNumber(NumberOfTasksToSubmitOnRestart).SetMegabytes(64).Build());
         }
 
         public void OnNext(IActiveContext value)
         {
-            Logger.Log(Level.Info, "Received active context {0} from evaluator 
with ID [{1}].", value.Id, value.EvaluatorId);
+            if (!_evaluators.ContainsKey(value.EvaluatorId))
+            {
+                throw new Exception("Received active context from unexpected 
Evaluator " + value.EvaluatorId);
+            }
+
+            Logger.Log(Level.Info, "{0} active context {1} from evaluator with 
ID [{2}].", _evaluators[value.EvaluatorId], value.Id, value.EvaluatorId);
         }
 
         public void OnNext(IRunningTask value)
         {
             lock (_lockObj)
             {
-                _runningTaskCount++;
-                
-                Logger.Log(Level.Info, "Received running task with ID [{0}] " +
-                              " with restart set to {1} from evaluator with ID 
[{2}]",
-                              value.Id, _restarted, 
value.ActiveContext.EvaluatorId);
+                var evaluatorId = value.ActiveContext.EvaluatorId;
+                if (!_evaluators.ContainsKey(evaluatorId))
+                {
+                    throw new Exception("Unexpected Running Task from 
Evaluator " + evaluatorId);
+                }
+
+                Logger.Log(Level.Info, "{0} running task with ID [{1}] from 
evaluator with ID [{2}]",
+                    _evaluators[evaluatorId], value.Id, evaluatorId);
+
 
-                if (_restarted)
+                if (_evaluators[evaluatorId] == EvaluatorState.Expected)
                 {
                     value.Send(Encoding.UTF8.GetBytes("Hello from driver!"));
+                    _evaluators[evaluatorId] = EvaluatorState.RecoveredRunning;
                 }
-
-                // Kill itself in order for the driver to restart it.
-                if (_runningTaskCount == NumberOfTasksToSubmit)
+                else if (_evaluators[value.ActiveContext.EvaluatorId] == 
EvaluatorState.NewAllocated)
                 {
-                    if (_restarted)
+                    _evaluators[evaluatorId] = EvaluatorState.NewRunning;
+
+                    var newRunningCount = 
CountState(EvaluatorState.NewRunning);
+                    // Kill itself in order for the driver to restart it.
+                    if (!_isRestart && newRunningCount == 
NumberOfTasksToSubmit)
                     {
-                        Logger.Log(Level.Info, "Retrieved all running tasks 
from the previous instance.");
+                        Process.GetCurrentProcess().Kill();
                     }
-                    else
+
+                    if (_isRestart)
                     {
-                        Process.GetCurrentProcess().Kill();
+                        value.Send(Encoding.UTF8.GetBytes("Hello from 
driver!"));
+                        if (newRunningCount == NumberOfTasksToSubmitOnRestart)
+                        {
+                            Logger.Log(Level.Info, "Received all requested new 
running tasks.");
+                        }
                     }
                 }
             }
@@ -157,41 +186,127 @@ namespace Org.Apache.REEF.Examples.DriverRestart
 
         public void OnNext(IFailedEvaluator value)
         {
-            bool restart;
+            string action;
+            var evaluatorId = value.Id;
             lock (_lockObj)
             {
-                restart = !this._receivedEvaluators.Contains(value.Id);
+                if (!_evaluators.ContainsKey(evaluatorId))
+                {
+                    _evaluators[evaluatorId] = 
EvaluatorState.FailedAtRestartInit;
+                    action = "Restart initialization ";
+                }
+                else
+                {
+                    var state = _evaluators[evaluatorId];
+                    switch (state)
+                    {
+                        case EvaluatorState.Expected:
+                            _evaluators[evaluatorId] = EvaluatorState.Expired;
+                            action = "Expired on restart ";
+                            break;
+                        case EvaluatorState.RecoveredFinished:
+                        case EvaluatorState.NewFinished:
+                            // Note: this can be a result of REEF-61 as well, 
so we ignore Finished tasks and don't mark them as UnexpectedFailed.
+                            action = "Finished (REEF-61) ";
+                            break;
+                        default:
+                            _evaluators[evaluatorId] = 
EvaluatorState.UnexpectedFailed;
+                            action = "Unexpectedly failed (with original state 
" + state + ") ";
+                            break;
+                    }
+                }
             }
 
-            var append = restart ? "Restarted recovered " : string.Empty;
+            Logger.Log(Level.Info, action + "Evaluator [" + evaluatorId + "] 
has failed!");
 
-            Logger.Log(Level.Info, append + "Evaluator [" + value + "] has 
failed!");
+            CheckSuccess();
         }
 
         public void OnError(Exception error)
         {
+            _exceptionTimer.Dispose();
             throw error;
         }
 
         public void OnCompleted()
         {
+            _exceptionTimer.Dispose();
         }
 
         private void IncrementFinishedTask(Optional<IActiveContext> 
activeContext)
         {
             lock (_lockObj)
             {
-                _finishedTaskCount++;
-                if (_finishedTaskCount == NumberOfTasksToSubmit)
+                if (activeContext.IsPresent())
                 {
-                    Logger.Log(Level.Info, "All tasks are done! Driver should 
exit now...");
+                    var evaluatorId = activeContext.Value.EvaluatorId;
+                    if (!_evaluators.ContainsKey(evaluatorId))
+                    {
+                        throw new Exception("Unexpected finished/completed 
Task from Evaluator " + evaluatorId + ".");
+                    }
+
+                    if (_evaluators[evaluatorId] == 
EvaluatorState.RecoveredRunning)
+                    {
+                        Logger.Log(Level.Info, "Task on recovered Evaluator 
[{0}] has finished.", evaluatorId);
+                        _evaluators[evaluatorId] = 
EvaluatorState.RecoveredFinished;
+                    }
+                    else
+                    {
+                        Logger.Log(Level.Info, "Newly allocated task on 
Evaluator [{0}] has finished.", evaluatorId);
+                        _evaluators[evaluatorId] = EvaluatorState.NewFinished;
+
+                        if (_isRestart)
+                        {
+                            if (CountState(EvaluatorState.NewFinished) == 
NumberOfTasksToSubmitOnRestart)
+                            {
+                                Logger.Log(Level.Info, "All newly submitted 
tasks have finished.");
+                            }
+                        }
+                    }
+
+                    activeContext.Value.Dispose();
+
+                    CheckSuccess();
+                }
+                else
+                {
+                    throw new Exception("Active context is expected to be 
present.");
                 }
+            }
+        }
 
-                if (activeContext.IsPresent())
+        private void CheckSuccess()
+        {
+            lock (_lockObj)
+            {
+                if (CountState(EvaluatorState.Expected, 
EvaluatorState.NewRunning, EvaluatorState.RecoveredRunning,
+                        EvaluatorState.NewAllocated) == 0 &&
+                    _evaluators.Count == NumberOfTasksToSubmitOnRestart + 
NumberOfTasksToSubmit)
                 {
-                    activeContext.Value.Dispose();
+                    var append = CountState(EvaluatorState.UnexpectedFailed) > 
0 ? " However, there are evaluators that have unexpectedly failed " +
+                        "in this trial. Please re-run or read through the logs 
to make sure that such evaluators are expected." : string.Empty;
+                    Logger.Log(Level.Info, "SUCCESS!" + append);
                 }
             }
         }
+
+        private int CountState(params EvaluatorState[] states)
+        {
+            var set = new HashSet<EvaluatorState>(states);
+            return _evaluators.Count(kv => set.Contains(kv.Value));
+        }
+
+        private enum EvaluatorState
+        {
+            NewAllocated,
+            NewRunning,
+            Expected,
+            RecoveredRunning,
+            NewFinished,
+            RecoveredFinished,
+            UnexpectedFailed,
+            FailedAtRestartInit,
+            Expired
+        }
     }
 }
\ No newline at end of file

Reply via email to