Repository: airavata
Updated Branches:
  refs/heads/gfac_appcatalog_int e9ee22b97 -> 755273e1a


fixing input/outhandler - AIRAVATA-1488


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b3769516
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b3769516
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b3769516

Branch: refs/heads/gfac_appcatalog_int
Commit: b3769516eec7d7127e17d2e24e4001718763c1ec
Parents: 3b330c0
Author: lahiru <[email protected]>
Authored: Thu Oct 30 11:27:12 2014 -0400
Committer: lahiru <[email protected]>
Committed: Thu Oct 30 11:27:12 2014 -0400

----------------------------------------------------------------------
 .../impl/password/PasswordCredential.java       |   3 +-
 .../gfac/core/context/JobExecutionContext.java  |   2 +-
 .../airavata/gfac/core/utils/GFacUtils.java     |   3 +-
 .../gfac/gsissh/util/GFACGSISSHUtils.java       |   2 +-
 .../monitor/impl/pull/qstat/HPCPullMonitor.java | 114 ++++++++++---------
 .../ssh/handler/AdvancedSCPInputHandler.java    |   9 +-
 .../ssh/handler/AdvancedSCPOutputHandler.java   |  12 +-
 .../airavata/gfac/ssh/util/GFACSSHUtils.java    |  86 +++++++++++---
 8 files changed, 146 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
----------------------------------------------------------------------
diff --git 
a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
 
b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
index ee32ef4..a31c98b 100644
--- 
a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
+++ 
b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
@@ -22,13 +22,14 @@
 package org.apache.airavata.credential.store.credential.impl.password;
 
 import org.apache.airavata.credential.store.credential.Credential;
+import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
 
 import java.util.Date;
 
 /**
  * User name password credentials.
  */
