Repository: airavata Updated Branches: refs/heads/master 9e5356049 -> bea823b43
Experiment cancel request, Orchestrator side implementation and refactored zookeeper node paths. AIRAVATA-1798 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e1356b73 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e1356b73 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e1356b73 Branch: refs/heads/master Commit: e1356b73aaab299552e5992fddce932e9505304b Parents: 4ba16d6 Author: Shameera Rathanyaka <[email protected]> Authored: Thu Sep 3 19:57:17 2015 -0400 Committer: Supun Nakandala <[email protected]> Committed: Sat Sep 5 12:24:21 2015 +0530 ---------------------------------------------------------------------- .../airavata/common/utils/zkConstants.java | 32 ++++ .../airavata/gfac/core/GFacConstants.java | 8 +- .../apache/airavata/gfac/core/GFacUtils.java | 15 +- .../impl/watcher/CancelRequestWatcherImpl.java | 8 +- .../watcher/RedeliveryRequestWatcherImpl.java | 2 + .../airavata/gfac/server/GfacServerHandler.java | 67 +++++--- .../server/OrchestratorServerHandler.java | 151 ++++--------------- 7 files changed, 122 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java ---------------------------------------------------------------------- diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java new file mode 100644 index 0000000..9255e02 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/zkConstants.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.airavata.common.utils; + +public interface ZkConstants { + + public static final String ZOOKEEPER_SERVERS_NODE = "/servers"; + public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac"; + public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments"; + public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag"; + public static final String ZOOKEEPER_TOKEN_NODE = "/token"; + public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener"; + public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST"; +} http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java index b662fff..444956b 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacConstants.java @@ -50,13 +50,7 @@ public class GFacConstants { public static final String _127_0_0_1 = "127.0.0.1"; public static final String LOCALHOST = "localhost"; - public static final String ZOOKEEPER_SERVERS_NODE = "/servers"; - public static final String ZOOKEEPER_GFAC_SERVER_NODE = "/gfac"; - public static final String ZOOKEEPER_EXPERIMENT_NODE = "/experiments"; - public static final String ZOOKEEPER_DELIVERYTAG_NODE = "/deliveryTag"; - public static final String ZOOKEEPER_TOKEN_NODE = "/token"; - public static final String ZOOKEEPER_CANCEL_LISTENER_NODE = "/cancelListener"; - public static final String ZOOKEEPER_CANCEL_REQEUST = "CANCEL_REQUEST"; + public static final String PROP_WORKFLOW_INSTANCE_ID = "workflow.instance.id"; public static final String PROP_WORKFLOW_NODE_ID = "workflow.node.id"; http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java index 3ee0461..d3d4c7e 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/GFacUtils.java @@ -25,6 +25,7 @@ import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.common.utils.DBUtil; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ZkConstants; import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.credential.store.store.impl.CredentialReaderImpl; import org.apache.airavata.gfac.core.context.ProcessContext; @@ -551,7 +552,7 @@ public class GFacUtils { * @throws InterruptedException */ public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception { - String experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE; + String experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE; List<String> children = curatorClient.getChildren().forPath(experimentNode); for (String pickedChild : children) { String experimentPath = experimentNode + File.separator + pickedChild; @@ -568,9 +569,9 @@ public class GFacUtils { public static boolean setExperimentCancelRequest(String processId, CuratorFramework curatorClient, long deliveryTag) throws Exception { - String experimentNode = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId); - String cancelListenerNodePath = ZKPaths.makePath(experimentNode, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); - curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, GFacConstants.ZOOKEEPER_CANCEL_REQEUST + String experimentNode = ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, processId); + String cancelListenerNodePath = ZKPaths.makePath(experimentNode, ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + curatorClient.setData().withVersion(-1).forPath(cancelListenerNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST .getBytes()); return true; } @@ -768,7 +769,7 @@ public class GFacUtils { // } public static String getZKGfacServersParentPath() { - return ZKPaths.makePath(GFacConstants.ZOOKEEPER_SERVERS_NODE, GFacConstants.ZOOKEEPER_GFAC_SERVER_NODE); + return ZKPaths.makePath(ZkConstants.ZOOKEEPER_SERVERS_NODE, ZkConstants.ZOOKEEPER_GFAC_SERVER_NODE); } public static JobDescriptor createJobDescriptor(ProcessContext processContext) throws GFacException, AppCatalogException, ApplicationSettingsException { @@ -1113,11 +1114,11 @@ public class GFacUtils { } public static String getExperimentNodePath(String experimentId) { - return GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId; + return ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + File.separator + experimentId; } public static long getProcessDeliveryTag(CuratorFramework curatorClient, String processId) throws Exception { - String deliveryTagPath = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + GFacConstants + String deliveryTagPath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + processId + ZkConstants .ZOOKEEPER_DELIVERYTAG_NODE; byte[] bytes = curatorClient.getData().forPath(deliveryTagPath); return GFacUtils.bytesToLong(bytes); http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java index bfeac89..58d2817 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/CancelRequestWatcherImpl.java @@ -20,6 +20,7 @@ */ package org.apache.airavata.gfac.impl.watcher; +import org.apache.airavata.common.utils.ZkConstants; import org.apache.airavata.gfac.core.context.ProcessContext; import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher; import org.apache.airavata.gfac.impl.Factory; @@ -44,7 +45,7 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher { byte[] bytes = curatorClient.getData().forPath(path); String processId = path.substring(path.lastIndexOf("/") + 1); String action = new String(bytes); - if (action.equalsIgnoreCase("CANCEL")) { + if (action.equalsIgnoreCase(ZkConstants.ZOOKEEPER_CANCEL_REQEUST)) { ProcessContext processContext = Factory.getGfacContext().getProcess(processId); if (processContext != null) { processContext.setCancel(true); @@ -56,9 +57,12 @@ public class CancelRequestWatcherImpl implements CancelRequestWatcher { curatorClient.getData().usingWatcher(this).forPath(path); } break; + case NodeDeleted: + //end of experiment execution, ignore this event + break; case NodeCreated: case NodeChildrenChanged: - case NodeDeleted: + case None: curatorClient.getData().usingWatcher(this).forPath(path); break; default: http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java index 4738edb..dc5317f 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/watcher/RedeliveryRequestWatcherImpl.java @@ -58,6 +58,8 @@ public class RedeliveryRequestWatcherImpl implements RedeliveryRequestWatcher { } break; case NodeDeleted: + //end of experiment execution, ignore this event + break; case NodeCreated: case NodeChildrenChanged: case None: http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 9ebfa05..1040b05 100644 --- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.common.utils.ZkConstants; import org.apache.airavata.common.utils.listener.AbstractActivityListener; import org.apache.airavata.gfac.core.GFacConstants; import org.apache.airavata.gfac.core.GFacException; @@ -54,6 +55,7 @@ import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +118,7 @@ public class GfacServerHandler implements GfacService.Iface { airavataServerHostPort = ServerSettings.getGfacServerHost() + ":" + ServerSettings.getGFacServerPort(); // create PERSISTENT nodes ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacUtils.getZKGfacServersParentPath()); - ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), GFacConstants.ZOOKEEPER_EXPERIMENT_NODE); + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), ZkConstants.ZOOKEEPER_EXPERIMENT_NODE); // create EPHEMERAL server name node String gfacName = ServerSettings.getGFacServerName(); if (curatorClient.checkExists().forPath(ZKPaths.makePath(GFacUtils.getZKGfacServersParentPath() ,gfacName)) == null) { @@ -196,7 +198,7 @@ public class GfacServerHandler implements GfacService.Iface { private String gfacServerName; public ProcessLaunchMessageHandler() throws ApplicationSettingsException { - experimentNode = GFacConstants.ZOOKEEPER_EXPERIMENT_NODE; + experimentNode = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE; gfacServerName = ServerSettings.getGFacServerName(); } @@ -226,8 +228,7 @@ public class GfacServerHandler implements GfacService.Iface { if (Factory.getGfacContext().getProcess(event.getProcessId()) != null) { // update deliver tag try { - updateDeliveryTag(curatorClient, gfacServerName, event.getProcessId(), message - .getDeliveryTag()); + updateDeliveryTag(curatorClient, gfacServerName, event, message ); return; } catch (Exception e) { log.error("Error while updating delivery tag for redelivery message , messageId : " + @@ -254,8 +255,7 @@ public class GfacServerHandler implements GfacService.Iface { .getProcessId()); publishProcessStatus(event, status); try { - createProcessZKNode(curatorClient, gfacServerName, event.getProcessId(), message - .getDeliveryTag(), event.getTokenId()); + createProcessZKNode(curatorClient, gfacServerName, event, message); submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId()); } catch (Exception e) { log.error(e.getMessage(), e); @@ -306,38 +306,55 @@ public class GfacServerHandler implements GfacService.Iface { statusPublisher.publish(msgCtx); } - private void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName, String - processId, long deliveryTag, String token) throws Exception { - // TODO - To handle multiple processes per experiment, need to create a /experiments/{expId}/{processId} node - // create /experiments/{processId} node and set data - serverName, add redelivery listener - String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId); + private void createProcessZKNode(CuratorFramework curatorClient, String gfacServerName,ProcessSubmitEvent event + ,MessageContext messageContext) throws Exception { + String processId = event.getProcessId(); + String token = event.getTokenId(); + String experimentId = event.getExperimentId(); + long deliveryTag = messageContext.getDeliveryTag(); + + // create /experiments//{experimentId}{processId} node and set data - serverName, add redelivery listener + String experimentNodePath = ZkConstants.ZOOKEEPER_EXPERIMENT_NODE + "/" + experimentId; + String zkProcessNodePath = ZKPaths.makePath(experimentNodePath, processId); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), zkProcessNodePath); curatorClient.setData().withVersion(-1).forPath(zkProcessNodePath, gfacServerName.getBytes()); curatorClient.getData().usingWatcher(Factory.getRedeliveryReqeustWatcher()).forPath(zkProcessNodePath); - // create /experiments/{processId}/deliveryTag node and set data - deliveryTag - String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE); + // create /experiments/{experimentId}/{processId}/deliveryTag node and set data - deliveryTag + String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), deliveryTagPath); curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag)); - // create /experiments/{processId}/token node and set data - token - String tokenNodePath = ZKPaths.makePath(processId, GFacConstants.ZOOKEEPER_TOKEN_NODE); + // create /experiments/{experimentId}/{processId}/token node and set data - token + String tokenNodePath = ZKPaths.makePath(zkProcessNodePath, ZkConstants.ZOOKEEPER_TOKEN_NODE); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), tokenNodePath); curatorClient.setData().withVersion(-1).forPath(tokenNodePath, token.getBytes()); - // create /experiments/{processId}/cancelListener node and set watcher for data changes - String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + // create /experiments/{experimentId}/{processId}/cancelListener node and set watcher for data changes +/* String cancelListenerNode = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), cancelListenerNode); - curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode); + curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath(cancelListenerNode);*/ + + // create /experiments/{experimentId}/cancel node and set watcher for data changes + String experimentCancelNode = experimentNodePath + "/" + ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE; + ZKPaths.mkdirs(curatorClient.getZookeeperClient().getZooKeeper(), experimentCancelNode); + curatorClient.getData().usingWatcher(Factory.getCancelRequestWatcher()).forPath (experimentCancelNode); + } - private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, String processId, long - deliveryTag) throws Exception { - // create /experiments/{processId} node and set data - serverName, add redelivery listener - String zkProcessNodePath = ZKPaths.makePath(GFacConstants.ZOOKEEPER_EXPERIMENT_NODE, processId); - // create /experiments/{processId}/deliveryTag node and set data - deliveryTag - String deliveryTagPath = ZKPaths.makePath(zkProcessNodePath, GFacConstants.ZOOKEEPER_DELIVERYTAG_NODE); - curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag)); + private void updateDeliveryTag(CuratorFramework curatorClient, String gfacServerName, ProcessSubmitEvent event, + MessageContext messageContext) throws Exception { + String experimentId = event.getExperimentId(); + String processId = event.getProcessId(); + long deliveryTag = messageContext.getDeliveryTag(); + String processNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, + experimentId), processId); + Stat stat = curatorClient.checkExists().forPath(processNodePath); + if (stat != null) { + // create /experiments/{processId}/deliveryTag node and set data - deliveryTag + String deliveryTagPath = ZKPaths.makePath(processNodePath, ZkConstants.ZOOKEEPER_DELIVERYTAG_NODE); + curatorClient.setData().withVersion(-1).forPath(deliveryTagPath, GFacUtils.longToBytes(deliveryTag)); + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/e1356b73/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 8427d0c..acb5530 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.common.utils.ZkConstants; import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.gfac.core.GFacUtils; import org.apache.airavata.gfac.core.scheduler.HostScheduler; @@ -59,8 +60,14 @@ import org.apache.airavata.registry.core.app.catalog.resources.AppCatAbstractRes import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory; import org.apache.airavata.registry.core.experiment.catalog.resources.AbstractExpCatResource; import org.apache.airavata.registry.cpi.*; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.ZKPaths; import org.apache.thrift.TBase; import org.apache.thrift.TException; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +87,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { private String gatewayName; private Publisher publisher; private RabbitMQStatusConsumer statusConsumer; + private CuratorFramework curatorClient; /** * Query orchestrator server to fetch the CPI version @@ -109,6 +117,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { String exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); statusConsumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName); statusConsumer.listen(new ProcessStatusHandler()); + startCurator(); } catch (OrchestratorException | RegistryException | AppCatalogException | AiravataException e) { log.error(e.getMessage(), e); throw new OrchestratorException("Error while initializing orchestrator service", e); @@ -209,7 +218,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { */ public boolean terminateExperiment(String experimentId, String tokenId) throws TException { log.info(experimentId, "Experiment: {} is cancelling !!!!!", experimentId); - return validateStatesAndCancel(experimentId, tokenId); + try { + return validateStatesAndCancel(experimentId, tokenId); + } catch (Exception e) { + log.error("expId : " + experimentId + " :- Error while cancelling experiment", e); + return false; + } } private String getAiravataUserName() { @@ -277,7 +291,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { List<ComputeResourceDescription> computeHostList = Arrays.asList(deploymentMap.keySet().toArray(new ComputeResourceDescription[]{})); Class<? extends HostScheduler> aClass = Class.forName( ServerSettings.getHostScheduler()).asSubclass( - HostScheduler.class); + HostScheduler.class); HostScheduler hostScheduler = aClass.newInstance(); ComputeResourceDescription ComputeResourceDescription = hostScheduler.schedule(computeHostList); return deploymentMap.get(ComputeResourceDescription); @@ -297,124 +311,15 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { return selectedModuleId; } - private boolean validateStatesAndCancel(String experimentId, String tokenId)throws TException{ - // FIXME -// try { -// Experiment experiment = (Experiment) experimentCatalog.get( -// ExperimentCatalogModelType.EXPERIMENT, experimentId); -// log.info("Waiting for zookeeper to connect to the server"); -// synchronized (mutex){ -// mutex.wait(5000); -// } -// if (experiment == null) { -// log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}.", experimentId); -// throw new OrchestratorException("Error retrieving the Experiment by the given experimentID: " + experimentId); -// } -// ExperimentState experimentState = experiment.getExperimentStatus().getExperimentState(); -// if (isCancelValid(experimentState)){ -// ExperimentStatus status = new ExperimentStatus(); -// status.setExperimentState(ExperimentState.CANCELING); -// status.setTimeOfStateChange(Calendar.getInstance() -// .getTimeInMillis()); -// experiment.setExperimentStatus(status); -// experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, -// experimentId); -// -// List<String> ids = experimentCatalog.getIds( -// ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, -// WorkflowNodeConstants.EXPERIMENT_ID, experimentId); -// for (String workflowNodeId : ids) { -// WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) experimentCatalog -// .get(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, -// workflowNodeId); -// int value = workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().getValue(); -// if ( value> 1 && value < 7) { // we skip the unknown state -// log.error(workflowNodeDetail.getNodeName() + " Workflow Node status cannot mark as cancelled, because " + -// "current status is " + workflowNodeDetail.getWorkflowNodeStatus().getWorkflowNodeState().toString()); -// continue; // this continue is very useful not to process deeper loops if the upper layers have non-cancel states -// } else { -// WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); -// workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELING); -// workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() -// .getTimeInMillis()); -// workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); -// experimentCatalog.update(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, -// workflowNodeId); -// } -// List<Object> taskDetailList = experimentCatalog.get( -// ExperimentCatalogModelType.TASK_DETAIL, -// TaskDetailConstants.NODE_ID, workflowNodeId); -// for (Object o : taskDetailList) { -// TaskDetails taskDetails = (TaskDetails) o; -// TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus(); -// if (taskStatus.getExecutionState().getValue() > 7 && taskStatus.getExecutionState().getValue()<12) { -// log.error(((TaskDetails) o).getTaskID() + " Task status cannot mark as cancelled, because " + -// "current task state is " + ((TaskDetails) o).getTaskStatus().getExecutionState().toString()); -// continue;// this continue is very useful not to process deeper loops if the upper layers have non-cancel states -// } else { -// taskStatus.setExecutionState(TaskState.CANCELING); -// taskStatus.setTimeOfStateChange(Calendar.getInstance() -// .getTimeInMillis()); -// taskDetails.setTaskStatus(taskStatus); -// experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, o, -// taskDetails.getTaskID()); -// } -// orchestrator.cancelExperiment(experiment, -// workflowNodeDetail, taskDetails, tokenId); -// // Status update should be done at the monitor -// } -// } -// }else { -// if (isCancelAllowed(experimentState)){ -// // when experiment status is < 3 no jobDetails object is created, -// // so we don't have to worry, we simply have to change the status and stop the execution -// ExperimentStatus status = new ExperimentStatus(); -// status.setExperimentState(ExperimentState.CANCELED); -// status.setTimeOfStateChange(Calendar.getInstance() -// .getTimeInMillis()); -// experiment.setExperimentStatus(status); -// experimentCatalog.update(ExperimentCatalogModelType.EXPERIMENT, experiment, -// experimentId); -// List<String> ids = experimentCatalog.getIds( -// ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, -// WorkflowNodeConstants.EXPERIMENT_ID, experimentId); -// for (String workflowNodeId : ids) { -// WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) experimentCatalog -// .get(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, -// workflowNodeId); -// WorkflowNodeStatus workflowNodeStatus = new WorkflowNodeStatus(); -// workflowNodeStatus.setWorkflowNodeState(WorkflowNodeState.CANCELED); -// workflowNodeStatus.setTimeOfStateChange(Calendar.getInstance() -// .getTimeInMillis()); -// workflowNodeDetail.setWorkflowNodeStatus(workflowNodeStatus); -// experimentCatalog.update(ExperimentCatalogModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetail, -// workflowNodeId); -// List<Object> taskDetailList = experimentCatalog.get( -// ExperimentCatalogModelType.TASK_DETAIL, -// TaskDetailConstants.NODE_ID, workflowNodeId); -// for (Object o : taskDetailList) { -// TaskDetails taskDetails = (TaskDetails) o; -// TaskStatus taskStatus = ((TaskDetails) o).getTaskStatus(); -// taskStatus.setExecutionState(TaskState.CANCELED); -// taskStatus.setTimeOfStateChange(Calendar.getInstance() -// .getTimeInMillis()); -// taskDetails.setTaskStatus(taskStatus); -// experimentCatalog.update(ExperimentCatalogModelType.TASK_DETAIL, o, -// taskDetails); -// } -// } -// }else { -// log.errorId(experimentId, "Unable to mark experiment as Cancelled, current state {} doesn't allow to cancel the experiment {}.", -// experiment.getExperimentStatus().getExperimentState().toString(), experimentId); -// throw new OrchestratorException("Unable to mark experiment as Cancelled, because current state is: " -// + experiment.getExperimentStatus().getExperimentState().toString()); -// } -// } -// log.info("Experiment: " + experimentId + " is cancelled !!!!!"); -// } catch (Exception e) { -// throw new TException(e); -// } - return true; + private boolean validateStatesAndCancel(String experimentId, String tokenId) throws Exception { + String expCancelNodePath = ZKPaths.makePath(ZKPaths.makePath(ZkConstants.ZOOKEEPER_EXPERIMENT_NODE, + experimentId), ZkConstants.ZOOKEEPER_CANCEL_LISTENER_NODE); + Stat stat = curatorClient.checkExists().forPath(expCancelNodePath); + if (stat != null) { + curatorClient.setData().withVersion(-1).forPath(expCancelNodePath, ZkConstants.ZOOKEEPER_CANCEL_REQEUST + .getBytes()); + } + return true; } private void launchWorkflowExperiment(String experimentId, String airavataCredStoreToken) throws TException { @@ -427,6 +332,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { // } } + private void startCurator() throws ApplicationSettingsException { + String connectionSting = ServerSettings.getZookeeperConnection(); + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5); + curatorClient = CuratorFrameworkFactory.newClient(connectionSting, retryPolicy); + curatorClient.start(); + } private class SingleAppExperimentRunner implements Runnable { String experimentId;
