Repository: incubator-reef
Updated Branches:
  refs/heads/master ad431b097 -> e374351a9


[REEF-691] Add DriverRestartEvaluatorFailedHandler

This addressed the issue by
  * Adding a separate `FailedEvaluatorHandler` for Driver restart, i.e.
    `DriverRestartFailedEvaluatorHandlers`.
  * Wiring up the new handler through to .NET.
  * Additionally small bug fixes that include the right handler check
    for `DriverRestartCompleted` in `JobDriver` and enable transition of
    Restart State from `EXPECTED` to `EXPIRED`.

JIRA:
  [REEF-691](https://issues.apache.org/jira/browse/REEF-691)

Pull Request:
  This closes #451


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/e374351a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/e374351a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/e374351a

Branch: refs/heads/master
Commit: e374351a957c437f29fe9b61f3cf9f703e40ee33
Parents: ad431b0
Author: Andrew Chung <[email protected]>
Authored: Mon Aug 31 16:52:10 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 31 18:08:12 2015 -0700

----------------------------------------------------------------------
 .../cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp |  18 +++
 .../Bridge/ClrSystemHandlerWrapper.cs           |   9 ++
 .../Bridge/DriverBridge.cs                      |  15 ++
 .../Bridge/DriverBridgeConfiguration.cs         |   7 +
 .../Bridge/DriverBridgeConfigurationOptions.cs  |   5 +
 lang/cs/Org.Apache.REEF.Driver/Constants.cs     |  10 +-
 .../DriverConfiguration.cs                      |   8 ++
 .../bridge/client/YarnJobSubmissionClient.java  |   2 +
 .../apache/reef/javabridge/NativeInterop.java   |  10 +-
 .../reef/javabridge/generic/JobDriver.java      | 137 ++++++++++++-------
 .../reef/client/DriverRestartConfiguration.java |   8 ++
 .../DriverRestartFailedEvaluatorHandlers.java   |  37 +++++
 ...iceDriverRestartFailedEvaluatorHandlers.java |  35 +++++
 .../driver/restart/EvaluatorRestartState.java   |   1 +
 .../driver/evaluator/EvaluatorManager.java      |  10 +-
 .../evaluator/EvaluatorMessageDispatcher.java   |  20 ++-
 16 files changed, 274 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp 
b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
index e2f1523..33b6571 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
@@ -510,3 +510,21 @@ JNIEXPORT void JNICALL 
Java_org_apache_reef_javabridge_NativeInterop_clrSystemDr
                ManagedLog::LOGGER->LogError(errorMessage, ex);
        }
 }
+
+/*
+* Class:     org_apache_reef_javabridge_NativeInterop
+* Method:    clrSystemDriverRestartFailedEvaluatorHandlerOnNext
+* Signature: 
(JLorg/apache/reef/javabridge/FailedEvaluatorBridge;Lorg/apache/reef/javabridge/InteropLogger;)V
+*/
+JNIEXPORT void JNICALL 
Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartFailedEvaluatorHandlerOnNext
+(JNIEnv * env, jclass cls, jlong handler, jobject jfailedEvaluator, jobject 
jlogger) {
+       
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartFailedEvaluatorHandlerOnNext");
+       FailedEvaluatorClr2Java^ failedEvaluatorBridge = gcnew 
FailedEvaluatorClr2Java(env, jfailedEvaluator);
+       try {
+               
ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartFailedEvaluator_OnNext(handler,
 failedEvaluatorBridge);
+       }
+       catch (System::Exception^ ex) {
+               String^ errorMessage = "Exception in 
Call_ClrSystemDriverRestartFailedEvaluator_OnNext";
+               ManagedLog::LOGGER->LogError(errorMessage, ex);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
index 03e2746..39a8919 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrSystemHandlerWrapper.cs
@@ -220,6 +220,15 @@ namespace Org.Apache.REEF.Driver.Bridge
             }
         }
 
