Repository: airavata Updated Branches: refs/heads/master a773a2858 -> 7f89109fe
fixing issue with threadpool Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7f89109f Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7f89109f Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7f89109f Branch: refs/heads/master Commit: 7f89109fe22680f14901139ce4056458b2665ff2 Parents: a773a28 Author: Lahiru Gunathilake <[email protected]> Authored: Tue Apr 14 00:20:33 2015 -0400 Committer: Lahiru Gunathilake <[email protected]> Committed: Tue Apr 14 00:20:33 2015 -0400 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 38 +++++++++----- .../airavata/gfac/server/GfacServerHandler.java | 55 ++++++++++++++++---- .../gfac/core/utils/InputHandlerWorker.java | 13 +++-- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 10 ++-- 4 files changed, 82 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 9ad71f4..be07975 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -58,10 +58,10 @@ public class CreateLaunchExperiment { private static final String DEFAULT_GATEWAY = "php_reference_gateway"; private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_fcac7076-e350-4dfb-a6eb-73e2d648fc60"; + private static String echoAppId = "Echo_4efc544e-b72b-45d4-9f2d-ea1fae602d02"; private static String mpiAppId = "HelloMPI_bfd56d58-6085-4b7f-89fc-646576830518"; private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762"; - private static String amberAppId = "Amber_717cba99-1085-45de-861c-952001c5243c"; + private static String amberAppId = "Amber_1e7c89b0-4e03-4f36-8bdd-86c8bb9df8a1"; private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b"; private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1"; private static String lammpsAppId = "LAMMPS_2472685b-8acf-497e-aafe-cc66fe5f4cb6"; @@ -168,12 +168,12 @@ public class CreateLaunchExperiment { // final String expId = createMPIExperimentForFSD(airavataClient); // final String expId = createEchoExperimentForStampede(airavataClient); // final String expId = createEchoExperimentForTrestles(airavataClient); -// final String expId = createExperimentEchoForLocalHost(airavataClient); + final String expId = createExperimentEchoForLocalHost(airavataClient); // final String expId = createExperimentWRFTrestles(airavataClient); // final String expId = createExperimentForBR2(airavataClient); // final String expId = createExperimentForBR2Amber(airavataClient); // final String expId = createExperimentWRFStampede(airavataClient); - final String expId = createExperimentForStampedeAmber(airavataClient); +// final String expId = createExperimentForStampedeAmber(airavataClient); // String expId = createExperimentForTrestlesAmber(airavataClient); // final String expId = createExperimentGROMACSStampede(airavataClient); // final String expId = createExperimentESPRESSOStampede(airavataClient); @@ -191,10 +191,20 @@ public class CreateLaunchExperiment { launchExperiment(airavataClient, expId); } - Thread.sleep(10000); - for (String exId : experimentIds) { - Experiment experiment = airavataClient.getExperiment(exId); - System.out.println(experiment.getExperimentID() + " " + experiment.getExperimentStatus().getExperimentState().name()); + boolean allNotFinished = true; + while(allNotFinished) { + allNotFinished = false; + for (String exId : experimentIds) { + Experiment experiment = airavataClient.getExperiment(exId); + if(!experiment.getExperimentStatus().getExperimentState().equals(ExperimentState.COMPLETED)&& + !experiment.getExperimentStatus().getExperimentState().equals(ExperimentState.FAILED) + &&!experiment.getExperimentStatus().getExperimentState().equals(ExperimentState.CANCELED)){ + allNotFinished = true; + } + System.out.println(experiment.getExperimentID() + " " + experiment.getExperimentStatus().getExperimentState().name()); + } + System.out.println("----------------------------------------------------"); + Thread.sleep(10000); } @@ -1313,11 +1323,11 @@ public class CreateLaunchExperiment { // } for (InputDataObjectType inputDataObjectType : exInputs) { if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { - inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst"); + inputDataObjectType.setValue("/Users/lginnali/Downloads/02_Heat.rst"); } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { - inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in"); + inputDataObjectType.setValue("/Users/lginnali/Downloads/03_Prod.in"); } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { - inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop"); + inputDataObjectType.setValue("/Users/lginnali/Downloads/prmtop"); } } @@ -1378,11 +1388,11 @@ public class CreateLaunchExperiment { // } for (InputDataObjectType inputDataObjectType : exInputs) { if (inputDataObjectType.getName().equalsIgnoreCase("Heat_Restart_File")) { - inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/02_Heat.rst"); + inputDataObjectType.setValue("/Users/lginnali/Downloads/02_Heat.rst"); } else if (inputDataObjectType.getName().equalsIgnoreCase("Production_Control_File")) { - inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/03_Prod.in"); + inputDataObjectType.setValue("/Users/lginnali/Downloads/03_Prod.in"); } else if (inputDataObjectType.getName().equalsIgnoreCase("Parameter_Topology_File")) { - inputDataObjectType.setValue("/Users/shameera/Projects/scigap/apps/AMBER_FILES/prmtop"); + inputDataObjectType.setValue("/Users/lginnali/Downloads/prmtop"); } } List<OutputDataObjectType> exOut = client.getApplicationOutputs(amberAppId); http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 7d5e223..f3e1d91 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -25,6 +25,7 @@ import edu.uiuc.ncsa.security.delegation.services.Server; import org.airavata.appcatalog.cpi.AppCatalog; import org.airavata.appcatalog.cpi.AppCatalogException; import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; +import org.apache.airavata.common.exception.AiravataException; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.logger.AiravataLogger; import org.apache.airavata.common.logger.AiravataLoggerFactory; @@ -68,7 +69,7 @@ import java.util.concurrent.locks.Lock; public class GfacServerHandler implements GfacService.Iface, Watcher { private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class); - private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; + private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer; private static int requestCount=0; @@ -143,6 +144,15 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { } } + public static void main(String[] args) { + RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer = null; + try { + rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer(); + rabbitMQTaskLaunchConsumer.listen(new TestHandler()); + } catch (AiravataException e) { + logger.error(e.getMessage(), e); + } + } private void storeServerConfig() throws KeeperException, InterruptedException, ApplicationSettingsException { Stat zkStat = zk.exists(gfacServer, false); if (zkStat == null) { @@ -244,13 +254,9 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { // if( gfac.submitJob(experimentId, taskId, gatewayId)){ logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " + "{}", experimentId, taskId, gatewayId); - try { - GFacThreadPoolExecutor.getFixedThreadPool().submit(inputHandlerWorker).get(); - } catch (InterruptedException e) { - logger.error(e.getMessage(), e); - } catch (ExecutionException e) { - logger.error(e.getMessage(), e); - } + + GFacThreadPoolExecutor.getFixedThreadPool().execute(inputHandlerWorker); + // we immediately return when we have a threadpool return true; } @@ -309,6 +315,33 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { } } + private static class TestHandler implements MessageHandler{ + @Override + public Map<String, Object> getProperties() { + Map<String, Object> props = new HashMap<String, Object>(); + ArrayList<String> keys = new ArrayList<String>(); + keys.add(ServerSettings.getLaunchQueueName()); + keys.add(ServerSettings.getCancelQueueName()); + props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys); + props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName()); + return props; + } + + @Override + public void onMessage(MessageContext message) { + TaskSubmitEvent event = new TaskSubmitEvent(); + TBase messageEvent = message.getEvent(); + byte[] bytes = new byte[0]; + try { + bytes = ThriftUtils.serializeThriftObject(messageEvent); + ThriftUtils.createThriftFromBytes(bytes, event); + System.out.println(event.getExperimentId()); + } catch (TException e) { + logger.error(e.getMessage(), e); + } + } + } + private class TaskLaunchMessageHandler implements MessageHandler { private String experimentNode; private String nodeName; @@ -329,6 +362,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { } public void onMessage(MessageContext message) { + System.out.println(" Message Received with message id '" + message.getMessageId() + + "' and with message type '" + message.getType()); if (message.getType().equals(MessageType.LAUNCHTASK)) { try { TaskSubmitEvent event = new TaskSubmitEvent(); @@ -339,7 +374,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { try { GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag()); - AiravataZKUtils.getExpStatePath(event.getExperimentId(),event.getTaskId()); + AiravataZKUtils.getExpStatePath(event.getExperimentId(), event.getTaskId()); submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId()); } catch (KeeperException e) { logger.error(nodeName + " was interrupted."); @@ -351,8 +386,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher { logger.error(e.getMessage(), e); rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag()); } - System.out.println(" Message Received with message id '" + message.getMessageId() - + "' and with message type '" + message.getType()); } catch (TException e) { logger.error(e.getMessage(), e); //nobody is listening so nothing to throw } http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java index 963db7c..ec7991a 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java @@ -20,6 +20,7 @@ */ package org.apache.airavata.gfac.core.utils; +import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.cpi.GFac; import org.slf4j.Logger; @@ -27,7 +28,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; -public class InputHandlerWorker implements Callable { +public class InputHandlerWorker implements Runnable { private static Logger log = LoggerFactory.getLogger(InputHandlerWorker.class); String experimentId; @@ -45,9 +46,11 @@ public class InputHandlerWorker implements Callable { } @Override - public Object call() throws Exception { - boolean b = gfac.submitJob(experimentId, taskId, gatewayId); - log.info("InHandler and provider Gfac invocation returned: " + b); - return b; + public void run() { + try { + gfac.submitJob(experimentId, taskId, gatewayId); + } catch (GFacException e) { + log.error(e.getMessage(), e); + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/7f89109f/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index ab934ca..26d3385 100644 --- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -197,7 +197,7 @@ public class HPCPullMonitor extends PullMonitor { sendNotification(iMonitorID); logger.info("To avoid timing issues we sleep sometime and try to retrieve output files"); Thread.sleep(10000); - GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); + GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); break; } } @@ -223,7 +223,9 @@ public class HPCPullMonitor extends PullMonitor { iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); sendNotification(iMonitorID); - GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); + logger.info("To avoid timing issues we sleep sometime and try to retrieve output files"); + Thread.sleep(10000); + GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); break; } } @@ -248,7 +250,7 @@ public class HPCPullMonitor extends PullMonitor { removeList.add(iMonitorID); logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); - GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); + GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); } iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); @@ -286,7 +288,7 @@ public class HPCPullMonitor extends PullMonitor { sendNotification(iMonitorID); // CommonUtils.removeMonitorFromQueue(take, iMonitorID); removeList.add(iMonitorID); - GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); + GFacThreadPoolExecutor.getFixedThreadPool().execute(new OutHandlerWorker(gfac, iMonitorID, publisher)); } else { iMonitorID.setFailedCount(0); }
