Repository: incubator-reef
Updated Branches:
refs/heads/master ad431b097 -> e374351a9
[REEF-691] Add DriverRestartEvaluatorFailedHandler
This addressed the issue by
* Adding a separate `FailedEvaluatorHandler` for Driver restart, i.e.
`DriverRestartFailedEvaluatorHandlers`.
* Wiring up the new handler through to .NET.
* Additionally small bug fixes that include the right handler check
for `DriverRestartCompleted` in `JobDriver` and enable transition of
Restart State from `EXPECTED` to `EXPIRED`.
JIRA:
[REEF-691](https://issues.apache.org/jira/browse/REEF-691)
Pull Request:
This closes #451
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/e374351a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/e374351a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/e374351a
Branch: refs/heads/master
Commit: e374351a957c437f29fe9b61f3cf9f703e40ee33
Parents: ad431b0
Author: Andrew Chung <[email protected]>
Authored: Mon Aug 31 16:52:10 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 31 18:08:12 2015 -0700
----------------------------------------------------------------------
.../cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp | 18 +++
.../Bridge/ClrSystemHandlerWrapper.cs | 9 ++
.../Bridge/DriverBridge.cs | 15 ++
.../Bridge/DriverBridgeConfiguration.cs | 7 +
.../Bridge/DriverBridgeConfigurationOptions.cs | 5 +
lang/cs/Org.Apache.REEF.Driver/Constants.cs | 10 +-
.../DriverConfiguration.cs | 8 ++
.../bridge/client/YarnJobSubmissionClient.java | 2 +
.../apache/reef/javabridge/NativeInterop.java | 10 +-
.../reef/javabridge/generic/JobDriver.java | 137 ++++++++++++-------
.../reef/client/DriverRestartConfiguration.java | 8 ++
.../DriverRestartFailedEvaluatorHandlers.java | 37 +++++
...iceDriverRestartFailedEvaluatorHandlers.java | 35 +++++
.../driver/restart/EvaluatorRestartState.java | 1 +
.../driver/evaluator/EvaluatorManager.java | 10 +-
.../evaluator/EvaluatorMessageDispatcher.java | 20 ++-
16 files changed, 274 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
index e2f1523..33b6571 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
@@ -510,3 +510,21 @@ JNIEXPORT void JNICALL
Java_org_apache_reef_javabridge_NativeInterop_clrSystemDr
ManagedLog::LOGGER->LogError(errorMessage, ex);
}
}
+
+/*
+* Class: org_apache_reef_javabridge_NativeInterop
+* Method: clrSystemDriverRestartFailedEvaluatorHandlerOnNext
+* Signature:
(JLorg/apache/reef/javabridge/FailedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V
+*/
+JNIEXPORT void JNICALL
Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartFailedEvaluatorHandlerOnNext
+(JNIEnv * env, jclass cls, jlong handler, jobject jfailedEvaluator, jobject
jlogger) {
+
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartFailedEvaluatorHandlerOnNext");
+ FailedEvaluatorClr2Java^ failedEvaluatorBridge = gcnew
FailedEvaluatorClr2Java(env, jfailedEvaluator);
+ try {
+
ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartFailedEvaluator_OnNext(handler,
failedEvaluatorBridge);
+ }
+ catch (System::Exception^ ex) {
+ String^ errorMessage = "Exception in
Call_ClrSystemDriverRestartFailedEvaluator_OnNext";
+ ManagedLog::LOGGER->LogError(errorMessage, ex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
index 03e2746..39a8919 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
@@ -220,6 +220,15 @@ namespace Org.Apache.REEF.Driver.Bridge
}
}
+ public static void
Call_ClrSystemDriverRestartFailedEvaluator_OnNext(ulong handle,
IFailedEvaluatorClr2Java clr2Java)
+ {
+ using
(LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartFailedEvaluator_OnNext"))
+ {
+ GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+ ClrSystemHandler<IFailedEvaluator> obj =
(ClrSystemHandler<IFailedEvaluator>)gc.Target;
+ obj.OnNext(new FailedEvaluator(clr2Java));
+ }
+ }
//Deprecate, remove after both Java and C# code gets checked in
public static ulong[] Call_ClrSystemStartHandler_OnStart(
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
index d7e9c9e..449f4cf 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
@@ -59,6 +59,8 @@ namespace Org.Apache.REEF.Driver.Bridge
private static ClrSystemHandler<IFailedEvaluator>
_failedEvaluatorSubscriber;
+ private static ClrSystemHandler<IFailedEvaluator>
_driverRestartFailedEvaluatorSubscriber;
+
private static ClrSystemHandler<ICompletedEvaluator>
_completedEvaluatorSubscriber;
private static ClrSystemHandler<IHttpMessage>
_httpServerEventSubscriber;
@@ -99,6 +101,8 @@ namespace Org.Apache.REEF.Driver.Bridge
private readonly ISet<IObserver<IFailedEvaluator>>
_failedEvaluatorHandlers;
+ private readonly ISet<IObserver<IFailedEvaluator>>
_driverRestartFailedEvaluatorHandlers;
+
private readonly ISet<IObserver<ICompletedEvaluator>>
_completedEvaluatorHandlers;
private readonly ISet<IObserver<IClosedContext>>
_closedContextHandlers;
@@ -133,6 +137,7 @@ namespace Org.Apache.REEF.Driver.Bridge
[Parameter(Value =
typeof(DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers))]
ISet<IObserver<IActiveContext>> driverRestartActiveContextHandlers,
[Parameter(Value =
typeof(DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers))]
ISet<IObserver<IRunningTask>> driverRestartRunningTaskHandlers,
[Parameter(Value =
typeof(DriverBridgeConfigurationOptions.DriverRestartCompletedHandlers))]
ISet<IObserver<IDriverRestartCompleted>> driverRestartCompletedHandlers,
+ [Parameter(Value =
typeof(DriverBridgeConfigurationOptions.DriverRestartFailedEvaluatorHandlers))]
ISet<IObserver<IFailedEvaluator>> driverRestartFailedEvaluatorHandlers,
[Parameter(Value =
typeof(DriverBridgeConfigurationOptions.TraceListenersSet))]
ISet<TraceListener> traceListeners,
[Parameter(Value = typeof(EvaluatorConfigurationProviders))]
ISet<IConfigurationProvider> configurationProviders,
[Parameter(Value =
typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel,
@@ -173,6 +178,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_driverRestartActiveContextHandlers =
driverRestartActiveContextHandlers;
_driverRestartRunningTaskHandlers =
driverRestartRunningTaskHandlers;
_driverRestartCompletedHandlers = driverRestartCompletedHandlers;
+ _driverRestartFailedEvaluatorHandlers =
driverRestartFailedEvaluatorHandlers;
_httpServerHandler = httpServerHandler;
_configurationProviders = configurationProviders;
@@ -193,6 +199,7 @@ namespace Org.Apache.REEF.Driver.Bridge
_driverRestartActiveContextSubscriber = new
ClrSystemHandler<IActiveContext>();
_driverRestartRunningTaskSubscriber = new
ClrSystemHandler<IRunningTask>();
_driverRestartCompletedSubscriber = new
ClrSystemHandler<IDriverRestartCompleted>();
+ _driverRestartFailedEvaluatorSubscriber = new
ClrSystemHandler<IFailedEvaluator>();
}
public ulong[] Subscribe()
@@ -319,6 +326,14 @@ namespace Org.Apache.REEF.Driver.Bridge
}
handlers[Constants.Handlers[Constants.DriverRestartCompletedHandler]] =
ClrHandlerHelper.CreateHandler(_driverRestartCompletedSubscriber);
+ // subscribe to Failed Evaluator received during driver restart
+ foreach (var handler in _driverRestartFailedEvaluatorHandlers)
+ {
+ _driverRestartFailedEvaluatorSubscriber.Subscribe(handler);
+ _logger.Log(Level.Verbose, "subscribed to handler for
IFailedEvaluator received during driver restart: " + handler);
+ }
+
handlers[Constants.Handlers[Constants.DriverRestartFailedEvaluatorHandler]] =
ClrHandlerHelper.CreateHandler(_driverRestartFailedEvaluatorSubscriber);
+
// subscribe to Http message
_httpServerEventSubscriber.Subscribe(_httpServerHandler);
_logger.Log(Level.Verbose, "subscribed to IHttpMessage handler :"
+ _httpServerHandler);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
index 8ba2ff0..4b0d6d1 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
@@ -180,6 +180,12 @@ namespace Org.Apache.REEF.Driver.Bridge
[SuppressMessage("Microsoft.Security", "CA2104:Do not declare read
only mutable reference types", Justification = "not applicable")]
public static readonly
OptionalImpl<IObserver<IDriverRestartCompleted>> OnDriverRestartCompleted = new
OptionalImpl<IObserver<IDriverRestartCompleted>>();
+ ///// <summary>
+ ///// Event handler for driver restart failed evaluator event received
during driver restart. Defaults to job failure if not bound.
+ ///// </summary>
+ [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read
only mutable reference types", Justification = "not applicable")]
+ public static readonly OptionalImpl<IObserver<IFailedEvaluator>>
OnDriverRestartEvaluatorFailed = new
OptionalImpl<IObserver<IFailedEvaluator>>();
+
/// <summary>
/// Evaluator recovery timeout in seconds for driver restart. If value
is greater than 0, restart is enabled. The default value is -1.
/// </summary>
@@ -238,6 +244,7 @@ namespace Org.Apache.REEF.Driver.Bridge
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers>.Class,
OnDriverRestartContextActive)
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class,
OnDriverRestartTaskRunning)
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartCompletedHandlers>.Class,
OnDriverRestartCompleted)
+
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartFailedEvaluatorHandlers>.Class,
OnDriverRestartEvaluatorFailed)
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
CustomTraceLevel)
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds>.Class,
DriverRestartEvaluatorRecoverySeconds)
.Build();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
index 013522d..dc96538 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
@@ -57,6 +57,11 @@ namespace Org.Apache.REEF.Driver.Bridge
{
}
+ [NamedParameter(documentation: "Called when an evaluator has failed in
the Driver Restart process.", defaultClasses: new[] {
typeof(DefaultEvaluatorFailureHandler) })]
+ public class DriverRestartFailedEvaluatorHandlers :
Name<ISet<IObserver<IFailedEvaluator>>>
+ {
+ }
+
[NamedParameter(documentation: "Called when evaluator is requested.")]
public class EvaluatorRequestHandlers :
Name<ISet<IObserver<IEvaluatorRequestor>>>
{
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/Constants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Constants.cs
b/lang/cs/Org.Apache.REEF.Driver/Constants.cs
index c833827..cb23f78 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Constants.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Constants.cs
@@ -47,7 +47,7 @@ namespace Org.Apache.REEF.Driver
/// <summary>
/// The number of handlers total. Tightly coupled with Java.
/// </summary>
- public const int HandlersNumber = 17;
+ public const int HandlersNumber = 18;
/// <summary>
/// The name for EvaluatorRequestorHandler. Tightly coupled with Java.
@@ -134,6 +134,11 @@ namespace Org.Apache.REEF.Driver
/// </summary>
public const string DriverRestartCompletedHandler =
"DriverRestartCompleted";
+ /// <summary>
+ /// The name for DriverRestartFailedEvaluatorHandler. Tightly coupled
with Java
+ /// </summary>
+ public const string DriverRestartFailedEvaluatorHandler =
"DriverRestartFailedEvaluator";
+
[Obsolete(message:"Use REEFFileNames instead.")]
public const string DriverBridgeConfiguration =
Common.Constants.ClrBridgeRuntimeConfiguration;
@@ -199,7 +204,8 @@ namespace Org.Apache.REEF.Driver
{ ContextMessageHandler, 13 },
{ DriverRestartActiveContextHandler, 14 },
{ DriverRestartRunningTaskHandler, 15 },
- { DriverRestartCompletedHandler, 16 }
+ { DriverRestartCompletedHandler, 16 },
+ { DriverRestartFailedEvaluatorHandler, 17 }
};
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
index 7e6c50c..9047c04 100644
--- a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
@@ -146,6 +146,12 @@ namespace Org.Apache.REEF.Driver
public static readonly
OptionalImpl<IObserver<IDriverRestartCompleted>> OnDriverRestartCompleted =
new OptionalImpl<IObserver<IDriverRestartCompleted>>();
+ ///// <summary>
+ ///// Event handler for driver restart failed evaluator event received
during driver restart. Defaults to job failure if not bound.
+ ///// </summary>
+ public static readonly OptionalImpl<IObserver<IFailedEvaluator>>
OnDriverRestartEvaluatorFailed =
+ new OptionalImpl<IObserver<IFailedEvaluator>>();
+
/// <summary>
/// Additional set of string arguments that can be pssed to handlers
through client
/// </summary>
@@ -220,6 +226,8 @@ namespace Org.Apache.REEF.Driver
OnDriverRestartContextActive)
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class,
OnDriverRestartTaskRunning)
+
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartFailedEvaluatorHandlers>.Class,
+ OnDriverRestartEvaluatorFailed)
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
CustomTraceLevel)
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.MaxApplicationSubmissions>.Class,
MaxApplicationSubmissions)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 1f6b16d..a2cfad0 100644
---
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -126,6 +126,8 @@ public final class YarnJobSubmissionClient {
driverRestartEvaluatorRecoverySeconds)
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
JobDriver.DriverRestartCompletedHandler.class)
+
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
+ JobDriver.DriverRestartFailedEvaluatorHandler.class)
.build();
driverConfiguration = Configurations.merge(
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
index c564157..babdb10 100644
---
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
+++
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
@@ -39,6 +39,7 @@ public final class NativeInterop {
public static final String DRIVER_RESTART_ACTIVE_CONTEXT_KEY =
"DriverRestartActiveContext";
public static final String DRIVER_RESTART_RUNNING_TASK_KEY =
"DriverRestartRunningTask";
public static final String DRIVER_RESTART_COMPLETED_KEY =
"DriverRestartCompleted";
+ public static final String DRIVER_RESTART_FAILED_EVALUATOR_KEY =
"DriverRestartFailedEvaluator";
public static final HashMap<String, Integer> HANDLERS = new HashMap<String,
Integer>() {
{
put(ALLOCATED_EVALUATOR_KEY, 1);
@@ -57,10 +58,11 @@ public final class NativeInterop {
put(DRIVER_RESTART_ACTIVE_CONTEXT_KEY, 14);
put(DRIVER_RESTART_RUNNING_TASK_KEY, 15);
put(DRIVER_RESTART_COMPLETED_KEY, 16);
+ put(DRIVER_RESTART_FAILED_EVALUATOR_KEY, 17);
}
};
- public static final int N_HANDLERS = 17;
+ public static final int N_HANDLERS = 18;
public static native void loadClrAssembly(final String filePath);
@@ -165,6 +167,12 @@ public final class NativeInterop {
final long handle
);
+ public static native void clrSystemDriverRestartFailedEvaluatorHandlerOnNext(
+ final long handle,
+ final FailedEvaluatorBridge failedEvaluatorBridge,
+ final InteropLogger interopLogger
+ );
+
/**
* Empty private constructor to prohibit instantiation of utility class.
*/
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index aec8aa0..1b37474 100644
---
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -133,10 +133,10 @@ public final class JobDriver {
private long closedContextHandler = 0;
private long failedContextHandler = 0;
private long contextMessageHandler = 0;
- private long driverRestartHandler = 0;
private long driverRestartActiveContextHandler = 0;
private long driverRestartRunningTaskHandler = 0;
private long driverRestartCompletedHandler = 0;
+ private long driverRestartFailedEvaluatorHandler = 0;
private boolean clrBridgeSetup = false;
private boolean isRestarted = false;
@@ -226,6 +226,8 @@ public final class JobDriver {
handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_RUNNING_TASK_KEY)];
this.driverRestartCompletedHandler =
handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_COMPLETED_KEY)];
+ this.driverRestartFailedEvaluatorHandler =
+
handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_FAILED_EVALUATOR_KEY)];
}
try (final LoggingScope lp =
@@ -273,6 +275,78 @@ public final class JobDriver {
}
}
+ private void handleFailedEvaluator(final FailedEvaluator eval, final boolean
isRestartFailed) {
+ try (final LoggingScope ls =
loggingScopeFactory.evaluatorFailed(eval.getId())) {
+ synchronized (JobDriver.this) {
+ LOG.log(Level.SEVERE, "FailedEvaluator", eval);
+ for (final FailedContext failedContext : eval.getFailedContextList()) {
+ final String failedContextId = failedContext.getId();
+ LOG.log(Level.INFO, "removing context " + failedContextId + " from
job driver contexts.");
+ JobDriver.this.contexts.remove(failedContextId);
+ }
+ String message = "Evaluator " + eval.getId() + " failed with message: "
+ + eval.getEvaluatorException().getMessage();
+
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+
+ if (isRestartFailed) {
+
evaluatorFailedHandlerWaitForCLRBridgeSetup(driverRestartFailedEvaluatorHandler,
eval, isRestartFailed);
+ } else {
+ evaluatorFailedHandlerWaitForCLRBridgeSetup(failedEvaluatorHandler,
eval, isRestartFailed);
+ }
+ }
+ }
+ }
+
+ private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle,
+ final
FailedEvaluator eval,
+ final boolean
isRestartFailed) {
+ if (handle == 0) {
+ if (JobDriver.this.clrBridgeSetup) {
+ final String message = "No CLR FailedEvaluator handler was set,
exiting now";
+ LOG.log(Level.WARNING, message);
+
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+ return;
+ } else {
+ clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm time) {
+ if (JobDriver.this.clrBridgeSetup) {
+ handleFailedEvaluatorInCLR(eval, isRestartFailed);
+ } else {
+ LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
+ clock.scheduleAlarm(5000, this);
+ }
+ }
+ });
+ }
+ } else{
+ handleFailedEvaluatorInCLR(eval, isRestartFailed);
+ }
+ }
+
+ private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final
boolean isRestartFailed) {
+ final String message = "CLR FailedEvaluator handler set, handling things
with CLR handler.";
+ LOG.log(Level.INFO, message);
+ final FailedEvaluatorBridge failedEvaluatorBridge =
+ new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor,
+ JobDriver.this.isRestarted, loggingScopeFactory);
+ if (isRestartFailed) {
+ NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext(
+ JobDriver.this.driverRestartFailedEvaluatorHandler,
failedEvaluatorBridge, JobDriver.this.interopLogger);
+ } else {
+
NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler,
failedEvaluatorBridge,
+ JobDriver.this.interopLogger);
+ }
+
+ final int additionalRequestedEvaluatorNumber =
failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
+ if (additionalRequestedEvaluatorNumber > 0) {
+ LOG.log(Level.INFO, "number of additional evaluators requested after
evaluator failure: " +
+ additionalRequestedEvaluatorNumber);
+ }
+
+ JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+ }
+
/**
* Submit a Task to a single Evaluator.
*/
@@ -359,57 +433,17 @@ public final class JobDriver {
public final class FailedEvaluatorHandler implements
EventHandler<FailedEvaluator> {
@Override
public void onNext(final FailedEvaluator eval) {
- try (final LoggingScope ls =
loggingScopeFactory.evaluatorFailed(eval.getId())) {
- synchronized (JobDriver.this) {
- LOG.log(Level.SEVERE, "FailedEvaluator", eval);
- for (final FailedContext failedContext :
eval.getFailedContextList()) {
- final String failedContextId = failedContext.getId();
- LOG.log(Level.INFO, "removing context " + failedContextId + " from
job driver contexts.");
- JobDriver.this.contexts.remove(failedContextId);
- }
- String message = "Evaluator " + eval.getId() + " failed with
message: "
- + eval.getEvaluatorException().getMessage();
-
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
-
- if (failedEvaluatorHandler == 0) {
- if (JobDriver.this.clrBridgeSetup) {
- message = "No CLR FailedEvaluator handler was set, exiting now";
- LOG.log(Level.WARNING, message);
-
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
- return;
- } else {
- clock.scheduleAlarm(0, new EventHandler<Alarm>() {
- @Override
- public void onNext(final Alarm time) {
- if (JobDriver.this.clrBridgeSetup) {
- handleFailedEvaluatorInCLR(eval);
- } else {
- LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
- clock.scheduleAlarm(5000, this);
- }
- }
- });
- }
- } else {
- handleFailedEvaluatorInCLR(eval);
- }
- }
- }
+ JobDriver.this.handleFailedEvaluator(eval, false);
}
+ }
- private void handleFailedEvaluatorInCLR(final FailedEvaluator eval) {
- final String message = "CLR FailedEvaluator handler set, handling things
with CLR handler.";
- LOG.log(Level.INFO, message);
- FailedEvaluatorBridge failedEvaluatorBridge = new
FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor,
- JobDriver.this.isRestarted, loggingScopeFactory);
-
NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler,
failedEvaluatorBridge,
- JobDriver.this.interopLogger);
- final int additionalRequestedEvaluatorNumber =
failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
- if (additionalRequestedEvaluatorNumber > 0) {
- LOG.log(Level.INFO, "number of additional evaluators requested after
evaluator failure: " +
- additionalRequestedEvaluatorNumber);
- }
-
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+ /**
+ * Receive notification that the entire Evaluator had failed on Driver
Restart.
+ */
+ public final class DriverRestartFailedEvaluatorHandler implements
EventHandler<FailedEvaluator> {
+ @Override
+ public void onNext(final FailedEvaluator eval) {
+ JobDriver.this.handleFailedEvaluator(eval, true);
}
}
@@ -613,14 +647,13 @@ public final class JobDriver {
driverRestartCompleted.getCompletedTime());
try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(
driverRestartCompleted.getCompletedTime().getTimeStamp())) {
- if (JobDriver.this.driverRestartHandler != 0) {
+ if (JobDriver.this.driverRestartCompletedHandler != 0) {
LOG.log(Level.INFO, "CLR driver restart handler implemented, now
handle it in CLR.");
// TODO[REEF-690]: Pass in DriverRestartCompleted object to .NET.
NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext(JobDriver.this.driverRestartCompletedHandler);
} else {
LOG.log(Level.WARNING, "No CLR driver restart handler implemented,
done with DriverRestartCompletedHandler.");
-
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
index cda65ab..04cd56c 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Public;
import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.parameters.*;
import org.apache.reef.driver.restart.DriverRestarted;
import org.apache.reef.driver.task.RunningTask;
@@ -64,6 +65,12 @@ public final class DriverRestartConfiguration extends
ConfigurationModuleBuilder
new OptionalImpl<>();
/**
+ * Event handler for the event of driver restart completion, default to
logging if not bound.
+ */
+ public static final OptionalImpl<EventHandler<FailedEvaluator>>
ON_DRIVER_RESTART_EVALUATOR_FAILED =
+ new OptionalImpl<>();
+
+ /**
* The amount of time in seconds the driver waits for evaluators to report
back on restart.
* Defaults to 3 minutes. If the value is set to Integer.MAX_VALUE, the
driver will wait forever.
*/
@@ -83,6 +90,7 @@ public final class DriverRestartConfiguration extends
ConfigurationModuleBuilder
.bindSetEntry(DriverRestartTaskRunningHandlers.class,
ON_DRIVER_RESTART_TASK_RUNNING)
.bindSetEntry(DriverRestartContextActiveHandlers.class,
ON_DRIVER_RESTART_CONTEXT_ACTIVE)
.bindSetEntry(DriverRestartCompletedHandlers.class,
ON_DRIVER_RESTART_COMPLETED)
+ .bindSetEntry(DriverRestartFailedEvaluatorHandlers.class,
ON_DRIVER_RESTART_EVALUATOR_FAILED)
.build();
private DriverRestartConfiguration(){
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartFailedEvaluatorHandlers.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartFailedEvaluatorHandlers.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartFailedEvaluatorHandlers.java
new file mode 100644
index 0000000..b5af599
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartFailedEvaluatorHandlers.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import
org.apache.reef.runtime.common.driver.defaults.DefaultEvaluatorFailureHandler;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.Set;
+
+/**
+ * The {@link org.apache.reef.tang.annotations.NamedParameter} for Evaluators
that have failed on Driver Restart.
+ */
+@NamedParameter(doc = "Handler for event of evaluator failed on driver
restart.",
+ default_classes = DefaultEvaluatorFailureHandler.class)
+public final class DriverRestartFailedEvaluatorHandlers implements
Name<Set<EventHandler<FailedEvaluator>>> {
+ private DriverRestartFailedEvaluatorHandlers(){
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartFailedEvaluatorHandlers.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartFailedEvaluatorHandlers.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartFailedEvaluatorHandlers.java
new file mode 100644
index 0000000..ac861e1
--- /dev/null
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartFailedEvaluatorHandlers.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.Set;
+
+/**
+ * Service handler for driver restart evaluator failed event.
+ */
+@NamedParameter(doc = "Handler for failed evaluator on Driver Restart.")
+public final class ServiceDriverRestartFailedEvaluatorHandlers implements
Name<Set<EventHandler<FailedEvaluator>>> {
+ private ServiceDriverRestartFailedEvaluatorHandlers() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
index cf14b20..07791df 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
@@ -71,6 +71,7 @@ public enum EvaluatorRestartState {
switch(from) {
case EXPECTED:
switch(to) {
+ case EXPIRED:
case REPORTED:
return true;
default:
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index bc8ebbf..3ceff3e 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.common.driver.evaluator;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
import org.apache.reef.driver.restart.DriverRestartManager;
import org.apache.reef.driver.restart.EvaluatorRestartState;
@@ -298,9 +299,14 @@ public final class EvaluatorManager implements
Identifiable, AutoCloseable {
failedTaskOptional = Optional.empty();
}
+ final FailedEvaluator failedEvaluator = new
FailedEvaluatorImpl(exception, failedContextList,
+ failedTaskOptional, this.evaluatorId);
- this.messageDispatcher.onEvaluatorFailed(new
FailedEvaluatorImpl(exception, failedContextList,
- failedTaskOptional, this.evaluatorId));
+ if
(driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired())
{
+
this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator);
+ } else {
+ this.messageDispatcher.onEvaluatorFailed(failedEvaluator);
+ }
} catch (final Exception e) {
LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index 16836f4..e46b06c 100644
---
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -118,6 +118,8 @@ public final class EvaluatorMessageDispatcher {
final Set<EventHandler<ActiveContext>>
driverRestartActiveContextHandlers,
@Parameter(DriverRestartCompletedHandlers.class)
final Set<EventHandler<DriverRestartCompleted>>
driverRestartCompletedHandlers,
+ @Parameter(DriverRestartFailedEvaluatorHandlers.class)
+ final Set<EventHandler<FailedEvaluator>>
driverRestartEvaluatorFailedHandlers,
// Service-provided event handlers specific to a Driver restart
@Parameter(ServiceDriverRestartTaskRunningHandlers.class)
@@ -126,6 +128,8 @@ public final class EvaluatorMessageDispatcher {
final Set<EventHandler<ActiveContext>>
serviceDriverRestartActiveContextHandlers,
@Parameter(ServiceDriverRestartCompletedHandlers.class)
final Set<EventHandler<DriverRestartCompleted>>
serviceDriverRestartCompletedHandlers,
+ @Parameter(ServiceDriverRestartFailedEvaluatorHandlers.class)
+ final Set<EventHandler<FailedEvaluator>>
serviceDriverRestartFailedEvaluatorHandlers,
@Parameter(EvaluatorDispatcherThreads.class) final int numberOfThreads,
@Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String
evaluatorIdentifier,
@@ -177,10 +181,20 @@ public final class EvaluatorMessageDispatcher {
this.driverRestartApplicationDispatcher.register(ActiveContext.class,
driverRestartActiveContextHandlers);
this.driverRestartApplicationDispatcher.register(DriverRestartCompleted.class,
driverRestartCompletedHandlers);
+ final Set<EventHandler<FailedEvaluator>>
driverRestartEvaluatorFailedCallbackHandlers = new HashSet<>();
+ for (final EventHandler<FailedEvaluator> evaluatorFailedHandler :
driverRestartEvaluatorFailedHandlers) {
+ driverRestartEvaluatorFailedCallbackHandlers.add(
+
idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler));
+ }
+
+ this.driverRestartApplicationDispatcher.register(
+ FailedEvaluator.class, driverRestartEvaluatorFailedCallbackHandlers);
+
// Service event handlers specific to a Driver restart
this.driverRestartServiceDispatcher.register(RunningTask.class,
serviceDriverRestartTaskRunningHandlers);
this.driverRestartServiceDispatcher.register(ActiveContext.class,
serviceDriverRestartActiveContextHandlers);
this.driverRestartServiceDispatcher.register(DriverRestartCompleted.class,
serviceDriverRestartCompletedHandlers);
+ this.driverRestartServiceDispatcher.register(FailedEvaluator.class,
serviceDriverRestartFailedEvaluatorHandlers);
final Set<EventHandler<CompletedEvaluator>>
evaluatorCompletedCallbackHandlers = new HashSet<>();
for (final EventHandler<CompletedEvaluator> evaluatorCompletedHandler :
evaluatorCompletedHandlers) {
@@ -255,6 +269,10 @@ public final class EvaluatorMessageDispatcher {
this.dispatchForRestartedDriver(ActiveContext.class, activeContext);
}
+ public void onDriverRestartEvaluatorFailed(final FailedEvaluator
failedEvaluator) {
+ this.dispatchForRestartedDriver(FailedEvaluator.class, failedEvaluator);
+ }
+
public void onDriverRestartCompleted(final DriverRestartCompleted
restartCompleted) {
this.dispatchForRestartedDriver(DriverRestartCompleted.class,
restartCompleted);
}
@@ -269,7 +287,7 @@ public final class EvaluatorMessageDispatcher {
}
private <T, U extends T> void dispatchForRestartedDriver(final Class<T>
type, final U message) {
- this.driverRestartApplicationDispatcher.onNext(type, message);
this.driverRestartServiceDispatcher.onNext(type, message);
+ this.driverRestartApplicationDispatcher.onNext(type, message);
}
}