Repository: airavata Updated Branches: refs/heads/queue-gfac b6bf782db -> a486b67d9
adding curator leader election logic Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a486b67d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a486b67d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a486b67d Branch: refs/heads/queue-gfac Commit: a486b67d9187294bae14dd15d4d5b90a84484c73 Parents: b6bf782 Author: Lahiru Gunathilake <[email protected]> Authored: Mon Feb 16 22:38:41 2015 -0500 Committer: Lahiru Gunathilake <[email protected]> Committed: Mon Feb 16 22:38:41 2015 -0500 ---------------------------------------------------------------------- .../lib/airavata/messagingEvents_types.cpp | 20 +++- .../lib/airavata/messagingEvents_types.h | 13 ++- .../Airavata/Model/Messaging/Event/Types.php | 20 ++++ .../client/samples/CreateLaunchExperiment.java | 2 +- .../model/messaging/event/TaskSubmitEvent.java | 100 +++++++++++++++- airavata-api/generate-thrift-files.sh | 2 +- .../messagingEvents.thrift | 3 +- modules/gfac/airavata-gfac-service/pom.xml | 10 ++ .../airavata/gfac/leader/CuratorClient.java | 79 +++++++++++++ .../gfac/leader/LeaderSelectorExample.java | 80 +++++++++++++ .../airavata/gfac/server/GfacServerHandler.java | 112 +++++++++++++++--- modules/gfac/gfac-core/pom.xml | 1 + .../airavata/gfac/core/utils/GFacUtils.java | 116 ++++++++++++++++++- modules/orchestrator/orchestrator-core/pom.xml | 10 -- .../core/impl/GFACPassiveJobSubmitter.java | 46 +++----- .../core/impl/GFACRPCJobSubmitter.java | 4 +- 16 files changed, 547 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp index a2e72f5..71f45be 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp @@ -839,8 +839,8 @@ void swap(JobIdentifier &a, JobIdentifier &b) { swap(a.gatewayId, b.gatewayId); } -const char* TaskSubmitEvent::ascii_fingerprint = "AB879940BD15B6B25691265F7384B271"; -const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xAB,0x87,0x99,0x40,0xBD,0x15,0xB6,0xB2,0x56,0x91,0x26,0x5F,0x73,0x84,0xB2,0x71}; +const char* TaskSubmitEvent::ascii_fingerprint = "C93D890311F28844166CF6E571EB3AC2"; +const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2}; uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) { @@ -856,6 +856,7 @@ uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) { bool isset_experimentId = false; bool isset_taskId = false; bool isset_gatewayId = false; + bool isset_tokenId = false; while (true) { @@ -889,6 +890,14 @@ uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 4: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->tokenId); + isset_tokenId = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -904,6 +913,8 @@ uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) { throw TProtocolException(TProtocolException::INVALID_DATA); if (!isset_gatewayId) throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_tokenId) + throw TProtocolException(TProtocolException::INVALID_DATA); return xfer; } @@ -923,6 +934,10 @@ uint32_t TaskSubmitEvent::write(::apache::thrift::protocol::TProtocol* oprot) co xfer += oprot->writeString(this->gatewayId); xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("tokenId", ::apache::thrift::protocol::T_STRING, 4); + xfer += oprot->writeString(this->tokenId); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -933,6 +948,7 @@ void swap(TaskSubmitEvent &a, TaskSubmitEvent &b) { swap(a.experimentId, b.experimentId); swap(a.taskId, b.taskId); swap(a.gatewayId, b.gatewayId); + swap(a.tokenId, b.tokenId); } const char* TaskTerminateEvent::ascii_fingerprint = "07A9615F837F7D0A952B595DD3020972"; http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h index f063fc2..c7e2bb5 100644 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h +++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h @@ -465,10 +465,10 @@ void swap(JobIdentifier &a, JobIdentifier &b); class TaskSubmitEvent { public: - static const char* ascii_fingerprint; // = "AB879940BD15B6B25691265F7384B271"; - static const uint8_t binary_fingerprint[16]; // = {0xAB,0x87,0x99,0x40,0xBD,0x15,0xB6,0xB2,0x56,0x91,0x26,0x5F,0x73,0x84,0xB2,0x71}; + static const char* ascii_fingerprint; // = "C93D890311F28844166CF6E571EB3AC2"; + static const uint8_t binary_fingerprint[16]; // = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2}; - TaskSubmitEvent() : experimentId(), taskId(), gatewayId() { + TaskSubmitEvent() : experimentId(), taskId(), gatewayId(), tokenId() { } virtual ~TaskSubmitEvent() throw() {} @@ -476,6 +476,7 @@ class TaskSubmitEvent { std::string experimentId; std::string taskId; std::string gatewayId; + std::string tokenId; void __set_experimentId(const std::string& val) { experimentId = val; @@ -489,6 +490,10 @@ class TaskSubmitEvent { gatewayId = val; } + void __set_tokenId(const std::string& val) { + tokenId = val; + } + bool operator == (const TaskSubmitEvent & rhs) const { if (!(experimentId == rhs.experimentId)) @@ -497,6 +502,8 @@ class TaskSubmitEvent { return false; if (!(gatewayId == rhs.gatewayId)) return false; + if (!(tokenId == rhs.tokenId)) + return false; return true; } bool operator != (const TaskSubmitEvent &rhs) const { http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php index 40810d3..b0d7676 100644 --- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php +++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php @@ -977,6 +977,7 @@ class TaskSubmitEvent { public $experimentId = null; public $taskId = null; public $gatewayId = null; + public $tokenId = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -993,6 +994,10 @@ class TaskSubmitEvent { 'var' => 'gatewayId', 'type' => TType::STRING, ), + 4 => array( + 'var' => 'tokenId', + 'type' => TType::STRING, + ), ); } if (is_array($vals)) { @@ -1005,6 +1010,9 @@ class TaskSubmitEvent { if (isset($vals['gatewayId'])) { $this->gatewayId = $vals['gatewayId']; } + if (isset($vals['tokenId'])) { + $this->tokenId = $vals['tokenId']; + } } } @@ -1048,6 +1056,13 @@ class TaskSubmitEvent { $xfer += $input->skip($ftype); } break; + case 4: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->tokenId); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -1076,6 +1091,11 @@ class TaskSubmitEvent { $xfer += $output->writeString($this->gatewayId); $xfer += $output->writeFieldEnd(); } + if ($this->tokenId !== null) { + $xfer += $output->writeFieldBegin('tokenId', TType::STRING, 4); + $xfer += $output->writeString($this->tokenId); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 8483da7..b7121b9 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -60,7 +60,7 @@ public class CreateLaunchExperiment { private static final String DEFAULT_GATEWAY = "default.registry.gateway"; private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_78e34255-39f3-4c07-add6-a1a672c80104"; + private static String echoAppId = "Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576"; private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9"; private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762"; private static String amberAppId = "Amber_eda074ea-223d-49d7-a942-6c8742249f36"; http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java index c813c76..71d497e 100644 --- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java +++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java @@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory; private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1); private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2); private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)3); + private static final org.apache.thrift.protocol.TField TOKEN_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("tokenId", org.apache.thrift.protocol.TType.STRING, (short)4); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -65,12 +66,14 @@ import org.slf4j.LoggerFactory; private String experimentId; // required private String taskId; // required private String gatewayId; // required + private String tokenId; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum { EXPERIMENT_ID((short)1, "experimentId"), TASK_ID((short)2, "taskId"), - GATEWAY_ID((short)3, "gatewayId"); + GATEWAY_ID((short)3, "gatewayId"), + TOKEN_ID((short)4, "tokenId"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -91,6 +94,8 @@ import org.slf4j.LoggerFactory; return TASK_ID; case 3: // GATEWAY_ID return GATEWAY_ID; + case 4: // TOKEN_ID + return TOKEN_ID; default: return null; } @@ -140,6 +145,8 @@ import org.slf4j.LoggerFactory; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.TOKEN_ID, new org.apache.thrift.meta_data.FieldMetaData("tokenId", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TaskSubmitEvent.class, metaDataMap); } @@ -150,12 +157,14 @@ import org.slf4j.LoggerFactory; public TaskSubmitEvent( String experimentId, String taskId, - String gatewayId) + String gatewayId, + String tokenId) { this(); this.experimentId = experimentId; this.taskId = taskId; this.gatewayId = gatewayId; + this.tokenId = tokenId; } /** @@ -171,6 +180,9 @@ import org.slf4j.LoggerFactory; if (other.isSetGatewayId()) { this.gatewayId = other.gatewayId; } + if (other.isSetTokenId()) { + this.tokenId = other.tokenId; + } } public TaskSubmitEvent deepCopy() { @@ -182,6 +194,7 @@ import org.slf4j.LoggerFactory; this.experimentId = null; this.taskId = null; this.gatewayId = null; + this.tokenId = null; } public String getExperimentId() { @@ -253,6 +266,29 @@ import org.slf4j.LoggerFactory; } } + public String getTokenId() { + return this.tokenId; + } + + public void setTokenId(String tokenId) { + this.tokenId = tokenId; + } + + public void unsetTokenId() { + this.tokenId = null; + } + + /** Returns true if field tokenId is set (has been assigned a value) and false otherwise */ + public boolean isSetTokenId() { + return this.tokenId != null; + } + + public void setTokenIdIsSet(boolean value) { + if (!value) { + this.tokenId = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case EXPERIMENT_ID: @@ -279,6 +315,14 @@ import org.slf4j.LoggerFactory; } break; + case TOKEN_ID: + if (value == null) { + unsetTokenId(); + } else { + setTokenId((String)value); + } + break; + } } @@ -293,6 +337,9 @@ import org.slf4j.LoggerFactory; case GATEWAY_ID: return getGatewayId(); + case TOKEN_ID: + return getTokenId(); + } throw new IllegalStateException(); } @@ -310,6 +357,8 @@ import org.slf4j.LoggerFactory; return isSetTaskId(); case GATEWAY_ID: return isSetGatewayId(); + case TOKEN_ID: + return isSetTokenId(); } throw new IllegalStateException(); } @@ -354,6 +403,15 @@ import org.slf4j.LoggerFactory; return false; } + boolean this_present_tokenId = true && this.isSetTokenId(); + boolean that_present_tokenId = true && that.isSetTokenId(); + if (this_present_tokenId || that_present_tokenId) { + if (!(this_present_tokenId && that_present_tokenId)) + return false; + if (!this.tokenId.equals(that.tokenId)) + return false; + } + return true; } @@ -400,6 +458,16 @@ import org.slf4j.LoggerFactory; return lastComparison; } } + lastComparison = Boolean.valueOf(isSetTokenId()).compareTo(other.isSetTokenId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTokenId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tokenId, other.tokenId); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -443,6 +511,14 @@ import org.slf4j.LoggerFactory; sb.append(this.gatewayId); } first = false; + if (!first) sb.append(", "); + sb.append("tokenId:"); + if (this.tokenId == null) { + sb.append("null"); + } else { + sb.append(this.tokenId); + } + first = false; sb.append(")"); return sb.toString(); } @@ -461,6 +537,10 @@ import org.slf4j.LoggerFactory; throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' is unset! Struct:" + toString()); } + if (!isSetTokenId()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'tokenId' is unset! Struct:" + toString()); + } + // check for sub-struct validity } @@ -522,6 +602,14 @@ import org.slf4j.LoggerFactory; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 4: // TOKEN_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.tokenId = iprot.readString(); + struct.setTokenIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -550,6 +638,11 @@ import org.slf4j.LoggerFactory; oprot.writeString(struct.gatewayId); oprot.writeFieldEnd(); } + if (struct.tokenId != null) { + oprot.writeFieldBegin(TOKEN_ID_FIELD_DESC); + oprot.writeString(struct.tokenId); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -570,6 +663,7 @@ import org.slf4j.LoggerFactory; oprot.writeString(struct.experimentId); oprot.writeString(struct.taskId); oprot.writeString(struct.gatewayId); + oprot.writeString(struct.tokenId); } @Override @@ -581,6 +675,8 @@ import org.slf4j.LoggerFactory; struct.setTaskIdIsSet(true); struct.gatewayId = iprot.readString(); struct.setGatewayIdIsSet(true); + struct.tokenId = iprot.readString(); + struct.setTokenIdIsSet(true); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/generate-thrift-files.sh ---------------------------------------------------------------------- diff --git a/airavata-api/generate-thrift-files.sh b/airavata-api/generate-thrift-files.sh index bd823e4..c8a000d 100755 --- a/airavata-api/generate-thrift-files.sh +++ b/airavata-api/generate-thrift-files.sh @@ -27,7 +27,7 @@ DATAMODEL_SRC_DIR='airavata-data-models/src/main/java' JAVA_API_SDK_DIR='airavata-api-stubs/src/main/java' CPP_SDK_DIR='airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/' PHP_SDK_DIR='airavata-client-sdks/airavata-php-sdk/src/main/resources/lib' -THRIFT_EXEC=thrift +THRIFT_EXEC=/usr/local/Cellar/thrift/0.9.1/bin/thrift # The Function fail prints error messages on failure and quits the script. fail() { echo $@ http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/thrift-interface-descriptions/messagingEvents.thrift ---------------------------------------------------------------------- diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift index d736701..d9e85d4 100644 --- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift +++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift @@ -105,7 +105,8 @@ struct JobIdentifier { struct TaskSubmitEvent{ 1: required string experimentId, 2: required string taskId, - 3: required string gatewayId + 3: required string gatewayId, + 4: required string tokenId } struct TaskTerminateEvent{ http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/pom.xml b/modules/gfac/airavata-gfac-service/pom.xml index 0884942..5a178bd 100644 --- a/modules/gfac/airavata-gfac-service/pom.xml +++ b/modules/gfac/airavata-gfac-service/pom.xml @@ -80,6 +80,16 @@ <artifactId>airavata-server-configuration</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${curator.version}</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator.version}</version> + </dependency> </dependencies> <properties> http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java new file mode 100644 index 0000000..2db9a6f --- /dev/null +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.airavata.gfac.leader; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; +import org.apache.curator.framework.recipes.leader.LeaderSelector; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An example leader selector client. Note that {@link LeaderSelectorListenerAdapter} which + * has the recommended handling for connection state issues + */ +public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable { + private final String name; + private final LeaderSelector leaderSelector; + private final AtomicInteger leaderCount = new AtomicInteger(); + + public CuratorClient(CuratorFramework client, String path, String name) { + this.name = name; + + // create a leader selector using the given path for management + // all participants in a given leader selection must use the same path + // ExampleClient here is also a LeaderSelectorListener but this isn't required + leaderSelector = new LeaderSelector(client, path, this); + + // for most cases you will want your instance to requeue when it relinquishes leadership + leaderSelector.autoRequeue(); + } + + public void start() throws IOException { + // the selection for this instance doesn't start until the leader selector is started + // leader selection is done in the background so this call to leaderSelector.start() returns immediately + leaderSelector.start(); + } + + @Override + public void close() throws IOException { + leaderSelector.close(); + } + + @Override + public void takeLeadership(CuratorFramework client) throws Exception { + // we are now the leader. This method should not return until we want to relinquish leadership + + final int waitSeconds = (int) (5 * Math.random()) + 1; + + System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds..."); + System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before."); + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); + } catch (InterruptedException e) { + System.err.println(name + " was interrupted."); + Thread.currentThread().interrupt(); + } finally { + System.out.println(name + " relinquishing leadership.\n"); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java new file mode 100644 index 0000000..ad02641 --- /dev/null +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.leader; + +import com.google.common.collect.Lists; +import org.apache.airavata.common.utils.AiravataZKUtils; +import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.List; + +public class LeaderSelectorExample { + private final static Logger logger = LoggerFactory.getLogger(LeaderSelectorExample.class); + private static final int CLIENT_QTY = 10; + + private static final String PATH = "/examples/leader"; + + public static void main(String[] args) throws Exception + { + // all of the useful sample code is in ExampleClient.java + + System.out.println("Create " + CLIENT_QTY + " clients, have each negotiate for leadership and then wait a random number of seconds before letting another leader election occur."); + System.out.println("Notice that leader election is fair: all clients will become leader and will do so the same number of times."); + + try + { + for ( int i = 0; i < CLIENT_QTY; ++i ) + { + CuratorFramework client = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3)); + + CuratorClient example = new CuratorClient(client, PATH, "Client #" + i); + + client.start(); + example.start(); + } + + System.out.println("Press enter/return to quit\n"); + new BufferedReader(new InputStreamReader(System.in)).readLine(); + } + finally + { + System.out.println("Shutting down..."); + + /*for ( CuratorClient exampleClient : examples ) + { + CloseableUtils.closeQuietly(exampleClient); + } + for ( CuratorFramework client : clients ) + { + CloseableUtils.closeQuietly(client); + }*/ + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index c8f1100..c838703 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -31,31 +31,38 @@ import org.apache.airavata.common.utils.*; import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; import org.apache.airavata.gfac.core.cpi.GFac; import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor; +import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.core.utils.InputHandlerWorker; import org.apache.airavata.gfac.cpi.GfacService; import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants; +import org.apache.airavata.gfac.leader.CuratorClient; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessageHandler; import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer; -import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.messaging.event.TaskSubmitEvent; import org.apache.airavata.model.messaging.event.TaskTerminateEvent; -import org.apache.airavata.model.workspace.experiment.ExperimentState; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.leader.LeaderSelector; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class GfacServerHandler implements GfacService.Iface, Watcher{ @@ -88,6 +95,9 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; + CuratorFramework curatorFramework = null; + + public GfacServerHandler() throws Exception{ // registering with zk try { @@ -114,6 +124,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ if(ServerSettings.isGFacPassiveMode()) { rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler()); + curatorFramework = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3)); } @@ -229,14 +240,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ inHandlerFutures.add(GFacThreadPoolExecutor.getFixedThreadPool().submit(inputHandlerWorker)); // we immediately return when we have a threadpool return true; -// }else{ -// logger.error(experimentId, "Failed to submit job to the GFac implementation, experiment {}, task {}, " + -// "gateway {}", experimentId, taskId, gatewayId); -// return false; -// } -// } catch (GFacException e) { -// throw new TException("Error launching the experiment : " + e.getMessage(), e); -// } } public boolean cancelJob(String experimentId, String taskId) throws TException { @@ -295,10 +298,26 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ private class TaskLaunchMessageHandler implements MessageHandler { private String experimentId; - + private String nodeName; + + public TaskLaunchMessageHandler(){ + try { + nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME); + } catch (ApplicationSettingsException e) { + logger.error(e.getMessage(), e); + } + } + public Map<String, Object> getProperties() { Map<String, Object> props = new HashMap<String, Object>(); - props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString()); + try { + props.put(MessagingConstants.RABBIT_ROUTING_KEY, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)); + } catch (ApplicationSettingsException e) { + // if we cannot find gfac node name configured we set a random id + logger.error("airavata-server.properties should configure: " + Constants.ZOOKEEPER_GFAC_SERVER_NAME + " value."); + logger.error("listening to a random generated routing key"); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString()); + } return props; } @@ -309,10 +328,16 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ TBase messageEvent = message.getEvent(); byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); ThriftUtils.createThriftFromBytes(bytes, event); - submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId()); - System.out.println(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType()); - } catch (TException e) { + CuratorClient curatorClient = new CuratorClient(curatorFramework, event, nodeName); + try { + curatorClient.start(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType()); + } catch (TException e) { logger.error(e.getMessage(), e); //nobody is listening so nothing to throw } }else if(message.getType().equals(MessageType.TERMINATETASK)){ @@ -331,4 +356,57 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ } } + public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable { + private final String name; + private final LeaderSelector leaderSelector; + private final AtomicInteger leaderCount = new AtomicInteger(); + private final String path; + private TaskSubmitEvent event; + private String experimentNode; + + public CuratorClient(CuratorFramework client, TaskSubmitEvent taskSubmitEvent, String name) { + this.name = name; + this.event = taskSubmitEvent; + this.path = File.separator + event.getExperimentId() + "-" + event.getTaskId() + "-" + event.getGatewayId(); + // create a leader selector using the given path for management + // all participants in a given leader selection must use the same path + // ExampleClient here is also a LeaderSelectorListener but this isn't required + leaderSelector = new LeaderSelector(client, path, this); + experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + // for most cases you will want your instance to requeue when it relinquishes leadership + leaderSelector.autoRequeue(); + } + + public void start() throws IOException { + // the selection for this instance doesn't start until the leader selector is started + // leader selection is done in the background so this call to leaderSelector.start() returns immediately + leaderSelector.start(); + } + + @Override + public void close() throws IOException { + leaderSelector.close(); + } + + @Override + public void takeLeadership(CuratorFramework client) throws Exception { + // we are now the leader. This method should not return until we want to relinquish leadership + final int waitSeconds = (int) (5 * Math.random()) + 1; + + System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds..."); + System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before."); + + try { + GFacUtils.createExperimentEntryForRPC(event.getExperimentId(),event.getTaskId(),client.getZookeeperClient().getZooKeeper(),experimentNode,name,event.getTokenId()); + submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId()); + Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); + } catch (InterruptedException e) { + System.err.println(name + " was interrupted."); + Thread.currentThread().interrupt(); + } finally { + System.out.println(name + " relinquishing leadership.\n"); + } + } + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/gfac-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml index 4fc2a15..2a42503 100644 --- a/modules/gfac/gfac-core/pom.xml +++ b/modules/gfac/gfac-core/pom.xml @@ -131,6 +131,7 @@ <artifactId>zookeeper</artifactId> <version>3.4.0</version> </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java index cbbce48..9f104fa 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java @@ -1044,9 +1044,9 @@ public class GFacUtils { } // This method is dangerous because of moving the experiment data - public static boolean createExperimentEntry(String experimentID, - String taskID, ZooKeeper zk, String experimentNode, - String pickedChild, String tokenId) throws KeeperException, + public static boolean createExperimentEntryForRPC(String experimentID, + String taskID, ZooKeeper zk, String experimentNode, + String pickedChild, String tokenId) throws KeeperException, InterruptedException { String experimentPath = experimentNode + File.separator + pickedChild; String newExpNode = experimentPath + File.separator + experimentID @@ -1153,6 +1153,116 @@ public class GFacUtils { return true; } + // This method is dangerous because of moving the experiment data + public static boolean createExperimentEntryForPassive(String experimentID, + String taskID, ZooKeeper zk, String experimentNode, + String pickedChild, String tokenId) throws KeeperException, + InterruptedException { + String experimentPath = experimentNode + File.separator + pickedChild; + String newExpNode = experimentPath + File.separator + experimentID + + "+" + taskID; + Stat exists1 = zk.exists(newExpNode, false); + String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk); + String foundExperimentPath = null; + if (exists1 == null && experimentEntry == null) { // this means this is a very new experiment + List<String> runningGfacNodeNames = AiravataZKUtils + .getAllGfacNodeNames(zk); // here we take old gfac servers + // too + for (String gfacServerNode : runningGfacNodeNames) { + if (!gfacServerNode.equals(pickedChild)) { + foundExperimentPath = experimentNode + File.separator + + gfacServerNode + File.separator + experimentID + + "+" + taskID; + exists1 = zk.exists(foundExperimentPath, false); + if (exists1 != null) { // when the experiment is found we + // break the loop + break; + } + } + } + if (exists1 == null) { // OK this is a pretty new experiment so we + // are going to create a new node + log.info("This is a new Job, so creating all the experiment docs from the scratch"); + zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + Stat expParent = zk.exists(newExpNode, false); + if (tokenId != null && expParent != null) { + zk.setData(newExpNode, tokenId.getBytes(), + expParent.getVersion()); + } + zk.create(newExpNode + File.separator + "state", String + .valueOf(GfacExperimentState.LAUNCHED.getValue()) + .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + zk.create(newExpNode + File.separator + "operation","submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + } else { + // ohhh this node exists in some other failed gfac folder, we + // have to move it to this gfac experiment list,safely + log.info("This is an old Job, so copying data from old experiment location"); + zk.create(newExpNode, + zk.getData(foundExperimentPath, false, exists1), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + List<String> children = zk.getChildren(foundExperimentPath, + false); + for (String childNode1 : children) { + String level1 = foundExperimentPath + File.separator + + childNode1; + Stat exists2 = zk.exists(level1, false); // no need to check + // exists + String newLeve1 = newExpNode + File.separator + childNode1; + log.info("Creating new znode: " + newLeve1); // these has to + // be info + // logs + zk.create(newLeve1, zk.getData(level1, false, exists2), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + for (String childNode2 : zk.getChildren(level1, false)) { + String level2 = level1 + File.separator + childNode2; + Stat exists3 = zk.exists(level2, false); // no need to + // check + // exists + String newLeve2 = newLeve1 + File.separator + + childNode2; + log.info("Creating new znode: " + newLeve2); + zk.create(newLeve2, zk.getData(level2, false, exists3), + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + } + // After all the files are successfully transfered we delete the + // old experiment,otherwise we do + // not delete a single file + log.info("After a successful copying of experiment data for an old experiment we delete the old data"); + log.info("Deleting experiment data: " + foundExperimentPath); + ZKUtil.deleteRecursive(zk, foundExperimentPath); + } + }else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){ + // this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment + // node to gfac node specific location, because original request execution will fail with errors + log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !"); + return false; + } else { + log.error("ExperimentID: " + experimentID + " taskID: " + taskID + + " is already running by this Gfac instance"); + List<String> runningGfacNodeNames = AiravataZKUtils + .getAllGfacNodeNames(zk); // here we take old gfac servers + // too + for (String gfacServerNode : runningGfacNodeNames) { + if (!gfacServerNode.equals(pickedChild)) { + foundExperimentPath = experimentNode + File.separator + + gfacServerNode + File.separator + experimentID + + "+" + taskID; + break; + } + } + ZKUtil.deleteRecursive(zk, foundExperimentPath); + } + return true; + } + public static String findExperimentEntry(String experimentID, String taskID, ZooKeeper zk ) throws KeeperException, http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/orchestrator/orchestrator-core/pom.xml ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml index 23863fb..99c0abb 100644 --- a/modules/orchestrator/orchestrator-core/pom.xml +++ b/modules/orchestrator/orchestrator-core/pom.xml @@ -127,16 +127,6 @@ the License. --> <artifactId>zookeeper</artifactId> <version>${zk.version}</version> </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> - <version>${curator.version}</version> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-framework</artifactId> - <version>${curator.version}</version> - </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index bfe2b16..78cc6b7 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -39,7 +39,6 @@ import org.apache.airavata.model.messaging.event.TaskSubmitEvent; import org.apache.airavata.orchestrator.core.context.OrchestratorContext; import org.apache.airavata.orchestrator.core.exception.OrchestratorException; import org.apache.airavata.orchestrator.core.job.JobSubmitter; -import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -113,40 +112,29 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { } } String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); - String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); List<String> children = zk.getChildren(gfacServer, this); if (children.size() == 0) { // Zookeeper data need cleaning throw new OrchestratorException("There is no active GFac instance to route the request"); } else { - String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); - // here we are not using an index because the getChildren does not return the same order everytime - String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); - logger.info("GFAC instance node data: " + gfacNodeData); - String[] split = gfacNodeData.split(":"); - gfacClient = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); - if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { - // before submitting the job we check again the state of the node - if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) { - String gatewayId = null; - CredentialReader credentialReader = GFacUtils.getCredentialReader(); - if (credentialReader != null) { - try { - gatewayId = credentialReader.getGatewayID(tokenId); - } catch (Exception e) { - logger.error(e.getLocalizedMessage()); - } - } - if(gatewayId == null || gatewayId.isEmpty()){ - gatewayId = ServerSettings.getDefaultUserGateway(); - } - TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId); - MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); - publisher.publish(messageContext); + String gatewayId = null; + CredentialReader credentialReader = GFacUtils.getCredentialReader(); + if (credentialReader != null) { + try { + gatewayId = credentialReader.getGatewayID(tokenId); + } catch (Exception e) { + logger.error(e.getLocalizedMessage()); } } + if(gatewayId == null || gatewayId.isEmpty()){ + gatewayId = ServerSettings.getDefaultUserGateway(); + } + + TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId,tokenId); + MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); + publisher.publish(messageContext); } } catch (InterruptedException e) { logger.error(e.getMessage(), e); @@ -204,8 +192,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { // before submitting the job we check again the state of the node - if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, null)) { - TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null); + if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) { + TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null,null); MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TERMINATE-"+ UUID.randomUUID().toString(),null); publisher.publish(messageContext); } http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java index 54339a2..b855de2 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java @@ -99,7 +99,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher { gfacClient = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { // before submitting the job we check again the state of the node - if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) { + if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) { String gatewayId = null; CredentialReader credentialReader = GFacUtils.getCredentialReader(); if (credentialReader != null) { @@ -167,7 +167,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher { localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { // before submitting the job we check again the state of the node - if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, null)) { + if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) { return localhost.cancelJob(experimentID, taskID); } }
