Repository: airavata Updated Branches: refs/heads/master 3cab2d63a -> 011f01965
implementing cancel experiment method Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/011f0196 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/011f0196 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/011f0196 Branch: refs/heads/master Commit: 011f01965fb4b17953cf5edf7a3b3663a1501cd4 Parents: 3cab2d6 Author: lahiru <lah...@apache.org> Authored: Tue Aug 19 01:43:47 2014 +0530 Committer: lahiru <lah...@apache.org> Committed: Tue Aug 19 01:43:47 2014 +0530 ---------------------------------------------------------------------- .../AiravataExperimentStatusUpdator.java | 16 +- .../client/samples/CancelExperiments.java | 83 ++++++++ .../client/samples/CreateLaunchExperiment.java | 14 +- .../model/workspace/experiment/Experiment.java | 2 +- .../model/workspace/experiment/TaskStatus.java | 2 +- .../client/tools/DocumentCreatorNew.java | 2 +- .../airavata/common/utils/AiravataZKUtils.java | 8 + .../airavata/gfac/server/GfacServerHandler.java | 2 +- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 195 ++++++++++++------- .../org/apache/airavata/gfac/core/cpi/GFac.java | 3 +- .../apache/airavata/gfac/core/cpi/GFacImpl.java | 15 +- .../core/monitor/AiravataJobStatusUpdator.java | 7 +- .../core/monitor/AiravataTaskStatusUpdator.java | 13 +- .../airavata/gfac/core/utils/GFacUtils.java | 72 ++++++- .../gsissh/provider/impl/GSISSHProvider.java | 89 ++++++++- .../handlers/GridPullMonitorHandler.java | 18 +- .../server/OrchestratorServerHandler.java | 182 ++++++++++------- .../cpi/impl/SimpleOrchestratorImpl.java | 3 +- 18 files changed, 552 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java index ae3e67d..f033ba9 100644 --- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java +++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java @@ -81,26 +81,32 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, nodeStatus.getIdentity().getExperimentID())); updateExperimentStatus=(executionType==ExecutionType.SINGLE_APP); } - updateExperimentStatus(nodeStatus.getIdentity().getExperimentID(), state); - logger.debug("Publishing experiment status for "+nodeStatus.getIdentity().getExperimentID()+":"+state.toString()); + state = updateExperimentStatus(nodeStatus.getIdentity().getExperimentID(), state); + logger.debug("Publishing experiment status for "+nodeStatus.getIdentity().getExperimentID()+":"+state.toString()); monitorPublisher.publish(new ExperimentStatusChangedEvent(nodeStatus.getIdentity(), state)); } catch (Exception e) { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } } - public void updateExperimentStatus(String experimentId, ExperimentState state) throws Exception { - logger.info("Updating the experiment status of experiment: " + experimentId + " to " + state.toString()); + public ExperimentState updateExperimentStatus(String experimentId, ExperimentState state) throws Exception { Experiment details = (Experiment)airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId); if(details == null) { details = new Experiment(); details.setExperimentID(experimentId); } org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus(); - status.setExperimentState(state); status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); + if(!ExperimentState.CANCELED.equals(details.getExperimentStatus().getExperimentState())&& + !ExperimentState.CANCELING.equals(details.getExperimentStatus().getExperimentState())) { + status.setExperimentState(state); + }else{ + status.setExperimentState(details.getExperimentStatus().getExperimentState()); + } details.setExperimentStatus(status); + logger.info("Updating the experiment status of experiment: " + experimentId + " to " + status.getExperimentState().toString()); airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId); + return details.getExperimentStatus().getExperimentState(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CancelExperiments.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CancelExperiments.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CancelExperiments.java new file mode 100644 index 0000000..b1b1503 --- /dev/null +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CancelExperiments.java @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.client.samples; + +import org.apache.airavata.model.error.*; +import org.apache.airavata.api.Airavata; +import org.apache.airavata.api.client.AiravataClientFactory; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.String; + +public class CancelExperiments { + + //FIXME: Read from a config file + public static final String THRIFT_SERVER_HOST = "localhost"; + public static final int THRIFT_SERVER_PORT = 8930; + private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class); + private static final String DEFAULT_USER = "default.registry.user"; + private static final String DEFAULT_GATEWAY = "default.registry.gateway"; + private static Airavata.Client client; + + + + public static void main(String[] args) { + try { + AiravataUtils.setExecutionAsClient(); + client = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT); + + String expeId = "echoExperiment_06a24a33-6c39-4349-9a47-f16633a2152b"; + terminateExperiment(client,expeId); + +// System.out.println("retrieved exp id : " + experiment.getExperimentID()); + } catch (Exception e) { + logger.error("Error while connecting with server", e.getMessage()); + e.printStackTrace(); + } + } + + public static void terminateExperiment(Airavata.Client client, String expId) + throws TException { + try { + client.terminateExperiment(expId); + } catch (ExperimentNotFoundException e) { + logger.error("Error occured while launching the experiment...", e.getMessage()); + throw new ExperimentNotFoundException(e); + } catch (AiravataSystemException e) { + logger.error("Error occured while launching the experiment...", e.getMessage()); + throw new AiravataSystemException(e); + } catch (InvalidRequestException e) { + logger.error("Error occured while launching the experiment...", e.getMessage()); + throw new InvalidRequestException(e); + } catch (AiravataClientException e) { + logger.error("Error occured while launching the experiment...", e.getMessage()); + throw new AiravataClientException(e); + } catch (TException e) { + logger.error("Error occured while launching the experiment...", e.getMessage()); + throw new TException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 4317ce0..9c82eac 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -65,7 +65,7 @@ public class CreateLaunchExperiment { private static String localHostAppId = "localhost_3b5962d3-5e7e-4a97-9d1f-25c5ec436ba5,SimpleEcho0_44c34394-ca27-4fa9-bb2d-87f95a02352a"; private static String sshHostAppId; private static String pbsEchoAppId = "trestles.sdsc.edu_6dc7de3d-0d18-4933-bd96-b40c425f44c5,SimpleEcho2_8cc763c9-c57c-4a23-890c-4d3cee413c68"; - private static String pbsWRFAppId = "trestles.sdsc.edu_6ba292ab-573b-4b48-b72d-8006c8ced713,WRF_73b3356e-0596-4542-80b7-9048d9103cef"; + private static String pbsWRFAppId = "trestles.sdsc.edu_66caf560-497d-4316-b1e9-adadc5817b20,WRF_61490c54-dacf-4bc4-874c-ad5909a65afb"; private static String slurmAppId = "stampede.tacc.xsede.org_b2ef59cb-f626-4767-9ca0-601f94c42ba4,SimpleEcho3_b81c2559-a088-42a3-84ce-40119d874918"; private static String sgeAppId; private static String br2EchoAppId = "bigred2_9c1e6be8-f7d8-4494-98f2-bf508790e8c6,SimpleEchoBR_149fd613-98e2-46e7-ac7c-4d393349469e"; @@ -82,14 +82,14 @@ public class CreateLaunchExperiment { client = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT); System.out.println("API version is " + client.getAPIVersion()); // getExperiment(client, "WRFExperiment_2a2de26c-7f74-47c9-8e14-40e50dedfe0f"); -// addDescriptors(); + addDescriptors(); //// final String expId = createExperimentForSSHHost(airavata); -// final String expId = createExperimentForTrestles(client); + final String expId = createExperimentForTrestles(client); //// final String expId = createExperimentForStampede(client); // final String expId = createExperimentForLocalHost(client); // final String expId = createExperimentForLonestar(airavata); - final String expId = createExperimentWRFTrestles(client); +// final String expId = createExperimentWRFTrestles(client); // final String expId = createExperimentForBR2(client); // final String expId = createExperimentForBR2Amber(client); // final String expId = createExperimentWRFStampede(client); @@ -253,17 +253,17 @@ public class CreateLaunchExperiment { DataObjectType input = new DataObjectType(); input.setKey("WRF_Namelist"); input.setType(DataType.URI); - input.setValue("/Users/chathuri/Downloads/wrf_sample_inputs/namelist.input"); + input.setValue("/Users/lahirugunathilake/Downloads/wrf_sample_inputs/namelist.input"); DataObjectType input1 = new DataObjectType(); input1.setKey("WRF_Input_File"); input1.setType(DataType.URI); - input1.setValue("/Users/chathuri/Downloads/wrf_sample_inputs/wrfinput_d01"); + input1.setValue("/Users/lahirugunathilake/Downloads/wrf_sample_inputs/wrfinput_d01"); DataObjectType input2 = new DataObjectType(); input2.setKey("WRF_Boundary_File"); input2.setType(DataType.URI); - input2.setValue("/Users/chathuri/Downloads/wrf_sample_inputs/wrfbdy_d01"); + input2.setValue("/Users/lahirugunathilake/Downloads/wrf_sample_inputs/wrfbdy_d01"); exInputs.add(input); exInputs.add(input1); http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/workspace/experiment/Experiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/workspace/experiment/Experiment.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/workspace/experiment/Experiment.java index 7b674cd..ff003ec 100644 --- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/workspace/experiment/Experiment.java +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/workspace/experiment/Experiment.java @@ -757,7 +757,7 @@ import org.slf4j.LoggerFactory; } public void setExperimentStatus(ExperimentStatus experimentStatus) { - this.experimentStatus = experimentStatus; + this.experimentStatus = experimentStatus; } public void unsetExperimentStatus() { http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/workspace/experiment/TaskStatus.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/workspace/experiment/TaskStatus.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/workspace/experiment/TaskStatus.java index 6a51eca..bbb19e3 100644 --- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/workspace/experiment/TaskStatus.java +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/workspace/experiment/TaskStatus.java @@ -189,7 +189,7 @@ import org.slf4j.LoggerFactory; * @see TaskState */ public void setExecutionState(TaskState executionState) { - this.executionState = executionState; + this.executionState = executionState; } public void unsetExecutionState() { http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreatorNew.java ---------------------------------------------------------------------- diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreatorNew.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreatorNew.java index 5dd487c..f686847 100644 --- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreatorNew.java +++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreatorNew.java @@ -320,7 +320,7 @@ public class DocumentCreatorNew { application.setApplicationInterfaceId(client.registerApplicationInterface(application)); - ApplicationDeploymentDescription deployment = DocumentCreatorUtils.createApplicationDeployment(host.getComputeResourceId(), module1.getAppModuleId(), "/bin/echo", ApplicationParallelismType.SERIAL, "Echo application"); + ApplicationDeploymentDescription deployment = DocumentCreatorUtils.createApplicationDeployment(host.getComputeResourceId(), module1.getAppModuleId(), "/home/ogce/echo.sh", ApplicationParallelismType.SERIAL, "Echo application"); deployment.setAppDeploymentId(client.registerApplicationDeployment(deployment)); client.addGatewayComputeResourcePreference(getGatewayResourceProfile().getGatewayID(), host.getComputeResourceId(), DocumentCreatorUtils.createComputeResourcePreference(host.getComputeResourceId(), "/oasis/scratch/trestles/ogce/temp_project/", "sds128", false, null, null, null)); http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java index 4347bd2..f91fc3c 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java @@ -87,6 +87,14 @@ public class AiravataZKUtils { return null; } + public static int getExpStateValueWithGivenPath(ZooKeeper zk,String fullPath)throws ApplicationSettingsException, + KeeperException, InterruptedException { + Stat exists = zk.exists(fullPath, false); + if (exists != null) { + return Integer.parseInt(new String(zk.getData(fullPath, false, exists))); + } + return -1; + } public static List<String> getRunningGfacNodeNames(ZooKeeper zk) throws KeeperException, InterruptedException { String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_API_SERVER_NODE, "/gfac-server"); return zk.getChildren(gfacServer, null); http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 01aa665..4d59b32 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -206,7 +206,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ logger.info("GFac Recieved the Experiment: " + experimentId + " TaskId: " + taskId); GFac gfac = getGfac(); try { - return gfac.submitJob(experimentId, taskId, ServerSettings.getSetting(Constants.GATEWAY_NAME)); + return gfac.cancel(experimentId, taskId, ServerSettings.getSetting(Constants.GATEWAY_NAME)); } catch (Exception e) { throw new TException("Error launching the experiment : " + e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 7a12043..624f9ab 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -81,12 +81,7 @@ import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission; import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; -import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; -import org.apache.airavata.model.workspace.experiment.DataObjectType; -import org.apache.airavata.model.workspace.experiment.Experiment; -import org.apache.airavata.model.workspace.experiment.JobState; -import org.apache.airavata.model.workspace.experiment.TaskDetails; -import org.apache.airavata.model.workspace.experiment.TaskState; +import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.registry.api.AiravataRegistry2; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.registry.cpi.RegistryModelType; @@ -103,9 +98,8 @@ import org.apache.airavata.schemas.gfac.ProjectAccountType; import org.apache.airavata.schemas.gfac.QueueType; import org.apache.airavata.schemas.gfac.SSHHostType; import org.apache.airavata.schemas.gfac.ServiceDescriptionType; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZKUtil; -import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.*; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; @@ -114,7 +108,7 @@ import org.xml.sax.SAXException; * This is the GFac CPI class for external usage, this simply have a single method to submit a job to * the resource, required data for the job has to be stored in registry prior to invoke this object. */ -public class BetterGfacImpl implements GFac { +public class BetterGfacImpl implements GFac,Watcher { private static final Logger log = LoggerFactory.getLogger(GFacImpl.class); public static final String ERROR_SENT = "ErrorSent"; @@ -126,6 +120,8 @@ public class BetterGfacImpl implements GFac { private ZooKeeper zk; // we are not storing zk instance in to jobExecution context + private static Integer mutex = new Integer(-1); + private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>(); private static File gfacConfigFile; @@ -134,6 +130,8 @@ public class BetterGfacImpl implements GFac { private static MonitorPublisher monitorPublisher; + private boolean cancelled = false; + /** * Constructor for GFac * @@ -256,6 +254,7 @@ public class BetterGfacImpl implements GFac { //Fetch the Task details for the requested experimentID from the registry. Extract required pointers from the Task object. TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID); + String applicationInterfaceId = taskData.getApplicationId(); String applicationDeploymentId = taskData.getApplicationDeploymentId(); if (null == applicationInterfaceId) { @@ -463,6 +462,11 @@ public class BetterGfacImpl implements GFac { jobExecutionContext.setTaskData(taskData); jobExecutionContext.setGatewayID(gatewayID); + + List<JobDetails> jobDetailsList = taskData.getJobDetailsList(); + for(JobDetails jDetails:jobDetailsList){ + jobExecutionContext.setJobDetails(jDetails); + } // setting the registry jobExecutionContext.setRegistry(registry); @@ -488,10 +492,13 @@ public class BetterGfacImpl implements GFac { return jobExecutionContext; } - public boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException { + private boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException { // We need to check whether this job is submitted as a part of a large workflow. If yes, // we need to setup workflow tracking listerner. try { + String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk); + Stat exists = zk.exists(experimentEntry + File.separator + "operation", false); + zk.getData(experimentEntry + File.separator + "operation", this,exists); int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status @@ -526,11 +533,24 @@ public class BetterGfacImpl implements GFac { return true; } - public boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException { + public boolean cancel(String experimentID, String taskID, String gatewayID) throws GFacException { + JobExecutionContext jobExecutionContext = null; + try { + jobExecutionContext = createJEC(experimentID, taskID, gatewayID); + return cancel(jobExecutionContext); + } catch (Exception e) { + log.error("Error inovoking the job with experiment ID: " + experimentID); + throw new GFacException(e); + } + } + + private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException { // We need to check whether this job is submitted as a part of a large workflow. If yes, - // we need to setup workflow tracking listerner. + // we need to setup workflow tracking listener. try { - int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment + // we cannot call GFacUtils.getZKExperimentStateValue because experiment might be running in some other node + String expPath = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk); + int stateVal = GFacUtils.getZKExperimentStateValue(zk, expPath); // this is the original state came, if we query again it might be different,so we preserve this state in the environment monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status String workflowInstanceID = null; @@ -546,12 +566,14 @@ public class BetterGfacImpl implements GFac { log.info("Job is not yet submitted, so nothing much to do except changing the registry entry " + " and stop the execution chain"); //todo update registry and find a way to stop the execution chain + GFacUtils.setExperimentCancel(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk); } else if (stateVal >= 8) { - log.info("There is nothing to recover in this job so we do not re-submit"); + log.error("This experiment is almost finished, so cannot cancel this experiment"); ZKUtil.deleteRecursive(zk, AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID())); } else { // Now we know this is an old Job, so we have to handle things gracefully + GFacUtils.setExperimentCancel(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk); log.info("Job is in a position to perform a proper cancellation"); try { Scheduler.schedule(jobExecutionContext); @@ -686,19 +708,29 @@ public class BetterGfacImpl implements GFac { // here we do not skip handler if some handler does not have to be // run again during re-run it can implement // that logic in to the handler - invokeInFlowHandlers(jobExecutionContext); // to keep the - // consistency we always - // try to re-run to - // avoid complexity - // if (experimentID != null){ + if (!isCancelled()) { + invokeInFlowHandlers(jobExecutionContext); // to keep the + // consistency we always + // try to re-run to + // avoid complexity + }else{ + log.info("Experiment is cancelled, so launch operation is stopping immediately"); + return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned + } + // if (experimentID != null){ // registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE); // } // After executing the in handlers provider instance should be set // to job execution context. // We get the provider instance and execute it. - invokeProviderExecute(jobExecutionContext); - } catch (Exception e) { + if (!isCancelled()) { + invokeProviderExecute(jobExecutionContext); + } else { + log.info("Experiment is cancelled, so launch operation is stopping immediately"); + return; + } + } catch (Exception e) { try { // we make the experiment as failed due to exception scenario monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED)); @@ -869,26 +901,31 @@ public class BetterGfacImpl implements GFac { monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.INHANDLERSINVOKING)); for (GFacHandlerConfig handlerClassName : handlers) { - Class<? extends GFacHandler> handlerClass; - GFacHandler handler; - try { - GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName()); - handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class); - handler = handlerClass.newInstance(); - handler.initProperties(handlerClassName.getProperties()); - } catch (ClassNotFoundException e) { - throw new GFacException("Cannot load handler class " + handlerClassName, e); - } catch (InstantiationException e) { - throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); - } catch (IllegalAccessException e) { - throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); - } - try { - handler.invoke(jobExecutionContext); - GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED); - // if exception thrown before that we do not make it finished - } catch (GFacHandlerException e) { - throw new GFacException("Error Executing a InFlow Handler", e.getCause()); + if(!isCancelled()) { + Class<? extends GFacHandler> handlerClass; + GFacHandler handler; + try { + GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName()); + handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class); + handler = handlerClass.newInstance(); + handler.initProperties(handlerClassName.getProperties()); + } catch (ClassNotFoundException e) { + throw new GFacException("Cannot load handler class " + handlerClassName, e); + } catch (InstantiationException e) { + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } catch (IllegalAccessException e) { + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } + try { + handler.invoke(jobExecutionContext); + GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED); + // if exception thrown before that we do not make it finished + } catch (GFacHandlerException e) { + throw new GFacException("Error Executing a InFlow Handler", e.getCause()); + } + }else{ + log.info("Experiment execution is cancelled, so InHandler invocation is going to stop"); + break; } } monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) @@ -911,35 +948,39 @@ public class BetterGfacImpl implements GFac { log.error("Error constructing job execution context during outhandler invocation"); throw new GFacException(e); } - launch(jobExecutionContext); } monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING)); for (GFacHandlerConfig handlerClassName : handlers) { - Class<? extends GFacHandler> handlerClass; - GFacHandler handler; - try { - GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName()); - handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class); - handler = handlerClass.newInstance(); - handler.initProperties(handlerClassName.getProperties()); - } catch (ClassNotFoundException e) { - log.error(e.getMessage()); - throw new GFacException("Cannot load handler class " + handlerClassName, e); - } catch (InstantiationException e) { - log.error(e.getMessage()); - throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); - } catch (IllegalAccessException e) { - log.error(e.getMessage()); - throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); - } catch (Exception e) { - throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); - } - try { - handler.invoke(jobExecutionContext); - GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED); - } catch (Exception e) { - // TODO: Better error reporting. - throw new GFacException("Error Executing a OutFlow Handler", e); + if(!isCancelled()) { + Class<? extends GFacHandler> handlerClass; + GFacHandler handler; + try { + GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName()); + handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class); + handler = handlerClass.newInstance(); + handler.initProperties(handlerClassName.getProperties()); + } catch (ClassNotFoundException e) { + log.error(e.getMessage()); + throw new GFacException("Cannot load handler class " + handlerClassName, e); + } catch (InstantiationException e) { + log.error(e.getMessage()); + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } catch (IllegalAccessException e) { + log.error(e.getMessage()); + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } catch (Exception e) { + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } + try { + handler.invoke(jobExecutionContext); + GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED); + } catch (Exception e) { + // TODO: Better error reporting. + throw new GFacException("Error Executing a OutFlow Handler", e); + } + }else{ + log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop"); + break; } monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED)); } @@ -1128,4 +1169,22 @@ public class BetterGfacImpl implements GFac { public void setZk(ZooKeeper zk) { this.zk = zk; } + + + public boolean isCancelled() { + return cancelled; + } + + public void setCancelled(boolean cancelled) { + this.cancelled = cancelled; + } + + public void process(WatchedEvent watchedEvent) { + if(Event.EventType.NodeDataChanged.equals(watchedEvent.getType())){ + // node data is changed, this means node is cancelled. + System.out.println(watchedEvent.getPath()); + System.out.println("Experiment is cancelled with this path"); + this.cancelled = true; + } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java index 79f5a0b..10656b7 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java @@ -55,10 +55,9 @@ public interface GFac { /** * This operation can be used to cancel an already running experiment - * @param jobExecutionContext * @return Successful cancellation will return true * @throws GFacException */ - public boolean cancel(JobExecutionContext jobExecutionContext)throws GFacException; + public boolean cancel(String experimentID, String taskID, String gatewayID)throws GFacException; } http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java index bf27deb..6b057ed 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java @@ -290,7 +290,7 @@ public class GFacImpl implements GFac { return jobExecutionContext; } - public boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException { + private boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException { // We need to check whether this job is submitted as a part of a large workflow. If yes, // we need to setup workflow tracking listerner. String workflowInstanceID = null; @@ -306,7 +306,18 @@ public class GFacImpl implements GFac { } - public boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException { + public boolean cancel(String experimentID, String taskID, String gatewayID) throws GFacException { + JobExecutionContext jobExecutionContext = null; + try { + jobExecutionContext = createJEC(experimentID, taskID, gatewayID); + return cancel(jobExecutionContext); + } catch (Exception e) { + log.error("Error inovoking the job with experiment ID: " + experimentID); + throw new GFacException(e); + } + } + + private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException { // We need to check whether this job is submitted as a part of a large workflow. If yes, // we need to setup workflow tracking listerner. String workflowInstanceID = null; http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java index 80fa0d9..9d8cac9 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java @@ -73,17 +73,20 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { } public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception { - logger.debug("Updating job status for "+jobID+":"+state.toString()); CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID); JobDetails details = (JobDetails)airavataRegistry.get(RegistryModelType.JOB_DETAIL, ids); if(details == null) { details = new JobDetails(); } org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus(); - status.setJobState(state); + if(!JobState.CANCELED.equals(details.getJobStatus().getJobState())&& + !JobState.CANCELING.equals(details.getJobStatus().getJobState())) { + status.setJobState(state); + } status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); details.setJobStatus(status); details.setJobID(jobID); + logger.debug("Updating job status for "+jobID+":"+details.getJobStatus().toString()); airavataRegistry.update(RegistryModelType.JOB_DETAIL, details, ids); } http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java index 1e68bdc..6457435 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java @@ -88,7 +88,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { break; } try { - updateTaskStatus(jobStatus.getIdentity().getTaskId(), state); + state = updateTaskStatus(jobStatus.getIdentity().getTaskId(), state); logger.debug("Publishing task status for "+jobStatus.getIdentity().getTaskId()+":"+state.toString()); monitorPublisher.publish(new TaskStatusChangedEvent(jobStatus.getIdentity(),state)); } catch (Exception e) { @@ -96,18 +96,23 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener { } } - public void updateTaskStatus(String taskId, TaskState state) throws Exception { - logger.debug("Updating task status for "+taskId+":"+state.toString()); + public TaskState updateTaskStatus(String taskId, TaskState state) throws Exception { TaskDetails details = (TaskDetails)airavataRegistry.get(RegistryModelType.TASK_DETAIL, taskId); if(details == null) { details = new TaskDetails(); details.setTaskID(taskId); } org.apache.airavata.model.workspace.experiment.TaskStatus status = new org.apache.airavata.model.workspace.experiment.TaskStatus(); - status.setExecutionState(state); + if(!TaskState.CANCELED.equals(details.getTaskStatus().getExecutionState()) + && !TaskState.CANCELING.equals(details.getTaskStatus().getExecutionState())){ + status.setExecutionState(state); + } status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); details.setTaskStatus(status); + logger.debug("Updating task status for "+taskId+":"+details.getTaskStatus().toString()); + airavataRegistry.update(RegistryModelType.TASK_DETAIL, details, taskId); + return details.getTaskStatus().getExecutionState(); } public void setup(Object... configurations) { http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java index a265043..ed4dd14 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java @@ -29,14 +29,13 @@ import java.util.*; import org.apache.airavata.client.api.AiravataAPI; import org.apache.airavata.client.api.exception.AiravataAPIInvocationException; import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.AiravataZKUtils; -import org.apache.airavata.common.utils.DBUtil; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.common.utils.StringUtil; +import org.apache.airavata.common.utils.*; import org.apache.airavata.commons.gfac.type.ActualParameter; import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl; import org.apache.airavata.gfac.*; +import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.ExecutionMode; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.states.GfacExperimentState; @@ -879,6 +878,15 @@ public class GFacUtils { return Integer.parseInt(expState); } + public static int getZKExperimentStateValue(ZooKeeper zk,String fullPath)throws ApplicationSettingsException, + KeeperException, InterruptedException { + Stat exists = zk.exists(fullPath+File.separator+"state", false); + if (exists != null) { + return Integer.parseInt(new String(zk.getData(fullPath+File.separator+"state", false, exists))); + } + return -1; + } + public static boolean createPluginZnode(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className) throws ApplicationSettingsException, KeeperException, @@ -1037,6 +1045,8 @@ public class GFacUtils { .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.create(newExpNode + File.separator + "operation","submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); } else { // ohhh this node exists in some other failed gfac folder, we @@ -1099,7 +1109,59 @@ public class GFacUtils { return true; } - public static void savePluginData(JobExecutionContext jobExecutionContext, + public static String findExperimentEntry(String experimentID, + String taskID, ZooKeeper zk + ) throws KeeperException, + InterruptedException { + String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); + String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + List<String> children = zk.getChildren(gfacServer, false); + for(String pickedChild:children) { + String experimentPath = experimentNode + File.separator + pickedChild; + String newExpNode = experimentPath + File.separator + experimentID + + "+" + taskID; + Stat exists = zk.exists(newExpNode, false); + if(exists == null){ + continue; + }else{ + return newExpNode; + } + } + return null; + } + + public static void setExperimentCancel(String experimentId,String taskId,ZooKeeper zk)throws KeeperException, + InterruptedException { + String experimentEntry = GFacUtils.findExperimentEntry(experimentId, taskId, zk); + Stat operation = zk.exists(experimentEntry + File.separator + "operation", false); + if (operation == null) { // if there is no entry, this will come when a user immediately cancel a job + zk.create(experimentEntry + File.separator + "operation", "cancel".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } else { // if user submit the job to gfac then cancel during execution + zk.setData(experimentEntry + File.separator + "operation", "cancel".getBytes(), operation.getVersion()); + } + + } + public static boolean isCancelled(String experimentID, + String taskID, ZooKeeper zk + ) throws KeeperException, + InterruptedException { + String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk); + if(experimentEntry == null){ + return false; + }else { + Stat exists = zk.exists(experimentEntry, false); + if (exists != null) { + String operation = new String(zk.getData(experimentEntry, false, exists)); + if ("cancel".equals(operation)) { + return true; + } + } + } + return false; + } + + public static void savePluginData(JobExecutionContext jobExecutionContext, StringBuffer data, String className) throws GFacHandlerException { try { ZooKeeper zk = jobExecutionContext.getZk(); http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java index 90efaad..74975df 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java @@ -45,6 +45,7 @@ import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.airavata.schemas.gfac.HostDescriptionType; import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,12 +162,98 @@ public class GSISSHProvider extends AbstractRecoverableProvider { } } + public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException { + List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers(); + if (daemonHandlers == null) { + daemonHandlers = BetterGfacImpl.getDaemonHandlers(); + } + ThreadedHandler pullMonitorHandler = null; + ThreadedHandler pushMonitorHandler = null; + String monitorMode = host.getMonitorMode(); + for (ThreadedHandler threadedHandler : daemonHandlers) { + if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { + pullMonitorHandler = threadedHandler; + if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) { + log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID); + jobExecutionContext.setProperty("cancel","true"); + pullMonitorHandler.invoke(jobExecutionContext); + } else { + log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" + + " to handle by the GridPullMonitorHandler"); + } + } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { + pushMonitorHandler = threadedHandler; + if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) { + log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID); + pushMonitorHandler.invoke(jobExecutionContext); + } else { + log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" + + " to handle by the GridPushMonitorHandler"); + } + } + // have to handle the GridPushMonitorHandler logic + } + if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) { + log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" + + ", execution is configured as asynchronous, so Outhandler will not be invoked"); + } + } + public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { //To change body of implemented methods use File | Settings | File Templates. } - public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacException { + public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { //To change body of implemented methods use File | Settings | File Templates. + log.info("cancelling the job status in GSISSHProvider!!!!!"); + HostDescriptionType host = jobExecutionContext.getApplicationContext(). + getHostDescription().getType(); + StringBuffer data = new StringBuffer(); + JobDetails jobDetails = jobExecutionContext.getJobDetails(); + try { + Cluster cluster = null; + if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) { + }else { + GFACGSISSHUtils.addSecurityContext(jobExecutionContext); + } + cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster(); + if (cluster == null) { + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + // This installed path is a mandetory field, because this could change based on the computing resource + if(jobDetails == null) { + log.error("There is not JobDetails so cancelations cannot perform !!!"); + return; + } + if (jobDetails.getJobID() != null) { + cluster.cancelJob(jobDetails.getJobID()); + } else { + log.error("No Job Id is set, so cannot perform the cancel operation !!!"); + return; + } + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); + // we know this host is type GsiSSHHostType + } catch (SSHApiException e) { + String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + log.error(error); + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + throw new GFacProviderException(error, e); + } catch (Exception e) { + String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + log.error(error); + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + throw new GFacProviderException(error, e); + } finally { + log.info("Saving data for future recovery: "); + log.info(data.toString()); + GFacUtils.savePluginData(jobExecutionContext, data, this.getClass().getName()); + } } public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index 6468542..3899538 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -80,12 +80,24 @@ public class GridPullMonitorHandler extends ThreadedHandler { hpcPullMonitor.setGfac(jobExecutionContext.getGfac()); MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext); try { - CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID); + if ("true".equals(jobExecutionContext.getProperty("cancel"))) { + removeJobFromMonitoring(jobExecutionContext); + } else { + CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID); + } } catch (AiravataMonitorException e) { logger.error("Error adding monitorID object to the queue with experiment ", monitorID.getExperimentID()); } } + public void removeJobFromMonitoring(JobExecutionContext jobExecutionContext)throws GFacHandlerException { + MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(),jobExecutionContext); + try { + CommonUtils.removeMonitorFromQueue(hpcPullMonitor.getQueue(),monitorID); + } catch (AiravataMonitorException e) { + throw new GFacHandlerException(e); + } + } public AuthenticationInfo getAuthenticationInfo() { return authenticationInfo; } @@ -101,4 +113,8 @@ public class GridPullMonitorHandler extends ThreadedHandler { public void setHpcPullMonitor(HPCPullMonitor hpcPullMonitor) { this.hpcPullMonitor = hpcPullMonitor; } + + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + this.removeJobFromMonitoring(jobExecutionContext); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index d371d4c..8a9764c 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -305,6 +305,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, * @throws TException */ public boolean terminateExperiment(String experimentId) throws TException { + log.info("Experiment: " + experimentId + " is cancelling !!!!!"); return validateStatesAndCancel(experimentId); } @@ -517,94 +518,131 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, throw new OrchestratorException("Error retrieving the Experiment by the given experimentID:\n" + experimentId); } - switch (experiment.getExperimentStatus().getExperimentState()){ - case COMPLETED: - throw new OrchestratorException("Experiment is already finished cannot cancel the experiment"); - case CANCELED: - throw new OrchestratorException("Experiment is already canceled, cannot perform cancel again !!!"); - case CANCELING: - throw new OrchestratorException("Experiment is cancelling, cannot perform cancel again !!!!"); - case SUSPENDED: - throw new OrchestratorException("Experiment is suspended, cannot perform cancel !!!!"); - case FAILED: - throw new OrchestratorException("Experiment is failed,cannot perform cancel !!!!"); - case UNKNOWN: - throw new OrchestratorException("Experiment is inconsistent,cannot perform cancel, !!!!"); - } - - ExperimentStatus status = new ExperimentStatus(); - status.setExperimentState(ExperimentState.CANCELING); - status.setTimeOfStateChange(Calendar.getInstance() - .getTimeInMillis()); - experiment.setExperimentStatus(status); - registry.update(RegistryModelType.EXPERIMENT, experiment, - experimentId); - - List<String> ids = registry.getIds( - RegistryModelType.WORKFLOW_NODE_DETAIL, - WorkflowNodeConstants.EXPERIMENT_ID, experimentId); - for (String workflowNodeId : ids) { - WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry - .get(RegistryModelType.WORKFLOW_NODE_DETAIL, - workflowNodeId); - if (workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue() > 1) { - log.error(workflowNodeDetail.getNodeName() + " Workflow Node status cannot mark as cancelled, because " + - "current status is " + workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString()); - continue; // this continue is very useful not to process deeper loops if the upper layers have non-cancel states - }else { + ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState(); + if (experimentState.getValue()> 4 && experimentState.getValue()<10){ + throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: " + + experiment.getExperimentStatus().getExperimentState().toString()); + }else if(experimentState.getValue()<3){ + // when experiment status is < 3 no jobDetails object is created, + // so we don't have to worry, we simply have to change the status and stop the execution + ExperimentStatus status = new ExperimentStatus(); + status.setExperimentState(ExperimentState.CANCELED); + status.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + experiment.setExperimentStatus(status); + registry.update(RegistryModelType.EXPERIMENT, experiment, + experimentId); + List<String> ids = registry.getIds( + RegistryModelType.WORKFLOW_NODE_DETAIL, + WorkflowNodeConstants.EXPERIMENT_ID, experimentId); + for (String workflowNodeId : ids) { + WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry + .get(RegistryModelType.WORKFLOW_NODE_DETAIL, + workflowNodeId); WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); - workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING); + workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED); workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() .getTimeInMillis()); workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, workflowNodeId); - } - List<Object> taskDetailList = registry.get( - RegistryModelType.TASK_DETAIL, - TaskDetailConstants.NODE_ID, workflowNodeId); - for (Object o : taskDetailList) { - TaskDetails taskDetails = (TaskDetails) o; - TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus(); - if(taskStatus.getExecutionState().getValue()>7){ - log.error(((TaskDetails) o).getTaskID() + " Task status cannot mark as cancelled, because " + - "current task state is "+((TaskDetails) o).getTaskStatus().getExecutionState().toString()); - continue;// this continue is very useful not to process deeper loops if the upper layers have non-cancel states - }else{ - taskStatus.setExecutionState(TaskState.CANCELING); + List<Object> taskDetailList = registry.get( + RegistryModelType.TASK_DETAIL, + TaskDetailConstants.NODE_ID, workflowNodeId); + for (Object o : taskDetailList) { + TaskDetails taskDetails = (TaskDetails) o; + TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus(); + taskStatus.setExecutionState(TaskState.CANCELED); taskStatus.setTimeOfStateChange(Calendar.getInstance() .getTimeInMillis()); taskDetails.setTaskStatus(taskStatus); registry.update(RegistryModelType.TASK_DETAIL, o, taskDetails); } - // iterate through all the generated tasks and performs the - // job submisssion+monitoring - // launching the experiment - orchestrator.cancelExperiment(experiment, - workflowNodeDetail, taskDetails, null); - taskStatus.setExecutionState(TaskState.CANCELED); - taskStatus.setTimeOfStateChange(Calendar.getInstance() + } + }else { + + ExperimentStatus status = new ExperimentStatus(); + status.setExperimentState(ExperimentState.CANCELING); + status.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + experiment.setExperimentStatus(status); + registry.update(RegistryModelType.EXPERIMENT, experiment, + experimentId); + + List<String> ids = registry.getIds( + RegistryModelType.WORKFLOW_NODE_DETAIL, + WorkflowNodeConstants.EXPERIMENT_ID, experimentId); + for (String workflowNodeId : ids) { + WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry + .get(RegistryModelType.WORKFLOW_NODE_DETAIL, + workflowNodeId); + int value = workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue(); + if ( value> 1 && value < 7) { // we skip the unknown state + log.error(workflowNodeDetail.getNodeName() + " Workflow Node status cannot mark as cancelled, because " + + "current status is " + workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString()); + continue; // this continue is very useful not to process deeper loops if the upper layers have non-cancel states + } else { + WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); + workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING); + workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); + registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, + workflowNodeId); + } + List<Object> taskDetailList = registry.get( + RegistryModelType.TASK_DETAIL, + TaskDetailConstants.NODE_ID, workflowNodeId); + for (Object o : taskDetailList) { + TaskDetails taskDetails = (TaskDetails) o; + TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus(); + if (taskStatus.getExecutionState().getValue() > 7 && taskStatus.getExecutionState().getValue()<12) { + log.error(((TaskDetails) o).getTaskID() + " Task status cannot mark as cancelled, because " + + "current task state is " + ((TaskDetails) o).getTaskStatus().getExecutionState().toString()); + continue;// this continue is very useful not to process deeper loops if the upper layers have non-cancel states + } else { + taskStatus.setExecutionState(TaskState.CANCELING); + taskStatus.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + taskDetails.setTaskStatus(taskStatus); + registry.update(RegistryModelType.TASK_DETAIL, o, + taskDetails.getTaskID()); + } + // iterate through all the generated tasks and performs the + // job submisssion+monitoring + // launching the experiment + orchestrator.cancelExperiment(experiment, + workflowNodeDetail, taskDetails, null); + + // after performing gfac level cancel operation + // mark task cancelled + taskStatus.setExecutionState(TaskState.CANCELED); + taskStatus.setTimeOfStateChange(Calendar.getInstance() + .getTimeInMillis()); + taskDetails.setTaskStatus(taskStatus); + registry.update(RegistryModelType.TASK_DETAIL, o, + taskDetails.getTaskID()); + } + // mark workflownode cancelled + WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); + workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED); + workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() .getTimeInMillis()); - taskDetails.setTaskStatus(taskStatus); - registry.update(RegistryModelType.TASK_DETAIL, o, - taskDetails); + workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); + registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, + workflowNodeId); } - WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); - workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED); - workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() + // mark experiment cancelled + status = new ExperimentStatus(); + status.setExperimentState(ExperimentState.CANCELED); + status.setTimeOfStateChange(Calendar.getInstance() .getTimeInMillis()); - workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); - registry.update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, - workflowNodeId); + experiment.setExperimentStatus(status); + registry.update(RegistryModelType.EXPERIMENT, experiment, + experimentId); } - status = new ExperimentStatus(); - status.setExperimentState(ExperimentState.CANCELED); - status.setTimeOfStateChange(Calendar.getInstance() - .getTimeInMillis()); - experiment.setExperimentStatus(status); - registry.update(RegistryModelType.EXPERIMENT, experiment, - experimentId); + log.info("Experiment: " + experimentId + " is cancelled !!!!!"); } catch (Exception e) { throw new TException(e); http://git-wip-us.apache.org/repos/asf/airavata/blob/011f0196/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index 70930f4..0f43190 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -169,9 +169,10 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ if (jobState.getValue() > 4){ logger.error("Cannot cancel the job, because current job state is : " + jobState.toString() + "jobId: " + jobDetails.getJobID() + " Job Name: " + jobDetails.getJobName()); + return; } - jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID()); } + jobSubmitter.terminate(experiment.getExperimentID(),task.getTaskID()); }