+        public static void 
Call_ClrSystemDriverRestartFailedEvaluator_OnNext(ulong handle, 
IFailedEvaluatorClr2Java clr2Java)
+        {
+            using 
(LOGGER.LogFunction("ClrSystemHandlerWrapper::Call_ClrSystemDriverRestartFailedEvaluator_OnNext"))
+            {
+                GCHandle gc = GCHandle.FromIntPtr((IntPtr)handle);
+                ClrSystemHandler<IFailedEvaluator> obj = 
(ClrSystemHandler<IFailedEvaluator>)gc.Target;
+                obj.OnNext(new FailedEvaluator(clr2Java));
+            }
+        }
 
         //Deprecate, remove after both Java and C# code gets checked in
         public static ulong[] Call_ClrSystemStartHandler_OnStart(

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
index d7e9c9e..449f4cf 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridge.cs
@@ -59,6 +59,8 @@ namespace Org.Apache.REEF.Driver.Bridge
 
         private static ClrSystemHandler<IFailedEvaluator> 
_failedEvaluatorSubscriber;
 
+        private static ClrSystemHandler<IFailedEvaluator> 
_driverRestartFailedEvaluatorSubscriber;
+
         private static ClrSystemHandler<ICompletedEvaluator> 
_completedEvaluatorSubscriber;
 
         private static ClrSystemHandler<IHttpMessage> 
_httpServerEventSubscriber;
@@ -99,6 +101,8 @@ namespace Org.Apache.REEF.Driver.Bridge
 
         private readonly ISet<IObserver<IFailedEvaluator>> 
_failedEvaluatorHandlers;
 
+        private readonly ISet<IObserver<IFailedEvaluator>> 
_driverRestartFailedEvaluatorHandlers;
+
         private readonly ISet<IObserver<ICompletedEvaluator>> 
_completedEvaluatorHandlers;
 
         private readonly ISet<IObserver<IClosedContext>> 
_closedContextHandlers;
@@ -133,6 +137,7 @@ namespace Org.Apache.REEF.Driver.Bridge
             [Parameter(Value = 
typeof(DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers))] 
ISet<IObserver<IActiveContext>> driverRestartActiveContextHandlers,
             [Parameter(Value = 
typeof(DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers))] 
ISet<IObserver<IRunningTask>> driverRestartRunningTaskHandlers,
             [Parameter(Value = 
typeof(DriverBridgeConfigurationOptions.DriverRestartCompletedHandlers))] 
ISet<IObserver<IDriverRestartCompleted>> driverRestartCompletedHandlers,
+            [Parameter(Value = 
typeof(DriverBridgeConfigurationOptions.DriverRestartFailedEvaluatorHandlers))] 
ISet<IObserver<IFailedEvaluator>> driverRestartFailedEvaluatorHandlers,
             [Parameter(Value = 
typeof(DriverBridgeConfigurationOptions.TraceListenersSet))] 
ISet<TraceListener> traceListeners,
             [Parameter(Value = typeof(EvaluatorConfigurationProviders))] 
ISet<IConfigurationProvider> configurationProviders,
             [Parameter(Value = 
typeof(DriverBridgeConfigurationOptions.TraceLevel))] string traceLevel,
@@ -173,6 +178,7 @@ namespace Org.Apache.REEF.Driver.Bridge
             _driverRestartActiveContextHandlers = 
driverRestartActiveContextHandlers;
             _driverRestartRunningTaskHandlers = 
driverRestartRunningTaskHandlers;
             _driverRestartCompletedHandlers = driverRestartCompletedHandlers;
+            _driverRestartFailedEvaluatorHandlers = 
driverRestartFailedEvaluatorHandlers;
             _httpServerHandler = httpServerHandler;
             _configurationProviders = configurationProviders;
             
@@ -193,6 +199,7 @@ namespace Org.Apache.REEF.Driver.Bridge
             _driverRestartActiveContextSubscriber = new 
ClrSystemHandler<IActiveContext>();
             _driverRestartRunningTaskSubscriber = new 
ClrSystemHandler<IRunningTask>();
             _driverRestartCompletedSubscriber = new 
ClrSystemHandler<IDriverRestartCompleted>();
+            _driverRestartFailedEvaluatorSubscriber = new 
ClrSystemHandler<IFailedEvaluator>();
         }
 
         public ulong[] Subscribe()
@@ -319,6 +326,14 @@ namespace Org.Apache.REEF.Driver.Bridge
             }
             
handlers[Constants.Handlers[Constants.DriverRestartCompletedHandler]] = 
ClrHandlerHelper.CreateHandler(_driverRestartCompletedSubscriber);
 
