Repository: incubator-reef
Updated Branches:
refs/heads/master db4c1b55b -> e742d7ebc
[REEF-731] Improve DriverRestart Example
This addressed the issue by
* Adding more checks and keeping more state in DriverRestart example.
* Adding a success message to print such that the job submitter knows
whether the example has passed.
* Bind ``DriverRestartEvaluatorFailedHandler`` properly.
JIRA:
[REEF-731](https://issues.apache.org/jira/browse/REEF-731)
Pull Request:
This closes #480
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/e742d7eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/e742d7eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/e742d7eb
Branch: refs/heads/master
Commit: e742d7ebca72cd250957b7e2cd10e6adf07d8779
Parents: db4c1b5
Author: Andrew Chung <[email protected]>
Authored: Wed Sep 9 16:54:30 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Sep 11 14:20:16 2015 -0700
----------------------------------------------------------------------
.../DriverRestart.cs | 1 +
.../DriverRestart/HelloRestartDriver.cs | 187 +++++++++++++++----
2 files changed, 152 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e742d7eb/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
index 2db7eb3..6924c44 100644
--- a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/DriverRestart.cs
@@ -63,6 +63,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart
.Set(DriverConfiguration.OnDriverRestartTaskRunning,
GenericType<HelloRestartDriver>.Class)
.Set(DriverConfiguration.OnEvaluatorAllocated,
GenericType<HelloRestartDriver>.Class)
.Set(DriverConfiguration.OnEvaluatorFailed,
GenericType<HelloRestartDriver>.Class)
+ .Set(DriverConfiguration.OnDriverRestartEvaluatorFailed,
GenericType<HelloRestartDriver>.Class)
.Set(DriverConfiguration.OnDriverReconnect,
GenericType<DefaultYarnClusterHttpDriverConnection>.Class)
.Set(DriverConfiguration.DriverRestartEvaluatorRecoverySeconds, (5 *
60).ToString())
.Set(DriverConfiguration.MaxApplicationSubmissions,
2.ToString())
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e742d7eb/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
index 00240f1..36d6178 100644
--- a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
@@ -20,7 +20,9 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.Linq;
using System.Text;
+using System.Threading;
using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Context;
@@ -42,29 +44,31 @@ namespace Org.Apache.REEF.Examples.DriverRestart
/// Once all tasks are running, the driver kills itself and expects the RM
to restart it.
/// On restart, it expects all of the running task(s) to report back to it.
/// </summary>
- public sealed class HelloRestartDriver :
IObserver<IDriverRestartCompleted>, IObserver<IAllocatedEvaluator>,
IObserver<IDriverStarted>,
+ public sealed class HelloRestartDriver :
IObserver<IDriverRestartCompleted>, IObserver<IAllocatedEvaluator>,
IObserver<IDriverStarted>,
IObserver<IDriverRestarted>, IObserver<IActiveContext>,
IObserver<IRunningTask>, IObserver<ICompletedTask>, IObserver<IFailedTask>,
IObserver<IFailedEvaluator>
{
private static readonly Logger Logger =
Logger.GetLogger(typeof(HelloRestartDriver));
private const int NumberOfTasksToSubmit = 1;
+ private const int NumberOfTasksToSubmitOnRestart = 1;
private readonly IEvaluatorRequestor _evaluatorRequestor;
- private readonly ISet<string> _receivedEvaluators = new
HashSet<string>(StringComparer.OrdinalIgnoreCase);
- private readonly object _lockObj;
-
- private int _runningTaskCount;
- private int _finishedTaskCount;
- private bool _restarted;
+
+ private bool _isRestart;
+ private readonly IDictionary<string, EvaluatorState> _evaluators = new
Dictionary<string, EvaluatorState>(StringComparer.OrdinalIgnoreCase);
+
+ private readonly object _lockObj = new object();
+ private readonly Timer _exceptionTimer;
[Inject]
private HelloRestartDriver(IEvaluatorRequestor evaluatorRequestor)
{
- _finishedTaskCount = 0;
- _runningTaskCount = 0;
+ _exceptionTimer = new Timer(obj =>
+ {
+ throw new Exception("Expected driver to be finished by now.");
+ }, new object(), TimeSpan.FromMinutes(10),
TimeSpan.FromMinutes(10));
+
_evaluatorRequestor = evaluatorRequestor;
- _restarted = false;
- _lockObj = new object();
}
/// <summary>
@@ -74,7 +78,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart
{
lock (_lockObj)
{
- _receivedEvaluators.Add(allocatedEvaluator.Id);
+ _evaluators.Add(allocatedEvaluator.Id,
EvaluatorState.NewAllocated);
}
var taskConfiguration = TaskConfiguration.ConfigurationModule
@@ -91,6 +95,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart
/// </summary>
public void OnNext(IDriverStarted driverStarted)
{
+ _isRestart = false;
Logger.Log(Level.Info, "HelloRestartDriver started at {0}",
driverStarted.StartTime);
_evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder().SetNumber(NumberOfTasksToSubmit).SetMegabytes(64).Build());
}
@@ -100,40 +105,64 @@ namespace Org.Apache.REEF.Examples.DriverRestart
/// </summary>
public void OnNext(IDriverRestarted value)
{
- _restarted = true;
+ _isRestart = true;
Logger.Log(Level.Info, "Hello! HelloRestartDriver has restarted!
Expecting these Evaluator IDs [{0}]", string.Join(", ",
value.ExpectedEvaluatorIds));
+ foreach (var expectedEvaluatorId in value.ExpectedEvaluatorIds)
+ {
+ _evaluators.Add(expectedEvaluatorId, EvaluatorState.Expected);
+ }
+
+ Logger.Log(Level.Info, "Requesting {0} new Evaluators on
restart.", NumberOfTasksToSubmitOnRestart);
+
_evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder().SetNumber(NumberOfTasksToSubmitOnRestart).SetMegabytes(64).Build());
}
public void OnNext(IActiveContext value)
{
- Logger.Log(Level.Info, "Received active context {0} from evaluator
with ID [{1}].", value.Id, value.EvaluatorId);
+ if (!_evaluators.ContainsKey(value.EvaluatorId))
+ {
+ throw new Exception("Received active context from unexpected
Evaluator " + value.EvaluatorId);
+ }
+
+ Logger.Log(Level.Info, "{0} active context {1} from evaluator with
ID [{2}].", _evaluators[value.EvaluatorId], value.Id, value.EvaluatorId);
}
public void OnNext(IRunningTask value)
{
lock (_lockObj)
{
- _runningTaskCount++;
-
- Logger.Log(Level.Info, "Received running task with ID [{0}] " +
- " with restart set to {1} from evaluator with ID
[{2}]",
- value.Id, _restarted,
value.ActiveContext.EvaluatorId);
+ var evaluatorId = value.ActiveContext.EvaluatorId;
+ if (!_evaluators.ContainsKey(evaluatorId))
+ {
+ throw new Exception("Unexpected Running Task from
Evaluator " + evaluatorId);
+ }
+
+ Logger.Log(Level.Info, "{0} running task with ID [{1}] from
evaluator with ID [{2}]",
+ _evaluators[evaluatorId], value.Id, evaluatorId);
+
- if (_restarted)
+ if (_evaluators[evaluatorId] == EvaluatorState.Expected)
{
value.Send(Encoding.UTF8.GetBytes("Hello from driver!"));
+ _evaluators[evaluatorId] = EvaluatorState.RecoveredRunning;
}
-
- // Kill itself in order for the driver to restart it.
- if (_runningTaskCount == NumberOfTasksToSubmit)
+ else if (_evaluators[value.ActiveContext.EvaluatorId] ==
EvaluatorState.NewAllocated)
{
- if (_restarted)
+ _evaluators[evaluatorId] = EvaluatorState.NewRunning;
+
+ var newRunningCount =
CountState(EvaluatorState.NewRunning);
+ // Kill itself in order for the driver to restart it.
+ if (!_isRestart && newRunningCount ==
NumberOfTasksToSubmit)
{
- Logger.Log(Level.Info, "Retrieved all running tasks
from the previous instance.");
+ Process.GetCurrentProcess().Kill();
}
- else
+
+ if (_isRestart)
{
- Process.GetCurrentProcess().Kill();
+ value.Send(Encoding.UTF8.GetBytes("Hello from
driver!"));
+ if (newRunningCount == NumberOfTasksToSubmitOnRestart)
+ {
+ Logger.Log(Level.Info, "Received all requested new
running tasks.");
+ }
}
}
}
@@ -157,41 +186,127 @@ namespace Org.Apache.REEF.Examples.DriverRestart
public void OnNext(IFailedEvaluator value)
{
- bool restart;
+ string action;
+ var evaluatorId = value.Id;
lock (_lockObj)
{
- restart = !this._receivedEvaluators.Contains(value.Id);
+ if (!_evaluators.ContainsKey(evaluatorId))
+ {
+ _evaluators[evaluatorId] =
EvaluatorState.FailedAtRestartInit;
+ action = "Restart initialization ";
+ }
+ else
+ {
+ var state = _evaluators[evaluatorId];
+ switch (state)
+ {
+ case EvaluatorState.Expected:
+ _evaluators[evaluatorId] = EvaluatorState.Expired;
+ action = "Expired on restart ";
+ break;
+ case EvaluatorState.RecoveredFinished:
+ case EvaluatorState.NewFinished:
+ // Note: this can be a result of REEF-61 as well,
so we ignore Finished tasks and don't mark them as UnexpectedFailed.
+ action = "Finished (REEF-61) ";
+ break;
+ default:
+ _evaluators[evaluatorId] =
EvaluatorState.UnexpectedFailed;
+ action = "Unexpectedly failed (with original state
" + state + ") ";
+ break;
+ }
+ }
}
- var append = restart ? "Restarted recovered " : string.Empty;
+ Logger.Log(Level.Info, action + "Evaluator [" + evaluatorId + "]
has failed!");
- Logger.Log(Level.Info, append + "Evaluator [" + value + "] has
failed!");
+ CheckSuccess();
}
public void OnError(Exception error)
{
+ _exceptionTimer.Dispose();
throw error;
}
public void OnCompleted()
{
+ _exceptionTimer.Dispose();
}
private void IncrementFinishedTask(Optional<IActiveContext>
activeContext)
{
lock (_lockObj)
{
- _finishedTaskCount++;
- if (_finishedTaskCount == NumberOfTasksToSubmit)
+ if (activeContext.IsPresent())
{
- Logger.Log(Level.Info, "All tasks are done! Driver should
exit now...");
+ var evaluatorId = activeContext.Value.EvaluatorId;
+ if (!_evaluators.ContainsKey(evaluatorId))
+ {
+ throw new Exception("Unexpected finished/completed
Task from Evaluator " + evaluatorId + ".");
+ }
+
+ if (_evaluators[evaluatorId] ==
EvaluatorState.RecoveredRunning)
+ {
+ Logger.Log(Level.Info, "Task on recovered Evaluator
[{0}] has finished.", evaluatorId);
+ _evaluators[evaluatorId] =
EvaluatorState.RecoveredFinished;
+ }
+ else
+ {
+ Logger.Log(Level.Info, "Newly allocated task on
Evaluator [{0}] has finished.", evaluatorId);
+ _evaluators[evaluatorId] = EvaluatorState.NewFinished;
+
+ if (_isRestart)
+ {
+ if (CountState(EvaluatorState.NewFinished) ==
NumberOfTasksToSubmitOnRestart)
+ {
+ Logger.Log(Level.Info, "All newly submitted
tasks have finished.");
+ }
+ }
+ }
+
+ activeContext.Value.Dispose();
+
+ CheckSuccess();
+ }
+ else
+ {
+ throw new Exception("Active context is expected to be
present.");
}
+ }
+ }
- if (activeContext.IsPresent())
+ private void CheckSuccess()
+ {
+ lock (_lockObj)
+ {
+ if (CountState(EvaluatorState.Expected,
EvaluatorState.NewRunning, EvaluatorState.RecoveredRunning,
+ EvaluatorState.NewAllocated) == 0 &&
+ _evaluators.Count == NumberOfTasksToSubmitOnRestart +
NumberOfTasksToSubmit)
{
- activeContext.Value.Dispose();
+ var append = CountState(EvaluatorState.UnexpectedFailed) >
0 ? " However, there are evaluators that have unexpectedly failed " +
+ "in this trial. Please re-run or read through the logs
to make sure that such evaluators are expected." : string.Empty;
+ Logger.Log(Level.Info, "SUCCESS!" + append);
}
}
}
+
+ private int CountState(params EvaluatorState[] states)
+ {
+ var set = new HashSet<EvaluatorState>(states);
+ return _evaluators.Count(kv => set.Contains(kv.Value));
+ }
+
+ private enum EvaluatorState
+ {
+ NewAllocated,
+ NewRunning,
+ Expected,
+ RecoveredRunning,
+ NewFinished,
+ RecoveredFinished,
+ UnexpectedFailed,
+ FailedAtRestartInit,
+ Expired
+ }
}
}
\ No newline at end of file