fixing issue with monitoring, because topmost object is not removed when everything is empty, unwanted iteration
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/86c2a9df Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/86c2a9df Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/86c2a9df Branch: refs/heads/gfac_appcatalog_int Commit: 86c2a9df9195aea59a07525f954064cfbfad3ea4 Parents: c383cf3 Author: lginnaliadmin <[email protected]> Authored: Fri Nov 7 11:06:58 2014 -0500 Committer: lginnaliadmin <[email protected]> Committed: Fri Nov 7 11:06:58 2014 -0500 ---------------------------------------------------------------------- .../gfac/monitor/impl/pull/qstat/HPCPullMonitor.java | 15 ++++++++++----- .../airavata/gfac/monitor/util/CommonUtils.java | 15 +++------------ 2 files changed, 13 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/86c2a9df/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 5973c0a..9f93dc6 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -186,7 +186,7 @@ public class HPCPullMonitor extends PullMonitor { String cancelMId = iterator1.next(); if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) { iMonitorID.setStatus(JobState.CANCELED); - iterator1.remove(); + CommonUtils.removeMonitorFromQueue(take,iMonitorID); logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " + "completed job queue, experiment {}, task {} , job {}", iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID()); @@ -211,7 +211,7 @@ public class HPCPullMonitor extends PullMonitor { if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { logger.info("This job is finished because push notification came with <username,jobName> " + completeId); iMonitorID.setStatus(JobState.COMPLETE); - iterator.remove();//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak + CommonUtils.removeMonitorFromQueue(take,iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak logger.debugId(completeId, "Push notification updated job {} status to {}. " + "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); @@ -241,7 +241,7 @@ public class HPCPullMonitor extends PullMonitor { }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){ logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " + "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); - iterator.remove(); + CommonUtils.removeMonitorFromQueue(take,iMonitorID); logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); @@ -278,7 +278,7 @@ public class HPCPullMonitor extends PullMonitor { logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); sendNotification(iMonitorID); - iterator.remove(); + CommonUtils.removeMonitorFromQueue(take,iMonitorID); GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); } else { iMonitorID.setFailedCount(0); @@ -299,7 +299,12 @@ public class HPCPullMonitor extends PullMonitor { } // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back // now the userMonitorData goes back to the tail of the queue - queue.put(take); + // during individual monitorID removal we remove the HostMonitorData object if it become empty + // so if all the jobs are finished for all the hostMOnitorId objects in userMonitorData object + // we should remove it from the queue so here we do not put it back. + if(take.getHostMonitorData().size()!=0) { + queue.put(take); + } } catch (InterruptedException e) { if (!this.queue.contains(take)) { try { http://git-wip-us.apache.org/repos/asf/airavata/blob/86c2a9df/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java index 3abcf1d..6152505 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -152,13 +152,10 @@ public class CommonUtils { * @param monitorID * @throws AiravataMonitorException */ - public static void removeMonitorFromQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID) throws AiravataMonitorException { - Iterator<UserMonitorData> iterator = queue.iterator(); - while (iterator.hasNext()) { - UserMonitorData next = iterator.next(); - if (next.getUserName().equals(monitorID.getUserName())) { + public static void removeMonitorFromQueue(UserMonitorData userMonitorData, MonitorID monitorID) throws AiravataMonitorException { + if (userMonitorData.getUserName().equals(monitorID.getUserName())) { // then this is the right place to update - List<HostMonitorData> hostMonitorData = next.getHostMonitorData(); + List<HostMonitorData> hostMonitorData = userMonitorData.getHostMonitorData(); Iterator<HostMonitorData> iterator1 = hostMonitorData.iterator(); while (iterator1.hasNext()) { HostMonitorData iHostMonitorID = iterator1.next(); @@ -177,11 +174,6 @@ public class CommonUtils { iterator1.remove(); logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getHost() .getType().getHostAddress()); - if (hostMonitorData.size() == 0) { - // no useful data so we have to remove the element from the queue - queue.remove(next); - logger.debug("Removed user {} from monitoring.", next.getUserName()); - } } return; } @@ -189,7 +181,6 @@ public class CommonUtils { } } } - } logger.info("Cannot find the given MonitorID in the queue with userName " + monitorID.getUserName() + " and jobID " + monitorID.getJobID()); logger.info("This might not be an error because someone else removed this job from the queue");