+            // subscribe to Failed Evaluator received during driver restart
+            foreach (var handler in _driverRestartFailedEvaluatorHandlers)
+            {
+                _driverRestartFailedEvaluatorSubscriber.Subscribe(handler);
+                _logger.Log(Level.Verbose, "subscribed to handler for 
IFailedEvaluator received during driver restart: " + handler);
+            }
+            
handlers[Constants.Handlers[Constants.DriverRestartFailedEvaluatorHandler]] = 
ClrHandlerHelper.CreateHandler(_driverRestartFailedEvaluatorSubscriber);
+
             // subscribe to Http message
             _httpServerEventSubscriber.Subscribe(_httpServerHandler);
             _logger.Log(Level.Verbose, "subscribed to IHttpMessage handler  :" 
+ _httpServerHandler);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
index 8ba2ff0..4b0d6d1 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfiguration.cs
@@ -180,6 +180,12 @@ namespace Org.Apache.REEF.Driver.Bridge
         [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read 
only mutable reference types", Justification = "not applicable")]
         public static readonly 
OptionalImpl<IObserver<IDriverRestartCompleted>> OnDriverRestartCompleted = new 
OptionalImpl<IObserver<IDriverRestartCompleted>>();
 
+        ///// <summary>
+        ///// Event handler for driver restart failed evaluator event received 
during driver restart. Defaults to job failure if not bound.
+        ///// </summary>
+        [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read 
only mutable reference types", Justification = "not applicable")]
+        public static readonly OptionalImpl<IObserver<IFailedEvaluator>> 
OnDriverRestartEvaluatorFailed = new 
OptionalImpl<IObserver<IFailedEvaluator>>();
+
         /// <summary>
         /// Evaluator recovery timeout in seconds for driver restart. If value 
is greater than 0, restart is enabled. The default value is -1.
         /// </summary>
@@ -238,6 +244,7 @@ namespace Org.Apache.REEF.Driver.Bridge
                 
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartActiveContextHandlers>.Class,
 OnDriverRestartContextActive)
                 
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class,
 OnDriverRestartTaskRunning)
                 
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartCompletedHandlers>.Class,
 OnDriverRestartCompleted)
+                
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartFailedEvaluatorHandlers>.Class,
 OnDriverRestartEvaluatorFailed)
                 
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
 CustomTraceLevel)
                 
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.DriverRestartEvaluatorRecoverySeconds>.Class,
 DriverRestartEvaluatorRecoverySeconds)
                 .Build();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
index 013522d..dc96538 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/DriverBridgeConfigurationOptions.cs
@@ -57,6 +57,11 @@ namespace Org.Apache.REEF.Driver.Bridge
         {
         }
 
+        [NamedParameter(documentation: "Called when an evaluator has failed in 
the Driver Restart process.", defaultClasses: new[] { 
typeof(DefaultEvaluatorFailureHandler) })]
+        public class DriverRestartFailedEvaluatorHandlers : 
Name<ISet<IObserver<IFailedEvaluator>>>
+        {
+        }
+
         [NamedParameter(documentation: "Called when evaluator is requested.")] 
         public class EvaluatorRequestHandlers : 
Name<ISet<IObserver<IEvaluatorRequestor>>>
         {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/Constants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Constants.cs 
b/lang/cs/Org.Apache.REEF.Driver/Constants.cs
index c833827..cb23f78 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Constants.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Constants.cs
@@ -47,7 +47,7 @@ namespace Org.Apache.REEF.Driver
         /// <summary>
         /// The number of handlers total. Tightly coupled with Java.
         /// </summary>
-        public const int HandlersNumber = 17;
+        public const int HandlersNumber = 18;
 
         /// <summary>
         /// The name for EvaluatorRequestorHandler. Tightly coupled with Java.
@@ -134,6 +134,11 @@ namespace Org.Apache.REEF.Driver
         /// </summary>
         public const string DriverRestartCompletedHandler = 
"DriverRestartCompleted";
 
+        /// <summary>
+        /// The name for DriverRestartFailedEvaluatorHandler. Tightly coupled 
with Java
+        /// </summary>
+        public const string DriverRestartFailedEvaluatorHandler = 
"DriverRestartFailedEvaluator";
+
         [Obsolete(message:"Use REEFFileNames instead.")]
         public const string DriverBridgeConfiguration = 
Common.Constants.ClrBridgeRuntimeConfiguration;
 
@@ -199,7 +204,8 @@ namespace Org.Apache.REEF.Driver
                         { ContextMessageHandler, 13 },
                         { DriverRestartActiveContextHandler, 14 },
                         { DriverRestartRunningTaskHandler, 15 },
-                        { DriverRestartCompletedHandler, 16 }
+                        { DriverRestartCompletedHandler, 16 },
+                        { DriverRestartFailedEvaluatorHandler, 17 }
                     };
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs 
b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
index 7e6c50c..9047c04 100644
--- a/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/DriverConfiguration.cs
@@ -146,6 +146,12 @@ namespace Org.Apache.REEF.Driver
         public static readonly 
