Repository: airavata
Updated Branches:
  refs/heads/master 5ea8f2c0b -> 75fd7e40c


fixing threadPool implementation to used during input handler execution


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/bdd025ee
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/bdd025ee
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/bdd025ee

Branch: refs/heads/master
Commit: bdd025ee293edeabf2e2e635a901e0ec74ad79a2
Parents: cc0f8ec
Author: lahiru <[email protected]>
Authored: Wed Sep 24 13:03:05 2014 -0400
Committer: lahiru <[email protected]>
Committed: Wed Sep 24 13:03:05 2014 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java | 18 ++++---
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 16 ++----
 .../gfac/core/utils/GFacThreadPoolExecutor.java | 35 ++++++++++++++
 .../gfac/core/utils/InputHandlerWorker.java     | 51 ++++++++++++++++++++
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 41 ++++++++++------
 .../impl/push/amqp/SimpleJobFinishConsumer.java |  4 +-
 pom.xml                                         |  2 +-
 7 files changed, 132 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
 
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 1b9eb75..f7c3cb8 100644
--- 
a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ 
b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -29,6 +29,8 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFac;
+import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.utils.InputHandlerWorker;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
@@ -41,6 +43,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
 
 
 public class GfacServerHandler implements GfacService.Iface, Watcher{
@@ -68,6 +73,8 @@ public class GfacServerHandler implements GfacService.Iface, 
Watcher{
 
     private String airavataServerHostPort;
 
+    private List<Future> inHandlerFutures;
+
     public GfacServerHandler() {
         // registering with zk
         try {
@@ -100,6 +107,7 @@ public class GfacServerHandler implements 
GfacService.Iface, Watcher{
             setGatewayProperties();
             BetterGfacImpl.startDaemonHandlers();
             BetterGfacImpl.startStatusUpdators(registry,zk,publisher);
+            inHandlerFutures = new ArrayList<Future>();
         }catch (Exception e){
            logger.error("Error initialising GFAC",e);
         }
@@ -187,11 +195,9 @@ public class GfacServerHandler implements 
GfacService.Iface, Watcher{
     public boolean submitJob(String experimentId, String taskId, String 
gatewayId) throws TException {
         logger.info("GFac Recieved the Experiment: " + experimentId + " 
TaskId: " + taskId);
         GFac gfac = getGfac();
-        try {
-            return gfac.submitJob(experimentId, taskId, gatewayId);
-        } catch (GFacException e) {
-            throw new TException("Error launching the experiment : " + 
e.getMessage(), e);
-        }
+        InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, 
experimentId, taskId, gatewayId);
+        
inHandlerFutures.add(GFacThreadPoolExecutor.getCachedThreadPool().submit(inputHandlerWorker));
+        return true;
     }
 
     public boolean cancelJob(String experimentId, String taskId) throws 
TException {
@@ -204,7 +210,6 @@ public class GfacServerHandler implements 
GfacService.Iface, Watcher{
         }
     }
 
-
     public Registry getRegistry() {
         return registry;
     }
@@ -249,4 +254,5 @@ public class GfacServerHandler implements 
GfacService.Iface, Watcher{
             throw new TException("Error initializing gfac instance",e);
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index beaa124..60bbc58 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -132,8 +132,6 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     private boolean cancelled = false;
 
-    private static ExecutorService cachedThreadPool;
-
     /**
      * Constructor for GFac
      *
@@ -147,7 +145,6 @@ public class BetterGfacImpl implements GFac,Watcher {
 //        this.airavataRegistry2 = airavataRegistry2;
         monitorPublisher = publisher;     // This is a EventBus common for gfac
         this.zk = zooKeeper;
-       this.cachedThreadPool = Executors.newCachedThreadPool();
     }
 
     public static void startStatusUpdators(Registry registry, ZooKeeper zk, 
MonitorPublisher publisher) {
@@ -499,7 +496,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         try {
             String experimentEntry = 
GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), 
jobExecutionContext.getTaskData().getTaskID(), zk);
             Stat exists = zk.exists(experimentEntry + File.separator + 
"operation", false);
-            zk.getData(experimentEntry + File.separator + "operation", 
this,exists);
+            zk.getData(experimentEntry + File.separator + "operation", this, 
exists);
             int stateVal = GFacUtils.getZKExperimentStateValue(zk, 
jobExecutionContext);   // this is the original state came, if we query again 
it might be different,so we preserve this state in the environment
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new 
MonitorID(jobExecutionContext)
                     , GfacExperimentState.ACCEPTED));                  // 
immediately we get the request we update the status
@@ -525,13 +522,12 @@ public class BetterGfacImpl implements GFac,Watcher {
             }
             return true;
         } catch (ApplicationSettingsException e) {
-            e.printStackTrace();
+            throw new GFacException("Error launching the Job",e);
         } catch (KeeperException e) {
-            e.printStackTrace();
+            throw new GFacException("Error launching the Job",e);
         } catch (InterruptedException e) {
-            e.printStackTrace();
+            throw new GFacException("Error launching the Job",e);
         }
-        return true;
     }
 
     public boolean cancel(String experimentID, String taskID, String 
gatewayID) throws GFacException {
@@ -1181,8 +1177,4 @@ public class BetterGfacImpl implements GFac,Watcher {
             this.cancelled = true;
         }
     }
-
-    public static ExecutorService getCachedThreadPool(){
-        return cachedThreadPool;
-    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java
new file mode 100644
index 0000000..56a97a5
--- /dev/null
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacThreadPoolExecutor.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class GFacThreadPoolExecutor {
+    private static ExecutorService cachedThreadPool;
+
+    public static ExecutorService getCachedThreadPool() {
+        if(cachedThreadPool==null){
+            cachedThreadPool = Executors.newCachedThreadPool();
+        }
+        return cachedThreadPool;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
new file mode 100644
index 0000000..efa7c0c
--- /dev/null
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.utils;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.GFac;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class InputHandlerWorker implements Callable {
+    private static Logger log = 
LoggerFactory.getLogger(InputHandlerWorker.class);
+
+    String experimentId;
+
+    String taskId;
+
+    String gatewayId;
+
+    GFac gfac;
+    public InputHandlerWorker(GFac gfac, String experimentId,String 
taskId,String gatewayId) {
+        this.gfac = gfac;
+        this.experimentId = experimentId;
+        this.taskId = taskId;
+        this.gatewayId = gatewayId;
+    }
+
+    @Override
+    public Object call() throws Exception {
+        return gfac.submitJob(experimentId,taskId,gatewayId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index 58b48ef..59ed90c 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -31,6 +31,7 @@ import org.apache.airavata.gfac.core.monitor.MonitorID;
 import org.apache.airavata.gfac.core.monitor.TaskIdentity;
 import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
 import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
 import org.apache.airavata.gfac.core.utils.OutHandlerWorker;
 import org.apache.airavata.gfac.monitor.HostMonitorData;
 import org.apache.airavata.gfac.monitor.UserMonitorData;
@@ -195,21 +196,24 @@ public class HPCPullMonitor extends PullMonitor {
                                 iterator1.remove();
                             }
                         }
+                        iterator1 = cancelJobList.iterator();
                     }
-                    Iterator<String> iterator = 
completedJobsFromPush.iterator();
-                    for(MonitorID iMonitorID:monitorID){
-                        while(iterator.hasNext()) {
-                            String cancelMId = iterator.next();
-                            if (cancelMId.equals(iMonitorID.getUserName() + 
"," + iMonitorID.getJobName())) {
-                                logger.info("This job is finished because push 
notification came with <username,jobName> " + cancelMId);
-                                completedJobs.add(iMonitorID);
-                                iMonitorID.setStatus(JobState.COMPLETE);
+                    synchronized (completedJobsFromPush) {
+                        Iterator<String> iterator = 
completedJobsFromPush.iterator();
+                        for (MonitorID iMonitorID : monitorID) {
+                            while (iterator.hasNext()) {
+                                String cancelMId = iterator.next();
+                                if (cancelMId.equals(iMonitorID.getUserName() 
+ "," + iMonitorID.getJobName())) {
+                                    logger.info("This job is finished because 
push notification came with <username,jobName> " + cancelMId);
+                                    completedJobs.add(iMonitorID);
+                                    iMonitorID.setStatus(JobState.COMPLETE);
+                                }
+                                //we have to make this empty everytime we 
iterate, otherwise this list will accumilate and will
+                                // lead to a memory leak
+                                iterator.remove();
                             }
-                            //we have to make this empty everytime we iterate, 
otherwise this list will accumilate and will
-                            // lead to a memory leak
-                            iterator.remove();
+                            iterator = completedJobsFromPush.listIterator();
                         }
-                        iterator = completedJobsFromPush.listIterator();
                     }
                     Map<String, JobState> jobStatuses = 
connection.getJobStatuses(monitorID);
                     for (MonitorID iMonitorID : monitorID) {
@@ -218,7 +222,12 @@ public class HPCPullMonitor extends PullMonitor {
                                 
!JobState.COMPLETE.equals(iMonitorID.getStatus())) {
                             
iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + 
iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we have a 
logic
                         }
-                            jobStatus = new JobStatusChangeRequest(iMonitorID);
+
+                        String id = iMonitorID.getUserName() + "," + 
iMonitorID.getJobName();
+                        if(completedJobsFromPush.contains(id)){
+                            iMonitorID.setStatus(JobState.COMPLETE);
+                        }
+                        jobStatus = new JobStatusChangeRequest(iMonitorID);
                             // we have this JobStatus class to handle amqp 
monitoring
 
                             publisher.publish(jobStatus);
@@ -227,10 +236,12 @@ public class HPCPullMonitor extends PullMonitor {
 
                             // After successful monitoring perform follow   
ing actions to cleanup the queue, if necessary
                             if 
(jobStatus.getState().equals(JobState.COMPLETE)) {
-                                completedJobs.add(iMonitorID);
+                                if(completedJobs.contains(iMonitorID)) {
+                                    completedJobs.add(iMonitorID);
+                                }
                                 // we run all the finished jobs in separate 
threads, because each job doesn't have to wait until
                                 // each one finish transfering files
-                                
BetterGfacImpl.getCachedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
+                                
GFacThreadPoolExecutor.getCachedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
                             } else if (iMonitorID.getFailedCount() > 
FAILED_COUNT) {
                                 logger.error("Tried to monitor the job with ID 
" + iMonitorID.getJobID() + " But failed" +iMonitorID.getFailedCount()+
                                         " 3 times, so skip this Job from 
Monitor");

http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
index 64d241e..3a6aae7 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/SimpleJobFinishConsumer.java
@@ -64,7 +64,9 @@ public class SimpleJobFinishConsumer {
                         while (true) {
                             QueueingConsumer.Delivery delivery = 
consumer.nextDelivery();
                             logger.info("---------------- Job Finish message 
received:"+new String(delivery.getBody())+" --------------");
-                            completedJobsFromPush.add(new 
String(delivery.getBody()));
+                            synchronized (completedJobsFromPush) {
+                                completedJobsFromPush.add(new 
String(delivery.getBody()));
+                            }
                             
ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                         }
                     } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/bdd025ee/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 69cd5c4..cfe571b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -507,7 +507,7 @@
                 <module>modules/test-suite</module>
                 <module>modules/distribution</module>
                 <module>modules/ws-messenger</module>
-                <module>modules/integration-tests</module>
+                <!--module>modules/integration-tests</module-->
                 <module>modules/xbaya-gui</module>
             </modules>
         </profile>

Reply via email to