Repository: airavata
Updated Branches:
  refs/heads/master 9ce24a559 -> ba8538ddf


Fixed issues with experiment cancel


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/44e5ed14
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/44e5ed14
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/44e5ed14

Branch: refs/heads/master
Commit: 44e5ed14adbe756fcdcd7a541656dd968a61f642
Parents: 1302c30
Author: Shameera Rathanyaka <[email protected]>
Authored: Fri Sep 18 16:21:06 2015 -0400
Committer: Shameera Rathanyaka <[email protected]>
Committed: Fri Sep 18 16:21:06 2015 -0400

----------------------------------------------------------------------
 .../org/apache/airavata/gfac/impl/Factory.java  |  8 +--
 .../impl/watcher/CancelRequestWatcherImpl.java  | 27 +++++++--
 .../watcher/RedeliveryRequestWatcherImpl.java   | 28 +++++++--
 .../airavata/gfac/server/GfacServerHandler.java |  5 +-
 .../server/OrchestratorServerHandler.java       | 63 +++++++++++++++++---
 .../orchestrator/util/OrchestratorUtils.java    |  6 ++
 6 files changed, 111 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index bb53802..fbba17b 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -301,12 +301,12 @@ public abstract class Factory {
                return 
getMonitorService(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR);
        }
 
