fixing issue with monitoring, because topmost object is not removed when 
everything is empty, unwanted iteration


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/86c2a9df
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/86c2a9df
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/86c2a9df

Branch: refs/heads/gfac_appcatalog_int
Commit: 86c2a9df9195aea59a07525f954064cfbfad3ea4
Parents: c383cf3
Author: lginnaliadmin <[email protected]>
Authored: Fri Nov 7 11:06:58 2014 -0500
Committer: lginnaliadmin <[email protected]>
Committed: Fri Nov 7 11:06:58 2014 -0500

----------------------------------------------------------------------
 .../gfac/monitor/impl/pull/qstat/HPCPullMonitor.java | 15 ++++++++++-----
 .../airavata/gfac/monitor/util/CommonUtils.java      | 15 +++------------
 2 files changed, 13 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/86c2a9df/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 5973c0a..9f93dc6 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -186,7 +186,7 @@ public class HPCPullMonitor extends PullMonitor {
                             String cancelMId = iterator1.next();
                             if (cancelMId.equals(iMonitorID.getExperimentID() 
+ "+" + iMonitorID.getTaskID())) {
                                 iMonitorID.setStatus(JobState.CANCELED);
-                                iterator1.remove();
+                                
CommonUtils.removeMonitorFromQueue(take,iMonitorID);
                                 logger.debugId(cancelMId, "Found a match in 
cancel monitor queue, hence moved to the " +
                                                 "completed job queue, 
experiment {}, task {} , job {}",
                                         iMonitorID.getExperimentID(), 
iMonitorID.getTaskID(), iMonitorID.getJobID());
@@ -211,7 +211,7 @@ public class HPCPullMonitor extends PullMonitor {
                                 if (completeId.equals(iMonitorID.getUserName() 
+ "," + iMonitorID.getJobName())) {
                                     logger.info("This job is finished because 
push notification came with <username,jobName> " + completeId);
                                     iMonitorID.setStatus(JobState.COMPLETE);
-                                    iterator.remove();//we have to make this 
empty everytime we iterate, otherwise this list will accumulate and will lead 
to a memory leak
+                                    
CommonUtils.removeMonitorFromQueue(take,iMonitorID);//we have to make this 
empty everytime we iterate, otherwise this list will accumulate and will lead 
to a memory leak
                                     logger.debugId(completeId, "Push 
notification updated job {} status to {}. " +
                                                     "experiment {} , task 
{}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(),
                                             iMonitorID.getExperimentID(), 
iMonitorID.getTaskID());
@@ -241,7 +241,7 @@ public class HPCPullMonitor extends PullMonitor {
                         }else 
if(JobState.COMPLETE.equals(iMonitorID.getStatus())){
                             logger.debugId(iMonitorID.getJobID(), "Moved job 
{} to completed jobs map, experiment {}, " +
                                     "task {}", iMonitorID.getJobID(), 
iMonitorID.getExperimentID(), iMonitorID.getTaskID());
-                            iterator.remove();
+                            
CommonUtils.removeMonitorFromQueue(take,iMonitorID);
                             logger.info("PULL Notification is complete: 
marking the Job as ************COMPLETE************ experiment {}, task {}, job 
name {} .",
                                     
iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
                             
GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
@@ -278,7 +278,7 @@ public class HPCPullMonitor extends PullMonitor {
                                 logger.info("Listing directory came as 
complete: marking the Job as ************COMPLETE************ experiment {}, 
task {}, job name {} .",
                                         
iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
                                 sendNotification(iMonitorID);
-                                iterator.remove();
+                                
CommonUtils.removeMonitorFromQueue(take,iMonitorID);
                                 
GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
                             } else {
                                 iMonitorID.setFailedCount(0);
@@ -299,7 +299,12 @@ public class HPCPullMonitor extends PullMonitor {
             }
             // We have finished all the HostMonitorData object in 
userMonitorData, now we need to put it back
             // now the userMonitorData goes back to the tail of the queue
-            queue.put(take);
+            // during individual monitorID removal we remove the 
HostMonitorData object if it become empty
+            // so if all the jobs are finished for all the hostMOnitorId 
objects in userMonitorData object
+            // we should remove it from the queue so here we do not put it 
back.
+            if(take.getHostMonitorData().size()!=0) {
+                queue.put(take);
+            }
         } catch (InterruptedException e) {
             if (!this.queue.contains(take)) {
                 try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/86c2a9df/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index 3abcf1d..6152505 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -152,13 +152,10 @@ public class CommonUtils {
      * @param monitorID
      * @throws AiravataMonitorException
      */
-    public static void removeMonitorFromQueue(BlockingQueue<UserMonitorData> 
queue, MonitorID monitorID) throws AiravataMonitorException {
-            Iterator<UserMonitorData> iterator = queue.iterator();
-            while (iterator.hasNext()) {
-                UserMonitorData next = iterator.next();
-                if (next.getUserName().equals(monitorID.getUserName())) {
+    public static void removeMonitorFromQueue(UserMonitorData userMonitorData, 
MonitorID monitorID) throws AiravataMonitorException {
+                if 
(userMonitorData.getUserName().equals(monitorID.getUserName())) {
                     // then this is the right place to update
-                    List<HostMonitorData> hostMonitorData = 
next.getHostMonitorData();
+                    List<HostMonitorData> hostMonitorData = 
userMonitorData.getHostMonitorData();
                     Iterator<HostMonitorData> iterator1 = 
hostMonitorData.iterator();
                     while (iterator1.hasNext()) {
                         HostMonitorData iHostMonitorID = iterator1.next();
@@ -177,11 +174,6 @@ public class CommonUtils {
                                         iterator1.remove();
                                         logger.debug("Removed host {} from 
monitoring queue", iHostMonitorID.getHost()
                                                 .getType().getHostAddress());
-                                        if (hostMonitorData.size() == 0) {
-                                            // no useful data so we have to 
remove the element from the queue
-                                            queue.remove(next);
-                                            logger.debug("Removed user {} from 
monitoring.", next.getUserName());
-                                        }
                                     }
                                     return;
                                 }
@@ -189,7 +181,6 @@ public class CommonUtils {
                         }
                     }
                 }
-            }
         logger.info("Cannot find the given MonitorID in the queue with 
userName " +
                 monitorID.getUserName() + "  and jobID " + 
monitorID.getJobID());
         logger.info("This might not be an error because someone else removed 
this job from the queue");

Reply via email to