Repository: airavata Updated Branches: refs/heads/queue-gfac a486b67d9 -> 60788efec
initial version of passive job submission Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/60788efe Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/60788efe Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/60788efe Branch: refs/heads/queue-gfac Commit: 60788efec3a808274a19e9687a669cabad84e89b Parents: a486b67 Author: Lahiru Gunathilake <[email protected]> Authored: Tue Feb 17 22:09:15 2015 -0500 Committer: Lahiru Gunathilake <[email protected]> Committed: Tue Feb 17 22:09:15 2015 -0500 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 2 +- .../airavata/gfac/leader/CuratorClient.java | 79 ------------------ .../gfac/leader/LeaderSelectorExample.java | 80 ------------------ .../airavata/gfac/server/GfacServerHandler.java | 85 ++++++++++---------- .../core/impl/RabbitMQTaskLaunchConsumer.java | 1 - .../core/impl/GFACPassiveJobSubmitter.java | 5 -- 6 files changed, 43 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index b7121b9..6937c25 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -95,7 +95,7 @@ public class CreateLaunchExperiment { public static void createAndLaunchExp() throws TException { // final String expId = createEchoExperimentForFSD(airavataClient); try { - for (int i = 0; i < 1; i++) { + for (int i = 0; i < 10; i++) { // final String expId = createExperimentForSSHHost(airavata); // final String expId = createEchoExperimentForFSD(airavataClient); // final String expId = createMPIExperimentForFSD(airavataClient); http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java deleted file mode 100644 index 2db9a6f..0000000 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.airavata.gfac.leader; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; -import org.apache.curator.framework.recipes.leader.LeaderSelector; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * An example leader selector client. Note that {@link LeaderSelectorListenerAdapter} which - * has the recommended handling for connection state issues - */ -public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable { - private final String name; - private final LeaderSelector leaderSelector; - private final AtomicInteger leaderCount = new AtomicInteger(); - - public CuratorClient(CuratorFramework client, String path, String name) { - this.name = name; - - // create a leader selector using the given path for management - // all participants in a given leader selection must use the same path - // ExampleClient here is also a LeaderSelectorListener but this isn't required - leaderSelector = new LeaderSelector(client, path, this); - - // for most cases you will want your instance to requeue when it relinquishes leadership - leaderSelector.autoRequeue(); - } - - public void start() throws IOException { - // the selection for this instance doesn't start until the leader selector is started - // leader selection is done in the background so this call to leaderSelector.start() returns immediately - leaderSelector.start(); - } - - @Override - public void close() throws IOException { - leaderSelector.close(); - } - - @Override - public void takeLeadership(CuratorFramework client) throws Exception { - // we are now the leader. This method should not return until we want to relinquish leadership - - final int waitSeconds = (int) (5 * Math.random()) + 1; - - System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds..."); - System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before."); - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); - } catch (InterruptedException e) { - System.err.println(name + " was interrupted."); - Thread.currentThread().interrupt(); - } finally { - System.out.println(name + " relinquishing leadership.\n"); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java deleted file mode 100644 index ad02641..0000000 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.leader; - -import com.google.common.collect.Lists; -import org.apache.airavata.common.utils.AiravataZKUtils; -import org.apache.airavata.common.utils.Constants; -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.curator.utils.CloseableUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.util.List; - -public class LeaderSelectorExample { - private final static Logger logger = LoggerFactory.getLogger(LeaderSelectorExample.class); - private static final int CLIENT_QTY = 10; - - private static final String PATH = "/examples/leader"; - - public static void main(String[] args) throws Exception - { - // all of the useful sample code is in ExampleClient.java - - System.out.println("Create " + CLIENT_QTY + " clients, have each negotiate for leadership and then wait a random number of seconds before letting another leader election occur."); - System.out.println("Notice that leader election is fair: all clients will become leader and will do so the same number of times."); - - try - { - for ( int i = 0; i < CLIENT_QTY; ++i ) - { - CuratorFramework client = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3)); - - CuratorClient example = new CuratorClient(client, PATH, "Client #" + i); - - client.start(); - example.start(); - } - - System.out.println("Press enter/return to quit\n"); - new BufferedReader(new InputStreamReader(System.in)).readLine(); - } - finally - { - System.out.println("Shutting down..."); - - /*for ( CuratorClient exampleClient : examples ) - { - CloseableUtils.closeQuietly(exampleClient); - } - for ( CuratorFramework client : clients ) - { - CloseableUtils.closeQuietly(client); - }*/ - } - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index c838703..679a5ee 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -35,7 +35,6 @@ import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.core.utils.InputHandlerWorker; import org.apache.airavata.gfac.cpi.GfacService; import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants; -import org.apache.airavata.gfac.leader.CuratorClient; import org.apache.airavata.messaging.core.MessageContext; import org.apache.airavata.messaging.core.MessageHandler; import org.apache.airavata.messaging.core.MessagingConstants; @@ -60,7 +59,9 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.*; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -93,11 +94,15 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ private List<Future> inHandlerFutures; - private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; + private String nodeName = null; - CuratorFramework curatorFramework = null; + private CuratorFramework curatorFramework = null; + private BlockingQueue<TaskSubmitEvent> taskSubmitEvents; + private BlockingQueue<TaskTerminateEvent> taskTerminateEvents; + + private CuratorClient curatorClient; public GfacServerHandler() throws Exception{ // registering with zk try { @@ -107,6 +112,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ zk = new ZooKeeper(zkhostPort, 6000, this); // no watcher is required, this will only use to store some data gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME); synchronized (mutex) { mutex.wait(); // waiting for the syncConnected event } @@ -121,10 +127,14 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ BetterGfacImpl.startStatusUpdators(registry, zk, publisher); inHandlerFutures = new ArrayList<Future>(); - if(ServerSettings.isGFacPassiveMode()) { - rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); - rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler()); + if (ServerSettings.isGFacPassiveMode()) { + taskSubmitEvents = new LinkedBlockingDeque<TaskSubmitEvent>(); + taskTerminateEvents = new LinkedBlockingDeque<TaskTerminateEvent>(); curatorFramework = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3)); + curatorClient = new CuratorClient(curatorFramework, nodeName); + + curatorFramework.start(); + curatorClient.start(); } @@ -296,51 +306,37 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ } private class TaskLaunchMessageHandler implements MessageHandler { - private String experimentId; - - private String nodeName; - + public static final String LAUNCH_TASK = "launch.task"; + public static final String TERMINATE_TASK = "teminate.task"; public TaskLaunchMessageHandler(){ - try { - nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME); - } catch (ApplicationSettingsException e) { - logger.error(e.getMessage(), e); - } + } public Map<String, Object> getProperties() { Map<String, Object> props = new HashMap<String, Object>(); - try { - props.put(MessagingConstants.RABBIT_ROUTING_KEY, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME)); - } catch (ApplicationSettingsException e) { - // if we cannot find gfac node name configured we set a random id - logger.error("airavata-server.properties should configure: " + Constants.ZOOKEEPER_GFAC_SERVER_NAME + " value."); - logger.error("listening to a random generated routing key"); - props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString()); - } + ArrayList<String> keys = new ArrayList<String>(); + keys.add(LAUNCH_TASK); + keys.add(TERMINATE_TASK); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys); return props; } public void onMessage(MessageContext message) { - if (message.getType().equals(MessageType.LAUNCHTASK)){ + if (message.getType().equals(MessageType.LAUNCHTASK)) { try { TaskSubmitEvent event = new TaskSubmitEvent(); TBase messageEvent = message.getEvent(); byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent); ThriftUtils.createThriftFromBytes(bytes, event); - CuratorClient curatorClient = new CuratorClient(curatorFramework, event, nodeName); - try { - curatorClient.start(); - } catch (IOException e) { - logger.error(e.getMessage(), e); - } + taskSubmitEvents.add(event); - System.out.println(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType()); - } catch (TException e) { + + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType()); + } catch (TException e) { logger.error(e.getMessage(), e); //nobody is listening so nothing to throw } - }else if(message.getType().equals(MessageType.TERMINATETASK)){ + } else if (message.getType().equals(MessageType.TERMINATETASK)) { try { TaskTerminateEvent event = new TaskTerminateEvent(); TBase messageEvent = message.getEvent(); @@ -361,18 +357,16 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ private final LeaderSelector leaderSelector; private final AtomicInteger leaderCount = new AtomicInteger(); private final String path; - private TaskSubmitEvent event; private String experimentNode; - public CuratorClient(CuratorFramework client, TaskSubmitEvent taskSubmitEvent, String name) { + public CuratorClient(CuratorFramework client, String name) { this.name = name; - this.event = taskSubmitEvent; - this.path = File.separator + event.getExperimentId() + "-" + event.getTaskId() + "-" + event.getGatewayId(); // create a leader selector using the given path for management // all participants in a given leader selection must use the same path // ExampleClient here is also a LeaderSelectorListener but this isn't required - leaderSelector = new LeaderSelector(client, path, this); experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + path = experimentNode + File.separator + "leader"; + leaderSelector = new LeaderSelector(client, path, this); // for most cases you will want your instance to requeue when it relinquishes leadership leaderSelector.autoRequeue(); } @@ -393,18 +387,23 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ // we are now the leader. This method should not return until we want to relinquish leadership final int waitSeconds = (int) (5 * Math.random()) + 1; - System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds..."); - System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before."); + logger.info(name + " is now the leader. Waiting " + waitSeconds + " seconds..."); + logger.info(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before."); + RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); + String listenId = rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler()); + TaskSubmitEvent event = taskSubmitEvents.take(); try { GFacUtils.createExperimentEntryForRPC(event.getExperimentId(),event.getTaskId(),client.getZookeeperClient().getZooKeeper(),experimentNode,name,event.getTokenId()); submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId()); Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); } catch (InterruptedException e) { - System.err.println(name + " was interrupted."); + logger.error(name + " was interrupted."); Thread.currentThread().interrupt(); } finally { - System.out.println(name + " relinquishing leadership.\n"); + Thread.sleep(5); + logger.info(name + " relinquishing leadership.: "+ new Date().toString()); + rabbitMQTaskLaunchConsumer.stopListen(listenId); } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java ---------------------------------------------------------------------- diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java index 056dcac..4bc7468 100644 --- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java +++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java @@ -190,7 +190,6 @@ public class RabbitMQTaskLaunchConsumer { for (String key : details.getRoutingKeys()) { channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key); } - channel.queueDelete(details.getQueueName(), true, true); } catch (IOException e) { String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName; log.debug(msg); http://git-wip-us.apache.org/repos/asf/airavata/blob/60788efe/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java index 78cc6b7..b5e25b1 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java @@ -102,7 +102,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException { ZooKeeper zk = orchestratorContext.getZk(); - GfacService.Client gfacClient = null; try { if (zk == null || !zk.getState().isConnected()) { String zkhostPort = AiravataZKUtils.getZKhostPort(); @@ -151,8 +150,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { } catch (Exception e) { logger.error(e.getMessage(), e); throw new OrchestratorException(e); - }finally { - gfacClient.getOutputProtocol().getTransport().close(); } return true; @@ -167,7 +164,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { */ public boolean terminate(String experimentID, String taskID) throws OrchestratorException { ZooKeeper zk = orchestratorContext.getZk(); - GfacService.Client localhost = null; try { if (zk == null || !zk.getState().isConnected()) { String zkhostPort = AiravataZKUtils.getZKhostPort(); @@ -189,7 +185,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher { String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); logger.info("GFAC instance node data: " + gfacNodeData); String[] split = gfacNodeData.split(":"); - localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { // before submitting the job we check again the state of the node if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) {