OptionalImpl<IObserver<IDriverRestartCompleted>> OnDriverRestartCompleted =
             new OptionalImpl<IObserver<IDriverRestartCompleted>>();
 
+        ///// <summary>
+        ///// Event handler for driver restart failed evaluator event received 
during driver restart. Defaults to job failure if not bound.
+        ///// </summary>
+        public static readonly OptionalImpl<IObserver<IFailedEvaluator>> 
OnDriverRestartEvaluatorFailed =
+            new OptionalImpl<IObserver<IFailedEvaluator>>();
+
         /// <summary>
         /// Additional set of string arguments that can be pssed to handlers 
through client
         /// </summary>
@@ -220,6 +226,8 @@ namespace Org.Apache.REEF.Driver
                         OnDriverRestartContextActive)
                     
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartRunningTaskHandlers>.Class,
                         OnDriverRestartTaskRunning)
+                    
.BindSetEntry(GenericType<DriverBridgeConfigurationOptions.DriverRestartFailedEvaluatorHandlers>.Class,
+                        OnDriverRestartEvaluatorFailed)
                     
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.TraceLevel>.Class,
 CustomTraceLevel)
                     
.BindNamedParameter(GenericType<DriverBridgeConfigurationOptions.MaxApplicationSubmissions>.Class,
                         MaxApplicationSubmissions)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
index 1f6b16d..a2cfad0 100644
--- 
a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
+++ 
b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/YarnJobSubmissionClient.java
@@ -126,6 +126,8 @@ public final class YarnJobSubmissionClient {
                   driverRestartEvaluatorRecoverySeconds)
               .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED,
                   JobDriver.DriverRestartCompletedHandler.class)
+              
.set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED,
+                  JobDriver.DriverRestartFailedEvaluatorHandler.class)
               .build();
 
       driverConfiguration = Configurations.merge(

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
index c564157..babdb10 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
@@ -39,6 +39,7 @@ public final class NativeInterop {
   public static final String DRIVER_RESTART_ACTIVE_CONTEXT_KEY = 
"DriverRestartActiveContext";
   public static final String DRIVER_RESTART_RUNNING_TASK_KEY = 
"DriverRestartRunningTask";
   public static final String DRIVER_RESTART_COMPLETED_KEY = 
"DriverRestartCompleted";
+  public static final String DRIVER_RESTART_FAILED_EVALUATOR_KEY = 
"DriverRestartFailedEvaluator";
   public static final HashMap<String, Integer> HANDLERS = new HashMap<String, 
Integer>() {
     {
       put(ALLOCATED_EVALUATOR_KEY, 1);
@@ -57,10 +58,11 @@ public final class NativeInterop {
       put(DRIVER_RESTART_ACTIVE_CONTEXT_KEY, 14);
       put(DRIVER_RESTART_RUNNING_TASK_KEY, 15);
       put(DRIVER_RESTART_COMPLETED_KEY, 16);
+      put(DRIVER_RESTART_FAILED_EVALUATOR_KEY, 17);
     }
   };
 
-  public static final int N_HANDLERS = 17;
+  public static final int N_HANDLERS = 18;
 
   public static native void loadClrAssembly(final String filePath);
 
@@ -165,6 +167,12 @@ public final class NativeInterop {
       final long handle
   );
 
+  public static native void clrSystemDriverRestartFailedEvaluatorHandlerOnNext(
+      final long handle,
+      final FailedEvaluatorBridge failedEvaluatorBridge,
+      final InteropLogger interopLogger
+  );
+
   /**
    * Empty private constructor to prohibit instantiation of utility class.
    */

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index aec8aa0..1b37474 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -133,10 +133,10 @@ public final class JobDriver {
   private long closedContextHandler = 0;
   private long failedContextHandler = 0;
   private long contextMessageHandler = 0;
-  private long driverRestartHandler = 0;
   private long driverRestartActiveContextHandler = 0;
   private long driverRestartRunningTaskHandler = 0;
   private long driverRestartCompletedHandler = 0;
+  private long driverRestartFailedEvaluatorHandler = 0;
   private boolean clrBridgeSetup = false;
   private boolean isRestarted = false;
 
@@ -226,6 +226,8 @@ public final class JobDriver {
             
handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_RUNNING_TASK_KEY)];
         this.driverRestartCompletedHandler =
             
handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_COMPLETED_KEY)];
+        this.driverRestartFailedEvaluatorHandler =
+            
handlers[NativeInterop.HANDLERS.get(NativeInterop.DRIVER_RESTART_FAILED_EVALUATOR_KEY)];
       }
 
       try (final LoggingScope lp =
@@ -273,6 +275,78 @@ public final class JobDriver {
     }
   }
 
