Repository: airavata Updated Branches: refs/heads/queue-gfac 0149c1afd -> b6bf782db
wrapping up working version of queue based communication between orchestrator and gfac Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b6bf782d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b6bf782d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b6bf782d Branch: refs/heads/queue-gfac Commit: b6bf782db30b0e7555f78852da3efcfdbfd530e4 Parents: 0149c1a Author: Lahiru Gunathilake <[email protected]> Authored: Thu Feb 12 11:36:02 2015 -0500 Committer: Lahiru Gunathilake <[email protected]> Committed: Thu Feb 12 11:36:02 2015 -0500 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 8 ++++---- .../apache/airavata/gfac/server/GfacServerHandler.java | 13 ++++--------- .../airavata/messaging/core/impl/RabbitMQProducer.java | 11 ++++++++++- .../core/impl/RabbitMQTaskLaunchPublisher.java | 4 ++-- .../core/impl/GFACPassiveJobSubmitter.java | 4 +++- 5 files changed, 23 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index b90e0ff..8483da7 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -60,7 +60,7 @@ public class CreateLaunchExperiment { private static final String DEFAULT_GATEWAY = "default.registry.gateway"; private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_2e539083-665d-40fd-aaa2-4a751028326b"; + private static String echoAppId = "Echo_78e34255-39f3-4c07-add6-a1a672c80104"; private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9"; private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762"; private static String amberAppId = "Amber_eda074ea-223d-49d7-a942-6c8742249f36"; @@ -85,7 +85,7 @@ public class CreateLaunchExperiment { public static void main(String[] args) throws Exception { airavataClient = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT); System.out.println("API version is " + airavataClient.getAPIVersion()); -// registerApplications(); // run this only the first time +// registerApplications(); // run this only the first time createAndLaunchExp(); } @@ -101,13 +101,13 @@ public class CreateLaunchExperiment { // final String expId = createMPIExperimentForFSD(airavataClient); // final String expId = createEchoExperimentForStampede(airavataClient); // final String expId = createEchoExperimentForTrestles(airavataClient); -// final String expId = createExperimentEchoForLocalHost(airavataClient); + final String expId = createExperimentEchoForLocalHost(airavataClient); // final String expId = createExperimentWRFTrestles(airavataClient); // final String expId = createExperimentForBR2(airavataClient); // final String expId = createExperimentForBR2Amber(airavataClient); // final String expId = createExperimentWRFStampede(airavataClient); // final String expId = createExperimentForStampedeAmber(airavataClient); - final String expId = createExperimentForTrestlesAmber(airavataClient); +// final String expId = createExperimentForTrestlesAmber(airavataClient); // final String expId = createExperimentGROMACSStampede(airavataClient); // final String expId = createExperimentESPRESSOStampede(airavataClient); // final String expId = createExperimentLAMMPSStampede(airavataClient); http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index d428d9c..c8f1100 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -113,6 +113,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ if(ServerSettings.isGFacPassiveMode()) { rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); + rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler()); } @@ -291,19 +292,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ } } - private class NotificationMessageHandler implements MessageHandler { + private class TaskLaunchMessageHandler implements MessageHandler { private String experimentId; - private NotificationMessageHandler(String experimentId) { - this.experimentId = experimentId; - } - + public Map<String, Object> getProperties() { Map<String, Object> props = new HashMap<String, Object>(); - List<String> routingKeys = new ArrayList<String>(); - routingKeys.add(experimentId); - routingKeys.add(experimentId + ".*"); - props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString()); return props; } http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java index b4a6d46..570b17f 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java @@ -48,6 +48,15 @@ public class RabbitMQProducer { private String url; + private String getExchangeType = "topic"; + + + public RabbitMQProducer(String url, String exchangeName,String getExchangeType) { + this.exchangeName = exchangeName; + this.url = url; + this.getExchangeType = getExchangeType; + } + public RabbitMQProducer(String url, String exchangeName) { this.exchangeName = exchangeName; this.url = url; @@ -104,7 +113,7 @@ public class RabbitMQProducer { log.info("setting basic.qos / prefetch count to " + prefetchCount + " for " + exchangeName); channel.basicQos(prefetchCount); } - channel.exchangeDeclare(exchangeName, "topic", false); + channel.exchangeDeclare(exchangeName, getExchangeType, false); } catch (Exception e) { reset(); String msg = "could not open channel for exchange " + exchangeName; http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java index fe58042..23b2379 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java @@ -44,13 +44,13 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{ String exchangeName; try { brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); - exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); + exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME); } catch (ApplicationSettingsException e) { String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; log.error(message, e); throw new AiravataException(message, e); } - rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName); + rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName,"fanout"); rabbitMQProducer.open(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/b6bf782d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index 58ac982..bfe2b16 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -22,6 +22,7 @@ package org.apache.airavata.orchestrator.core.impl; import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.common.utils.Constants; import org.apache.airavata.common.utils.ServerSettings; @@ -141,7 +142,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { gatewayId = ServerSettings.getDefaultUserGateway(); } TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId); - MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TASK-"+ UUID.randomUUID().toString(),gatewayId); + MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp()); publisher.publish(messageContext); } }
