Repository: incubator-reef Updated Branches: refs/heads/master ec91b53a9 -> 0e3b7dbd8
[REEF-489] Add concept of application attempt to driver startup This addressed the issue by * Adds a getter to resubmissionAttempts for DriverRestarted and IDriverRestarted. * Modify the DriverRestart example to check for only restarted once. * Fix interop calls that broke Driver Restart. JIRA: [REEF-489](https://issues.apache.org/jira/browse/REEF-489) This closes #501 Author: Andrew Chung <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0e3b7dbd Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0e3b7dbd Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0e3b7dbd Branch: refs/heads/master Commit: 0e3b7dbd8d6cac8e3b02cca83b8e869c362ef149 Parents: ec91b53 Author: Andrew Chung <[email protected]> Authored: Thu Sep 17 11:38:35 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Thu Sep 24 14:57:38 2015 -0700 ---------------------------------------------------------------------- lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 2 + .../DriverRestartedClr2Java.cpp | 6 +++ .../cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp | 14 +++--- .../Bridge/Clr2java/IDriverRestartedClr2Java.cs | 7 ++- .../Bridge/Events/DriverRestarted.cs | 17 ++++--- .../Org.Apache.REEF.Driver/IDriverRestarted.cs | 9 +++- .../DriverRestart/HelloRestartDriver.cs | 7 ++- .../reef/javabridge/DriverRestartedBridge.java | 14 ++++-- .../apache/reef/javabridge/NativeInterop.java | 4 +- .../DriverRestartClrHandlersInitializer.java | 5 +- .../DefaultDriverRuntimeRestartMangerImpl.java | 6 +-- .../driver/restart/DriverRestartManager.java | 13 ++++-- .../reef/driver/restart/DriverRestarted.java | 7 ++- .../driver/restart/DriverRestartedImpl.java | 15 ++++-- .../restart/DriverRuntimeRestartManager.java | 9 ++-- .../driver/YarnDriverRuntimeRestartManager.java | 49 +++++++++++++------- 16 files changed, 125 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index 139cf9d..45f2457 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -221,11 +221,13 @@ namespace Org { JavaVM* _jvm; array<String^>^ _expectedEvaluatorIds; DateTime _startTime; + int _resubmissionAttempts; public: DriverRestartedClr2Java(JNIEnv *env, jobject jobjectDriverRestarted); virtual void OnError(String^ message); virtual array<String^>^ GetExpectedEvaluatorIds(); virtual DateTime GetStartTime(); + virtual int GetResubmissionAttempts(); }; public ref class DriverRestartCompletedClr2Java : public IDriverRestartCompletedClr2Java { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Bridge/DriverRestartedClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/DriverRestartedClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/DriverRestartedClr2Java.cpp index ea3cf8b..05b6af2 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/DriverRestartedClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/DriverRestartedClr2Java.cpp @@ -39,7 +39,9 @@ namespace Org { jclass jclassDriverRestarted = env->GetObjectClass(_jobjectDriverRestarted); jfieldID jidExpectedEvaluatorIds = env->GetFieldID(jclassDriverRestarted, "expectedEvaluatorIds", "[Ljava/lang/String;"); + jfieldID jidResubmissionAttempts = env->GetFieldID(jclassDriverRestarted, "resubmissionAttempts", "I"); + _resubmissionAttempts = env->GetIntField(_jobjectDriverRestarted, jidResubmissionAttempts); jobjectArray jevaluatorIds = reinterpret_cast<jobjectArray>(env->NewGlobalRef(env->GetObjectField(_jobjectDriverRestarted, jidExpectedEvaluatorIds))); _startTime = System::DateTime::Now; int count = env->GetArrayLength(jevaluatorIds); @@ -61,6 +63,10 @@ namespace Org { return _startTime; } + int DriverRestartedClr2Java::GetResubmissionAttempts() { + return _resubmissionAttempts; + } + void DriverRestartedClr2Java::OnError(String^ message) { ManagedLog::LOGGER->Log("DriverRestartedClr2Java::OnError"); JNIEnv *env = RetrieveEnv(_jvm); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp index 45b3fcb..88ce9b8 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/JavaClrBridge.cpp @@ -433,13 +433,13 @@ JNIEXPORT void JNICALL Java_org_apache_reef_javabridge_NativeInterop_clrSystemCo /* * Class: org_apache_reef_javabridge_NativeInterop -* Method: callClrSystemOnRestartHandlerOnNext +* Method: callClrSystemOnRestartHandler * Signature: (Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)[J */ -JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandlerOnNext +JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler (JNIEnv * env, jclass jclassx, jstring httpServerPort, jobject jevaluatorRequestorBridge, jobject jdriverRestartedBridge) { try { - ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler"); + ManagedLog::LOGGER->Log("+Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler"); String^ strPort = ManagedStringFromJavaString(env, httpServerPort); EvaluatorRequestorClr2Java^ evaluatorRequestorBridge = gcnew EvaluatorRequestorClr2Java(env, jevaluatorRequestorBridge); @@ -449,7 +449,7 @@ JNIEXPORT jlongArray JNICALL Java_org_apache_reef_javabridge_NativeInterop_callC } catch (System::Exception^ ex) { // we cannot get error back to java here since we don't have an object to call back (although we ideally should...) - ManagedLog::LOGGER->LogError("Exceptions in Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnStartHandler", ex); + ManagedLog::LOGGER->LogError("Exceptions in Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler", ex); return NULL; } } @@ -575,8 +575,8 @@ static JNINativeMethod methods[] = { { "clrSystemContextMessageHandlerOnNext", "(JLorg/apache/reef/javabridge/ContextMessageBridge;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemContextMessageHandlerOnNext }, - { "callClrSystemOnRestartHandlerOnNext", "(Ljava/lang/String;Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;)[J", - (void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandlerOnNext }, + { "callClrSystemOnRestartHandler", "(Ljava/lang/String;Lorg/apache/reef/javabridge/EvaluatorRequestorBridge;Lorg/apache/reef/javabridge/DriverRestartedBridge;)[J", + (void*)&Java_org_apache_reef_javabridge_NativeInterop_callClrSystemOnRestartHandler }, { "clrSystemDriverRestartActiveContextHandlerOnNext", "(JLorg/apache/reef/javabridge/ActiveContextBridge;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartActiveContextHandlerOnNext }, @@ -584,7 +584,7 @@ static JNINativeMethod methods[] = { { "clrSystemDriverRestartRunningTaskHandlerOnNext", "(JLorg/apache/reef/javabridge/RunningTaskBridge;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartRunningTaskHandlerOnNext }, - { "clrSystemDriverRestartCompletedHandlerOnNext", "(J)V", + { "clrSystemDriverRestartCompletedHandlerOnNext", "(JLorg/apache/reef/javabridge/generic/DriverRestartCompletedBridge;)V", (void*)&Java_org_apache_reef_javabridge_NativeInterop_clrSystemDriverRestartCompletedHandlerOnNext }, }; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IDriverRestartedClr2Java.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IDriverRestartedClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IDriverRestartedClr2Java.cs index c1ec72c..036b903 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IDriverRestartedClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IDriverRestartedClr2Java.cs @@ -32,5 +32,10 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java /// StartTime of the restart. /// </summary> DateTime GetStartTime(); + + /// <summary> + /// The number of times the Driver has been resubmitted. Does not include the initial submission. + /// </summary> + int GetResubmissionAttempts(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/DriverRestarted.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/DriverRestarted.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/DriverRestarted.cs index 9ff5847..6e5edf2 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/DriverRestarted.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/DriverRestarted.cs @@ -27,23 +27,22 @@ namespace Org.Apache.REEF.Driver.Bridge.Events /// </summary> internal sealed class DriverRestarted : IDriverRestarted { - private readonly DateTime _startTime; - private readonly ISet<string> _expectedEvaluatorIds; + private readonly ISet<string> _expectedEvaluatorIds; internal DriverRestarted(IDriverRestartedClr2Java driverRestartedClr2Java) { - _startTime = driverRestartedClr2Java.GetStartTime(); _expectedEvaluatorIds = new HashSet<string>(driverRestartedClr2Java.GetExpectedEvaluatorIds()); - } - - public DateTime StartTime - { - get { return _startTime; } + StartTime = driverRestartedClr2Java.GetStartTime(); + ResubmissionAttempts = driverRestartedClr2Java.GetResubmissionAttempts(); } public ISet<string> ExpectedEvaluatorIds { get { return new HashSet<string>(_expectedEvaluatorIds); } - } + } + + public DateTime StartTime { get; private set; } + + public int ResubmissionAttempts { get; private set; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Driver/IDriverRestarted.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/IDriverRestarted.cs b/lang/cs/Org.Apache.REEF.Driver/IDriverRestarted.cs index 6ccfa60..2cdf03e 100644 --- a/lang/cs/Org.Apache.REEF.Driver/IDriverRestarted.cs +++ b/lang/cs/Org.Apache.REEF.Driver/IDriverRestarted.cs @@ -31,6 +31,13 @@ namespace Org.Apache.REEF.Driver /// The set of expected Evaluator IDs that are returned to the Driver by the /// RM on Driver Restart. /// </summary> - ISet<string> ExpectedEvaluatorIds { get; } + ISet<string> ExpectedEvaluatorIds { get; } + + /// <summary> + /// The number of times the Driver has been resubmitted. Does not include the initial submission. + /// i.e. ResubmissionAttempts is 0 on first launch and should always be at least 1 when called on + /// the DriverRestartedHandler. + /// </summary> + int ResubmissionAttempts { get; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs index 36d6178..a5208ff 100644 --- a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs +++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs @@ -65,7 +65,7 @@ namespace Org.Apache.REEF.Examples.DriverRestart { _exceptionTimer = new Timer(obj => { - throw new Exception("Expected driver to be finished by now."); + throw new ApplicationException("Expected driver to be finished by now."); }, new object(), TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(10)); _evaluatorRequestor = evaluatorRequestor; @@ -105,6 +105,11 @@ namespace Org.Apache.REEF.Examples.DriverRestart /// </summary> public void OnNext(IDriverRestarted value) { + if (value.ResubmissionAttempts != 1) + { + throw new ApplicationException("Only expected the driver to restart once."); + } + _isRestart = true; Logger.Log(Level.Info, "Hello! HelloRestartDriver has restarted! Expecting these Evaluator IDs [{0}]", string.Join(", ", value.ExpectedEvaluatorIds)); foreach (var expectedEvaluatorId in value.ExpectedEvaluatorIds) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/DriverRestartedBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/DriverRestartedBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/DriverRestartedBridge.java index 3c827ee..5739a3e 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/DriverRestartedBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/DriverRestartedBridge.java @@ -21,6 +21,7 @@ package org.apache.reef.javabridge; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.restart.DriverRestarted; import java.util.Set; @@ -33,16 +34,23 @@ import java.util.Set; public final class DriverRestartedBridge extends NativeBridge { // Used by bridge to extract field. Please take this into consideration when changing the name of the field. private final String[] expectedEvaluatorIds; + private final int resubmissionAttempts; - public DriverRestartedBridge(final Set<String> expectedEvaluatorIds) { - this.expectedEvaluatorIds = expectedEvaluatorIds.toArray(new String[expectedEvaluatorIds.size()]); + public DriverRestartedBridge(final DriverRestarted driverRestarted) { + final Set<String> evaluatorIds = driverRestarted.getExpectedEvaluatorIds(); + this.expectedEvaluatorIds = evaluatorIds.toArray(new String[evaluatorIds.size()]); + this.resubmissionAttempts = driverRestarted.getResubmissionAttempts(); } public String[] getExpectedEvaluatorIds() { return expectedEvaluatorIds; } + public int getResubmissionAttempts() { + return resubmissionAttempts; + } + @Override public void close() throws Exception { } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java index 3b08103..e87fbb5 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/NativeInterop.java @@ -89,7 +89,7 @@ public final class NativeInterop { public static native void clrSystemTaskMessageHandlerOnNext( final long handle, - final byte[] mesage, + final byte[] message, final TaskMessageBridge javaTaskMessageBridge, final InteropLogger interopLogger ); @@ -149,7 +149,7 @@ public final class NativeInterop { final ContextMessageBridge contextMessageBridge ); - public static native long[] callClrSystemOnRestartHandlerOnNext( + public static native long[] callClrSystemOnRestartHandler( final String httpServerPortNumber, final EvaluatorRequestorBridge javaEvaluatorRequestorBridge, final DriverRestartedBridge driverRestartedBridge http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java index 6885743..d97a1cd 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/DriverRestartClrHandlersInitializer.java @@ -42,9 +42,8 @@ final class DriverRestartClrHandlersInitializer implements ClrHandlersInitialize @Override public long[] getClrHandlers(final String portNumber, final EvaluatorRequestorBridge evaluatorRequestorBridge) { - // TODO[REEF-689]: Make callClrSystemOnRestartedHandlerOnNext take DriverRestarted object. - return NativeInterop.callClrSystemOnRestartHandlerOnNext( + return NativeInterop.callClrSystemOnRestartHandler( portNumber, - evaluatorRequestorBridge, new DriverRestartedBridge(driverRestarted.getExpectedEvaluatorIds())); + evaluatorRequestorBridge, new DriverRestartedBridge(driverRestarted)); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java index 277b872..0f3f6c0 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DefaultDriverRuntimeRestartMangerImpl.java @@ -29,7 +29,7 @@ import java.util.Set; /** * The default driver runtime restart manager that is not able to perform any restart actions. * Thus, when performing actions pertaining to restart, it is recommended to call - * {@link DriverRuntimeRestartManager#hasRestarted()} first. + * {@link DriverRuntimeRestartManager#getResubmissionAttempts()} first and check for > 0. */ @Private @DriverSide @@ -40,8 +40,8 @@ final class DefaultDriverRuntimeRestartMangerImpl implements DriverRuntimeRestar } @Override - public boolean hasRestarted() { - return false; + public int getResubmissionAttempts() { + return 0; } @Override http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java index 9de0004..58c809e 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartManager.java @@ -56,6 +56,7 @@ public final class DriverRestartManager implements DriverIdlenessSource { private RestartEvaluators restartEvaluators; private DriverRestartState state = DriverRestartState.NOT_RESTARTED; + private int resubmissionAttempts = 0; @Inject private DriverRestartManager(final DriverRuntimeRestartManager driverRuntimeRestartManager, @@ -81,9 +82,13 @@ public final class DriverRestartManager implements DriverIdlenessSource { * Can be already done with restart or in the process of restart. */ public synchronized boolean detectRestart() { - if (this.state.hasNotRestarted() && driverRuntimeRestartManager.hasRestarted()) { - // set the state machine in motion. - this.state = DriverRestartState.BEGAN; + if (this.state.hasNotRestarted()) { + resubmissionAttempts = driverRuntimeRestartManager.getResubmissionAttempts(); + + if (resubmissionAttempts > 0) { + // set the state machine in motion. + this.state = DriverRestartState.BEGAN; + } } return this.state.hasRestarted(); @@ -105,7 +110,7 @@ public final class DriverRestartManager implements DriverIdlenessSource { final List<EventHandler<DriverRestarted>> orderedHandlers) { if (this.state == DriverRestartState.BEGAN) { restartEvaluators = driverRuntimeRestartManager.getPreviousEvaluators(); - final DriverRestarted restartedInfo = new DriverRestartedImpl(startTime, restartEvaluators); + final DriverRestarted restartedInfo = new DriverRestartedImpl(resubmissionAttempts, startTime, restartEvaluators); for (final EventHandler<DriverRestarted> handler : orderedHandlers) { handler.onNext(restartedInfo); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java index 53f9149..0b3d4eb 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestarted.java @@ -35,6 +35,11 @@ import java.util.Set; @Unstable public interface DriverRestarted { /** + * @return The number of times the Driver has been resubmitted. Not including the initial attempt. + */ + int getResubmissionAttempts(); + + /** * @return The time of restart. */ StartTime getStartTime(); @@ -44,4 +49,4 @@ public interface DriverRestarted { * to report back to the Driver after restart. */ Set<String> getExpectedEvaluatorIds(); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java index db8aaa8..589177b 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRestartedImpl.java @@ -34,12 +34,16 @@ import java.util.Set; @Private @Unstable public final class DriverRestartedImpl implements DriverRestarted { + private final int resubmissionAttempts; private final StartTime startTime; private final Set<String> expectedEvaluatorIds; - DriverRestartedImpl(final StartTime startTime, final RestartEvaluators restartEvaluators) { + DriverRestartedImpl(final int resubmissionAttempts, + final StartTime startTime, + final RestartEvaluators restartEvaluators) { + this.resubmissionAttempts = resubmissionAttempts; this.startTime = startTime; - Set<String> expected = new HashSet<>(); + final Set<String> expected = new HashSet<>(); for (final String evaluatorId : restartEvaluators.getEvaluatorIds()) { if (restartEvaluators.get(evaluatorId).getEvaluatorRestartState() == EvaluatorRestartState.EXPECTED) { @@ -51,6 +55,11 @@ public final class DriverRestartedImpl implements DriverRestarted { } @Override + public int getResubmissionAttempts() { + return resubmissionAttempts; + } + + @Override public StartTime getStartTime() { return startTime; } @@ -59,4 +68,4 @@ public final class DriverRestartedImpl implements DriverRestarted { public Set<String> getExpectedEvaluatorIds() { return expectedEvaluatorIds; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java index 5e1acec..9d041db 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/DriverRuntimeRestartManager.java @@ -38,11 +38,12 @@ import java.util.Set; @DefaultImplementation(DefaultDriverRuntimeRestartMangerImpl.class) public interface DriverRuntimeRestartManager { /** - * @return true if the driver has been restarted. Note that this is different from whether - * the driver is in the process of restarting. This returns true both on when the driver is in the - * restart process or has already finished restarting. The default implementation always returns false. + * @return > 0 if the driver has been restarted as reported by the resource manager. 0 otherwise. + * Note that this is different from whether the driver is in the process of restarting. + * This returns > 0 both on when the driver is in the restart process or has already finished restarting. + * The default implementation always returns 0. */ - boolean hasRestarted(); + int getResubmissionAttempts(); /** * Records the evaluators when it is allocated. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0e3b7dbd/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java index 495a777..8a23a82 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java @@ -54,13 +54,20 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta private static final Logger LOG = Logger.getLogger(YarnDriverRuntimeRestartManager.class.getName()); + /** + * The default resubmission attempts number returned if: + * 1) we are not able to determine the number of application attempts based on the environment provided by YARN. + * 2) we are able to receive a list of previous containers from the Resource Manager. + */ + private static final int DEFAULT_RESTART_RESUBMISSION_ATTEMPTS = 1; + private final EvaluatorPreserver evaluatorPreserver; private final ApplicationMasterRegistration registration; private final REEFEventHandlers reefEventHandlers; private final YarnContainerManager yarnContainerManager; private final RackNameFormatter rackNameFormatter; - private Set<Container> previousContainers; + private Set<Container> previousContainers = null; @Inject private YarnDriverRuntimeRestartManager(@Parameter(YarnEvaluatorPreserver.class) @@ -74,35 +81,39 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta this.reefEventHandlers = reefEventHandlers; this.yarnContainerManager = yarnContainerManager; this.rackNameFormatter = rackNameFormatter; - this.previousContainers = null; } /** - * Determines whether the application master has been restarted based on the container ID environment + * Determines the number of times the Driver has been submitted based on the container ID environment * variable provided by YARN. If that fails, determine whether the application master is a restart - * based on the number of previous containers reported by YARN. - * @return true if the application master is a restarted instance, false otherwise. + * based on the number of previous containers reported by YARN. In the failure scenario, returns 1 if restart, 0 + * otherwise. + * @return > 0 if the application master is a restarted instance, 0 otherwise. */ @Override - public boolean hasRestarted() { + public int getResubmissionAttempts() { final String containerIdString = getContainerIdString(); + final ApplicationAttemptId appAttemptID = getAppAttemptId(containerIdString); - if (containerIdString == null) { - // container id should always be set in the env by the framework - LOG.log(Level.WARNING, "Container ID is null, determining restart based on previous containers."); - return this.isRestartByPreviousContainers(); - } + if (containerIdString == null || appAttemptID == null) { + LOG.log(Level.WARNING, "Was not able to fetch application attempt, container ID is [" + containerIdString + + "] and application attempt is [" + appAttemptID + "]. Determining restart based on previous containers."); - final ApplicationAttemptId appAttemptID = getAppAttemptId(containerIdString); + if (this.isRestartByPreviousContainers()) { + LOG.log(Level.WARNING, "Driver is a restarted instance based on the number of previous containers. " + + "As returned by the Resource Manager. Returning default resubmission attempts " + + DEFAULT_RESTART_RESUBMISSION_ATTEMPTS + "."); + return DEFAULT_RESTART_RESUBMISSION_ATTEMPTS; + } - if (appAttemptID == null) { - LOG.log(Level.WARNING, "applicationAttempt ID is null, determining restart based on previous containers."); - return this.isRestartByPreviousContainers(); + return 0; } - LOG.log(Level.FINE, "Application attempt: " + appAttemptID.getAttemptId()); + int appAttempt = appAttemptID.getAttemptId(); - return appAttemptID.getAttemptId() > 1; + LOG.log(Level.FINE, "Application attempt: " + appAttempt); + assert appAttempt > 0; + return appAttempt - 1; } private static String getContainerIdString() { @@ -116,6 +127,10 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta } private static ApplicationAttemptId getAppAttemptId(final String containerIdString) { + if (containerIdString == null) { + return null; + } + try { final ContainerId containerId = ConverterUtils.toContainerId(containerIdString); return containerId.getApplicationAttemptId();
