Repository: airavata Updated Branches: refs/heads/master 449eb7576 -> 420a51ad6
Implemented JobSubmitted recover steps. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c29aa94c Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c29aa94c Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c29aa94c Branch: refs/heads/master Commit: c29aa94cd498c5f384b12f3dc655df80f0bc77e6 Parents: 4b048e1 Author: shamrath <[email protected]> Authored: Wed May 13 15:57:33 2015 -0400 Committer: shamrath <[email protected]> Committed: Wed May 13 15:57:33 2015 -0400 ---------------------------------------------------------------------- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 32 ++++++++++++-------- .../gfac/ssh/provider/impl/SSHProvider.java | 7 +++++ 2 files changed, 27 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/c29aa94c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index da55a2d..455c0d4 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -616,8 +616,10 @@ public class BetterGfacImpl implements GFac,Watcher { invokeProviderExecute(jobExecutionContext); break; case PROVIDERINVOKING: - reInvokeProviderExecute(jobExecutionContext); + reInvokeProviderExecute(jobExecutionContext, true); break; + case JOBSUBMITTED: + reInvokeProviderExecute(jobExecutionContext, false); case PROVIDERINVOKED: // no need to re-run the job log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID); @@ -772,21 +774,27 @@ public class BetterGfacImpl implements GFac,Watcher { } } - private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException { + private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException { GFacProvider provider = jobExecutionContext.getProvider(); if (provider != null) { - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); - GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName()); - GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName()); - if (plState != null && plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state - initProvider(provider, jobExecutionContext); - executeProvider(provider, jobExecutionContext); - disposeProvider(provider, jobExecutionContext); + if (submit) { + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); + GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName()); + GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName()); + if (plState != null && plState == GfacHandlerState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state + initProvider(provider, jobExecutionContext); + executeProvider(provider, jobExecutionContext); + disposeProvider(provider, jobExecutionContext); + } else { + provider.recover(jobExecutionContext); + } + GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED); + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); } else { - provider.recover(jobExecutionContext); + disposeProvider(provider, jobExecutionContext); + GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED); + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); } - GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED); - monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); } if (GFacUtils.isSynchronousMode(jobExecutionContext)) http://git-wip-us.apache.org/repos/asf/airavata/blob/c29aa94c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java index 1807339..9bc68bd 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -32,9 +32,12 @@ import org.apache.airavata.gfac.core.context.MessageContext; import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.handler.ThreadedHandler; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest; import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; import org.apache.airavata.gfac.core.provider.AbstractProvider; import org.apache.airavata.gfac.core.provider.GFacProviderException; +import org.apache.airavata.gfac.core.states.GfacExperimentState; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor; import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory; @@ -161,6 +164,8 @@ public class SSHProvider extends AbstractProvider { if (jobID != null) { jobDetails.setJobID(jobID); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED, monitorPublisher); + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + , GfacExperimentState.JOBSUBMITTED)); } jobExecutionContext.setJobDetails(jobDetails); String verifyJobId = verifyJobSubmission(cluster, jobDetails); @@ -169,6 +174,8 @@ public class SSHProvider extends AbstractProvider { if (jobID == null) { jobID = verifyJobId; jobDetails.setJobID(jobID); + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + , GfacExperimentState.JOBSUBMITTED)); } GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED, monitorPublisher); }
