Repository: airavata Updated Branches: refs/heads/master 5413e6718 -> f7de359dc
fixig concurrent modification exctpio in monitoring Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/af73c444 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/af73c444 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/af73c444 Branch: refs/heads/master Commit: af73c44468149b23053da8dc3247e78894fd751d Parents: 97b384b Author: Ginnaliya Gamathige <[email protected]> Authored: Tue Nov 11 10:50:02 2014 -0500 Committer: Ginnaliya Gamathige <[email protected]> Committed: Tue Nov 11 10:50:02 2014 -0500 ---------------------------------------------------------------------- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 66 +++++++++++--------- .../airavata/gfac/monitor/util/CommonUtils.java | 6 +- 2 files changed, 36 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/af73c444/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 8e5f758..66cc5f7 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -158,21 +158,22 @@ public class HPCPullMonitor extends PullMonitor { try { take = this.queue.take(); List<HostMonitorData> hostMonitorData = take.getHostMonitorData(); - for (HostMonitorData iHostMonitorData : hostMonitorData) { + for (ListIterator<HostMonitorData> hostIterator = hostMonitorData.listIterator(); hostIterator.hasNext();) { + HostMonitorData iHostMonitorData = hostIterator.next(); if (iHostMonitorData.getHost().getType() instanceof GsisshHostType || iHostMonitorData.getHost().getType() instanceof SSHHostType) { - String hostName = iHostMonitorData.getHost().getType().getHostAddress(); + String hostName = iHostMonitorData.getHost().getType().getHostAddress(); ResourceConnection connection = null; if (connections.containsKey(hostName)) { - if(!connections.get(hostName).isConnected()){ - connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo()); + if (!connections.get(hostName).isConnected()) { + connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo()); connections.put(hostName, connection); - }else{ + } else { logger.debug("We already have this connection so not going to create one"); connection = connections.get(hostName); } } else { - connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo()); + connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo()); connections.put(hostName, connection); } @@ -180,18 +181,18 @@ public class HPCPullMonitor extends PullMonitor { List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs(); Iterator<String> iterator1 = cancelJobList.iterator(); ListIterator<MonitorID> monitorIDListIterator = monitorID.listIterator(); - while (monitorIDListIterator.hasNext()){ + while (monitorIDListIterator.hasNext()) { MonitorID iMonitorID = monitorIDListIterator.next(); - while(iterator1.hasNext()) { + while (iterator1.hasNext()) { String cancelMId = iterator1.next(); if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) { iMonitorID.setStatus(JobState.CANCELED); - CommonUtils.removeMonitorFromQueue(take,iMonitorID); + CommonUtils.removeMonitorFromQueue(take, iMonitorID); logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " + "completed job queue, experiment {}, task {} , job {}", iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID()); logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); sendNotification(iMonitorID); monitorIDListIterator.remove(); GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); @@ -201,22 +202,19 @@ public class HPCPullMonitor extends PullMonitor { iterator1 = cancelJobList.iterator(); } synchronized (completedJobsFromPush) { - ListIterator<String> iterator = completedJobsFromPush.listIterator(); - monitorIDListIterator = monitorID.listIterator(); - while (monitorIDListIterator.hasNext()) { - MonitorID iMonitorID = monitorIDListIterator.next(); - String completeId = null; - while (iterator.hasNext()) { - completeId = iterator.next(); + for (ListIterator<String> iterator = completedJobsFromPush.listIterator(); iterator.hasNext(); ) { + String completeId = iterator.next(); + for (monitorIDListIterator = monitorID.listIterator(); monitorIDListIterator.hasNext(); ) { + MonitorID iMonitorID = monitorIDListIterator.next(); if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { logger.info("This job is finished because push notification came with <username,jobName> " + completeId); iMonitorID.setStatus(JobState.COMPLETE); - CommonUtils.removeMonitorFromQueue(take,iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak + CommonUtils.removeMonitorFromQueue(take, iMonitorID);//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak logger.debugId(completeId, "Push notification updated job {} status to {}. " + "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); logger.info("AMQP message recieved: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); iterator.remove(); sendNotification(iMonitorID); @@ -224,36 +222,34 @@ public class HPCPullMonitor extends PullMonitor { break; } } - iterator = completedJobsFromPush.listIterator(); } } // we have to get this again because we removed the already completed jobs with amqp messages monitorID = iHostMonitorData.getMonitorIDs(); Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID); - Iterator<MonitorID> iterator = monitorID.listIterator(); - while (iterator.hasNext()) { + for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) { MonitorID iMonitorID = iterator.next(); currentMonitorID = iMonitorID; - if (!JobState.CANCELED.equals(iMonitorID.getStatus())&& + if (!JobState.CANCELED.equals(iMonitorID.getStatus()) && !JobState.COMPLETE.equals(iMonitorID.getStatus())) { iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic - }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){ + } else if (JobState.COMPLETE.equals(iMonitorID.getStatus())) { logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " + "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); - CommonUtils.removeMonitorFromQueue(take,iMonitorID); + CommonUtils.removeMonitorFromQueue(take, iMonitorID); logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); } - iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic + iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); sendNotification(iMonitorID); // if the job is completed we do not have to put the job to the queue again iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); } - iterator = monitorID.listIterator(); - while(iterator.hasNext()){ + + for (Iterator<MonitorID> iterator = monitorID.listIterator(); iterator.hasNext(); ) { MonitorID iMonitorID = iterator.next(); if (iMonitorID.getFailedCount() > FAILED_COUNT) { iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); @@ -276,9 +272,9 @@ public class HPCPullMonitor extends PullMonitor { " Experiment {} , task {}", iMonitorID.getFailedCount(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", - iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobName()); sendNotification(iMonitorID); - CommonUtils.removeMonitorFromQueue(take,iMonitorID); + CommonUtils.removeMonitorFromQueue(take, iMonitorID); GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); } else { iMonitorID.setFailedCount(0); @@ -302,6 +298,14 @@ public class HPCPullMonitor extends PullMonitor { // during individual monitorID removal we remove the HostMonitorData object if it become empty // so if all the jobs are finished for all the hostMOnitorId objects in userMonitorData object // we should remove it from the queue so here we do not put it back. + for (ListIterator<HostMonitorData> iterator1 = take.getHostMonitorData().listIterator(); iterator1.hasNext(); ) { + HostMonitorData iHostMonitorID = iterator1.next(); + if (iHostMonitorID.getMonitorIDs().size() == 0) { + iterator1.remove(); + logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getHost() + .getType().getHostAddress()); + } + } if(take.getHostMonitorData().size()!=0) { queue.put(take); } http://git-wip-us.apache.org/repos/asf/airavata/blob/af73c444/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java index 6152505..531b8ff 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -170,11 +170,7 @@ public class CommonUtils { iterator2.remove(); logger.infoId(monitorID.getJobID(), "Removed the jobId: {} JobName: {} from monitoring last " + "status:{}", monitorID.getJobID(),monitorID.getJobName(), monitorID.getStatus().toString()); - if (iHostMonitorID.getMonitorIDs().size() == 0) { - iterator1.remove(); - logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getHost() - .getType().getHostAddress()); - } + return; } }
