Repository: airavata Updated Branches: refs/heads/master 5ea8f2c0b -> 75fd7e40c
fixing threadPool implementation to used during input handler execution Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/bdd025ee Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/bdd025ee Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/bdd025ee Branch: refs/heads/master Commit: bdd025ee293edeabf2e2e635a901e0ec74ad79a2 Parents: cc0f8ec Author: lahiru <[email protected]> Authored: Wed Sep 24 13:03:05 2014 -0400 Committer: lahiru <[email protected]> Committed: Wed Sep 24 13:03:05 2014 -0400 ---------------------------------------------------------------------- .../airavata/gfac/server/GfacServerHandler.java | 18 ++++--- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 16 ++---- .../gfac/core/utils/GFacThreadPoolExecutor.java | 35 ++++++++++++++ .../gfac/core/utils/InputHandlerWorker.java | 51 ++++++++++++++++++++ .../monitor/impl/pull/qstat/HPCPullMonitor.java | 41 ++++++++++------ .../impl/push/amqp/SimpleJobFinishConsumer.java | 4 +- pom.xml | 2 +- 7 files changed, 132 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 1b9eb75..f7c3cb8 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -29,6 +29,8 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; import org.apache.airavata.gfac.core.cpi.GFac; +import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor; +import org.apache.airavata.gfac.core.utils.InputHandlerWorker; import org.apache.airavata.gfac.cpi.GfacService; import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; @@ -41,6 +43,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; public class GfacServerHandler implements GfacService.Iface, Watcher{ @@ -68,6 +73,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ private String airavataServerHostPort; + private List<Future> inHandlerFutures; + public GfacServerHandler() { // registering with zk try { @@ -100,6 +107,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ setGatewayProperties(); BetterGfacImpl.startDaemonHandlers(); BetterGfacImpl.startStatusUpdators(registry,zk,publisher); + inHandlerFutures = new ArrayList<Future>(); }catch (Exception e){ logger.error("Error initialising GFAC",e); } @@ -187,11 +195,9 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ public boolean submitJob(String experimentId, String taskId, String gatewayId) throws TException { logger.info("GFac Recieved the Experiment: " + experimentId + " TaskId: " + taskId); GFac gfac = getGfac(); - try { - return gfac.submitJob(experimentId, taskId, gatewayId); - } catch (GFacException e) { - throw new TException("Error launching the experiment : " + e.getMessage(), e); - } + InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId); + inHandlerFutures.add(GFacThreadPoolExecutor.getCachedThreadPool().submit(inputHandlerWorker)); + return true; } public boolean cancelJob(String experimentId, String taskId) throws TException { @@ -204,7 +210,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ } } - public Registry getRegistry() { return registry; } @@ -249,4 +254,5 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ throw new TException("Error initializing gfac instance",e); } } + } http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index beaa124..60bbc58 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -132,8 +132,6 @@ public class BetterGfacImpl implements GFac,Watcher { private boolean cancelled = false; - private static ExecutorService cachedThreadPool; - /** * Constructor for GFac * @@ -147,7 +145,6 @@ public class BetterGfacImpl implements GFac,Watcher { // this.airavataRegistry2 = airavataRegistry2; monitorPublisher = publisher; // This is a EventBus common for gfac this.zk = zooKeeper; - this.cachedThreadPool = Executors.newCachedThreadPool(); } public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) { @@ -499,7 +496,7 @@ public class BetterGfacImpl implements GFac,Watcher { try { String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), zk); Stat exists = zk.exists(experimentEntry + File.separator + "operation", false); - zk.getData(experimentEntry + File.separator + "operation", this,exists); + zk.getData(experimentEntry + File.separator + "operation", this, exists); int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext) , GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status @@ -525,13 +522,12 @@ public class BetterGfacImpl implements GFac,Watcher { } return true; } catch (ApplicationSettingsException e) { - e.printStackTrace(); + throw new GFacException("Error launching the Job",e); } catch (KeeperException e) { - e.printStackTrace(); + throw new GFacException("Error launching the Job",e); } catch (InterruptedException e) { - e.printStackTrace(); + throw new GFacException("Error launching the Job",e); } - return true; } public boolean cancel(String experimentID, String taskID, String gatewayID) throws GFacException { @@ -1181,8 +1177,4 @@ public class BetterGfacImpl implements GFac,Watcher { this.cancelled = true; } } - - public static ExecutorService getCachedThreadPool(){ - return cachedThreadPool; - } } http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java new file mode 100644 index 0000000..56a97a5 --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.core.utils; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class GFacThreadPoolExecutor { + private static ExecutorService cachedThreadPool; + + public static ExecutorService getCachedThreadPool() { + if(cachedThreadPool==null){ + cachedThreadPool = Executors.newCachedThreadPool(); + } + return cachedThreadPool; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java new file mode 100644 index 0000000..efa7c0c --- /dev/null +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.gfac.core.utils; + +import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.cpi.GFac; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public class InputHandlerWorker implements Callable { + private static Logger log = LoggerFactory.getLogger(InputHandlerWorker.class); + + String experimentId; + + String taskId; + + String gatewayId; + + GFac gfac; + public InputHandlerWorker(GFac gfac, String experimentId,String taskId,String gatewayId) { + this.gfac = gfac; + this.experimentId = experimentId; + this.taskId = taskId; + this.gatewayId = gatewayId; + } + + @Override + public Object call() throws Exception { + return gfac.submitJob(experimentId,taskId,gatewayId); + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 58b48ef..59ed90c 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -31,6 +31,7 @@ import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.core.monitor.TaskIdentity; import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest; +import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor; import org.apache.airavata.gfac.core.utils.OutHandlerWorker; import org.apache.airavata.gfac.monitor.HostMonitorData; import org.apache.airavata.gfac.monitor.UserMonitorData; @@ -195,21 +196,24 @@ public class HPCPullMonitor extends PullMonitor { iterator1.remove(); } } + iterator1 = cancelJobList.iterator(); } - Iterator<String> iterator = completedJobsFromPush.iterator(); - for(MonitorID iMonitorID:monitorID){ - while(iterator.hasNext()) { - String cancelMId = iterator.next(); - if (cancelMId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { - logger.info("This job is finished because push notification came with <username,jobName> " + cancelMId); - completedJobs.add(iMonitorID); - iMonitorID.setStatus(JobState.COMPLETE); + synchronized (completedJobsFromPush) { + Iterator<String> iterator = completedJobsFromPush.iterator(); + for (MonitorID iMonitorID : monitorID) { + while (iterator.hasNext()) { + String cancelMId = iterator.next(); + if (cancelMId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { + logger.info("This job is finished because push notification came with <username,jobName> " + cancelMId); + completedJobs.add(iMonitorID); + iMonitorID.setStatus(JobState.COMPLETE); + } + //we have to make this empty everytime we iterate, otherwise this list will accumilate and will + // lead to a memory leak + iterator.remove(); } - //we have to make this empty everytime we iterate, otherwise this list will accumilate and will - // lead to a memory leak - iterator.remove(); + iterator = completedJobsFromPush.listIterator(); } - iterator = completedJobsFromPush.listIterator(); } Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID); for (MonitorID iMonitorID : monitorID) { @@ -218,7 +222,12 @@ public class HPCPullMonitor extends PullMonitor { !JobState.COMPLETE.equals(iMonitorID.getStatus())) { iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic } - jobStatus = new JobStatusChangeRequest(iMonitorID); + + String id = iMonitorID.getUserName() + "," + iMonitorID.getJobName(); + if(completedJobsFromPush.contains(id)){ + iMonitorID.setStatus(JobState.COMPLETE); + } + jobStatus = new JobStatusChangeRequest(iMonitorID); // we have this JobStatus class to handle amqp monitoring publisher.publish(jobStatus); @@ -227,10 +236,12 @@ public class HPCPullMonitor extends PullMonitor { // After successful monitoring perform follow ing actions to cleanup the queue, if necessary if (jobStatus.getState().equals(JobState.COMPLETE)) { - completedJobs.add(iMonitorID); + if(completedJobs.contains(iMonitorID)) { + completedJobs.add(iMonitorID); + } // we run all the finished jobs in separate threads, because each job doesn't have to wait until // each one finish transfering files - BetterGfacImpl.getCachedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); + GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); } else if (iMonitorID.getFailedCount() > FAILED_COUNT) { logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" +iMonitorID.getFailedCount()+ " 3 times, so skip this Job from Monitor"); http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java index 64d241e..3a6aae7 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java @@ -64,7 +64,9 @@ public class SimpleJobFinishConsumer { while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); logger.info("---------------- Job Finish message received:"+new String(delivery.getBody())+" --------------"); - completedJobsFromPush.add(new String(delivery.getBody())); + synchronized (completedJobsFromPush) { + completedJobsFromPush.add(new String(delivery.getBody())); + } ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 69cd5c4..cfe571b 100644 --- a/pom.xml +++ b/pom.xml @@ -507,7 +507,7 @@ <module>modules/test-suite</module> <module>modules/distribution</module> <module>modules/ws-messenger</module> - <module>modules/integration-tests</module> + <!--module>modules/integration-tests</module--> <module>modules/xbaya-gui</module> </modules> </profile>
