Repository: airavata Updated Branches: refs/heads/master 9ce24a559 -> ba8538ddf
Fixed issues with experiment cancel Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/44e5ed14 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/44e5ed14 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/44e5ed14 Branch: refs/heads/master Commit: 44e5ed14adbe756fcdcd7a541656dd968a61f642 Parents: 1302c30 Author: Shameera Rathanyaka <[email protected]> Authored: Fri Sep 18 16:21:06 2015 -0400 Committer: Shameera Rathanyaka <[email protected]> Committed: Fri Sep 18 16:21:06 2015 -0400 ---------------------------------------------------------------------- .../org/apache/airavata/gfac/impl/Factory.java | 8 +-- .../impl/watcher/CancelRequestWatcherImpl.java | 27 +++++++-- .../watcher/RedeliveryRequestWatcherImpl.java | 28 +++++++-- .../airavata/gfac/server/GfacServerHandler.java | 5 +- .../server/OrchestratorServerHandler.java | 63 +++++++++++++++++--- .../orchestrator/util/OrchestratorUtils.java | 6 ++ 6 files changed, 111 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java index bb53802..fbba17b 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java @@ -301,12 +301,12 @@ public abstract class Factory { return getMonitorService(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR); } - public static RedeliveryRequestWatcher getRedeliveryReqeustWatcher() { - return new RedeliveryRequestWatcherImpl(); + public static RedeliveryRequestWatcher getRedeliveryReqeustWatcher(String experimentId, String processId) { + return new RedeliveryRequestWatcherImpl(experimentId, processId); } - public static CancelRequestWatcher getCancelRequestWatcher() { - return new CancelRequestWatcherImpl(); + public static CancelRequestWatcher getCancelRequestWatcher(String experimentId, String processId) { + return new CancelRequestWatcherImpl(experimentId, processId); } public static Session getSSHSession(AuthenticationInfo authenticationInfo, ServerInfo serverInfo) throws AiravataException { http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java index 58d2817..ba87587 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java @@ -33,6 +33,13 @@ import org.slf4j.LoggerFactory; public class CancelRequestWatcherImpl implements CancelRequestWatcher { private static final Logger log = LoggerFactory.getLogger(CancelRequestWatcherImpl.class); + private final String processId; + private final String experimentId; + + public CancelRequestWatcherImpl(String experimentId, String processId) { + this.experimentId = experimentId; + this.processId = processId; + } @Override public void process(WatchedEvent watchedEvent) throws Exception { @@ -43,15 +50,15 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher { switch (type) { case NodeDataChanged: byte[] bytes = curatorClient.getData().forPath(path); - String processId = path.substring(path.lastIndexOf("/") + 1); String action = new String(bytes); if (action.equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) { ProcessContext processContext = Factory.getGfacContext().getProcess(processId); if (processContext != null) { processContext.setCancel(true); - log.info("procesId : {}, Cancelling process", processId); + log.info("expId {}, processId : {}, Cancelling process", experimentId, processId); } else { - log.info("Cancel request came for processId {} but couldn't find process context"); + log.info("expId: {}, Cancel request came for processId {} but couldn't find process context", + experimentId, processId); } } else { curatorClient.getData().usingWatcher(this).forPath(path); @@ -59,14 +66,24 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher { break; case NodeDeleted: //end of experiment execution, ignore this event + log.info("expId: {}, cancel watcher trigger for process {} with event type {}", experimentId, + processId, type.name()); break; case NodeCreated: case NodeChildrenChanged: case None: - curatorClient.getData().usingWatcher(this).forPath(path); + log.info("expId: {}, Cancel watcher trigger for process {} with event type {}", experimentId, + processId, type.name()); + if (path != null) { + curatorClient.getData().usingWatcher(this).forPath(path); + } break; default: - curatorClient.getData().usingWatcher(this).forPath(path); + log.info("expId: {}, Cancel watcher trigger for process {} with event type {}", experimentId, + processId, type.name()); + if (path != null) { + curatorClient.getData().usingWatcher(this).forPath(path); + } break; } } http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java index dc5317f..8341855 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java @@ -32,6 +32,13 @@ import org.slf4j.Logger; public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher { private static final Logger log = org.slf4j.LoggerFactory.getLogger(RedeliveryRequestWatcherImpl.class); + private final String processId; + private final String experimentId; + + public RedeliveryRequestWatcherImpl(String experimentId, String procesId) { + this.experimentId = experimentId; + this.processId = procesId; + } @Override public void process(WatchedEvent watchedEvent) throws Exception { @@ -43,31 +50,40 @@ public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher { case NodeDataChanged: byte[] bytes = curatorClient.getData().forPath(path); String serverName = new String(bytes); - String processId = path.substring(path.lastIndexOf("/") + 1); if (ServerSettings.getGFacServerName().trim().equals(serverName)) { curatorClient.getData().usingWatcher(this).forPath(path); - log.info("processId: {}, change data with same server name : {}" , processId, serverName); + log.info("processId: {},event type {}, change data with same server name : {}", processId, + eventType, serverName); } else { ProcessContext processContext = Factory.getGfacContext().getProcess(processId); if (processContext != null) { processContext.setHandOver(true); - log.info("procesId : {}, handing over to new server instance : {}", processId, serverName); + log.info("processId : {}, event type {}, handing over to new server instance : {}", processId, + eventType, serverName); } else { - log.info("Redelivery request came for processId {} but couldn't find process context"); + log.info("Redelivery request came for processId {}, with event type {}, but couldn't find " + + "process context", processId, eventType.name()); } } break; case NodeDeleted: //end of experiment execution, ignore this event + log.info("Redelivery watcher trigger for process {} with event type {}", processId, eventType.name()); break; case NodeCreated: case NodeChildrenChanged: case None: - curatorClient.getData().usingWatcher(this).forPath(path); + if (path != null) { + curatorClient.getData().usingWatcher(this).forPath(path); + log.info("Redelivery watcher trigger for process {} with event type {}", processId, eventType.name()); + } break; // not yet implemented default: - curatorClient.getData().usingWatcher(this).forPath(path); + if (path != null) { + curatorClient.getData().usingWatcher(this).forPath(path); + log.info("Redelivery watcher trigger for process {} with event type {}", processId, eventType.name()); + } break; } } http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 59b7a54..53665fd 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -312,7 +312,7 @@ public class GfacServerHandler implements GfacService.Iface { String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, processId); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath); curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes()); - curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher()).forPath(zkProcessNodePath); + curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher(experimentId, processId)).forPath(zkProcessNodePath); // create /experiments/{experimentId}/{processId}/deliveryTag node and set data - deliveryTag String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE); @@ -331,8 +331,7 @@ public class GfacServerHandler implements GfacService.Iface { // create /experiments/{experimentId}/cancel node and set watcher for data changes String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode); - curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath (experimentCancelNode); + curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId, processId)).forPath (experimentCancelNode); } http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 109faef..6c04ed7 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -154,7 +154,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { log.info("Couldn't identify the gateway Id using the credential token, Use default gateway Id"); // throw new AiravataException("Couldn't identify the gateway Id using the credential token"); } - ExperimentType executionType = experiment.getExperimentType(); + String experimentNodePath = GFacUtils.getExperimentNodePath (experimentId); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentNodePath); + String experimentCancelNode = ZKPaths.makePath(experimentNodePath, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode); + + ExperimentType executionType = experiment.getExperimentType(); if (executionType == ExperimentType.SINGLE_APPLICATION) { //its an single application execution experiment log.debug(experimentId, "Launching single application experiment {}.", experimentId); @@ -318,8 +323,14 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { if (stat != null) { curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST .getBytes()); + ExperimentStatus status = new ExperimentStatus(ExperimentState.CANCELING); + status.setReason("Experiment cancel request processed"); + status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + OrchestratorUtils.updageExperimentStatus(experimentId, status); + log.info("expId : " + experimentId + " :- Experiment status updated to " + status.getState()); + return true; } - return true; + return false; } private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException { @@ -419,8 +430,20 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { // case CREATED: // case VALIDATED: case STARTED: - status.setState(ExperimentState.EXECUTING); - status.setReason("process started"); + try { + ExperimentStatus stat = OrchestratorUtils.getExperimentStatus(processIdentity + .getExperimentId()); + if (stat.getState() == ExperimentState.CANCELING) { + status.setState(ExperimentState.CANCELING); + status.setReason("Process competed but experiment cancelling is triggered"); + } else { + status.setState(ExperimentState.EXECUTING); + status.setReason("process started"); + } + } catch (RegistryException e) { + status.setState(ExperimentState.EXECUTING); + status.setReason("process started"); + } break; // case PRE_PROCESSING: // break; @@ -433,12 +456,36 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { // case CANCELLING: // break; case COMPLETED: - status.setState(ExperimentState.COMPLETED); - status.setReason("process completed"); + try { + ExperimentStatus stat = OrchestratorUtils.getExperimentStatus(processIdentity + .getExperimentId()); + if (stat.getState() == ExperimentState.CANCELING) { + status.setState(ExperimentState.CANCELED); + status.setReason("Process competed but experiment cancelling is triggered"); + } else { + status.setState(ExperimentState.COMPLETED); + status.setReason("process completed"); + } + } catch (RegistryException e) { + status.setState(ExperimentState.COMPLETED); + status.setReason("process completed"); + } break; case FAILED: - status.setState(ExperimentState.FAILED); - status.setReason("process failed"); + try { + ExperimentStatus stat = OrchestratorUtils.getExperimentStatus(processIdentity + .getExperimentId()); + if (stat.getState() == ExperimentState.CANCELING) { + status.setState(ExperimentState.CANCELED); + status.setReason("Process failed but experiment cancelling is triggered"); + } else { + status.setState(ExperimentState.FAILED); + status.setReason("process failed"); + } + } catch (RegistryException e) { + status.setState(ExperimentState.FAILED); + status.setReason("process failed"); + } break; case CANCELED: status.setState(ExperimentState.CANCELED); http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java index 4d4b04e..834d3b6 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java @@ -40,4 +40,10 @@ public class OrchestratorUtils { log.error("expId : " + experimentId + " Error while updating experiment status to " + status.toString(), e); } } + + public static ExperimentStatus getExperimentStatus(String experimentId) throws RegistryException { + return ((ExperimentStatus) RegistryFactory.getDefaultExpCatalog().get(ExperimentCatalogModelType + .EXPERIMENT_STATUS, + experimentId)); + } }
