Repository: airavata
Updated Branches:
  refs/heads/master 404913acd -> d62a957db


ading initial version of terminate experiment


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1c554482
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1c554482
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1c554482

Branch: refs/heads/master
Commit: 1c554482790423daa242325d1167d5df8b326409
Parents: 538e37f
Author: lahiru <[email protected]>
Authored: Thu Aug 14 22:05:53 2014 +0530
Committer: lahiru <[email protected]>
Committed: Thu Aug 14 22:05:53 2014 +0530

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  |   6 +
 .../server/OrchestratorServerHandler.java       | 153 +++++++++++++++----
 .../core/impl/GFACEmbeddedJobSubmitter.java     |   4 +
 .../core/impl/GFACServiceJobSubmitter.java      |  15 +-
 .../orchestrator/core/job/JobSubmitter.java     |   9 ++
 .../airavata/orchestrator/cpi/Orchestrator.java |   7 +-
 .../cpi/impl/SimpleOrchestratorImpl.java        |   3 +-
 7 files changed, 160 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git 
a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
 
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 967577c..0729478 100644
--- 
a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ 
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -795,6 +795,12 @@ public class CreateLaunchExperiment {
             String sshTokenId = "61abd2ff-f92b-4901-a077-07b51abe2c5d";
             String gsisshTokenId = "61abd2ff-f92b-4901-a077-07b51abe2c5d";
             client.launchExperiment(expId, sshTokenId);
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            client.terminateExperiment(expId);
         } catch (ExperimentNotFoundException e) {
             logger.error("Error occured while launching the experiment...", 
e.getMessage());
             throw new ExperimentNotFoundException(e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 3bffe23..d371d4c 100644
--- 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -47,11 +47,7 @@ import 
org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentD
 import 
org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
 import 
org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
 import org.apache.airavata.model.error.LaunchValidationException;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.cpi.OrchestratorService;
 import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants;
@@ -188,29 +184,29 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface,
         * @param experimentId
         */
        public boolean launchExperiment(String experimentId) throws TException {
-               Experiment experiment = null;
-               try {
-                       List<String> ids = registry.getIds(
+        Experiment experiment = null; // this will inside the bottom catch 
statement
+        try {
+            experiment = (Experiment) registry.get(
+                    RegistryModelType.EXPERIMENT, experimentId);
+            if (experiment == null) {
+                log.error("Error retrieving the Experiment by the given 
experimentID: "
+                        + experimentId);
+                return false;
+            }
+            List<String> ids = registry.getIds(
                                        RegistryModelType.WORKFLOW_NODE_DETAIL,
                                        WorkflowNodeConstants.EXPERIMENT_ID, 
experimentId);
-                       for (String workflowNodeId : ids) {
-                               WorkflowNodeDetails workflowNodeDetail = 
(WorkflowNodeDetails) registry
+            for (String workflowNodeId : ids) {
+                WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) 
registry
                                                
.get(RegistryModelType.WORKFLOW_NODE_DETAIL,
                                                                workflowNodeId);
-                               List<Object> taskDetailList = registry.get(
+                List<Object> taskDetailList = registry.get(
                                                RegistryModelType.TASK_DETAIL,
                                                TaskDetailConstants.NODE_ID, 
workflowNodeId);
-                               for (Object o : taskDetailList) {
-                                       TaskDetails taskID = (TaskDetails) o;
-                                       // iterate through all the generated 
tasks and performs the
-                                       // job submisssion+monitoring
-                                       experiment = (Experiment) registry.get(
-                                                       
RegistryModelType.EXPERIMENT, experimentId);
-                                       if (experiment == null) {
-                                               log.error("Error retrieving the 
Experiment by the given experimentID: "
-                                                               + experimentId);
-                                               return false;
-                                       }
+                for (Object o : taskDetailList) {
+                    TaskDetails taskID = (TaskDetails) o;
+                    // iterate through all the generated tasks and performs the
+                    // job submisssion+monitoring
                                        ExperimentStatus status = new 
ExperimentStatus();
                                        
status.setExperimentState(ExperimentState.LAUNCHED);
                                        
status.setTimeOfStateChange(Calendar.getInstance()
@@ -309,13 +305,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface,
         * @throws TException
         */
        public boolean terminateExperiment(String experimentId) throws 
TException {
-               try {
-                       orchestrator.cancelExperiment(experimentId);
-               } catch (OrchestratorException e) {
-                       log.error("Error canceling experiment " + experimentId, 
e);
-                       return false;
-               }
-               return true;
+        return validateStatesAndCancel(experimentId);
        }
 
        /**
@@ -516,4 +506,109 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface,
                String selectedModuleId=applicationModules.get(0);
                return selectedModuleId;
        }
+
+    private boolean validateStatesAndCancel(String experimentId)throws 
TException{
+        try {
+            Experiment experiment = (Experiment) registry.get(
+                    RegistryModelType.EXPERIMENT, experimentId);
+            if (experiment == null) {
+                log.error("Error retrieving the Experiment by the given 
experimentID: "
+                        + experimentId);
+                throw new OrchestratorException("Error retrieving the 
Experiment by the given experimentID:\n" +
+                        experimentId);
+            }
+            switch (experiment.getExperimentStatus().getExperimentState()){
+                case COMPLETED:
+                    throw new OrchestratorException("Experiment is already 
finished cannot cancel the experiment");
+                case CANCELED:
+                    throw new OrchestratorException("Experiment is already 
canceled, cannot perform cancel again !!!");
+                case CANCELING:
+                    throw new OrchestratorException("Experiment is  
cancelling, cannot perform cancel again !!!!");
+                case SUSPENDED:
+                    throw new OrchestratorException("Experiment is  suspended, 
cannot perform cancel !!!!");
+                case FAILED:
+                    throw new OrchestratorException("Experiment is  
failed,cannot perform cancel !!!!");
+                case UNKNOWN:
+                    throw new OrchestratorException("Experiment is 
inconsistent,cannot perform cancel, !!!!");
+            }
+
+            ExperimentStatus status = new ExperimentStatus();
+            status.setExperimentState(ExperimentState.CANCELING);
+            status.setTimeOfStateChange(Calendar.getInstance()
+                    .getTimeInMillis());
+            experiment.setExperimentStatus(status);
+            registry.update(RegistryModelType.EXPERIMENT, experiment,
+                    experimentId);
+
+            List<String> ids = registry.getIds(
+                    RegistryModelType.WORKFLOW_NODE_DETAIL,
+                    WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
+            for (String workflowNodeId : ids) {
+                WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) 
registry
+                        .get(RegistryModelType.WORKFLOW_NODE_DETAIL,
+                                workflowNodeId);
+                if 
(workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue() > 
1) {
+                    log.error(workflowNodeDetail.getNodeName() + " Workflow 
Node status cannot mark as cancelled, because " +
+                            "current status is " + 
workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString());
+                    continue; // this continue is very useful not to process 
deeper loops if the upper layers have non-cancel states
+                }else {
+                    WorkflowNodeStatus workflowNodeStatus = new 
WorkflowNodeStatus();
+                    
workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING);
+                    
workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
+                            .getTimeInMillis());
+                    
workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
+                    registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, 
workflowNodeDetail,
+                            workflowNodeId);
+                }
+                List<Object> taskDetailList = registry.get(
+                        RegistryModelType.TASK_DETAIL,
+                        TaskDetailConstants.NODE_ID, workflowNodeId);
+                for (Object o : taskDetailList) {
+                    TaskDetails taskDetails = (TaskDetails) o;
+                    TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus();
+                    if(taskStatus.getExecutionState().getValue()>7){
+                        log.error(((TaskDetails) o).getTaskID() + " Task 
status cannot mark as cancelled, because " +
+                                "current task state is "+((TaskDetails) 
o).getTaskStatus().getExecutionState().toString());
+                        continue;// this continue is very useful not to 
process deeper loops if the upper layers have non-cancel states
+                    }else{
+                        taskStatus.setExecutionState(TaskState.CANCELING);
+                        taskStatus.setTimeOfStateChange(Calendar.getInstance()
+                                .getTimeInMillis());
+                        taskDetails.setTaskStatus(taskStatus);
+                        registry.update(RegistryModelType.TASK_DETAIL, o,
+                                taskDetails);
+                    }
+                    // iterate through all the generated tasks and performs the
+                    // job submisssion+monitoring
+                    // launching the experiment
+                    orchestrator.cancelExperiment(experiment,
+                            workflowNodeDetail, taskDetails, null);
+                    taskStatus.setExecutionState(TaskState.CANCELED);
+                    taskStatus.setTimeOfStateChange(Calendar.getInstance()
+                            .getTimeInMillis());
+                    taskDetails.setTaskStatus(taskStatus);
+                    registry.update(RegistryModelType.TASK_DETAIL, o,
+                            taskDetails);
+                }
+                WorkflowNodeStatus workflowNodeStatus = new 
WorkflowNodeStatus();
+                
workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED);
+                workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance()
+                        .getTimeInMillis());
+                workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus);
+                registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, 
workflowNodeDetail,
+                        workflowNodeId);
+            }
+            status = new ExperimentStatus();
+            status.setExperimentState(ExperimentState.CANCELED);
+            status.setTimeOfStateChange(Calendar.getInstance()
+                    .getTimeInMillis());
+            experiment.setExperimentStatus(status);
+            registry.update(RegistryModelType.EXPERIMENT, experiment,
+                    experimentId);
+
+        } catch (Exception e) {
+            throw new TException(e);
+        }
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
index ff6b5e8..cde0135 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
@@ -76,6 +76,10 @@ public class GFACEmbeddedJobSubmitter implements 
JobSubmitter {
         }
     }
 
+    public boolean terminate(String experimentID, String taskID) throws 
OrchestratorException {
+        return false;
+    }
+
     public GFac getGfac() {
         return gfac;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index a2c153e..0ca95ec 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -46,6 +46,8 @@ import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.naming.OperationNotSupportedException;
+
 /*
  * this class is responsible for submitting a job to gfac in service mode,
  * it will select a gfac instance based on the incoming request and submit to 
that
@@ -88,10 +90,9 @@ public class GFACServiceJobSubmitter implements 
JobSubmitter, Watcher {
                        List<String> children = zk.getChildren(gfacServer, 
this);
                        
                        if (children.size() == 0) {
-                               // Zookeeper data need cleaning
-                               GfacService.Client localhost = 
GFacClientFactory.createGFacClient(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST),
 Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)));
-                               return localhost.submitJob(experimentID, 
taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME));
-                       } else {
+                // Zookeeper data need cleaning
+                throw new OrchestratorException("There is no active GFac 
instance to route the request");
+            } else {
                                String pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
                                // here we are not using an index because the 
getChildren does not return the same order everytime
                                String gfacNodeData = new 
String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
@@ -122,7 +123,11 @@ public class GFACServiceJobSubmitter implements 
JobSubmitter, Watcher {
                return false;
        }
 
-       synchronized public void process(WatchedEvent event) {
+    public boolean terminate(String experimentID, String taskID) throws 
OrchestratorException {
+        throw new OrchestratorException(new 
OperationNotSupportedException("terminate method is not yet implemented"));
+    }
+
+    synchronized public void process(WatchedEvent event) {
                synchronized (mutex) {
                        switch (event.getState()) {
                        case SyncConnected:

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
index ff81ac7..6682fa1 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java
@@ -63,4 +63,13 @@ public interface JobSubmitter {
      * @throws OrchestratorException
      */
     boolean submit(String experimentID,String taskID,String tokenId) throws 
OrchestratorException;
+
+    /**
+     * This can be used to terminate the experiment
+     * @param experimentID
+     * @param taskID
+     * @return
+     * @throws OrchestratorException
+     */
+    boolean terminate(String experimentID,String taskID)throws 
OrchestratorException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
index ddda0f6..89393a8 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/Orchestrator.java
@@ -74,10 +74,13 @@ public interface Orchestrator {
      * experimentID as the handler to the experiment, during the 
launchExperiment
      * We just have to give the experimentID
      *
-     * @param experimentID
+     * @param experiment
+     * @param workflowNode
+     * @param task
+     * @param tokenId
      * @throws OrchestratorException
      */
-    void cancelExperiment(String experimentID) throws OrchestratorException;
+    void cancelExperiment(Experiment experiment, WorkflowNodeDetails 
workflowNode, TaskDetails task,String tokenId) throws OrchestratorException;
     //todo have to add another method to handle failed or jobs to be recovered 
by orchestrator
     //todo if you don't add these this is not an orchestrator, its just an 
intemediate component which invoke gfac
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/1c554482/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index bf7af51..83dccf0 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -163,8 +163,9 @@ public class SimpleOrchestratorImpl extends 
AbstractOrchestrator{
         }
     }
 
-    public void cancelExperiment(String experimentID)
+    public void cancelExperiment(Experiment experiment, WorkflowNodeDetails 
workflowNode, TaskDetails task,String tokenId)
                        throws OrchestratorException {
+
         throw new OrchestratorException(new OperationNotSupportedException());
     }
 

Reply via email to