+  private void handleFailedEvaluator(final FailedEvaluator eval, final boolean 
isRestartFailed) {
+    try (final LoggingScope ls = 
loggingScopeFactory.evaluatorFailed(eval.getId())) {
+      synchronized (JobDriver.this) {
+        LOG.log(Level.SEVERE, "FailedEvaluator", eval);
+        for (final FailedContext failedContext : eval.getFailedContextList()) {
+          final String failedContextId = failedContext.getId();
+          LOG.log(Level.INFO, "removing context " + failedContextId + " from 
job driver contexts.");
+          JobDriver.this.contexts.remove(failedContextId);
+        }
+        String message = "Evaluator " + eval.getId() + " failed with message: "
+            + eval.getEvaluatorException().getMessage();
+        
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+
+        if (isRestartFailed) {
+          
evaluatorFailedHandlerWaitForCLRBridgeSetup(driverRestartFailedEvaluatorHandler,
 eval, isRestartFailed);
+        } else {
+          evaluatorFailedHandlerWaitForCLRBridgeSetup(failedEvaluatorHandler, 
eval, isRestartFailed);
+        }
+      }
+    }
+  }
+
+  private void evaluatorFailedHandlerWaitForCLRBridgeSetup(final long handle,
+                                                           final 
FailedEvaluator eval,
+                                                           final boolean 
isRestartFailed) {
+    if (handle == 0) {
+      if (JobDriver.this.clrBridgeSetup) {
+        final String message = "No CLR FailedEvaluator handler was set, 
exiting now";
+        LOG.log(Level.WARNING, message);
+        
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+        return;
+      } else {
+        clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+          @Override
+          public void onNext(final Alarm time) {
+            if (JobDriver.this.clrBridgeSetup) {
+              handleFailedEvaluatorInCLR(eval, isRestartFailed);
+            } else {
+              LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
+              clock.scheduleAlarm(5000, this);
+            }
+          }
+        });
+      }
+    } else{
+      handleFailedEvaluatorInCLR(eval, isRestartFailed);
+    }
+  }
+
+  private void handleFailedEvaluatorInCLR(final FailedEvaluator eval, final 
boolean isRestartFailed) {
+    final String message = "CLR FailedEvaluator handler set, handling things 
with CLR handler.";
+    LOG.log(Level.INFO, message);
+    final FailedEvaluatorBridge failedEvaluatorBridge =
+        new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor,
+        JobDriver.this.isRestarted, loggingScopeFactory);
+    if (isRestartFailed) {
+      NativeInterop.clrSystemDriverRestartFailedEvaluatorHandlerOnNext(
+          JobDriver.this.driverRestartFailedEvaluatorHandler, 
failedEvaluatorBridge, JobDriver.this.interopLogger);
+    } else {
+      
NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler,
 failedEvaluatorBridge,
+          JobDriver.this.interopLogger);
+    }
+
+    final int additionalRequestedEvaluatorNumber = 
failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
+    if (additionalRequestedEvaluatorNumber > 0) {
+      LOG.log(Level.INFO, "number of additional evaluators requested after 
evaluator failure: " +
+          additionalRequestedEvaluatorNumber);
+    }
+
+    JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+  }
+
   /**
    * Submit a Task to a single Evaluator.
    */
