Repository: airavata
Updated Branches:
  refs/heads/master 71de39d9e -> 4917ce2a9


Optimize scp connection for client data movement. AIRAVATA-1456

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4917ce2a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4917ce2a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4917ce2a

Branch: refs/heads/master
Commit: 4917ce2a9d83d62219639ed57a37a9b0c5ab2f3c
Parents: 71de39d
Author: raminder <[email protected]>
Authored: Tue Sep 30 12:49:19 2014 -0400
Committer: raminder <[email protected]>
Committed: Tue Sep 30 12:49:19 2014 -0400

----------------------------------------------------------------------
 .../ssh/handler/AdvancedSCPInputHandler.java    | 64 +++++++++++++-------
 .../ssh/handler/AdvancedSCPOutputHandler.java   | 34 ++++++++---
 2 files changed, 67 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4917ce2a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
index 850471d..a0abe7f 100644
--- 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -85,6 +85,8 @@ public class AdvancedSCPInputHandler extends 
AbstractRecoverableHandler {
 
     private String inputPath;
 
+    public static Map<String, Cluster> clusters = new HashMap<String, 
Cluster>();
+
     public void initProperties(Properties properties) throws 
GFacHandlerException {
         password = (String) properties.get("password");
         passPhrase = (String) properties.get("passPhrase");
@@ -174,13 +176,27 @@ public class AdvancedSCPInputHandler extends 
AbstractRecoverableHandler {
                                        } catch (MalformedURLException e) {
                                                
log.error(e.getLocalizedMessage(),e);
                                        }
-                                       ServerInfo serverInfo = new 
ServerInfo(this.userName, this.hostName);
-                                       if (pbsCluster == null && (lastHost == 
null || !lastHost.equals(hostName))) {
-                                               pbsCluster = new 
PBSCluster(serverInfo, authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
-                                       }
-                                       lastHost = hostName;
-
-                                       if (index < oldIndex) {
+                                        ServerInfo serverInfo = new 
ServerInfo(this.userName, this.hostName);
+                                        String key = this.userName + 
this.hostName;
+                                   boolean recreate = false;
+                                   if (clusters.containsKey(key) && 
clusters.get(key).getSession().isConnected()) {
+                                       pbsCluster = (PBSCluster) 
clusters.get(key);
+                                       try {
+                                           pbsCluster.listDirectory("~/"); // 
its hard to trust isConnected method, so we try to connect if it works we are 
good,else we recreate
+                                               log.info("Reusing existing 
connection for ---- : " + this.hostName);
+                                       } catch (Exception e) {
+                                           log.info("Connection found the 
connection map is expired, so we create from the scratch");
+                                           recreate = true; // we make the 
pbsCluster to create again if there is any exception druing connection
+                                       }
+                                   } else{
+                                       recreate = true;
+                                   }
+                                   if(recreate){
+                                   pbsCluster = new PBSCluster(serverInfo, 
authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+                                   log.info("Connection created for ---- : " + 
this.hostName);
+                                   clusters.put(key, pbsCluster);
+                                   }
+                                   if (index < oldIndex) {
                                                log.info("Input File: " + 
paramValue + " is already transfered, so we skip this operation !!!");
                                                ((URIParameterType) 
actualParameter.getType()).setValue(oldFiles.get(index));
                                                
data.append(oldFiles.get(index++)).append(","); // we get already transfered 
file and increment the index
@@ -208,11 +224,26 @@ public class AdvancedSCPInputHandler extends 
AbstractRecoverableHandler {
                                                        
log.error(e.getLocalizedMessage(),e);
                                                }
                                                ServerInfo serverInfo = new 
ServerInfo(this.userName, this.hostName);
-                                               if (pbsCluster == null && 
(lastHost == null || !lastHost.equals(hostName))) {
-                                                       pbsCluster = new 
PBSCluster(serverInfo, authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
-                                               }
-                                               lastHost = hostName;
-
+                                               String key = this.userName + 
this.hostName;
+                                   boolean recreate = false;
+                                   if (clusters.containsKey(key) && 
clusters.get(key).getSession().isConnected()) {
+                                       pbsCluster = (PBSCluster) 
clusters.get(key);
+                                       try {
+                                           pbsCluster.listDirectory("~/"); // 
its hard to trust isConnected method, so we try to connect if it works we are 
good,else we recreate
+                                               log.info("Reusing existing 
connection for ---- : " + this.hostName);
+                                       } catch (Exception e) {
+                                           log.info("Connection found the 
connection map is expired, so we create from the scratch");
+                                           recreate = true; // we make the 
pbsCluster to create again if there is any exception druing connection
+                                       }
+                                   } else{
+                                       recreate = true;
+                                   }
+                                   if(recreate){
+                                   pbsCluster = new PBSCluster(serverInfo, 
authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+                                   log.info("Connection created for ---- : " + 
this.hostName);
+                                   clusters.put(key, pbsCluster);
+                                   }
+                                
                                                if (index < oldIndex) {
                                                        log.info("Input File: " 
+ paramValue + " is already transfered, so we skip this operation !!!");
                                                        
newFiles.add(oldFiles.get(index));
@@ -236,16 +267,7 @@ public class AdvancedSCPInputHandler extends 
AbstractRecoverableHandler {
                                 log.error(e1.getLocalizedMessage());
                        }
             throw new GFacHandlerException("Error while input File Staging", 
e, e.getLocalizedMessage());
-        }finally {
-            if (pbsCluster != null) {
-                try {
-                       pbsCluster.disconnect();
-                } catch (SSHApiException e) {
-                    throw new GFacHandlerException(e.getMessage(), e);
-                }
-            }
         }
-        
         jobExecutionContext.setInMessageContext(inputNew);
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/4917ce2a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
index 03eced4..b1e03b2 100644
--- 
a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
+++ 
b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -51,6 +51,7 @@ import java.io.File;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -86,6 +87,10 @@ public class AdvancedSCPOutputHandler extends 
AbstractHandler {
     private String hostName;
 
     private String outputPath;
+    
+    public static Map<String, Cluster> clusters = new HashMap<String, 
Cluster>();
+    
+    
 
     public void initProperties(Properties properties) throws 
GFacHandlerException {
         password = (String)properties.get("password");
@@ -99,7 +104,7 @@ public class AdvancedSCPOutputHandler extends 
AbstractHandler {
 
     @Override
     public void invoke(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
-       Cluster pbsCluster = null;
+         Cluster pbsCluster = null;
         try {
             if 
(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)
 == null) {
                 try {
@@ -139,8 +144,25 @@ public class AdvancedSCPOutputHandler extends 
AbstractHandler {
                                }
             }
             ServerInfo serverInfo = new ServerInfo(this.userName, 
this.hostName);
-
+            String key = this.userName + this.hostName;
+            boolean recreate = false;
+            if (clusters.containsKey(key) && 
clusters.get(key).getSession().isConnected()) {
+                pbsCluster = (PBSCluster) clusters.get(key);
+                try {
+                    pbsCluster.listDirectory("~/"); // its hard to trust 
isConnected method, so we try to connect if it works we are good,else we 
recreate
+                       log.info("Reusing existing connection for ---- : " + 
this.hostName);
+                } catch (Exception e) {
+                    log.info("Connection found the connection map is expired, 
so we create from the scratch");
+                    recreate = true; // we make the pbsCluster to create again 
if there is any exception druing connection
+                }
+            } else{
+               recreate = true;
+            }
+            if(recreate){
             pbsCluster = new PBSCluster(serverInfo, authenticationInfo, 
CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/"));
+            log.info("Connection created for ---- : " + this.hostName);
+            clusters.put(key, pbsCluster);
+            }
             
if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && 
!jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){
             outputPath = outputPath + File.separator + 
jobExecutionContext.getExperimentID() + "-" + 
jobExecutionContext.getTaskData().getTaskID()
                     + File.separator;
@@ -185,14 +207,6 @@ public class AdvancedSCPOutputHandler extends 
AbstractHandler {
                                 log.error(e1.getLocalizedMessage());
                        }
                throw new GFacHandlerException(e);
-        }finally {
-            if (pbsCluster != null) {
-                try {
-                       pbsCluster.disconnect();
-                } catch (SSHApiException e) {
-                    throw new GFacHandlerException(e.getMessage(), e);
-                }
-            }
         }
     }
 }

Reply via email to