Repository: airavata
Updated Branches:
  refs/heads/master 449eb7576 -> 420a51ad6


Implemented JobSubmitted recover steps.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c29aa94c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c29aa94c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c29aa94c

Branch: refs/heads/master
Commit: c29aa94cd498c5f384b12f3dc655df80f0bc77e6
Parents: 4b048e1
Author: shamrath <[email protected]>
Authored: Wed May 13 15:57:33 2015 -0400
Committer: shamrath <[email protected]>
Committed: Wed May 13 15:57:33 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 32 ++++++++++++--------
 .../gfac/ssh/provider/impl/SSHProvider.java     |  7 +++++
 2 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/c29aa94c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index da55a2d..455c0d4 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -616,8 +616,10 @@ public class BetterGfacImpl implements GFac,Watcher {
                     invokeProviderExecute(jobExecutionContext);
                     break;
                 case PROVIDERINVOKING:
-                    reInvokeProviderExecute(jobExecutionContext);
+                    reInvokeProviderExecute(jobExecutionContext, true);
                     break;
+                case JOBSUBMITTED:
+                    reInvokeProviderExecute(jobExecutionContext, false);
                 case PROVIDERINVOKED:
                     // no need to re-run the job
                     log.info("Provider does not have to be recovered because 
it ran successfully for experiment: " + experimentID);
@@ -772,21 +774,27 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-    private void reInvokeProviderExecute(JobExecutionContext 
jobExecutionContext) throws GFacException, GFacProviderException, 
ApplicationSettingsException, InterruptedException, KeeperException {
+    private void reInvokeProviderExecute(JobExecutionContext 
jobExecutionContext, boolean submit) throws GFacException, 
GFacProviderException, ApplicationSettingsException, InterruptedException, 
KeeperException {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-            GfacHandlerState plState = GFacUtils.getHandlerState(zk, 
jobExecutionContext, provider.getClass().getName());
-            GFacUtils.createHandlerZnode(zk, jobExecutionContext, 
provider.getClass().getName());
-            if (plState != null && plState == GfacHandlerState.INVOKING) {    
// this will make sure if a plugin crashes it will not launch from the scratch, 
but plugins have to save their invoked state
-                initProvider(provider, jobExecutionContext);
-                executeProvider(provider, jobExecutionContext);
-                disposeProvider(provider, jobExecutionContext);
+            if (submit) {
+                monitorPublisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), 
GfacExperimentState.PROVIDERINVOKING));
+                GfacHandlerState plState = GFacUtils.getHandlerState(zk, 
jobExecutionContext, provider.getClass().getName());
+                GFacUtils.createHandlerZnode(zk, jobExecutionContext, 
provider.getClass().getName());
+                if (plState != null && plState == GfacHandlerState.INVOKING) { 
   // this will make sure if a plugin crashes it will not launch from the 
scratch, but plugins have to save their invoked state
+                    initProvider(provider, jobExecutionContext);
+                    executeProvider(provider, jobExecutionContext);
+                    disposeProvider(provider, jobExecutionContext);
+                } else {
+                    provider.recover(jobExecutionContext);
+                }
+                GFacUtils.updateHandlerState(zk, jobExecutionContext, 
provider.getClass().getName(), GfacHandlerState.COMPLETED);
+                monitorPublisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), 
GfacExperimentState.PROVIDERINVOKED));
             } else {
-                provider.recover(jobExecutionContext);
+                disposeProvider(provider, jobExecutionContext);
+                GFacUtils.updateHandlerState(zk, jobExecutionContext, 
provider.getClass().getName(), GfacHandlerState.COMPLETED);
+                monitorPublisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), 
GfacExperimentState.PROVIDERINVOKED));
             }
-            GFacUtils.updateHandlerState(zk, jobExecutionContext, 
provider.getClass().getName(), GfacHandlerState.COMPLETED);
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
 
         if (GFacUtils.isSynchronousMode(jobExecutionContext))

http://git-wip-us.apache.org/repos/asf/airavata/blob/c29aa94c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 1807339..9bc68bd 100644
--- 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -32,9 +32,12 @@ import org.apache.airavata.gfac.core.context.MessageContext;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import 
org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
 import org.apache.airavata.gfac.core.provider.AbstractProvider;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.states.GfacExperimentState;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
 import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
@@ -161,6 +164,8 @@ public class SSHProvider extends AbstractProvider {
                     if (jobID != null) {
                         jobDetails.setJobID(jobID);
                         GFacUtils.saveJobStatus(jobExecutionContext, 
jobDetails, JobState.SUBMITTED, monitorPublisher);
+                        monitorPublisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                , GfacExperimentState.JOBSUBMITTED));
                     }
                     jobExecutionContext.setJobDetails(jobDetails);
                     String verifyJobId = verifyJobSubmission(cluster, 
jobDetails);
@@ -169,6 +174,8 @@ public class SSHProvider extends AbstractProvider {
                         if (jobID == null) {
                             jobID = verifyJobId;
                             jobDetails.setJobID(jobID);
+                            monitorPublisher.publish(new 
GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                    , GfacExperimentState.JOBSUBMITTED));
                         }
                         GFacUtils.saveJobStatus(jobExecutionContext, 
jobDetails, JobState.QUEUED, monitorPublisher);
                     }

Reply via email to