@@ -359,57 +433,17 @@ public final class JobDriver {
   public final class FailedEvaluatorHandler implements 
EventHandler<FailedEvaluator> {
     @Override
     public void onNext(final FailedEvaluator eval) {
-      try (final LoggingScope ls = 
loggingScopeFactory.evaluatorFailed(eval.getId())) {
-        synchronized (JobDriver.this) {
-          LOG.log(Level.SEVERE, "FailedEvaluator", eval);
-          for (final FailedContext failedContext : 
eval.getFailedContextList()) {
-            final String failedContextId = failedContext.getId();
-            LOG.log(Level.INFO, "removing context " + failedContextId + " from 
job driver contexts.");
-            JobDriver.this.contexts.remove(failedContextId);
-          }
-          String message = "Evaluator " + eval.getId() + " failed with 
message: "
-              + eval.getEvaluatorException().getMessage();
-          
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
-
-          if (failedEvaluatorHandler == 0) {
-            if (JobDriver.this.clrBridgeSetup) {
-              message = "No CLR FailedEvaluator handler was set, exiting now";
-              LOG.log(Level.WARNING, message);
-              
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
-              return;
-            } else {
-              clock.scheduleAlarm(0, new EventHandler<Alarm>() {
-                @Override
-                public void onNext(final Alarm time) {
-                  if (JobDriver.this.clrBridgeSetup) {
-                    handleFailedEvaluatorInCLR(eval);
-                  } else {
-                    LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
-                    clock.scheduleAlarm(5000, this);
-                  }
-                }
-              });
-            }
-          } else {
-            handleFailedEvaluatorInCLR(eval);
-          }
-        }
-      }
+      JobDriver.this.handleFailedEvaluator(eval, false);
     }
+  }
 
-    private void handleFailedEvaluatorInCLR(final FailedEvaluator eval) {
-      final String message = "CLR FailedEvaluator handler set, handling things 
with CLR handler.";
-      LOG.log(Level.INFO, message);
-      FailedEvaluatorBridge failedEvaluatorBridge = new 
FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor,
-          JobDriver.this.isRestarted, loggingScopeFactory);
-      
NativeInterop.clrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler,
 failedEvaluatorBridge,
-          JobDriver.this.interopLogger);
-      final int additionalRequestedEvaluatorNumber = 
failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
-      if (additionalRequestedEvaluatorNumber > 0) {
-        LOG.log(Level.INFO, "number of additional evaluators requested after 
evaluator failure: " +
-            additionalRequestedEvaluatorNumber);
-      }
-      
JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+  /**
+   * Receive notification that the entire Evaluator had failed on Driver 
Restart.
+   */
+  public final class DriverRestartFailedEvaluatorHandler implements 
EventHandler<FailedEvaluator> {
+    @Override
+    public void onNext(final FailedEvaluator eval) {
+      JobDriver.this.handleFailedEvaluator(eval, true);
     }
   }
 
@@ -613,14 +647,13 @@ public final class JobDriver {
           driverRestartCompleted.getCompletedTime());
       try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(
           driverRestartCompleted.getCompletedTime().getTimeStamp())) {
-        if (JobDriver.this.driverRestartHandler != 0) {
+        if (JobDriver.this.driverRestartCompletedHandler != 0) {
           LOG.log(Level.INFO, "CLR driver restart handler implemented, now 
handle it in CLR.");
 
           // TODO[REEF-690]: Pass in DriverRestartCompleted object to .NET.
           
NativeInterop.clrSystemDriverRestartCompletedHandlerOnNext(JobDriver.this.driverRestartCompletedHandler);
         } else {
           LOG.log(Level.WARNING, "No CLR driver restart handler implemented, 
done with DriverRestartCompletedHandler.");
-
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
index cda65ab..04cd56c 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/client/DriverRestartConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
 import org.apache.reef.driver.parameters.*;
 import org.apache.reef.driver.restart.DriverRestarted;
 import org.apache.reef.driver.task.RunningTask;
@@ -64,6 +65,12 @@ public final class DriverRestartConfiguration extends 
ConfigurationModuleBuilder
       new OptionalImpl<>();
 
   /**
+   * Event handler for the event of driver restart completion, default to 
logging if not bound.
+   */
+  public static final OptionalImpl<EventHandler<FailedEvaluator>> 
ON_DRIVER_RESTART_EVALUATOR_FAILED =
+      new OptionalImpl<>();
+
+  /**
    * The amount of time in seconds the driver waits for evaluators to report 
back on restart.
    * Defaults to 3 minutes. If the value is set to Integer.MAX_VALUE, the 
driver will wait forever.
    */
@@ -83,6 +90,7 @@ public final class DriverRestartConfiguration extends 
ConfigurationModuleBuilder
       .bindSetEntry(DriverRestartTaskRunningHandlers.class, 
ON_DRIVER_RESTART_TASK_RUNNING)
       .bindSetEntry(DriverRestartContextActiveHandlers.class, 
ON_DRIVER_RESTART_CONTEXT_ACTIVE)
       .bindSetEntry(DriverRestartCompletedHandlers.class, 
ON_DRIVER_RESTART_COMPLETED)
+      .bindSetEntry(DriverRestartFailedEvaluatorHandlers.class, 
ON_DRIVER_RESTART_EVALUATOR_FAILED)
       .build();
 
   private DriverRestartConfiguration(){

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartFailedEvaluatorHandlers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartFailedEvaluatorHandlers.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartFailedEvaluatorHandlers.java
new file mode 100644
index 0000000..b5af599
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/DriverRestartFailedEvaluatorHandlers.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import 
org.apache.reef.runtime.common.driver.defaults.DefaultEvaluatorFailureHandler;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.Set;
+
+/**
+ * The {@link org.apache.reef.tang.annotations.NamedParameter} for Evaluators 
that have failed on Driver Restart.
+ */
+@NamedParameter(doc = "Handler for event of evaluator failed on driver 
restart.",
+    default_classes = DefaultEvaluatorFailureHandler.class)
+public final class DriverRestartFailedEvaluatorHandlers implements 
Name<Set<EventHandler<FailedEvaluator>>> {
+  private DriverRestartFailedEvaluatorHandlers(){
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartFailedEvaluatorHandlers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartFailedEvaluatorHandlers.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartFailedEvaluatorHandlers.java
new file mode 100644
index 0000000..ac861e1
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/parameters/ServiceDriverRestartFailedEvaluatorHandlers.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.driver.parameters;
+
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.Set;
+
+/**
+ * Service handler for driver restart evaluator failed event.
+ */
+@NamedParameter(doc = "Handler for failed evaluator on Driver Restart.")
+public final class ServiceDriverRestartFailedEvaluatorHandlers implements 
Name<Set<EventHandler<FailedEvaluator>>> {
+  private ServiceDriverRestartFailedEvaluatorHandlers() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
index cf14b20..07791df 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
@@ -71,6 +71,7 @@ public enum EvaluatorRestartState {
     switch(from) {
     case EXPECTED:
       switch(to) {
+      case EXPIRED:
       case REPORTED:
         return true;
       default:

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index bc8ebbf..3ceff3e 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -20,6 +20,7 @@ package org.apache.reef.runtime.common.driver.evaluator;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
 import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
 import org.apache.reef.driver.restart.DriverRestartManager;
 import org.apache.reef.driver.restart.EvaluatorRestartState;
@@ -298,9 +299,14 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
           failedTaskOptional = Optional.empty();
         }
 
+        final FailedEvaluator failedEvaluator = new 
FailedEvaluatorImpl(exception, failedContextList,
+            failedTaskOptional, this.evaluatorId);
 
-        this.messageDispatcher.onEvaluatorFailed(new 
FailedEvaluatorImpl(exception, failedContextList,
-            failedTaskOptional, this.evaluatorId));
+        if 
(driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired())
 {
+          
this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator);
+        } else {
+          this.messageDispatcher.onEvaluatorFailed(failedEvaluator);
+        }
 
       } catch (final Exception e) {
         LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/e374351a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index 16836f4..e46b06c 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -118,6 +118,8 @@ public final class EvaluatorMessageDispatcher {
       final Set<EventHandler<ActiveContext>> 
driverRestartActiveContextHandlers,
       @Parameter(DriverRestartCompletedHandlers.class)
       final Set<EventHandler<DriverRestartCompleted>> 
driverRestartCompletedHandlers,
+      @Parameter(DriverRestartFailedEvaluatorHandlers.class)
+      final Set<EventHandler<FailedEvaluator>> 
driverRestartEvaluatorFailedHandlers,
 
       // Service-provided event handlers specific to a Driver restart
       @Parameter(ServiceDriverRestartTaskRunningHandlers.class)
@@ -126,6 +128,8 @@ public final class EvaluatorMessageDispatcher {
       final Set<EventHandler<ActiveContext>> 
serviceDriverRestartActiveContextHandlers,
       @Parameter(ServiceDriverRestartCompletedHandlers.class)
       final Set<EventHandler<DriverRestartCompleted>> 
serviceDriverRestartCompletedHandlers,
+      @Parameter(ServiceDriverRestartFailedEvaluatorHandlers.class)
+      final Set<EventHandler<FailedEvaluator>> 
serviceDriverRestartFailedEvaluatorHandlers,
 
       @Parameter(EvaluatorDispatcherThreads.class) final int numberOfThreads,
       @Parameter(EvaluatorManager.EvaluatorIdentifier.class) final String 
evaluatorIdentifier,
@@ -177,10 +181,20 @@ public final class EvaluatorMessageDispatcher {
     this.driverRestartApplicationDispatcher.register(ActiveContext.class, 
driverRestartActiveContextHandlers);
     
this.driverRestartApplicationDispatcher.register(DriverRestartCompleted.class, 
driverRestartCompletedHandlers);
 
+    final Set<EventHandler<FailedEvaluator>> 
driverRestartEvaluatorFailedCallbackHandlers = new HashSet<>();
+    for (final EventHandler<FailedEvaluator> evaluatorFailedHandler : 
driverRestartEvaluatorFailedHandlers) {
+      driverRestartEvaluatorFailedCallbackHandlers.add(
+          
idlenessCallbackEventHandlerFactory.createIdlenessCallbackWrapperHandler(evaluatorFailedHandler));
+    }
+
+    this.driverRestartApplicationDispatcher.register(
+        FailedEvaluator.class, driverRestartEvaluatorFailedCallbackHandlers);
+
     // Service event handlers specific to a Driver restart
     this.driverRestartServiceDispatcher.register(RunningTask.class, 
serviceDriverRestartTaskRunningHandlers);
     this.driverRestartServiceDispatcher.register(ActiveContext.class, 
serviceDriverRestartActiveContextHandlers);
     this.driverRestartServiceDispatcher.register(DriverRestartCompleted.class, 
serviceDriverRestartCompletedHandlers);
+    this.driverRestartServiceDispatcher.register(FailedEvaluator.class, 
serviceDriverRestartFailedEvaluatorHandlers);
 
     final Set<EventHandler<CompletedEvaluator>> 
evaluatorCompletedCallbackHandlers = new HashSet<>();
     for (final EventHandler<CompletedEvaluator> evaluatorCompletedHandler : 
evaluatorCompletedHandlers) {
@@ -255,6 +269,10 @@ public final class EvaluatorMessageDispatcher {
     this.dispatchForRestartedDriver(ActiveContext.class, activeContext);
   }
 
+  public void onDriverRestartEvaluatorFailed(final FailedEvaluator 
failedEvaluator) {
+    this.dispatchForRestartedDriver(FailedEvaluator.class, failedEvaluator);
+  }
+
   public void onDriverRestartCompleted(final DriverRestartCompleted 
restartCompleted) {
     this.dispatchForRestartedDriver(DriverRestartCompleted.class, 
restartCompleted);
   }
@@ -269,7 +287,7 @@ public final class EvaluatorMessageDispatcher {
   }
 
   private <T, U extends T> void dispatchForRestartedDriver(final Class<T> 
type, final U message) {
-    this.driverRestartApplicationDispatcher.onNext(type, message);
     this.driverRestartServiceDispatcher.onNext(type, message);
+    this.driverRestartApplicationDispatcher.onNext(type, message);
   }
 }


Reply via email to