Repository: airavata Updated Branches: refs/heads/master 404913acd -> d62a957db
ading initial version of terminate experiment Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1c554482 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1c554482 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1c554482 Branch: refs/heads/master Commit: 1c554482790423daa242325d1167d5df8b326409 Parents: 538e37f Author: lahiru <[email protected]> Authored: Thu Aug 14 22:05:53 2014 +0530 Committer: lahiru <[email protected]> Committed: Thu Aug 14 22:05:53 2014 +0530 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 6 + .../server/OrchestratorServerHandler.java | 153 +++++++++++++++---- .../core/impl/GFACEmbeddedJobSubmitter.java | 4 + .../core/impl/GFACServiceJobSubmitter.java | 15 +- .../orchestrator/core/job/JobSubmitter.java | 9 ++ .../airavata/orchestrator/cpi/Orchestrator.java | 7 +- .../cpi/impl/SimpleOrchestratorImpl.java | 3 +- 7 files changed, 160 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 967577c..0729478 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -795,6 +795,12 @@ public class CreateLaunchExperiment { String sshTokenId = "61abd2ff-f92b-4901-a077-07b51abe2c5d"; String gsisshTokenId = "61abd2ff-f92b-4901-a077-07b51abe2c5d"; client.launchExperiment(expId, sshTokenId); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + client.terminateExperiment(expId); } catch (ExperimentNotFoundException e) { logger.error("Error occured while launching the experiment...", e.getMessage()); throw new ExperimentNotFoundException(e); http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 3bffe23..d371d4c 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -47,11 +47,7 @@ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentD import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.error.LaunchValidationException; -import org.apache.airavata.model.workspace.experiment.Experiment; -import org.apache.airavata.model.workspace.experiment.ExperimentState; -import org.apache.airavata.model.workspace.experiment.ExperimentStatus; -import org.apache.airavata.model.workspace.experiment.TaskDetails; -import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.orchestrator.core.exception.OrchestratorException; import org.apache.airavata.orchestrator.cpi.OrchestratorService; import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants; @@ -188,29 +184,29 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, * @param experimentId */ public boolean launchExperiment(String experimentId) throws TException { - Experiment experiment = null; - try { - List<String> ids = registry.getIds( + Experiment experiment = null; // this will inside the bottom catch statement + try { + experiment = (Experiment) registry.get( + RegistryModelType.EXPERIMENT, experimentId); + if (experiment == null) { + log.error("Error retrieving the Experiment by the given experimentID: " + + experimentId); + return false; + } + List<String> ids = registry.getIds( RegistryModelType.WORKFLOW_NODE_DETAIL, WorkflowNodeConstants.EXPERIMENT_ID, experimentId); - for (String workflowNodeId : ids) { - WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry + for (String workflowNodeId : ids) { + WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry .get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId); - List<Object> taskDetailList = registry.get( + List<Object> taskDetailList = registry.get( RegistryModelType.TASK_DETAIL, TaskDetailConstants.NODE_ID, workflowNodeId); - for (Object o : taskDetailList) { - TaskDetails taskID = (TaskDetails) o; - // iterate through all the generated tasks and performs the - // job submisssion+monitoring - experiment = (Experiment) registry.get( - RegistryModelType.EXPERIMENT, experimentId); - if (experiment == null) { - log.error("Error retrieving the Experiment by the given experimentID: " - + experimentId); - return false; - } + for (Object o : taskDetailList) { + TaskDetails taskID = (TaskDetails) o; + // iterate through all the generated tasks and performs the + // job submisssion+monitoring ExperimentStatus status = new ExperimentStatus(); status.setExperimentState(ExperimentState.LAUNCHED); status.setTimeOfStateChange(Calendar.getInstance() @@ -309,13 +305,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, * @throws TException */ public boolean terminateExperiment(String experimentId) throws TException { - try { - orchestrator.cancelExperiment(experimentId); - } catch (OrchestratorException e) { - log.error("Error canceling experiment " + experimentId, e); - return false; - } - return true; + return validateStatesAndCancel(experimentId); } /** @@ -516,4 +506,109 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, String selectedModuleId=applicationModules.get(0); return selectedModuleId; } + + private boolean validateStatesAndCancel(String experimentId)throws TException{ + try { + Experiment experiment = (Experiment) registry.get( + RegistryModelType.EXPERIMENT, experimentId); + if (experiment == null) { + log.error("Error retrieving the Experiment by the given experimentID: " + + experimentId); + throw new OrchestratorException("Error retrieving the Experiment by the given experimentID:\n" + + experimentId); + } + switch (experiment.getExperimentStatus().getExperimentState()){ + case COMPLETED: + throw new OrchestratorException("Experiment is already finished cannot cancel the experiment"); + case CANCELED: + throw new OrchestratorException("Experiment is already canceled, cannot perform cancel again !!!"); + case CANCELING: + throw new OrchestratorException("Experiment is cancelling, cannot perform cancel again !!!!"); + case SUSPENDED: + throw new OrchestratorException("Experiment is suspended, cannot perform cancel !!!!"); + case FAILED: + throw new OrchestratorException("Experiment is failed,cannot perform cancel !!!!"); + case UNKNOWN: + throw new OrchestratorException("Experiment is inconsistent,cannot perform cancel, !!!!"); + } + + ExperimentStatus status = new ExperimentStatus(); + status.setExperimentState(ExperimentState.CANCELING); + status.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + experiment.setExperimentStatus(status); + registry.update(RegistryModelType.EXPERIMENT, experiment, + experimentId); + + List<String> ids = registry.getIds( + RegistryModelType.WORKFLOW_NODE_DETAIL, + WorkflowNodeConstants.EXPERIMENT_ID, experimentId); + for (String workflowNodeId : ids) { + WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry + .get(RegistryModelType.WORKFLOW_NODE_DETAIL, + workflowNodeId); + if (workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue() > 1) { + log.error(workflowNodeDetail.getNodeName() + " Workflow Node status cannot mark as cancelled, because " + + "current status is " + workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString()); + continue; // this continue is very useful not to process deeper loops if the upper layers have non-cancel states + }else { + WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); + workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING); + workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); + registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, + workflowNodeId); + } + List<Object> taskDetailList = registry.get( + RegistryModelType.TASK_DETAIL, + TaskDetailConstants.NODE_ID, workflowNodeId); + for (Object o : taskDetailList) { + TaskDetails taskDetails = (TaskDetails) o; + TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus(); + if(taskStatus.getExecutionState().getValue()>7){ + log.error(((TaskDetails) o).getTaskID() + " Task status cannot mark as cancelled, because " + + "current task state is "+((TaskDetails) o).getTaskStatus().getExecutionState().toString()); + continue;// this continue is very useful not to process deeper loops if the upper layers have non-cancel states + }else{ + taskStatus.setExecutionState(TaskState.CANCELING); + taskStatus.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + taskDetails.setTaskStatus(taskStatus); + registry.update(RegistryModelType.TASK_DETAIL, o, + taskDetails); + } + // iterate through all the generated tasks and performs the + // job submisssion+monitoring + // launching the experiment + orchestrator.cancelExperiment(experiment, + workflowNodeDetail, taskDetails, null); + taskStatus.setExecutionState(TaskState.CANCELED); + taskStatus.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + taskDetails.setTaskStatus(taskStatus); + registry.update(RegistryModelType.TASK_DETAIL, o, + taskDetails); + } + WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); + workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED); + workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); + registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, + workflowNodeId); + } + status = new ExperimentStatus(); + status.setExperimentState(ExperimentState.CANCELED); + status.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + experiment.setExperimentStatus(status); + registry.update(RegistryModelType.EXPERIMENT, experiment, + experimentId); + + } catch (Exception e) { + throw new TException(e); + } + return true; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java index ff6b5e8..cde0135 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java @@ -76,6 +76,10 @@ public class GFACEmbeddedJobSubmitter implements JobSubmitter { } } + public boolean terminate(String experimentID, String taskID) throws OrchestratorException { + return false; + } + public GFac getGfac() { return gfac; } http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java index a2c153e..0ca95ec 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java @@ -46,6 +46,8 @@ import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.naming.OperationNotSupportedException; + /* * this class is responsible for submitting a job to gfac in service mode, * it will select a gfac instance based on the incoming request and submit to that @@ -88,10 +90,9 @@ public class GFACServiceJobSubmitter implements JobSubmitter, Watcher { List<String> children = zk.getChildren(gfacServer, this); if (children.size() == 0) { - // Zookeeper data need cleaning - GfacService.Client localhost = GFacClientFactory.createGFacClient(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST), Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST))); - return localhost.submitJob(experimentID, taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME)); - } else { + // Zookeeper data need cleaning + throw new OrchestratorException("There is no active GFac instance to route the request"); + } else { String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); // here we are not using an index because the getChildren does not return the same order everytime String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); @@ -122,7 +123,11 @@ public class GFACServiceJobSubmitter implements JobSubmitter, Watcher { return false; } - synchronized public void process(WatchedEvent event) { + public boolean terminate(String experimentID, String taskID) throws OrchestratorException { + throw new OrchestratorException(new OperationNotSupportedException("terminate method is not yet implemented")); + } + + synchronized public void process(WatchedEvent event) { synchronized (mutex) { switch (event.getState()) { case SyncConnected: http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java index ff81ac7..6682fa1 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java @@ -63,4 +63,13 @@ public interface JobSubmitter { * @throws OrchestratorException */ boolean submit(String experimentID,String taskID,String tokenId) throws OrchestratorException; + + /** + * This can be used to terminate the experiment + * @param experimentID + * @param taskID + * @return + * @throws OrchestratorException + */ + boolean terminate(String experimentID,String taskID)throws OrchestratorException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java index ddda0f6..89393a8 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java @@ -74,10 +74,13 @@ public interface Orchestrator { * experimentID as the handler to the experiment, during the launchExperiment * We just have to give the experimentID * - * @param experimentID + * @param experiment + * @param workflowNode + * @param task + * @param tokenId * @throws OrchestratorException */ - void cancelExperiment(String experimentID) throws OrchestratorException; + void cancelExperiment(Experiment experiment, WorkflowNodeDetails workflowNode, TaskDetails task,String tokenId) throws OrchestratorException; //todo have to add another method to handle failed or jobs to be recovered by orchestrator //todo if you don't add these this is not an orchestrator, its just an intemediate component which invoke gfac http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index bf7af51..83dccf0 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -163,8 +163,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ } } - public void cancelExperiment(String experimentID) + public void cancelExperiment(Experiment experiment, WorkflowNodeDetails workflowNode, TaskDetails task,String tokenId) throws OrchestratorException { + throw new OrchestratorException(new OperationNotSupportedException()); }
