more implementation for job cancel
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a4296772 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a4296772 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a4296772 Branch: refs/heads/master Commit: a429677293c7b3677658ef9d82beca9b52d8b84d Parents: 1c55448 Author: lahiru <[email protected]> Authored: Fri Aug 15 02:05:39 2014 +0530 Committer: lahiru <[email protected]> Committed: Fri Aug 15 02:05:39 2014 +0530 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 6 - .../airavata/gfac/server/GfacServerHandler.java | 8 +- .../gfac/bes/provider/impl/BESProvider.java | 5 +- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 142 +++++++++++++++++-- .../org/apache/airavata/gfac/core/cpi/GFac.java | 8 ++ .../apache/airavata/gfac/core/cpi/GFacImpl.java | 51 +++++++ .../core/monitor/GfacInternalStatusUpdator.java | 11 +- .../gfac/core/provider/GFacProvider.java | 3 +- .../org/apache/airavata/job/TestProvider.java | 2 +- .../apache/airavata/gfac/ec2/EC2Provider.java | 2 +- .../gfac/gram/provider/impl/GramProvider.java | 4 +- .../gsissh/provider/impl/GSISSHProvider.java | 5 +- .../hadoop/provider/impl/HadoopProvider.java | 2 +- .../gfac/local/provider/impl/LocalProvider.java | 2 +- .../gfac/ssh/provider/impl/SSHProvider.java | 3 +- .../core/impl/GFACServiceJobSubmitter.java | 43 +++++- .../cpi/impl/SimpleOrchestratorImpl.java | 19 ++- 17 files changed, 268 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 0729478..967577c 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -795,12 +795,6 @@ public class CreateLaunchExperiment { String sshTokenId = "61abd2ff-f92b-4901-a077-07b51abe2c5d"; String gsisshTokenId = "61abd2ff-f92b-4901-a077-07b51abe2c5d"; client.launchExperiment(expId, sshTokenId); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - client.terminateExperiment(expId); } catch (ExperimentNotFoundException e) { logger.error("Error occured while launching the experiment...", e.getMessage()); throw new ExperimentNotFoundException(e); http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index b098120..01aa665 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -203,7 +203,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ } public boolean cancelJob(String experimentId, String taskId) throws TException { - throw new TException("Operation not supported"); + logger.info("GFac Recieved the Experiment: " + experimentId + " TaskId: " + taskId); + GFac gfac = getGfac(); + try { + return gfac.submitJob(experimentId, taskId, ServerSettings.getSetting(Constants.GATEWAY_NAME)); + } catch (Exception e) { + throw new TException("Error launching the experiment : " + e.getMessage(), e); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java index b325f62..22e823d 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java @@ -338,14 +338,13 @@ public class BESProvider extends AbstractProvider { /** * EndpointReference need to be saved to make cancel work. * - * @param activityEpr * @param jobExecutionContext * @throws GFacProviderException */ - public void cancelJob(String activityEpr, JobExecutionContext jobExecutionContext) throws GFacProviderException { + public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException { try { initSecurityProperties(jobExecutionContext); - EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(activityEpr); + EndpointReferenceType eprt = EndpointReferenceType.Factory.parse(jobExecutionContext.getJobDetails().getJobID()); UnicoreHostType host = (UnicoreHostType) jobExecutionContext.getApplicationContext().getHostDescription() .getType(); http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index a68a302..7a12043 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -53,7 +53,6 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.handler.GFacRecoverableHandler; import org.apache.airavata.gfac.core.handler.ThreadedHandler; -import org.apache.airavata.gfac.core.monitor.ExperimentIdentity; import org.apache.airavata.gfac.core.monitor.JobIdentity; import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.core.monitor.TaskIdentity; @@ -85,7 +84,6 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePrefer import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; import org.apache.airavata.model.workspace.experiment.DataObjectType; import org.apache.airavata.model.workspace.experiment.Experiment; -import org.apache.airavata.model.workspace.experiment.ExperimentState; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.model.workspace.experiment.TaskState; @@ -528,6 +526,81 @@ public class BetterGfacImpl implements GFac { return true; } + public boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException { + // We need to check whether this job is submitted as a part of a large workflow. If yes, + // we need to setup workflow tracking listerner. + try { + int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) + , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status + String workflowInstanceID = null; + if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) { + // This mean we need to register workflow tracking listener. + //todo implement WorkflowTrackingListener properly + registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext); + } + // Register log event listener. This is required in all scenarios. + jobExecutionContext.getNotificationService().registerListener(new LoggingListener()); + if (stateVal < 2) { + // In this scenario We do everything from the beginning + log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " + + " and stop the execution chain"); + //todo update registry and find a way to stop the execution chain + } else if (stateVal >= 8) { + log.info("There is nothing to recover in this job so we do not re-submit"); + ZKUtil.deleteRecursive(zk, + AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID())); + } else { + // Now we know this is an old Job, so we have to handle things gracefully + log.info("Job is in a position to perform a proper cancellation"); + try { + Scheduler.schedule(jobExecutionContext); + + invokeProviderCancel(jobExecutionContext); + + } catch (Exception e) { + try { + // we make the experiment as failed due to exception scenario + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); + // monitorPublisher.publish(new + // ExperimentStatusChangedEvent(new + // ExperimentIdentity(jobExecutionContext.getExperimentID()), + // ExperimentState.FAILED)); + // Updating the task status if there's any task associated + // monitorPublisher.publish(new TaskStatusChangeRequest( + // new TaskIdentity(jobExecutionContext.getExperimentID(), + // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + // jobExecutionContext.getTaskData().getTaskID()), + // TaskState.FAILED + // )); + monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), new JobIdentity(jobExecutionContext.getExperimentID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext + .getJobDetails().getJobID()), JobState.FAILED)); + } catch (NullPointerException e1) { + log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " + + "NullPointerException occurred because at this point there might not have Job Created", e1, e); + //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); + // Updating the task status if there's any task associated + monitorPublisher.publish(new TaskStatusChangeRequest(new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext + .getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED)); + + } + jobExecutionContext.setProperty(ERROR_SENT, "true"); + jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause())); + throw new GFacException(e.getMessage(), e); + } + } + return true; + } catch (ApplicationSettingsException e) { + e.printStackTrace(); + } catch (KeeperException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return true; + } + private void reLaunch(JobExecutionContext jobExecutionContext, int stateVal) throws GFacException { // Scheduler will decide the execution flow of handlers and provider // which handles @@ -547,12 +620,12 @@ public class BetterGfacImpl implements GFac { // to job execution context. // We get the provider instance and execute it. if (stateVal == 2 || stateVal == 3) { - invokeProvider(jobExecutionContext); // provider never ran in + invokeProviderExecute(jobExecutionContext); // provider never ran in // previous invocation } else if (stateVal == 4) { // whether sync or async job have to // invoke the recovering because it // crashed in the Handler - reInvokeProvider(jobExecutionContext); + reInvokeProviderExecute(jobExecutionContext); } else if (stateVal >= 5 && GFacUtils.isSynchronousMode(jobExecutionContext)) { // In this case we do nothing because provider ran successfully, // no need to re-run the job @@ -560,7 +633,7 @@ public class BetterGfacImpl implements GFac { } else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)) { // this is async mode where monitoring of jobs is hapenning, we // have to recover - reInvokeProvider(jobExecutionContext); + reInvokeProviderExecute(jobExecutionContext); } else if (stateVal == 6) { reInvokeOutFlowHandlers(jobExecutionContext); } else { @@ -624,7 +697,7 @@ public class BetterGfacImpl implements GFac { // After executing the in handlers provider instance should be set // to job execution context. // We get the provider instance and execute it. - invokeProvider(jobExecutionContext); + invokeProviderExecute(jobExecutionContext); } catch (Exception e) { try { // we make the experiment as failed due to exception scenario @@ -658,7 +731,7 @@ public class BetterGfacImpl implements GFac { } } - private void invokeProvider(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException { + private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException { GFacProvider provider = jobExecutionContext.getProvider(); if (provider != null) { monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); @@ -674,7 +747,7 @@ public class BetterGfacImpl implements GFac { } } - private void reInvokeProvider(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException { + private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException { GFacProvider provider = jobExecutionContext.getProvider(); if (provider != null) { monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); @@ -703,6 +776,51 @@ public class BetterGfacImpl implements GFac { } + private void invokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException { + GFacProvider provider = jobExecutionContext.getProvider(); + if (provider != null) { + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); + GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName()); + initProvider(provider, jobExecutionContext); + cancelProvider(provider, jobExecutionContext); + disposeProvider(provider, jobExecutionContext); + GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED); + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); + } + if (GFacUtils.isSynchronousMode(jobExecutionContext)) { + invokeOutFlowHandlers(jobExecutionContext); + } + } + + private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException { + GFacProvider provider = jobExecutionContext.getProvider(); + if (provider != null) { + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING)); + String plState = GFacUtils.getPluginState(zk, jobExecutionContext, provider.getClass().getName()); + if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state + if (provider instanceof GFacRecoverableProvider) { + GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName()); + ((GFacRecoverableProvider) provider).recover(jobExecutionContext); + GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED); + } + } else { + GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName()); + initProvider(provider, jobExecutionContext); + cancelProvider(provider, jobExecutionContext); + disposeProvider(provider, jobExecutionContext); + GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED); + } + monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED)); + } + + if (GFacUtils.isSynchronousMode(jobExecutionContext)) + + { + invokeOutFlowHandlers(jobExecutionContext); + } + + } + private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException { try { @@ -720,6 +838,14 @@ public class BetterGfacImpl implements GFac { } } + private void cancelProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException { + try { + provider.cancelJob(jobExecutionContext); + } catch (Exception e) { + throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e); + } + } + private void disposeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException { try { provider.dispose(jobExecutionContext); http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java index f161a55..79f5a0b 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java @@ -53,4 +53,12 @@ public interface GFac { */ public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException; + /** + * This operation can be used to cancel an already running experiment + * @param jobExecutionContext + * @return Successful cancellation will return true + * @throws GFacException + */ + public boolean cancel(JobExecutionContext jobExecutionContext)throws GFacException; + } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java index d370924..bf27deb 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java @@ -305,6 +305,50 @@ public class GFacImpl implements GFac { return true; } + + public boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException { + // We need to check whether this job is submitted as a part of a large workflow. If yes, + // we need to setup workflow tracking listerner. + String workflowInstanceID = null; + if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) { + // This mean we need to register workflow tracking listener. + //todo implement WorkflowTrackingListener properly + registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext); + } + // Register log event listener. This is required in all scenarios. + jobExecutionContext.getNotificationService().registerListener(new LoggingListener()); + try { + Scheduler.schedule(jobExecutionContext); + GFacProvider provider = jobExecutionContext.getProvider(); + if (provider != null) { + initProvider(provider, jobExecutionContext); + cancelProvider(provider, jobExecutionContext); + disposeProvider(provider, jobExecutionContext); + } + }catch (Exception e) { + try { + monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext), + new JobIdentity(jobExecutionContext.getExperimentID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(), + jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED)); + } catch (NullPointerException e1) { + log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " + + "NullPointerException occurred because at this point there might not have Job Created", e1, e); + // Updating status if job id is not set +// monitorPublisher +// .publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED)); + // Updating the task status if there's any task associated + monitorPublisher.publish(new TaskStatusChangedEvent(new TaskIdentity(jobExecutionContext.getExperimentID(), jobExecutionContext + .getWorkflowNodeDetails().getNodeInstanceId(), jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED)); + + } + jobExecutionContext.setProperty(ERROR_SENT, "true"); + jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause())); + throw new GFacException(e.getMessage(), e); + } + return true; + } + private void schedule(JobExecutionContext jobExecutionContext) throws GFacException { // Scheduler will decide the execution flow of handlers and provider which handles // the job. @@ -367,6 +411,13 @@ public class GFacImpl implements GFac { throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e); } } + private void cancelProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException { + try { + provider.cancelJob(jobExecutionContext); + } catch (Exception e) { + throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e); + } + } private void disposeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException { try { http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java index 896bff8..a1856e6 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java @@ -29,12 +29,7 @@ import org.apache.airavata.common.utils.Constants; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,10 +82,10 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc } switch (statusChangeRequest.getState()) { case COMPLETED: -// ZKUtil.deleteRecursive(zk,experimentPath); + ZKUtil.deleteRecursive(zk, experimentPath); break; case FAILED: -// ZKUtil.deleteRecursive(zk,experimentPath); + ZKUtil.deleteRecursive(zk,experimentPath); break; default: } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java index 7c17cf2..031cf77 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java @@ -53,11 +53,10 @@ public interface GFacProvider{ /** * Cancels all jobs relevant to an experiment. - * @param jobId The experiment id * @param jobExecutionContext The job execution context, contains runtime information. * @throws GFacException If an error occurred while cancelling the job. */ - public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException; + public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java index dcd7e3c..151ee19 100644 --- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java +++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java @@ -29,7 +29,7 @@ import java.util.Map; public class TestProvider extends AbstractProvider { - public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { //To change body of implemented methods use File | Settings | File Templates. } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java index cf8a7d2..a8a9eb1 100644 --- a/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java +++ b/modules/gfac/gfac-ec2/src/main/java/org/apache/airavata/gfac/ec2/EC2Provider.java @@ -243,7 +243,7 @@ public class EC2Provider extends AbstractProvider { // Do nothing } - public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException { + public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException { throw new NotImplementedException(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java index 71d5a6b..142e492 100644 --- a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java +++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/provider/impl/GramProvider.java @@ -364,8 +364,8 @@ public class GramProvider extends AbstractProvider { public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { } - public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException { - cancelSingleJob(jobId, jobExecutionContext); + public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException { + cancelSingleJob(jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext); } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java index 9ea284b..90efaad 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java @@ -26,13 +26,10 @@ import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; import org.apache.airavata.gfac.core.cpi.GFacImpl; -import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.handler.ThreadedHandler; import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; -import org.apache.airavata.gfac.core.provider.AbstractProvider; import org.apache.airavata.gfac.core.provider.AbstractRecoverableProvider; -import org.apache.airavata.gfac.core.provider.GFacProvider; import org.apache.airavata.gfac.core.provider.GFacProviderException; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; @@ -168,7 +165,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider { //To change body of implemented methods use File | Settings | File Templates. } - public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException { + public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException { //To change body of implemented methods use File | Settings | File Templates. } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java b/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java index 6543c01..30a1bf9 100644 --- a/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java +++ b/modules/gfac/gfac-hadoop/src/main/java/org/apache/airavata/gfac/hadoop/provider/impl/HadoopProvider.java @@ -143,7 +143,7 @@ public class HadoopProvider extends AbstractProvider { } @Override - public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException { + public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException { throw new NotImplementedException(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java index 9672767..425f782 100644 --- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java +++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java @@ -241,7 +241,7 @@ public class LocalProvider extends AbstractProvider { } } - public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException { + public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException { throw new NotImplementedException(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java index ab268df..67e6628 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -37,7 +37,6 @@ import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.context.MessageContext; import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; -import org.apache.airavata.gfac.core.cpi.GFacImpl; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.handler.ThreadedHandler; import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; @@ -205,7 +204,7 @@ public class SSHProvider extends AbstractProvider { } - public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException { + public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException { throw new NotImplementedException(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java index 0ca95ec..89925ee 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java @@ -124,7 +124,48 @@ public class GFACServiceJobSubmitter implements JobSubmitter, Watcher { } public boolean terminate(String experimentID, String taskID) throws OrchestratorException { - throw new OrchestratorException(new OperationNotSupportedException("terminate method is not yet implemented")); + ZooKeeper zk = orchestratorContext.getZk(); + try { + if (zk == null || !zk.getState().isConnected()) { + String zkhostPort = AiravataZKUtils.getZKhostPort(); + zk = new ZooKeeper(zkhostPort, 6000, this); + synchronized (mutex) { + mutex.wait(); + } + } + String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); + String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + List<String> children = zk.getChildren(gfacServer, this); + + if (children.size() == 0) { + // Zookeeper data need cleaning + throw new OrchestratorException("There is no active GFac instance to route the request"); + } else { + String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); + // here we are not using an index because the getChildren does not return the same order everytime + String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); + logger.info("GFAC instance node data: " + gfacNodeData); + String[] split = gfacNodeData.split(":"); + GfacService.Client localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); + if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { + // before submitting the job we check again the state of the node + return localhost.cancelJob(experimentID, taskID); + } + } + } catch (TException e) { + throw new OrchestratorException(e); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (KeeperException e) { + e.printStackTrace(); + } catch (ApplicationSettingsException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + return false; } synchronized public void process(WatchedEvent event) { http://git-wip-us.apache.org/repos/asf/airavata/blob/a4296772/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index 83dccf0..70930f4 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -30,9 +30,7 @@ import org.apache.airavata.model.error.LaunchValidationException; import org.apache.airavata.model.error.ValidationResults; import org.apache.airavata.model.error.ValidatorResult; import org.apache.airavata.model.util.ExperimentModelUtil; -import org.apache.airavata.model.workspace.experiment.Experiment; -import org.apache.airavata.model.workspace.experiment.TaskDetails; -import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.orchestrator.core.exception.OrchestratorException; import org.apache.airavata.orchestrator.core.job.JobSubmitter; import org.apache.airavata.orchestrator.core.validator.JobMetadataValidator; @@ -163,10 +161,17 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ } } - public void cancelExperiment(Experiment experiment, WorkflowNodeDetails workflowNode, TaskDetails task,String tokenId) - throws OrchestratorException { - - throw new OrchestratorException(new OperationNotSupportedException()); + public void cancelExperiment(Experiment experiment, WorkflowNodeDetails workflowNode, TaskDetails task, String tokenId) + throws OrchestratorException { + List<JobDetails> jobDetailsList = task.getJobDetailsList(); + for(JobDetails jobDetails:jobDetailsList) { + JobState jobState = jobDetails.getJobStatus().getJobState(); + if (jobState.getValue() > 4){ + logger.error("Cannot cancel the job, because current job state is : " + jobState.toString() + + "jobId: " + jobDetails.getJobID() + " Job Name: " + jobDetails.getJobName()); + } + jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID()); + } }
