Repository: incubator-reef
Updated Branches:
  refs/heads/master ec91b53a9 -> 0e3b7dbd8


[REEF-489] Add concept of application attempt to driver startup

This addressed the issue by
  * Adds a getter to resubmissionAttempts for DriverRestarted and 
IDriverRestarted.
  * Modify the DriverRestart example to check for only restarted once.
  * Fix interop calls that broke Driver Restart.

JIRA:
  [REEF-489](https://issues.apache.org/jira/browse/REEF-489)

This closes #501
Author:    Andrew Chung <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0e3b7dbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0e3b7dbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0e3b7dbd

Branch: refs/heads/master
Commit: 0e3b7dbd8d6cac8e3b02cca83b8e869c362ef149
Parents: ec91b53
Author: Andrew Chung <[email protected]>
Authored: Thu Sep 17 11:38:35 2015 -0700
Committer: Julia Wang <[email protected]>
Committed: Thu Sep 24 14:57:38 2015 -0700

----------------------------------------------------------------------
 lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h   |  2 +
 .../DriverRestartedClr2Java.cpp                 |  6 +++
 .../cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp | 14 +++---
 .../Bridge/Clr2java/IDriverRestartedClr2Java.cs |  7 ++-
 .../Bridge/Events/DriverRestarted.cs            | 17 ++++---
 .../Org.Apache.REEF.Driver/IDriverRestarted.cs  |  9 +++-
 .../DriverRestart/HelloRestartDriver.cs         |  7 ++-
 .../reef/javabridge/DriverRestartedBridge.java  | 14 ++++--
 .../apache/reef/javabridge/NativeInterop.java   |  4 +-
 .../DriverRestartClrHandlersInitializer.java    |  5 +-
 .../DefaultDriverRuntimeRestartMangerImpl.java  |  6 +--
 .../driver/restart/DriverRestartManager.java    | 13 ++++--
 .../reef/driver/restart/DriverRestarted.java    |  7 ++-
 .../driver/restart/DriverRestartedImpl.java     | 15 ++++--
 .../restart/DriverRuntimeRestartManager.java    |  9 ++--
 .../driver/YarnDriverRuntimeRestartManager.java | 49 +++++++++++++-------
 16 files changed, 125 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h 
b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
index 139cf9d..45f2457 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
@@ -221,11 +221,13 @@ namespace Org {
                                                  JavaVM* _jvm;
                                                  array<String^>^ 
_expectedEvaluatorIds;
                                                  DateTime _startTime;
+                                                 int _resubmissionAttempts;
                                          public:
                                                  
DriverRestartedClr2Java(JNIEnv *env, jobject jobjectDriverRestarted);
                                                  virtual void OnError(String^ 
message);
                                                  virtual array<String^>^ 
GetExpectedEvaluatorIds();
                                                  virtual DateTime 
GetStartTime();
+                                                 virtual int 
GetResubmissionAttempts();
                                          };
 
                                          public ref class 
DriverRestartCompletedClr2Java : public IDriverRestartCompletedClr2Java {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Bridge/DriverRestartedClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/DriverRestartedClr2Java.cpp 
b/lang/cs/Org.Apache.REEF.Bridge/DriverRestartedClr2Java.cpp
index ea3cf8b..05b6af2 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/DriverRestartedClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/DriverRestartedClr2Java.cpp
@@ -39,7 +39,9 @@ namespace Org {
 
                                                        jclass 
jclassDriverRestarted = env->GetObjectClass(_jobjectDriverRestarted);
                                                        jfieldID 
jidExpectedEvaluatorIds = env->GetFieldID(jclassDriverRestarted, 
"expectedEvaluatorIds", "[Ljava/lang/String;");
+                                                       jfieldID 
jidResubmissionAttempts = env->GetFieldID(jclassDriverRestarted, 
"resubmissionAttempts", "I");
 
+                                                       _resubmissionAttempts = 
env->GetIntField(_jobjectDriverRestarted, jidResubmissionAttempts);
                                                        jobjectArray 
jevaluatorIds = 
reinterpret_cast<jobjectArray>(env->NewGlobalRef(env->GetObjectField(_jobjectDriverRestarted,
 jidExpectedEvaluatorIds)));
                                                        _startTime = 
System::DateTime::Now;
                                                        int count = 
env->GetArrayLength(jevaluatorIds);
@@ -61,6 +63,10 @@ namespace Org {
                                                        return _startTime;
                                                }
 
+                                               int 
DriverRestartedClr2Java::GetResubmissionAttempts() {
+                                                       return 
_resubmissionAttempts;
+                                               }
+
                                                void 
DriverRestartedClr2Java::OnError(String^ message) {
                                                        
ManagedLog::LOGGER->Log("DriverRestartedClr2Java::OnError");
                                                        JNIEnv *env = 
RetrieveEnv(_jvm);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp 
b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
index 45b3fcb..88ce9b8 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp
@@ -433,13 +433,13 @@ JNIEXPORT void JNICALL 
Java_org_apache_reef_javabridge_NativeInterop_clrSystemCo
 
 /*
 * Class:     org_apache_reef_javabridge_NativeInterop
-* Method:    callClrSystemOnRestartHandlerOnNext
+* Method:    callClrSystemOnRestartHandler
 * Signature: 
(Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)[J
 */
-JNIEXPORT jlongArray JNICALL 
Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandlerOnNext
+JNIEXPORT jlongArray JNICALL 
Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler
 (JNIEnv * env, jclass jclassx, jstring httpServerPort, jobject 
jevaluatorRequestorBridge, jobject jdriverRestartedBridge) {
        try {
-               
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler");
+               
ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler");
                String^ strPort = ManagedStringFromJavaString(env, 
httpServerPort);
 
                EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew 
EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge);
@@ -449,7 +449,7 @@ JNIEXPORT jlongArray JNICALL 
Java_org_apache_reef_javabridge_NativeInterop_callC
        }
        catch (System::Exception^ ex) {
                // we cannot get error back to java here since we don't have an 
object to call back (although we ideally should...)
-               ManagedLog::LOGGER->LogError("Exceptions in 
Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler", ex);
+               ManagedLog::LOGGER->LogError("Exceptions in 
Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler", 
ex);
                return NULL;
        }
 }
@@ -575,8 +575,8 @@ static JNINativeMethod methods[] = {
        { "clrSystemContextMessageHandlerOnNext", 
"(JLorg/apache/reef/javabridge/ContextMessageBridge;)V",
        
(void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemContextMessageHandlerOnNext
 },
 
-       { "callClrSystemOnRestartHandlerOnNext", 
"(Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)[J",
-       
(void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandlerOnNext
 },
+       { "callClrSystemOnRestartHandler", 
"(Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)[J",
+       
(void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler
 },
 
        { "clrSystemDriverRestartActiveContextHandlerOnNext", 
"(JLorg/apache/reef/javabridge/ActiveContextBridge;)V",
        
(void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartActiveContextHandlerOnNext
 },
@@ -584,7 +584,7 @@ static JNINativeMethod methods[] = {
        { "clrSystemDriverRestartRunningTaskHandlerOnNext", 
"(JLorg/apache/reef/javabridge/RunningTaskBridge;)V",
        
(void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartRunningTaskHandlerOnNext
 },
 
-       { "clrSystemDriverRestartCompletedHandlerOnNext", "(J)V",
+       { "clrSystemDriverRestartCompletedHandlerOnNext", 
"(JLorg/apache/reef/javabridge/generic/DriverRestartCompletedBridge;)V",
        
(void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartCompletedHandlerOnNext
 },
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IDriverRestartedClr2Java.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IDriverRestartedClr2Java.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IDriverRestartedClr2Java.cs
index c1ec72c..036b903 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IDriverRestartedClr2Java.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IDriverRestartedClr2Java.cs
@@ -32,5 +32,10 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java
         /// StartTime of the restart.
         /// </summary>
         DateTime GetStartTime();
+
+        /// <summary>
+        /// The number of times the Driver has been resubmitted. Does not 
include the initial submission.
+        /// </summary>
+        int GetResubmissionAttempts();
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/DriverRestarted.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/DriverRestarted.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/DriverRestarted.cs
index 9ff5847..6e5edf2 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/DriverRestarted.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/DriverRestarted.cs
@@ -27,23 +27,22 @@ namespace Org.Apache.REEF.Driver.Bridge.Events
     /// </summary>
     internal sealed class DriverRestarted : IDriverRestarted
     {
-        private readonly DateTime _startTime;
-        private readonly ISet<string> _expectedEvaluatorIds; 
+        private readonly ISet<string> _expectedEvaluatorIds;
 
         internal DriverRestarted(IDriverRestartedClr2Java 
driverRestartedClr2Java)
         {
-            _startTime = driverRestartedClr2Java.GetStartTime();
             _expectedEvaluatorIds = new 
HashSet<string>(driverRestartedClr2Java.GetExpectedEvaluatorIds());
-        }
-
-        public DateTime StartTime
-        {
-            get { return _startTime; }
+            StartTime = driverRestartedClr2Java.GetStartTime();
+            ResubmissionAttempts = 
driverRestartedClr2Java.GetResubmissionAttempts();
         }
 
         public ISet<string> ExpectedEvaluatorIds
         {
             get { return new HashSet<string>(_expectedEvaluatorIds); }
-        } 
+        }
+
+        public DateTime StartTime { get; private set; }
+
+        public int ResubmissionAttempts { get; private set; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Driver/IDriverRestarted.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/IDriverRestarted.cs 
b/lang/cs/Org.Apache.REEF.Driver/IDriverRestarted.cs
index 6ccfa60..2cdf03e 100644
--- a/lang/cs/Org.Apache.REEF.Driver/IDriverRestarted.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/IDriverRestarted.cs
@@ -31,6 +31,13 @@ namespace Org.Apache.REEF.Driver
         /// The set of expected Evaluator IDs that are returned to the Driver 
by the
         /// RM on Driver Restart.
         /// </summary>
-        ISet<string> ExpectedEvaluatorIds { get; } 
+        ISet<string> ExpectedEvaluatorIds { get; }
+
+        /// <summary>
+        /// The number of times the Driver has been resubmitted. Does not 
include the initial submission.
+        /// i.e. ResubmissionAttempts is 0 on first launch and should always 
be at least 1 when called on
+        /// the DriverRestartedHandler.
+        /// </summary>
+        int ResubmissionAttempts { get; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs 
b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
index 36d6178..a5208ff 100644
--- a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
@@ -65,7 +65,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart
         {
             _exceptionTimer = new Timer(obj =>
             {
-                throw new Exception("Expected driver to be finished by now.");
+                throw new ApplicationException("Expected driver to be finished 
by now.");
             }, new object(), TimeSpan.FromMinutes(10), 
TimeSpan.FromMinutes(10));
 
             _evaluatorRequestor = evaluatorRequestor;
@@ -105,6 +105,11 @@ namespace Org.Apache.REEF.Examples.DriverRestart
         /// </summary>
         public void OnNext(IDriverRestarted value)
         {
+            if (value.ResubmissionAttempts != 1)
+            {
+                throw new ApplicationException("Only expected the driver to 
restart once.");
+            }
+
             _isRestart = true;
             Logger.Log(Level.Info, "Hello! HelloRestartDriver has restarted! 
Expecting these Evaluator IDs [{0}]", string.Join(", ", 
value.ExpectedEvaluatorIds));
             foreach (var expectedEvaluatorId in value.ExpectedEvaluatorIds)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/DriverRestartedBridge.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/DriverRestartedBridge.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/DriverRestartedBridge.java
index 3c827ee..5739a3e 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/DriverRestartedBridge.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/DriverRestartedBridge.java
@@ -21,6 +21,7 @@ package org.apache.reef.javabridge;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.restart.DriverRestarted;
 
 import java.util.Set;
 
@@ -33,16 +34,23 @@ import java.util.Set;
 public final class DriverRestartedBridge extends NativeBridge {
   // Used by bridge to extract field. Please take this into consideration when 
changing the name of the field.
   private final String[] expectedEvaluatorIds;
+  private final int resubmissionAttempts;
 
-  public DriverRestartedBridge(final Set<String> expectedEvaluatorIds) {
-    this.expectedEvaluatorIds = expectedEvaluatorIds.toArray(new 
String[expectedEvaluatorIds.size()]);
+  public DriverRestartedBridge(final DriverRestarted driverRestarted) {
+    final Set<String> evaluatorIds = driverRestarted.getExpectedEvaluatorIds();
+    this.expectedEvaluatorIds = evaluatorIds.toArray(new 
String[evaluatorIds.size()]);
+    this.resubmissionAttempts = driverRestarted.getResubmissionAttempts();
   }
 
   public String[] getExpectedEvaluatorIds() {
     return expectedEvaluatorIds;
   }
 
+  public int getResubmissionAttempts() {
+    return resubmissionAttempts;
+  }
+
   @Override
   public void close() throws Exception {
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
index 3b08103..e87fbb5 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java
@@ -89,7 +89,7 @@ public final class NativeInterop {
 
   public static native void clrSystemTaskMessageHandlerOnNext(
       final long handle,
-      final byte[] mesage,
+      final byte[] message,
       final TaskMessageBridge javaTaskMessageBridge,
       final InteropLogger interopLogger
   );
@@ -149,7 +149,7 @@ public final class NativeInterop {
       final ContextMessageBridge contextMessageBridge
   );
 
-  public static native long[] callClrSystemOnRestartHandlerOnNext(
+  public static native long[] callClrSystemOnRestartHandler(
       final String httpServerPortNumber,
       final EvaluatorRequestorBridge javaEvaluatorRequestorBridge,
       final DriverRestartedBridge driverRestartedBridge

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
index 6885743..d97a1cd 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java
@@ -42,9 +42,8 @@ final class DriverRestartClrHandlersInitializer implements 
ClrHandlersInitialize
 
   @Override
   public long[] getClrHandlers(final String portNumber, final 
EvaluatorRequestorBridge evaluatorRequestorBridge) {
-    // TODO[REEF-689]: Make callClrSystemOnRestartedHandlerOnNext take 
DriverRestarted object.
-    return NativeInterop.callClrSystemOnRestartHandlerOnNext(
+    return NativeInterop.callClrSystemOnRestartHandler(
         portNumber,
-        evaluatorRequestorBridge, new 
DriverRestartedBridge(driverRestarted.getExpectedEvaluatorIds()));
+        evaluatorRequestorBridge, new DriverRestartedBridge(driverRestarted));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
index 277b872..0f3f6c0 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java
@@ -29,7 +29,7 @@ import java.util.Set;
 /**
  * The default driver runtime restart manager that is not able to perform any 
restart actions.
  * Thus, when performing actions pertaining to restart, it is recommended to 
call
- * {@link DriverRuntimeRestartManager#hasRestarted()} first.
+ * {@link DriverRuntimeRestartManager#getResubmissionAttempts()} first and 
check for > 0.
  */
 @Private
 @DriverSide
@@ -40,8 +40,8 @@ final class DefaultDriverRuntimeRestartMangerImpl implements 
DriverRuntimeRestar
   }
 
   @Override
-  public boolean hasRestarted() {
-    return false;
+  public int getResubmissionAttempts() {
+    return 0;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
index 9de0004..58c809e 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java
@@ -56,6 +56,7 @@ public final class DriverRestartManager implements 
DriverIdlenessSource {
 
   private RestartEvaluators restartEvaluators;
   private DriverRestartState state = DriverRestartState.NOT_RESTARTED;
+  private int resubmissionAttempts = 0;
 
   @Inject
   private DriverRestartManager(final DriverRuntimeRestartManager 
driverRuntimeRestartManager,
@@ -81,9 +82,13 @@ public final class DriverRestartManager implements 
DriverIdlenessSource {
    * Can be already done with restart or in the process of restart.
    */
   public synchronized boolean detectRestart() {
-    if (this.state.hasNotRestarted() && 
driverRuntimeRestartManager.hasRestarted()) {
-      // set the state machine in motion.
-      this.state = DriverRestartState.BEGAN;
+    if (this.state.hasNotRestarted()) {
+      resubmissionAttempts = 
driverRuntimeRestartManager.getResubmissionAttempts();
+
+      if (resubmissionAttempts > 0) {
+        // set the state machine in motion.
+        this.state = DriverRestartState.BEGAN;
+      }
     }
 
     return this.state.hasRestarted();
@@ -105,7 +110,7 @@ public final class DriverRestartManager implements 
DriverIdlenessSource {
                                      final List<EventHandler<DriverRestarted>> 
orderedHandlers) {
     if (this.state == DriverRestartState.BEGAN) {
       restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators();
-      final DriverRestarted restartedInfo = new DriverRestartedImpl(startTime, 
restartEvaluators);
+      final DriverRestarted restartedInfo = new 
DriverRestartedImpl(resubmissionAttempts, startTime, restartEvaluators);
 
       for (final EventHandler<DriverRestarted> handler : orderedHandlers) {
         handler.onNext(restartedInfo);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
index 53f9149..0b3d4eb 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java
@@ -35,6 +35,11 @@ import java.util.Set;
 @Unstable
 public interface DriverRestarted {
   /**
+   * @return The number of times the Driver has been resubmitted. Not 
including the initial attempt.
+   */
+  int getResubmissionAttempts();
+
+  /**
    * @return The time of restart.
    */
   StartTime getStartTime();
@@ -44,4 +49,4 @@ public interface DriverRestarted {
    * to report back to the Driver after restart.
    */
   Set<String> getExpectedEvaluatorIds();
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
index db8aaa8..589177b 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java
@@ -34,12 +34,16 @@ import java.util.Set;
 @Private
 @Unstable
 public final class DriverRestartedImpl implements DriverRestarted {
+  private final int resubmissionAttempts;
   private final StartTime startTime;
   private final Set<String> expectedEvaluatorIds;
 
-  DriverRestartedImpl(final StartTime startTime, final RestartEvaluators 
restartEvaluators) {
+  DriverRestartedImpl(final int resubmissionAttempts,
+                      final StartTime startTime,
+                      final RestartEvaluators restartEvaluators) {
+    this.resubmissionAttempts = resubmissionAttempts;
     this.startTime = startTime;
-    Set<String> expected = new HashSet<>();
+    final Set<String> expected = new HashSet<>();
 
     for (final String evaluatorId : restartEvaluators.getEvaluatorIds()) {
       if (restartEvaluators.get(evaluatorId).getEvaluatorRestartState() == 
EvaluatorRestartState.EXPECTED) {
@@ -51,6 +55,11 @@ public final class DriverRestartedImpl implements 
DriverRestarted {
   }
 
   @Override
+  public int getResubmissionAttempts() {
+    return resubmissionAttempts;
+  }
+
+  @Override
   public StartTime getStartTime() {
     return startTime;
   }
@@ -59,4 +68,4 @@ public final class DriverRestartedImpl implements 
DriverRestarted {
   public Set<String> getExpectedEvaluatorIds() {
     return expectedEvaluatorIds;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
index 5e1acec..9d041db 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java
@@ -38,11 +38,12 @@ import java.util.Set;
 @DefaultImplementation(DefaultDriverRuntimeRestartMangerImpl.class)
 public interface DriverRuntimeRestartManager {
   /**
-   * @return true if the driver has been restarted. Note that this is 
different from whether
-   * the driver is in the process of restarting. This returns true both on 
when the driver is in the
-   * restart process or has already finished restarting. The default 
implementation always returns false.
+   * @return > 0 if the driver has been restarted as reported by the resource 
manager. 0 otherwise.
+   * Note that this is different from whether the driver is in the process of 
restarting.
+   * This returns > 0 both on when the driver is in the restart process or has 
already finished restarting.
+   * The default implementation always returns 0.
    */
-  boolean hasRestarted();
+  int getResubmissionAttempts();
 
   /**
    * Records the evaluators when it is allocated.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
index 495a777..8a23a82 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
@@ -54,13 +54,20 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
 
   private static final Logger LOG = 
Logger.getLogger(YarnDriverRuntimeRestartManager.class.getName());
 
+  /**
+   * The default resubmission attempts number returned if:
+   * 1) we are not able to determine the number of application attempts based 
on the environment provided by YARN.
+   * 2) we are able to receive a list of previous containers from the Resource 
Manager.
+   */
+  private static final int DEFAULT_RESTART_RESUBMISSION_ATTEMPTS = 1;
+
   private final EvaluatorPreserver evaluatorPreserver;
   private final ApplicationMasterRegistration registration;
   private final REEFEventHandlers reefEventHandlers;
   private final YarnContainerManager yarnContainerManager;
   private final RackNameFormatter rackNameFormatter;
 
-  private Set<Container> previousContainers;
+  private Set<Container> previousContainers = null;
 
   @Inject
   private 
YarnDriverRuntimeRestartManager(@Parameter(YarnEvaluatorPreserver.class)
@@ -74,35 +81,39 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
     this.reefEventHandlers = reefEventHandlers;
     this.yarnContainerManager = yarnContainerManager;
     this.rackNameFormatter = rackNameFormatter;
-    this.previousContainers = null;
   }
 
   /**
-   * Determines whether the application master has been restarted based on the 
container ID environment
+   * Determines the number of times the Driver has been submitted based on the 
container ID environment
    * variable provided by YARN. If that fails, determine whether the 
application master is a restart
-   * based on the number of previous containers reported by YARN.
-   * @return true if the application master is a restarted instance, false 
otherwise.
+   * based on the number of previous containers reported by YARN. In the 
failure scenario, returns 1 if restart, 0
+   * otherwise.
+   * @return > 0 if the application master is a restarted instance, 0 
otherwise.
    */
   @Override
-  public boolean hasRestarted() {
+  public int getResubmissionAttempts() {
     final String containerIdString = getContainerIdString();
+    final ApplicationAttemptId appAttemptID = 
getAppAttemptId(containerIdString);
 
-    if (containerIdString == null) {
-      // container id should always be set in the env by the framework
-      LOG.log(Level.WARNING, "Container ID is null, determining restart based 
on previous containers.");
-      return this.isRestartByPreviousContainers();
-    }
+    if (containerIdString == null || appAttemptID == null) {
+      LOG.log(Level.WARNING, "Was not able to fetch application attempt, 
container ID is [" + containerIdString +
+          "] and application attempt is [" + appAttemptID + "]. Determining 
restart based on previous containers.");
 
-    final ApplicationAttemptId appAttemptID = 
getAppAttemptId(containerIdString);
+      if (this.isRestartByPreviousContainers()) {
+        LOG.log(Level.WARNING, "Driver is a restarted instance based on the 
number of previous containers. " +
+            "As returned by the Resource Manager. Returning default 
resubmission attempts " +
+            DEFAULT_RESTART_RESUBMISSION_ATTEMPTS + ".");
+        return DEFAULT_RESTART_RESUBMISSION_ATTEMPTS;
+      }
 
-    if (appAttemptID == null) {
-      LOG.log(Level.WARNING, "applicationAttempt ID is null, determining 
restart based on previous containers.");
-      return this.isRestartByPreviousContainers();
+      return 0;
     }
 
-    LOG.log(Level.FINE, "Application attempt: " + appAttemptID.getAttemptId());
+    int appAttempt = appAttemptID.getAttemptId();
 
-    return appAttemptID.getAttemptId() > 1;
+    LOG.log(Level.FINE, "Application attempt: " + appAttempt);
+    assert appAttempt > 0;
+    return appAttempt - 1;
   }
 
   private static String getContainerIdString() {
@@ -116,6 +127,10 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
   }
 
   private static ApplicationAttemptId getAppAttemptId(final String 
containerIdString) {
+    if (containerIdString == null) {
+      return null;
+    }
+
     try {
       final ContainerId containerId = 
ConverterUtils.toContainerId(containerIdString);
       return containerId.getApplicationAttemptId();

Reply via email to