-public class PasswordCredential extends Credential {
+public class PasswordCredential extends SSHCredential {
 
     private String userName;
     private String password;

http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 9abab8d..2f94ec5 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -234,7 +234,7 @@ public class JobExecutionContext extends AbstractContext 
implements Serializable
 
        
        public SecurityContext getSecurityContext(String name) throws 
GFacException{
-               SecurityContext secContext = securityContext.get(name);
+               SecurityContext secContext = 
securityContext.get(name+"-"+this.getApplicationContext().getHostDescription().getType().getHostAddress());
                return secContext;
        }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 729c1ee..eef44a4 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -1092,7 +1092,8 @@ public class GFacUtils {
                }else if(experimentEntry != null && 
GFacUtils.isCancelled(experimentID,taskID,zk) ){
             // this happens when a cancel request comes to a differnt gfac 
node, in this case we do not move gfac experiment
             // node to gfac node specific location, because original request 
execution will fail with errors
-            return true;
+            log.error("This experiment is already cancelled and its already 
executing the cancel operation so cannot submit again !");
+            return false;
         } else {
             log.error("ExperimentID: " + experimentID + " taskID: " + taskID
                     + " is already running by this Gfac instance");

http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
 
b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
index 4d338e3..2f9dbc3 100644
--- 
a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
+++ 
b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
@@ -163,7 +163,7 @@ public class GFACGSISSHUtils {
             } catch (Exception e) {
                 throw new GFacException("An error occurred while creating GSI 
security context", e);
             }
-            
jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, context);
+            
jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(),
 context);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index d3c3df8..952b30e 100644
--- 
a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ 
b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -157,12 +157,10 @@ public class HPCPullMonitor extends PullMonitor {
         HostDescription currentHostDescription = null;
         try {
             take = this.queue.take();
-            Map<String,MonitorID> completedJobs = new 
HashMap<String,MonitorID>();
             List<HostMonitorData> hostMonitorData = take.getHostMonitorData();
             for (HostMonitorData iHostMonitorData : hostMonitorData) {
                 if (iHostMonitorData.getHost().getType() instanceof 
GsisshHostType
                         || iHostMonitorData.getHost().getType() instanceof 
SSHHostType) {
-                    currentHostDescription = iHostMonitorData.getHost();
                     String hostName =  
iHostMonitorData.getHost().getType().getHostAddress();
                     ResourceConnection connection = null;
                     if (connections.containsKey(hostName)) {
@@ -181,17 +179,22 @@ public class HPCPullMonitor extends PullMonitor {
                     // before we get the statuses, we check the cancel job 
list and remove them permanently
                     List<MonitorID> monitorID = 
iHostMonitorData.getMonitorIDs();
                     Iterator<String> iterator1 = cancelJobList.iterator();
-
-                    for(MonitorID iMonitorID:monitorID){
+                    ListIterator<MonitorID> monitorIDListIterator = 
monitorID.listIterator();
+                    while (monitorIDListIterator.hasNext()){
+                        MonitorID iMonitorID = monitorIDListIterator.next();
                         while(iterator1.hasNext()) {
                             String cancelMId = iterator1.next();
                             if (cancelMId.equals(iMonitorID.getExperimentID() 
+ "+" + iMonitorID.getTaskID())) {
                                 iMonitorID.setStatus(JobState.CANCELED);
-                                completedJobs.put(iMonitorID.getJobName(), 
iMonitorID);
                                 iterator1.remove();
                                 logger.debugId(cancelMId, "Found a match in 
cancel monitor queue, hence moved to the " +
                                                 "completed job queue, 
experiment {}, task {} , job {}",
                                         iMonitorID.getExperimentID(), 
iMonitorID.getTaskID(), iMonitorID.getJobID());
+                                logger.info("Job cancelled: marking the Job as 
************CANCELLED************ experiment {}, task {}, job name {} .",
+                                        
iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                                sendNotification(iMonitorID);
+                                monitorIDListIterator.remove();
+                                
GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
                                 break;
                             }
                         }
@@ -199,26 +202,36 @@ public class HPCPullMonitor extends PullMonitor {
                     }
                     synchronized (completedJobsFromPush) {
                         ListIterator<String> iterator = 
completedJobsFromPush.listIterator();
-                        for (MonitorID iMonitorID : monitorID) {
+                        monitorIDListIterator = monitorID.listIterator();
+                        while (monitorIDListIterator.hasNext()) {
+                            MonitorID iMonitorID = 
monitorIDListIterator.next();
                             String completeId = null;
                             while (iterator.hasNext()) {
                                  completeId = iterator.next();
                                 if (completeId.equals(iMonitorID.getUserName() 
+ "," + iMonitorID.getJobName())) {
                                     logger.info("This job is finished because 
push notification came with <username,jobName> " + completeId);
-                                    completedJobs.put(iMonitorID.getJobName(), 
iMonitorID);
                                     iMonitorID.setStatus(JobState.COMPLETE);
                                     iterator.remove();//we have to make this 
empty everytime we iterate, otherwise this list will accumulate and will lead 
to a memory leak
                                     logger.debugId(completeId, "Push 
notification updated job {} status to {}. " +
                                                     "experiment {} , task 
{}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(),
                                             iMonitorID.getExperimentID(), 
iMonitorID.getTaskID());
+                                    logger.info("AMQP message recieved: 
marking the Job as ************COMPLETE************ experiment {}, task {}, job 
name {} .",
+                                            
iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+
+                                    monitorIDListIterator.remove();
+                                    sendNotification(iMonitorID);
+                                    
GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
                                     break;
                                 }
                             }
                             iterator = completedJobsFromPush.listIterator();
                         }
                     }
+
+                    // we have to get this again because we removed the 
already completed jobs with amqp messages
+                    monitorID = iHostMonitorData.getMonitorIDs();
                     Map<String, JobState> jobStatuses = 
connection.getJobStatuses(monitorID);
-                    Iterator<MonitorID> iterator = monitorID.iterator();
+                    Iterator<MonitorID> iterator = monitorID.listIterator();
                     while (iterator.hasNext()) {
                         MonitorID iMonitorID = iterator.next();
                         currentMonitorID = iMonitorID;
@@ -226,13 +239,25 @@ public class HPCPullMonitor extends PullMonitor {
                                 
!JobState.COMPLETE.equals(iMonitorID.getStatus())) {
                             
iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + 
iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we have a 
logic
                         }else 
if(JobState.COMPLETE.equals(iMonitorID.getStatus())){
-                            completedJobs.put(iMonitorID.getJobName(), 
iMonitorID);
                             logger.debugId(iMonitorID.getJobID(), "Moved job 
{} to completed jobs map, experiment {}, " +
                                     "task {}", iMonitorID.getJobID(), 
iMonitorID.getExperimentID(), iMonitorID.getTaskID());
+                            iterator.remove();
+                            logger.info("PULL Notification is complete: 
marking the Job as ************COMPLETE************ experiment {}, task {}, job 
name {} .",
+                                    
iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                            
GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
                         }
-                        jobStatus = new JobStatusChangeRequestEvent();
                         
iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName()));
    //IMPORTANT this is not a simple setter we have a logic
-
+                        iMonitorID.setLastMonitored(new Timestamp((new 
Date()).getTime()));
+                        sendNotification(iMonitorID);
+                        logger.debugId(jobStatus.getJobIdentity().getJobId(), 
"Published job status change request, " +
+                                        "experiment {} , task {}", 
jobStatus.getJobIdentity().getExperimentId(),
+                                jobStatus.getJobIdentity().getTaskId());
+                        // if the job is completed we do not have to put the 
job to the queue again
+                        iMonitorID.setLastMonitored(new Timestamp((new 
Date()).getTime()));
+                    }
+                    iterator = monitorID.listIterator();
+                    while(iterator.hasNext()){
+                        MonitorID iMonitorID = iterator.next();
                         if (iMonitorID.getFailedCount() > FAILED_COUNT) {
                             iMonitorID.setLastMonitored(new Timestamp((new 
Date()).getTime()));
                             String outputDir = 
iMonitorID.getJobExecutionContext().getApplicationContext()
@@ -245,15 +270,19 @@ public class HPCPullMonitor extends PullMonitor {
                                     // this is because while we run output 
handler something failed and during exception
                                     // we store all the jobs in the monitor 
queue again
                                     logger.error("We know this  job is already 
attempted to run out-handlers");
-                                    CommonUtils.removeMonitorFromQueue(queue, 
iMonitorID);
+//                                    
CommonUtils.removeMonitorFromQueue(queue, iMonitorID);
                                 }
                             }
                             if (stdOut != null && stdOut.size() > 0 && 
!stdOut.get(0).isEmpty()) { // have to be careful with this
                                 iMonitorID.setStatus(JobState.COMPLETE);
-                                completedJobs.put(iMonitorID.getJobName(), 
iMonitorID);
-                                logger.errorId(iMonitorID.getJobID(), "Job 
monitoring failed {} times, removed job {} from " +
-                                                "monitor queue. Experiment {} 
, task {}", iMonitorID.getFailedCount(),
+                                logger.errorId(iMonitorID.getJobID(), "Job 
monitoring failed {} times, " +
+                                                " Experiment {} , task {}", 
iMonitorID.getFailedCount(),
                                         iMonitorID.getExperimentID(), 
iMonitorID.getTaskID());
+                                logger.info("Listing directory came as 
complete: marking the Job as ************COMPLETE************ experiment {}, 
task {}, job name {} .",
+                                        
iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
+                                sendNotification(iMonitorID);
+                                iterator.remove();
+                                
GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, 
iMonitorID, publisher));
                             } else {
                                 iMonitorID.setFailedCount(0);
                             }
@@ -263,22 +292,9 @@ public class HPCPullMonitor extends PullMonitor {
                             // if the job is complete we remove it from the 
Map, if any of these maps
                             // get empty this userMonitorData will get delete 
from the queue
                         }
-                        JobIdentifier jobIdentity = new 
JobIdentifier(iMonitorID.getJobID(),
-                                iMonitorID.getTaskID(),
-                                iMonitorID.getWorkflowNodeID(),
-                                iMonitorID.getExperimentID(),
-                                
iMonitorID.getJobExecutionContext().getGatewayID());
-                        jobStatus.setJobIdentity(jobIdentity);
-                        jobStatus.setState(iMonitorID.getStatus());
-                        // we have this JobStatus class to handle amqp 
monitoring
-
-                        publisher.publish(jobStatus);
-                        logger.debugId(jobStatus.getJobIdentity().getJobId(), 
"Published job status change request, " +
-                                        "experiment {} , task {}", 
jobStatus.getJobIdentity().getExperimentId(),
-                                jobStatus.getJobIdentity().getTaskId());
-                        // if the job is completed we do not have to put the 
job to the queue again
-                        iMonitorID.setLastMonitored(new Timestamp((new 
Date()).getTime()));
                     }
+
+
                 } else {
                     logger.debug("Qstat Monitor doesn't handle non-gsissh 
hosts , host {}", iHostMonitorData.getHost()
                             .getType().getHostAddress());
@@ -287,30 +303,6 @@ public class HPCPullMonitor extends PullMonitor {
             // We have finished all the HostMonitorData object in 
userMonitorData, now we need to put it back
             // now the userMonitorData goes back to the tail of the queue
             queue.put(take);
-            // cleaning up the completed jobs, this method will remove some of 
the userMonitorData from the queue if
-            // they become empty
-            Map<String, Integer> jobRemoveCountMap = new HashMap<String, 
Integer>();
-            ZooKeeper zk = null;
-            Set<String> keys = completedJobs.keySet();
-            for (String jobName: keys) {
-                MonitorID completedJob = completedJobs.get(jobName);
-                CommonUtils.removeMonitorFromQueue(queue, completedJob);
-//                    
gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext());
-                  GFacThreadPoolExecutor.getFixedThreadPool().submit(new 
OutHandlerWorker(gfac, completedJob, publisher));
-                if (zk == null) {
-                    zk = completedJob.getJobExecutionContext().getZk();
-                }
-                String key = CommonUtils.getJobCountUpdatePath(completedJob);
-                int i = 0;
-                if (jobRemoveCountMap.containsKey(key)) {
-                    i = Integer.valueOf(jobRemoveCountMap.get(key));
-                }
-                jobRemoveCountMap.put(key, ++i);
-            }
-            if (completedJobs.size() > 0) {
-                // reduce completed job count from zookeeper
-                CommonUtils.updateZkWithJobCount(zk, jobRemoveCountMap, false);
-            }
         } catch (InterruptedException e) {
             if (!this.queue.contains(take)) {
                 try {
@@ -357,6 +349,20 @@ public class HPCPullMonitor extends PullMonitor {
         return true;
     }
 
+    private void sendNotification(MonitorID iMonitorID) {
+        JobStatusChangeRequestEvent jobStatus = new 
JobStatusChangeRequestEvent();
+        JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(),
+                iMonitorID.getTaskID(),
+                iMonitorID.getWorkflowNodeID(),
+                iMonitorID.getExperimentID(),
+                iMonitorID.getJobExecutionContext().getGatewayID());
+        jobStatus.setJobIdentity(jobIdentity);
+        jobStatus.setState(iMonitorID.getStatus());
+        // we have this JobStatus class to handle amqp monitoring
+
+        publisher.publish(jobStatus);
+    }
+
     /**
      * This is the method to stop the polling process
      *

http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
index ce296da..de4dd41 100644
--- 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -72,6 +72,7 @@ import java.util.*;
 public class AdvancedSCPInputHandler extends AbstractRecoverableHandler {
     private static final Logger log = 
LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
     public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
+    public static final int DEFAULT_SSH_PORT = 22;
 
     private String password = null;
 
@@ -131,11 +132,11 @@ public class AdvancedSCPInputHandler extends 
AbstractRecoverableHandler {
                         this.passPhrase);
             }
             ServerInfo serverInfo = new ServerInfo(this.userName, 
this.hostName);
-            String key = this.userName + this.hostName;
-            jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new 
SSHAuthWrapper(serverInfo,authenticationInfo,key));
-            if 
(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)
 == null) {
+            String key = this.userName + this.hostName + DEFAULT_SSH_PORT;
+            SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, 
authenticationInfo, key);
+            if 
(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key)
 == null) {
                 try {
-                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
+                    
GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper);
                 } catch (ApplicationSettingsException e) {
                     log.error(e.getMessage());
                     try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
index ad2131e..aed6e9f 100644
--- 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
+++ 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -73,6 +73,8 @@ import java.util.Set;
 public class AdvancedSCPOutputHandler extends AbstractHandler {
     private static final Logger log = 
LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
 
+    public static final int DEFAULT_SSH_PORT = 22;
+
     private String password = null;
 
     private String publicKeyPath;
@@ -87,8 +89,6 @@ public class AdvancedSCPOutputHandler extends AbstractHandler 
{
 
     private String outputPath;
 
-    public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
-
 
     public void initProperties(Properties properties) throws 
GFacHandlerException {
         password = (String)properties.get("password");
@@ -111,12 +111,12 @@ public class AdvancedSCPOutputHandler extends 
AbstractHandler {
                     this.passPhrase);
         }
         ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName);
-        String key = this.userName + this.hostName;
-        jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new 
SSHAuthWrapper(serverInfo,authenticationInfo,key));
+        String key = this.userName + this.hostName + DEFAULT_SSH_PORT;
+        SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, 
authenticationInfo, key);
         try {
-            if 
(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)
 == null) {
+            if 
(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key)
 == null) {
                 try {
-                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
+                    
GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper);
                 } catch (ApplicationSettingsException e) {
                     log.error(e.getMessage());
                     try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
index 7ee5d6a..94f07b1 100644
--- 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
+++ 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
@@ -62,9 +62,12 @@ public class GFACSSHUtils {
 
     public static int maxClusterCount = 5;
 
-    public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
-
-
+    /**
+     * This method is to add computing resource specific authentication, if 
its a third party machine, use the other addSecurityContext
+     * @param jobExecutionContext
+     * @throws GFacException
+     * @throws ApplicationSettingsException
+     */
     public static void addSecurityContext(JobExecutionContext 
jobExecutionContext) throws GFacException, ApplicationSettingsException {
         HostDescription registeredHost = 
jobExecutionContext.getApplicationContext().getHostDescription();
         if (registeredHost.getType() instanceof GlobusHostType || 
registeredHost.getType() instanceof UnicoreHostType) {
@@ -77,8 +80,6 @@ public class GFACSSHUtils {
             requestData.setTokenId(credentialStoreToken);
 
             ServerInfo serverInfo = new ServerInfo(null, 
registeredHost.getType().getHostAddress());
-            SSHAuthWrapper sshAuth = (SSHAuthWrapper) 
jobExecutionContext.getProperty(ADVANCED_SSH_AUTH);
-
             Cluster pbsCluster = null;
             try {
                 TokenizedSSHAuthInfo tokenizedSSHAuthInfo = new 
TokenizedSSHAuthInfo(requestData);
@@ -95,9 +96,6 @@ public class GFACSSHUtils {
 
                 String key = credentials.getPortalUserName() + 
registeredHost.getType().getHostAddress() +
                         serverInfo.getPort();
-                if(sshAuth!=null){
-                    key=sshAuth.getKey();
-                }
                 boolean recreate = false;
                 synchronized (clusters) {
                     if (clusters.containsKey(key) && clusters.get(key).size() 
< maxClusterCount) {
@@ -125,15 +123,8 @@ public class GFACSSHUtils {
                         recreate = true;
                     }
                     if (recreate) {
-                        if (sshAuth != null) {
-                            pbsCluster = new 
PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),
+                        pbsCluster = new PBSCluster(serverInfo, 
tokenizedSSHAuthInfo,
                                     
CommonUtils.getPBSJobManager(installedParentPath));
-                            
jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,null); // some other provider 
might fail
-                            key = sshAuth.getKey();
-                        } else {
-                            pbsCluster = new PBSCluster(serverInfo, 
tokenizedSSHAuthInfo,
-                                    
CommonUtils.getPBSJobManager(installedParentPath));
-                        }
                         List<Cluster> pbsClusters = null;
                         if (!(clusters.containsKey(key))) {
                             pbsClusters = new ArrayList<Cluster>();
@@ -148,10 +139,71 @@ public class GFACSSHUtils {
                 e.printStackTrace();  //To change body of catch statement use 
File | Settings | File Templates.
             }
             sshSecurityContext.setPbsCluster(pbsCluster);
-            
jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT, 
sshSecurityContext);
+            
jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(),
 sshSecurityContext);
         }
     }
 
+    /**
+     * This method can be used to add third party resource security contexts
+     * @param jobExecutionContext
+     * @param sshAuth
+     * @throws GFacException
+     * @throws ApplicationSettingsException
+     */
+    public static void addSecurityContext(JobExecutionContext 
jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException, 
ApplicationSettingsException {
+            try {
+                if(sshAuth== null) {
+                    throw new GFacException("Error adding security Context, 
because sshAuthWrapper is null");
+                }
+                SSHSecurityContext sshSecurityContext = new 
SSHSecurityContext();
+                Cluster pbsCluster = null;
+                String key=sshAuth.getKey();
+                boolean recreate = false;
+                synchronized (clusters) {
+                    if (clusters.containsKey(key) && clusters.get(key).size() 
< maxClusterCount) {
+                        recreate = true;
+                    } else if (clusters.containsKey(key)) {
+                        int i = new Random().nextInt(Integer.MAX_VALUE) % 
maxClusterCount;
+                        if 
(clusters.get(key).get(i).getSession().isConnected()) {
+                            pbsCluster = clusters.get(key).get(i);
+                        } else {
+                            clusters.get(key).remove(i);
+                            recreate = true;
+                        }
+                        if (!recreate) {
+                            try {
+                                pbsCluster.listDirectory("~/"); // its hard to 
trust isConnected method, so we try to connect if it works we are good,else we 
recreate
+                            } catch (Exception e) {
+                                clusters.get(key).remove(i);
+                                logger.info("Connection found the connection 
map is expired, so we create from the scratch");
+                                maxClusterCount++;
+                                recreate = true; // we make the pbsCluster to 
create again if there is any exception druing connection
+                            }
+                        }
+                        logger.info("Re-using the same connection used with 
the connection string:" + key);
+                    } else {
+                        recreate = true;
+                    }
+                    if (recreate) {
+                        pbsCluster = new PBSCluster(sshAuth.getServerInfo(), 
sshAuth.getAuthenticationInfo(),null);
+                        key = sshAuth.getKey();
+                        List<Cluster> pbsClusters = null;
+                        if (!(clusters.containsKey(key))) {
+                            pbsClusters = new ArrayList<Cluster>();
+                        } else {
+                            pbsClusters = clusters.get(key);
+                        }
+                        pbsClusters.add(pbsCluster);
+                        clusters.put(key, pbsClusters);
+                    }
+                }
+                sshSecurityContext.setPbsCluster(pbsCluster);
+                
jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+key, 
sshSecurityContext);
+            } catch (Exception e) {
+                e.printStackTrace();  //To change body of catch statement use 
File | Settings | File Templates.
+            }
+    }
+
     public static JobDescriptor createJobDescriptor(JobExecutionContext 
jobExecutionContext,
                                                     
ApplicationDeploymentDescriptionType app, Cluster cluster) {
         JobDescriptor jobDescriptor = new JobDescriptor();

Reply via email to