-       public static RedeliveryRequestWatcher getRedeliveryReqeustWatcher() {
-               return new RedeliveryRequestWatcherImpl();
+       public static RedeliveryRequestWatcher 
getRedeliveryReqeustWatcher(String experimentId, String processId) {
+               return new RedeliveryRequestWatcherImpl(experimentId, 
processId);
        }
 
-       public static CancelRequestWatcher getCancelRequestWatcher() {
-               return new CancelRequestWatcherImpl();
+       public static CancelRequestWatcher getCancelRequestWatcher(String 
experimentId, String processId) {
+               return new CancelRequestWatcherImpl(experimentId, processId);
        }
 
        public static Session getSSHSession(AuthenticationInfo 
authenticationInfo, ServerInfo serverInfo) throws AiravataException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
index 58d2817..ba87587 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java
@@ -33,6 +33,13 @@ import org.slf4j.LoggerFactory;
 
 public class CancelRequestWatcherImpl implements CancelRequestWatcher {
        private static final Logger log = 
LoggerFactory.getLogger(CancelRequestWatcherImpl.class);
+       private final String processId;
+       private final String experimentId;
+
+       public CancelRequestWatcherImpl(String experimentId, String processId) {
+               this.experimentId = experimentId;
+               this.processId = processId;
+       }
 
        @Override
        public void process(WatchedEvent watchedEvent) throws Exception {
@@ -43,15 +50,15 @@ public class CancelRequestWatcherImpl implements 
CancelRequestWatcher {
                switch (type) {
                        case NodeDataChanged:
                                byte[] bytes = 
curatorClient.getData().forPath(path);
-                               String processId = 
path.substring(path.lastIndexOf("/") + 1);
                                String action = new String(bytes);
                                if 
(action.equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) {
                                        ProcessContext processContext = 
Factory.getGfacContext().getProcess(processId);
                                        if (processContext != null) {
                                                processContext.setCancel(true);
-                                               log.info("procesId : {}, 
Cancelling process", processId);
+                                               log.info("expId {}, processId : 
{}, Cancelling process", experimentId, processId);
                                        } else {
-                                               log.info("Cancel request came 
for processId {} but couldn't find process context");
+                                               log.info("expId: {}, Cancel 
request came for processId {} but couldn't find process context",
+                                                               experimentId, 
processId);
                                        }
                                } else {
                                        
curatorClient.getData().usingWatcher(this).forPath(path);
@@ -59,14 +66,24 @@ public class CancelRequestWatcherImpl implements 
CancelRequestWatcher {
                                break;
                        case NodeDeleted:
                                //end of experiment execution, ignore this event
+                               log.info("expId: {}, cancel watcher trigger for 
process {} with event type {}", experimentId,
+                                               processId, type.name());
                                break;
                        case NodeCreated:
                        case NodeChildrenChanged:
                        case None:
-                               
curatorClient.getData().usingWatcher(this).forPath(path);
+                               log.info("expId: {}, Cancel watcher trigger for 
process {} with event type {}", experimentId,
+                                               processId, type.name());
+                               if (path != null) {
+                                       
curatorClient.getData().usingWatcher(this).forPath(path);
+                               }
                                break;
                        default:
-                               
curatorClient.getData().usingWatcher(this).forPath(path);
+                               log.info("expId: {}, Cancel watcher trigger for 
process {} with event type {}", experimentId,
+                                               processId, type.name());
+                               if (path != null) {
+                                       
curatorClient.getData().usingWatcher(this).forPath(path);
+                               }
                                break;
                }
        }

http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
index dc5317f..8341855 100644
--- 
a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
+++ 
b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java
@@ -32,6 +32,13 @@ import org.slf4j.Logger;
 public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher {
 
        private static final Logger log = 
org.slf4j.LoggerFactory.getLogger(RedeliveryRequestWatcherImpl.class);
+       private final String processId;
+       private final String experimentId;
+
+       public RedeliveryRequestWatcherImpl(String experimentId, String 
procesId) {
+               this.experimentId = experimentId;
+               this.processId = procesId;
+       }
 
        @Override
        public void process(WatchedEvent watchedEvent) throws Exception {
@@ -43,31 +50,40 @@ public class RedeliveryRequestWatcherImpl implements 
RedeliveryRequestWatcher {
                        case NodeDataChanged:
                                byte[] bytes = 
curatorClient.getData().forPath(path);
                                String serverName = new String(bytes);
-                               String processId = 
path.substring(path.lastIndexOf("/") + 1);
                                if 
(ServerSettings.getGFacServerName().trim().equals(serverName)) {
                                        
curatorClient.getData().usingWatcher(this).forPath(path);
-                                       log.info("processId: {}, change data 
with same server name : {}" , processId, serverName);
+                                       log.info("processId: {},event type {}, 
change data with same server name : {}", processId,
+                                                       eventType, serverName);
                                } else {
                                        ProcessContext processContext = 
Factory.getGfacContext().getProcess(processId);
                                        if (processContext != null) {
                                                
processContext.setHandOver(true);
-                                               log.info("procesId : {}, 
handing over to new server instance : {}", processId, serverName);
+                                               log.info("processId : {}, event 
type {}, handing over to new server instance : {}", processId,
+                                                               eventType, 
serverName);
                                        } else {
-                                               log.info("Redelivery request 
came for processId {} but couldn't find process context");
+                                               log.info("Redelivery request 
came for processId {}, with event type {}, but couldn't find " +
+                                                               "process 
context", processId, eventType.name());
                                        }
                                }
                                break;
                        case NodeDeleted:
                                //end of experiment execution, ignore this event
+                               log.info("Redelivery watcher trigger for 
process {} with event type {}", processId, eventType.name());
                                break;
                        case NodeCreated:
                        case NodeChildrenChanged:
                        case None:
-                               
curatorClient.getData().usingWatcher(this).forPath(path);
+                               if (path != null) {
+                                       
curatorClient.getData().usingWatcher(this).forPath(path);
+                                       log.info("Redelivery watcher trigger 
for process {} with event type {}", processId, eventType.name());
+                               }
                                break;
                                // not yet implemented
                        default:
-                               
curatorClient.getData().usingWatcher(this).forPath(path);
+                               if (path != null) {
+                                       
curatorClient.getData().usingWatcher(this).forPath(path);
+                                       log.info("Redelivery watcher trigger 
for process {} with event type {}", processId, eventType.name());
+                               }
                                break;
                }
        }

http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 59b7a54..53665fd 100644
--- 
a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -312,7 +312,7 @@ public class GfacServerHandler implements GfacService.Iface 
{
                String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, 
processId);
                
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
zkProcessNodePath);
                
curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, 
gfacServerName.getBytes());
-               
curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher()).forPath(zkProcessNodePath);
+               
curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher(experimentId,
 processId)).forPath(zkProcessNodePath);
 
                // create /experiments/{experimentId}/{processId}/deliveryTag 
node and set data - deliveryTag
                String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, 
ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE);
@@ -331,8 +331,7 @@ public class GfacServerHandler implements GfacService.Iface 
{
 
                // create /experiments/{experimentId}/cancel node and set 
watcher for data changes
                String experimentCancelNode = 
ZKPaths.makePath(experimentNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
-               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentCancelNode);
-               
curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath 
(experimentCancelNode);
+               
curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher(experimentId,
 processId)).forPath (experimentCancelNode);
 
        }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 109faef..6c04ed7 100644
--- 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -154,7 +154,12 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                 log.info("Couldn't identify the gateway Id using the 
credential token, Use default gateway Id");
 //                throw new AiravataException("Couldn't identify the gateway 
Id using the credential token");
             }
-            ExperimentType executionType = experiment.getExperimentType();
+               String experimentNodePath = GFacUtils.getExperimentNodePath 
(experimentId);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentNodePath);
+               String experimentCancelNode = 
ZKPaths.makePath(experimentNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE);
+               
ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), 
experimentCancelNode);
+
+               ExperimentType executionType = experiment.getExperimentType();
             if (executionType == ExperimentType.SINGLE_APPLICATION) {
                 //its an single application execution experiment
                 log.debug(experimentId, "Launching single application 
experiment {}.", experimentId);
@@ -318,8 +323,14 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
            if (stat != null) {
                    
curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, 
ZkConstants.ZOOKEEPER_CANCEL_REQEUST
                                    .getBytes());
+                   ExperimentStatus status = new 
ExperimentStatus(ExperimentState.CANCELING);
+                   status.setReason("Experiment cancel request processed");
+                   
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+                   OrchestratorUtils.updageExperimentStatus(experimentId, 
status);
+                   log.info("expId : " + experimentId + " :- Experiment status 
updated to " + status.getState());
+                   return true;
            }
-           return true;
+           return false;
     }
 
     private void launchWorkflowExperiment(String experimentId, String 
airavataCredStoreToken) throws TException {
@@ -419,8 +430,20 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
 //                                             case CREATED:
 //                                             case VALIDATED:
                                                case STARTED:
-                                                       
status.setState(ExperimentState.EXECUTING);
-                                                       
status.setReason("process  started");
+                                                       try {
+                                                               
ExperimentStatus stat = OrchestratorUtils.getExperimentStatus(processIdentity
+                                                                               
.getExperimentId());
+                                                               if 
(stat.getState() == ExperimentState.CANCELING) {
+                                                                       
status.setState(ExperimentState.CANCELING);
+                                                                       
status.setReason("Process competed but experiment cancelling is triggered");
+                                                               } else {
+                                                                       
status.setState(ExperimentState.EXECUTING);
+                                                                       
status.setReason("process  started");
+                                                               }
+                                                       } catch 
(RegistryException e) {
+                                                               
status.setState(ExperimentState.EXECUTING);
+                                                               
status.setReason("process  started");
+                                                       }
                                                        break;
 //                                             case PRE_PROCESSING:
 //                                                     break;
@@ -433,12 +456,36 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
 //                                             case CANCELLING:
 //                                                     break;
                                                case COMPLETED:
-                                                       
status.setState(ExperimentState.COMPLETED);
-                                                       
status.setReason("process  completed");
+                                                       try {
+                                                               
ExperimentStatus stat = OrchestratorUtils.getExperimentStatus(processIdentity
+                                                                               
.getExperimentId());
+                                                               if 
(stat.getState() == ExperimentState.CANCELING) {
+                                                                       
status.setState(ExperimentState.CANCELED);
+                                                                       
status.setReason("Process competed but experiment cancelling is triggered");
+                                                               } else {
+                                                                       
status.setState(ExperimentState.COMPLETED);
+                                                                       
status.setReason("process  completed");
+                                                               }
+                                                       } catch 
(RegistryException e) {
+                                                               
status.setState(ExperimentState.COMPLETED);
+                                                               
status.setReason("process  completed");
+                                                       }
                                                        break;
                                                case FAILED:
-                                                       
status.setState(ExperimentState.FAILED);
-                                                       
status.setReason("process  failed");
+                                                       try {
+                                                               
ExperimentStatus stat = OrchestratorUtils.getExperimentStatus(processIdentity
+                                                                               
.getExperimentId());
+                                                               if 
(stat.getState() == ExperimentState.CANCELING) {
+                                                                       
status.setState(ExperimentState.CANCELED);
+                                                                       
status.setReason("Process failed but experiment cancelling is triggered");
+                                                               } else {
+                                                                       
status.setState(ExperimentState.FAILED);
+                                                                       
status.setReason("process  failed");
+                                                               }
+                                                       } catch 
(RegistryException e) {
+                                                               
status.setState(ExperimentState.FAILED);
+                                                               
status.setReason("process  failed");
+                                                       }
                                                        break;
                                                case CANCELED:
                                                        
status.setState(ExperimentState.CANCELED);

http://git-wip-us.apache.org/repos/asf/airavata/blob/44e5ed14/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
index 4d4b04e..834d3b6 100644
--- 
a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
+++ 
b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
@@ -40,4 +40,10 @@ public class OrchestratorUtils {
                        log.error("expId : " + experimentId + " Error while 
updating experiment status to " + status.toString(), e);
                }
        }
+
+       public static ExperimentStatus getExperimentStatus(String experimentId) 
throws RegistryException {
+               return ((ExperimentStatus) 
RegistryFactory.getDefaultExpCatalog().get(ExperimentCatalogModelType
+                                               .EXPERIMENT_STATUS,
+                               experimentId));
+       }
 }

Reply via email to