Repository: airavata Updated Branches: refs/heads/master 3f8986881 -> 21c9a9266
fixing a bug in completed job handling Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/21c9a926 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/21c9a926 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/21c9a926 Branch: refs/heads/master Commit: 21c9a9266c9a509ba50990077b14d6f21fc5b619 Parents: 3f89868 Author: lahiru <[email protected]> Authored: Wed Sep 24 15:32:19 2014 -0400 Committer: lahiru <[email protected]> Committed: Wed Sep 24 15:32:19 2014 -0400 ---------------------------------------------------------------------- .../gfac/monitor/impl/pull/qstat/HPCPullMonitor.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/21c9a926/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 34a6065..bac27bf 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -160,7 +160,7 @@ public class HPCPullMonitor extends PullMonitor { HostDescription currentHostDescription = null; try { take = this.queue.take(); - List<MonitorID> completedJobs = new ArrayList<MonitorID>(); + Map<String,MonitorID> completedJobs = new HashMap<String,MonitorID>(); List<HostMonitorData> hostMonitorData = take.getHostMonitorData(); for (HostMonitorData iHostMonitorData : hostMonitorData) { if (iHostMonitorData.getHost().getType() instanceof GsisshHostType @@ -191,7 +191,7 @@ public class HPCPullMonitor extends PullMonitor { if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) { logger.info("Found a match in monitoring Queue, so marking this job to remove from monitor queue " + cancelMId); logger.info("ExperimentID: " + cancelMId.split("\\+")[0] + ",TaskID: " + cancelMId.split("\\+")[1] + "JobID" + iMonitorID.getJobID()); - completedJobs.add(iMonitorID); + completedJobs.put(iMonitorID.getJobName(), iMonitorID); iMonitorID.setStatus(JobState.CANCELED); iterator1.remove(); break; @@ -207,7 +207,7 @@ public class HPCPullMonitor extends PullMonitor { completeId = iterator.next(); if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { logger.info("This job is finished because push notification came with <username,jobName> " + completeId); - completedJobs.add(iMonitorID); + completedJobs.put(iMonitorID.getJobName(), iMonitorID); iMonitorID.setStatus(JobState.COMPLETE); break; } @@ -229,7 +229,7 @@ public class HPCPullMonitor extends PullMonitor { !JobState.COMPLETE.equals(iMonitorID.getStatus())) { iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){ - completedJobs.add(iMonitorID); + completedJobs.put(iMonitorID.getJobName(), iMonitorID); } jobStatus = new JobStatusChangeRequest(iMonitorID); // we have this JobStatus class to handle amqp monitoring @@ -242,7 +242,7 @@ public class HPCPullMonitor extends PullMonitor { logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed" + iMonitorID.getFailedCount() + " 3 times, so skip this Job from Monitor"); iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); - completedJobs.add(iMonitorID); + completedJobs.put(iMonitorID.getJobName(), iMonitorID); try { logger.error("Launching outflow handlers to check output are genereated or not"); gfac.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext()); @@ -269,7 +269,9 @@ public class HPCPullMonitor extends PullMonitor { // they become empty Map<String, Integer> jobRemoveCountMap = new HashMap<String, Integer>(); ZooKeeper zk = null; - for (MonitorID completedJob : completedJobs) { + Set<String> keys = completedJobs.keySet(); + for (String jobName: keys) { + MonitorID completedJob = completedJobs.get(jobName); GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac, completedJob, publisher)); CommonUtils.removeMonitorFromQueue(queue, completedJob); if (zk == null) {
