Repository: airavata Updated Branches: refs/heads/queue-gfac 88d27d957 -> 0149c1afd
adding consumers to gfac without leader election Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/0149c1af Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/0149c1af Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/0149c1af Branch: refs/heads/queue-gfac Commit: 0149c1afd477ab1d86d19374bcb9824520d3a3bc Parents: 88d27d9 Author: Lahiru Gunathilake <[email protected]> Authored: Wed Feb 11 15:48:40 2015 -0500 Committer: Lahiru Gunathilake <[email protected]> Committed: Wed Feb 11 15:48:40 2015 -0500 ---------------------------------------------------------------------- .../airavata/common/utils/ServerSettings.java | 21 +- .../main/resources/airavata-server.properties | 7 +- .../main/resources/airavata-server.properties | 5 +- .../airavata/gfac/server/GfacServerHandler.java | 75 ++++- .../messaging/core/MessagingConstants.java | 3 +- .../messaging/core/PublisherFactory.java | 2 +- .../airavata/messaging/core/TestClient.java | 5 +- .../messaging/core/impl/RabbitMQConsumer.java | 258 ----------------- .../core/impl/RabbitMQStatusConsumer.java | 274 +++++++++++++++++++ .../core/impl/RabbitMQStatusPublisher.java | 2 +- .../core/impl/RabbitMQTaskLaunchConsumer.java | 239 ++++++++++++++++ .../core/impl/RabbitMQTaskLaunchPublisher.java | 12 +- .../core/utils/OrchestratorConstants.java | 1 - .../airavata/xbaya/messaging/Monitor.java | 5 +- 14 files changed, 614 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index 4ea0b44..b076e6a 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -28,7 +28,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; public class ServerSettings extends ApplicationSettings { - private static final String DEFAULT_USER = "default.registry.user"; + private static final String DEFAULT_USER = "default.registry.user"; private static final String DEFAULT_USER_PASSWORD = "default.registry.password"; private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway"; @@ -51,13 +51,15 @@ public class ServerSettings extends ApplicationSettings { private static final String MY_PROXY_USER = "myproxy.user"; private static final String MY_PROXY_PASSWORD = "myproxy.password"; private static final String MY_PROXY_LIFETIME = "myproxy.life"; - private static final String ACTIVITY_PUBLISHER = "activity.publisher"; + private static final String STATUS_PUBLISHER = "status.publisher"; private static final String TASK_LAUNCH_PUBLISHER = "task.launch.publisher"; private static final String ACTIVITY_LISTENERS = "activity.listeners"; public static final String PUBLISH_RABBITMQ = "publish.rabbitmq"; public static final String JOB_NOTIFICATION_ENABLE = "job.notification.enable"; public static final String JOB_NOTIFICATION_EMAILIDS = "job.notification.emailids"; public static final String JOB_NOTIFICATION_FLAGS = "job.notification.flags"; + public static final String GFAC_PASSIVE = "gfac.passive"; // by default this is desabled + private static boolean stopAllThreads = false; @@ -73,7 +75,7 @@ public class ServerSettings extends ApplicationSettings { return getSetting(DEFAULT_USER_GATEWAY); } - public static String getServerContextRoot() { + public static String getServerContextRoot() { return getSetting(SERVER_CONTEXT_ROOT, "axis2"); } @@ -151,19 +153,24 @@ public class ServerSettings extends ApplicationSettings { return getSetting(ACTIVITY_LISTENERS).split(","); } - public static String getActivityPublisher() throws ApplicationSettingsException{ - return getSetting(ACTIVITY_PUBLISHER); + public static String getStatusPublisher() throws ApplicationSettingsException { + return getSetting(STATUS_PUBLISHER); } - public static String getTaskLaunchPublisher() throws ApplicationSettingsException{ + public static String getTaskLaunchPublisher() throws ApplicationSettingsException { return getSetting(TASK_LAUNCH_PUBLISHER); } - public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException{ + public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException { String setting = getSetting(PUBLISH_RABBITMQ); return Boolean.parseBoolean(setting); } + public static boolean isGFacPassiveMode()throws ApplicationSettingsException { + String setting = getSetting(GFAC_PASSIVE); + return Boolean.parseBoolean(setting); + } + public static boolean isEmbeddedZK() { return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true")); } http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/configuration/server/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties index c90fab1..e309901 100644 --- a/modules/configuration/server/src/main/resources/airavata-server.properties +++ b/modules/configuration/server/src/main/resources/airavata-server.properties @@ -137,6 +137,7 @@ myproxy.password= myproxy.life=3600 # XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates +gfac.passive=false # SSH PKI key pair or ssh password can be used SSH based authentication is used. # if user specify both password authentication gets the higher preference @@ -215,10 +216,12 @@ connection.name=xsede #publisher activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher publish.rabbitmq=false -activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher +status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher rabbitmq.broker.url=amqp://localhost:5672 -rabbitmq.exchange.name=airavata_rabbitmq_exchange +rabbitmq.status.exchange.name=airavata_rabbitmq_exchange +rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange + ########################################################################### # Orchestrator module Configuration http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties ---------------------------------------------------------------------- diff --git a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties index d6be51a..2ecdeb6 100644 --- a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties +++ b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties @@ -200,10 +200,11 @@ connection.name=xsede #publisher activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher publish.rabbitmq=false -activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher +status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher rabbitmq.broker.url=amqp://localhost:5672 -rabbitmq.exchange.name=airavata_rabbitmq_exchange +rabbitmq.status.exchange.name=airavata_rabbitmq_exchange +rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange ########################################################################### # Orchestrator module Configuration http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 583ec07..d428d9c 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -27,28 +27,34 @@ import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.logger.AiravataLogger; import org.apache.airavata.common.logger.AiravataLoggerFactory; -import org.apache.airavata.common.utils.AiravataZKUtils; -import org.apache.airavata.common.utils.Constants; -import org.apache.airavata.common.utils.MonitorPublisher; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.common.utils.*; import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; import org.apache.airavata.gfac.core.cpi.GFac; import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor; import org.apache.airavata.gfac.core.utils.InputHandlerWorker; import org.apache.airavata.gfac.cpi.GfacService; import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; +import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer; +import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; +import org.apache.airavata.model.messaging.event.MessageType; +import org.apache.airavata.model.messaging.event.TaskSubmitEvent; +import org.apache.airavata.model.messaging.event.TaskTerminateEvent; +import org.apache.airavata.model.workspace.experiment.ExperimentState; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.registry.cpi.RegistryException; +import org.apache.thrift.TBase; import org.apache.thrift.TException; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.Future; @@ -80,6 +86,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ private List<Future> inHandlerFutures; + private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; + public GfacServerHandler() throws Exception{ // registering with zk try { @@ -102,7 +110,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ BetterGfacImpl.startDaemonHandlers(); BetterGfacImpl.startStatusUpdators(registry, zk, publisher); inHandlerFutures = new ArrayList<Future>(); - } catch (ApplicationSettingsException e) { + + if(ServerSettings.isGFacPassiveMode()) { + rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); + } + + + } catch (ApplicationSettingsException e) { logger.error("Error initialising GFAC", e); throw new Exception("Error initialising GFAC", e); } catch (InterruptedException e) { @@ -277,4 +291,49 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ } } + private class NotificationMessageHandler implements MessageHandler { + private String experimentId; + + private NotificationMessageHandler(String experimentId) { + this.experimentId = experimentId; + } + + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + List<String> routingKeys = new ArrayList<String>(); + routingKeys.add(experimentId); + routingKeys.add(experimentId + ".*"); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys); + return props; + } + + public void onMessage(MessageContext message) { + if (message.getType().equals(MessageType.LAUNCHTASK)){ + try { + TaskSubmitEvent event = new TaskSubmitEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId()); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType()); + } catch (TException e) { + logger.error(e.getMessage(), e); //nobody is listening so nothing to throw + } + }else if(message.getType().equals(MessageType.TERMINATETASK)){ + try { + TaskTerminateEvent event = new TaskTerminateEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + cancelJob(event.getExperimentId(), event.getTaskId()); + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType()); + } catch (TException e) { + logger.error(e.getMessage(), e); //nobody is listening so nothing to throw + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java index 7458d81..07b39e7 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java @@ -23,7 +23,8 @@ package org.apache.airavata.messaging.core; public abstract class MessagingConstants { public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url"; - public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name"; + public static final String RABBITMQ_STATUS_EXCHANGE_NAME = "rabbitmq.status.exchange.name"; + public static final String RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME = "rabbitmq.task.launch.exchange.name"; public static final String RABBIT_ROUTING_KEY = "routingKey"; public static final String RABBIT_QUEUE= "queue"; http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java index 59cdbdf..2e560a3 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java @@ -30,7 +30,7 @@ public class PublisherFactory { private static Logger log = LoggerFactory.getLogger(PublisherFactory.class); public static Publisher createActivityPublisher() throws AiravataException { - String activityPublisher = ServerSettings.getActivityPublisher(); + String activityPublisher = ServerSettings.getStatusPublisher(); if (activityPublisher == null) { String s = "Activity publisher is not specified"; http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java index 362f3f2..aea561f 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java @@ -25,9 +25,8 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; -import org.apache.airavata.messaging.core.impl.RabbitMQConsumer; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent; -import org.apache.airavata.model.messaging.event.Message; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.thrift.TBase; import org.apache.thrift.TException; @@ -51,7 +50,7 @@ public class TestClient { AiravataUtils.setExecutionAsServer(); String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL); final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME); - RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName); + RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName); consumer.listen(new MessageHandler() { @Override public Map<String, Object> getProperties() { http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java deleted file mode 100644 index 1f13496..0000000 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.airavata.messaging.core.impl; - - -import com.rabbitmq.client.*; -import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.exception.ApplicationSettingsException; -import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.common.utils.ThriftUtils; -import org.apache.airavata.messaging.core.Consumer; -import org.apache.airavata.messaging.core.MessageContext; -import org.apache.airavata.messaging.core.MessageHandler; -import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.model.messaging.event.*; -import org.apache.thrift.TBase; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class RabbitMQConsumer implements Consumer { - private static Logger log = LoggerFactory.getLogger(RabbitMQConsumer.class); - - private String exchangeName; - private String url; - private Connection connection; - private Channel channel; - private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>(); - - public RabbitMQConsumer() throws AiravataException { - try { - url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); - exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME); - - createConnection(); - } catch (ApplicationSettingsException e) { - String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; - log.error(message, e); - throw new AiravataException(message, e); - } - } - - public RabbitMQConsumer(String brokerUrl, String exchangeName) throws AiravataException { - this.exchangeName = exchangeName; - this.url = brokerUrl; - - createConnection(); - } - - private void createConnection() throws AiravataException { - try { - ConnectionFactory connectionFactory = new ConnectionFactory(); - connectionFactory.setUri(url); - connection = connectionFactory.newConnection(); - connection.addShutdownListener(new ShutdownListener() { - public void shutdownCompleted(ShutdownSignalException cause) { - } - }); - log.info("connected to rabbitmq: " + connection + " for " + exchangeName); - - channel = connection.createChannel(); - channel.exchangeDeclare(exchangeName, "topic", false); - - } catch (Exception e) { - String msg = "could not open channel for exchange " + exchangeName; - log.error(msg); - throw new AiravataException(msg, e); - } - } - - public String listen(final MessageHandler handler) throws AiravataException { - try { - Map<String, Object> props = handler.getProperties(); - final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY); - if (routing == null) { - throw new IllegalArgumentException("The routing key must be present"); - } - - List<String> keys = new ArrayList<String>(); - if (routing instanceof List) { - for (Object o : (List)routing) { - keys.add(o.toString()); - } - } else if (routing instanceof String) { - keys.add((String) routing); - } - - String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE); - String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG); - if (queueName == null) { - if (!channel.isOpen()) { - channel = connection.createChannel(); - channel.exchangeDeclare(exchangeName, "topic", false); - } - queueName = channel.queueDeclare().getQueue(); - } else { - channel.queueDeclare(queueName, true, false, false, null); - } - - final String id = getId(keys, queueName); - if (queueDetailsMap.containsKey(id)) { - throw new IllegalStateException("This subscriber is already defined for this Consumer, " + - "cannot define the same subscriber twice"); - } - - if (consumerTag == null) { - consumerTag = "default"; - } - - // bind all the routing keys - for (String routingKey : keys) { - channel.queueBind(queueName, exchangeName, routingKey); - } - - channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) { - @Override - public void handleDelivery(String consumerTag, - Envelope envelope, - AMQP.BasicProperties properties, - byte[] body) { - Message message = new Message(); - - try { - ThriftUtils.createThriftFromBytes(body, message); - TBase event = null; - String gatewayId = null; - if (message.getMessageType().equals(MessageType.EXPERIMENT)) { - ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + - experimentStatusChangeEvent.getState()); - event = experimentStatusChangeEvent; - gatewayId = experimentStatusChangeEvent.getGatewayId(); - } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE)) { - WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + - wfnStatusChangeEvent.getState()); - event = wfnStatusChangeEvent; - gatewayId = wfnStatusChangeEvent.getWorkflowNodeIdentity().getGatewayId(); - } else if (message.getMessageType().equals(MessageType.TASK)) { - TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + - taskStatusChangeEvent.getState()); - event = taskStatusChangeEvent; - gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId(); - } else if (message.getMessageType().equals(MessageType.JOB)) { - JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(); - ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent); - log.debug(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getMessageType() + "' with status " + - jobStatusChangeEvent.getState()); - event = jobStatusChangeEvent; - gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId(); - } - MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId); - messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); - handler.onMessage(messageContext); - } catch (TException e) { - String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id; - log.warn(msg, e); - } - } - }); - // save the name for deleting the queue - queueDetailsMap.put(id, new QueueDetails(queueName, keys)); - return id; - } catch (Exception e) { - String msg = "could not open channel for exchange " + exchangeName; - log.error(msg); - throw new AiravataException(msg, e); - } - } - - public void stopListen(final String id) throws AiravataException { - QueueDetails details = queueDetailsMap.get(id); - if (details != null) { - try { - for (String key : details.getRoutingKeys()) { - channel.queueUnbind(details.getQueueName(), exchangeName, key); - } - channel.queueDelete(details.getQueueName(), true, true); - } catch (IOException e) { - String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName; - log.debug(msg); - } - } - } - - /** - * Private class for holding some information about the consumers registered - */ - private class QueueDetails { - String queueName; - - List<String> routingKeys; - - private QueueDetails(String queueName, List<String> routingKeys) { - this.queueName = queueName; - this.routingKeys = routingKeys; - } - - public String getQueueName() { - return queueName; - } - - public List<String> getRoutingKeys() { - return routingKeys; - } - } - - private String getId(List<String> routingKeys, String queueName) { - String id = ""; - for (String key : routingKeys) { - id = id + "_" + key; - } - return id + "_" + queueName; - } - - public void close() { - if (connection != null) { - try { - connection.close(); - } catch (IOException ignore) { - } - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java new file mode 100644 index 0000000..d5e8c72 --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java @@ -0,0 +1,274 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.messaging.core.impl; + + +import com.rabbitmq.client.*; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.Consumer; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.model.messaging.event.*; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RabbitMQStatusConsumer implements Consumer { + private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class); + + private String exchangeName; + private String url; + private Connection connection; + private Channel channel; + private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>(); + + public RabbitMQStatusConsumer() throws AiravataException { + try { + url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); + exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); + + createConnection(); + } catch (ApplicationSettingsException e) { + String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; + log.error(message, e); + throw new AiravataException(message, e); + } + } + + public RabbitMQStatusConsumer(String brokerUrl, String exchangeName) throws AiravataException { + this.exchangeName = exchangeName; + this.url = brokerUrl; + + createConnection(); + } + + private void createConnection() throws AiravataException { + try { + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setUri(url); + connection = connectionFactory.newConnection(); + connection.addShutdownListener(new ShutdownListener() { + public void shutdownCompleted(ShutdownSignalException cause) { + } + }); + log.info("connected to rabbitmq: " + connection + " for " + exchangeName); + + channel = connection.createChannel(); + channel.exchangeDeclare(exchangeName, "topic", false); + + } catch (Exception e) { + String msg = "could not open channel for exchange " + exchangeName; + log.error(msg); + throw new AiravataException(msg, e); + } + } + + public String listen(final MessageHandler handler) throws AiravataException { + try { + Map<String, Object> props = handler.getProperties(); + final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY); + if (routing == null) { + throw new IllegalArgumentException("The routing key must be present"); + } + + List<String> keys = new ArrayList<String>(); + if (routing instanceof List) { + for (Object o : (List)routing) { + keys.add(o.toString()); + } + } else if (routing instanceof String) { + keys.add((String) routing); + } + + String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE); + String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG); + if (queueName == null) { + if (!channel.isOpen()) { + channel = connection.createChannel(); + channel.exchangeDeclare(exchangeName, "topic", false); + } + queueName = channel.queueDeclare().getQueue(); + } else { + channel.queueDeclare(queueName, true, false, false, null); + } + + final String id = getId(keys, queueName); + if (queueDetailsMap.containsKey(id)) { + throw new IllegalStateException("This subscriber is already defined for this Consumer, " + + "cannot define the same subscriber twice"); + } + + if (consumerTag == null) { + consumerTag = "default"; + } + + // bind all the routing keys + for (String routingKey : keys) { + channel.queueBind(queueName, exchangeName, routingKey); + } + + channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) { + Message message = new Message(); + + try { + ThriftUtils.createThriftFromBytes(body, message); + TBase event = null; + String gatewayId = null; + if (message.getMessageType().equals(MessageType.EXPERIMENT)) { + ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent); + log.debug(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getMessageType() + "' with status " + + experimentStatusChangeEvent.getState()); + event = experimentStatusChangeEvent; + gatewayId = experimentStatusChangeEvent.getGatewayId(); + } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE)) { + WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent); + log.debug(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getMessageType() + "' with status " + + wfnStatusChangeEvent.getState()); + event = wfnStatusChangeEvent; + gatewayId = wfnStatusChangeEvent.getWorkflowNodeIdentity().getGatewayId(); + } else if (message.getMessageType().equals(MessageType.TASK)) { + TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent); + log.debug(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getMessageType() + "' with status " + + taskStatusChangeEvent.getState()); + event = taskStatusChangeEvent; + gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId(); + } else if (message.getMessageType().equals(MessageType.JOB)) { + JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent); + log.debug(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getMessageType() + "' with status " + + jobStatusChangeEvent.getState()); + event = jobStatusChangeEvent; + gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId(); + }else if(message.getMessageType().equals(MessageType.LAUNCHTASK)) { + TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent); + log.debug(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getMessageType() + "' for experimentId: " + + taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId()); + event = taskSubmitEvent; + gatewayId = taskSubmitEvent.getGatewayId(); + }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) { + TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent); + log.debug(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getMessageType() + "' for experimentId: " + + taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId()); + event = taskTerminateEvent; + gatewayId = null; + } + MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); + handler.onMessage(messageContext); + } catch (TException e) { + String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id; + log.warn(msg, e); + } + } + }); + // save the name for deleting the queue + queueDetailsMap.put(id, new QueueDetails(queueName, keys)); + return id; + } catch (Exception e) { + String msg = "could not open channel for exchange " + exchangeName; + log.error(msg); + throw new AiravataException(msg, e); + } + } + + public void stopListen(final String id) throws AiravataException { + QueueDetails details = queueDetailsMap.get(id); + if (details != null) { + try { + for (String key : details.getRoutingKeys()) { + channel.queueUnbind(details.getQueueName(), exchangeName, key); + } + channel.queueDelete(details.getQueueName(), true, true); + } catch (IOException e) { + String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName; + log.debug(msg); + } + } + } + + /** + * Private class for holding some information about the consumers registered + */ + private class QueueDetails { + String queueName; + + List<String> routingKeys; + + private QueueDetails(String queueName, List<String> routingKeys) { + this.queueName = queueName; + this.routingKeys = routingKeys; + } + + public String getQueueName() { + return queueName; + } + + public List<String> getRoutingKeys() { + return routingKeys; + } + } + + private String getId(List<String> routingKeys, String queueName) { + String id = ""; + for (String key : routingKeys) { + id = id + "_" + key; + } + return id + "_" + queueName; + } + + public void close() { + if (connection != null) { + try { + connection.close(); + } catch (IOException ignore) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java index a4b4d1a..fe06ed7 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java @@ -45,7 +45,7 @@ public class RabbitMQStatusPublisher implements Publisher { String exchangeName; try { brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); - exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME); + exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); } catch (ApplicationSettingsException e) { String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; log.error(message, e); http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java new file mode 100644 index 0000000..056dcac --- /dev/null +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java @@ -0,0 +1,239 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.messaging.core.impl; + +import com.rabbitmq.client.*; +import org.apache.airavata.common.exception.AiravataException; +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.common.utils.AiravataUtils; +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.common.utils.ThriftUtils; +import org.apache.airavata.messaging.core.MessageContext; +import org.apache.airavata.messaging.core.MessageHandler; +import org.apache.airavata.messaging.core.MessagingConstants; +import org.apache.airavata.model.messaging.event.*; +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RabbitMQTaskLaunchConsumer { + private final static Logger logger = LoggerFactory.getLogger(RabbitMQTaskLaunchConsumer.class); + private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class); + + private String taskLaunchExchangeName; + private String url; + private Connection connection; + private Channel channel; + private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>(); + + public RabbitMQTaskLaunchConsumer() throws AiravataException { + try { + url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); + taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME); + createConnection(); + } catch (ApplicationSettingsException e) { + String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; + log.error(message, e); + throw new AiravataException(message, e); + } + } + + public RabbitMQTaskLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException { + this.taskLaunchExchangeName = exchangeName; + this.url = brokerUrl; + + createConnection(); + } + + private void createConnection() throws AiravataException { + try { + ConnectionFactory connectionFactory = new ConnectionFactory(); + connectionFactory.setUri(url); + connection = connectionFactory.newConnection(); + connection.addShutdownListener(new ShutdownListener() { + public void shutdownCompleted(ShutdownSignalException cause) { + } + }); + log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName); + + channel = connection.createChannel(); + channel.exchangeDeclare(taskLaunchExchangeName, "fanout"); + + } catch (Exception e) { + String msg = "could not open channel for exchange " + taskLaunchExchangeName; + log.error(msg); + throw new AiravataException(msg, e); + } + } + + public String listen(final MessageHandler handler) throws AiravataException { + try { + Map<String, Object> props = handler.getProperties(); + final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY); + if (routing == null) { + throw new IllegalArgumentException("The routing key must be present"); + } + + List<String> keys = new ArrayList<String>(); + if (routing instanceof List) { + for (Object o : (List)routing) { + keys.add(o.toString()); + } + } else if (routing instanceof String) { + keys.add((String) routing); + } + + String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE); + String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG); + if (queueName == null) { + if (!channel.isOpen()) { + channel = connection.createChannel(); + channel.exchangeDeclare(taskLaunchExchangeName, "fanout"); + } + queueName = channel.queueDeclare().getQueue(); + } else { + channel.queueDeclare(queueName, true, false, false, null); + } + + final String id = getId(keys, queueName); + if (queueDetailsMap.containsKey(id)) { + throw new IllegalStateException("This subscriber is already defined for this Consumer, " + + "cannot define the same subscriber twice"); + } + + if (consumerTag == null) { + consumerTag = "default"; + } + + // bind all the routing keys + for (String routingKey : keys) { + channel.queueBind(queueName, taskLaunchExchangeName, routingKey); + } + + channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) { + Message message = new Message(); + + try { + ThriftUtils.createThriftFromBytes(body, message); + TBase event = null; + String gatewayId = null; + if(message.getMessageType().equals(MessageType.LAUNCHTASK)) { + TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent); + log.debug(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getMessageType() + "' for experimentId: " + + taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId()); + event = taskSubmitEvent; + gatewayId = taskSubmitEvent.getGatewayId(); + }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) { + TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent(); + ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent); + log.debug(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getMessageType() + "' for experimentId: " + + taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId()); + event = taskTerminateEvent; + gatewayId = null; + } + MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId); + messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime())); + handler.onMessage(messageContext); + } catch (TException e) { + String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id; + log.warn(msg, e); + } + } + }); + // save the name for deleting the queue + queueDetailsMap.put(id, new QueueDetails(queueName, keys)); + return id; + } catch (Exception e) { + String msg = "could not open channel for exchange " + taskLaunchExchangeName; + log.error(msg); + throw new AiravataException(msg, e); + } + } + + public void stopListen(final String id) throws AiravataException { + QueueDetails details = queueDetailsMap.get(id); + if (details != null) { + try { + for (String key : details.getRoutingKeys()) { + channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key); + } + channel.queueDelete(details.getQueueName(), true, true); + } catch (IOException e) { + String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName; + log.debug(msg); + } + } + } + + /** + * Private class for holding some information about the consumers registered + */ + private class QueueDetails { + String queueName; + + List<String> routingKeys; + + private QueueDetails(String queueName, List<String> routingKeys) { + this.queueName = queueName; + this.routingKeys = routingKeys; + } + + public String getQueueName() { + return queueName; + } + + public List<String> getRoutingKeys() { + return routingKeys; + } + } + + private String getId(List<String> routingKeys, String queueName) { + String id = ""; + for (String key : routingKeys) { + id = id + "_" + key; + } + return id + "_" + queueName; + } + + public void close() { + if (connection != null) { + try { + connection.close(); + } catch (IOException ignore) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java index 8029a0c..fe58042 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java @@ -44,7 +44,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{ String exchangeName; try { brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL); - exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME); + exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME); } catch (ApplicationSettingsException e) { String message = "Failed to get read the required properties from airavata to initialize rabbitmq"; log.error(message, e); @@ -56,7 +56,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{ public void publish(MessageContext msgCtx) throws AiravataException { try { - log.info("Publishing to lauch queue ..."); + log.info("Publishing to launch queue ..."); byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent()); Message message = new Message(); message.setEvent(body); @@ -65,13 +65,9 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{ message.setUpdatedTime(msgCtx.getUpdatedTime().getTime()); String routingKey = null; if (msgCtx.getType().equals(MessageType.LAUNCHTASK)){ - TaskSubmitEvent event = (TaskSubmitEvent) msgCtx.getEvent(); - routingKey = LAUNCH_TASK + "."+event.getExperimentId() + "." + - event.getTaskId() + "." + event.getGatewayId(); + routingKey = LAUNCH_TASK; }else if(msgCtx.getType().equals(MessageType.TERMINATETASK)){ - TaskTerminateEvent event = (TaskTerminateEvent) msgCtx.getEvent(); - routingKey = TERMINATE_TASK + "."+event.getExperimentId() + "." + - event.getTaskId(); + routingKey = TERMINATE_TASK; } byte[] messageBody = ThriftUtils.serializeThriftObject(message); rabbitMQProducer.send(messageBody, routingKey); http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java index 0e0e425..97b85bc 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java @@ -25,7 +25,6 @@ package org.apache.airavata.orchestrator.core.utils; * */ public class OrchestratorConstants { - private static final String SUBMITTER_PROPERTY = "job.submitter"; public static final String AIRAVATA_PROPERTIES = "airavata-server.properties"; public static final int hotUpdateInterval=1000; public static final String JOB_SUBMITTER = "job.submitter"; http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java ---------------------------------------------------------------------- diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java index 896b248..6ee1111 100644 --- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java +++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java @@ -22,12 +22,11 @@ package org.apache.airavata.xbaya.messaging; import org.apache.airavata.common.exception.AiravataException; -import org.apache.airavata.common.utils.AiravataUtils; import org.apache.airavata.messaging.core.Consumer; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessageHandler; import org.apache.airavata.messaging.core.MessagingConstants; -import org.apache.airavata.messaging.core.impl.RabbitMQConsumer; +import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer; import org.apache.airavata.model.messaging.event.MessageType; import org.apache.airavata.model.workspace.experiment.ExperimentState; import org.apache.airavata.workflow.model.exceptions.WorkflowException; @@ -101,7 +100,7 @@ public class Monitor extends EventProducer { getEventDataRepository().triggerListenerForPreMonitorStart(); try { // AiravataUtils.setExecutionAsServer(); - this.messageClient = new RabbitMQConsumer("amqp://localhost:5672", "airavata_rabbitmq_exchange"); + this.messageClient = new RabbitMQStatusConsumer("amqp://localhost:5672", "airavata_rabbitmq_exchange"); } catch (AiravataException e) { String msg = "Failed to start the consumer"; logger.error(msg, e);
