Repository: airavata Updated Branches: refs/heads/master 71de39d9e -> 4917ce2a9
Optimize scp connection for client data movement. AIRAVATA-1456 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4917ce2a Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4917ce2a Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4917ce2a Branch: refs/heads/master Commit: 4917ce2a9d83d62219639ed57a37a9b0c5ab2f3c Parents: 71de39d Author: raminder <[email protected]> Authored: Tue Sep 30 12:49:19 2014 -0400 Committer: raminder <[email protected]> Committed: Tue Sep 30 12:49:19 2014 -0400 ---------------------------------------------------------------------- .../ssh/handler/AdvancedSCPInputHandler.java | 64 +++++++++++++------- .../ssh/handler/AdvancedSCPOutputHandler.java | 34 ++++++++--- 2 files changed, 67 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/4917ce2a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java index 850471d..a0abe7f 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java @@ -85,6 +85,8 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { private String inputPath; + public static Map<String, Cluster> clusters = new HashMap<String, Cluster>(); + public void initProperties(Properties properties) throws GFacHandlerException { password = (String) properties.get("password"); passPhrase = (String) properties.get("passPhrase"); @@ -174,13 +176,27 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { } catch (MalformedURLException e) { log.error(e.getLocalizedMessage(),e); } - ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); - if (pbsCluster == null && (lastHost == null || !lastHost.equals(hostName))) { - pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); - } - lastHost = hostName; - - if (index < oldIndex) { + ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); + String key = this.userName + this.hostName; + boolean recreate = false; + if (clusters.containsKey(key) && clusters.get(key).getSession().isConnected()) { + pbsCluster = (PBSCluster) clusters.get(key); + try { + pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate + log.info("Reusing existing connection for ---- : " + this.hostName); + } catch (Exception e) { + log.info("Connection found the connection map is expired, so we create from the scratch"); + recreate = true; // we make the pbsCluster to create again if there is any exception druing connection + } + } else{ + recreate = true; + } + if(recreate){ + pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); + log.info("Connection created for ---- : " + this.hostName); + clusters.put(key, pbsCluster); + } + if (index < oldIndex) { log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index @@ -208,11 +224,26 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { log.error(e.getLocalizedMessage(),e); } ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); - if (pbsCluster == null && (lastHost == null || !lastHost.equals(hostName))) { - pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); - } - lastHost = hostName; - + String key = this.userName + this.hostName; + boolean recreate = false; + if (clusters.containsKey(key) && clusters.get(key).getSession().isConnected()) { + pbsCluster = (PBSCluster) clusters.get(key); + try { + pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate + log.info("Reusing existing connection for ---- : " + this.hostName); + } catch (Exception e) { + log.info("Connection found the connection map is expired, so we create from the scratch"); + recreate = true; // we make the pbsCluster to create again if there is any exception druing connection + } + } else{ + recreate = true; + } + if(recreate){ + pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); + log.info("Connection created for ---- : " + this.hostName); + clusters.put(key, pbsCluster); + } + if (index < oldIndex) { log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); newFiles.add(oldFiles.get(index)); @@ -236,16 +267,7 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { log.error(e1.getLocalizedMessage()); } throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); - }finally { - if (pbsCluster != null) { - try { - pbsCluster.disconnect(); - } catch (SSHApiException e) { - throw new GFacHandlerException(e.getMessage(), e); - } - } } - jobExecutionContext.setInMessageContext(inputNew); } http://git-wip-us.apache.org/repos/asf/airavata/blob/4917ce2a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java index 03eced4..b1e03b2 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java @@ -51,6 +51,7 @@ import java.io.File; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -86,6 +87,10 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { private String hostName; private String outputPath; + + public static Map<String, Cluster> clusters = new HashMap<String, Cluster>(); + + public void initProperties(Properties properties) throws GFacHandlerException { password = (String)properties.get("password"); @@ -99,7 +104,7 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { @Override public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - Cluster pbsCluster = null; + Cluster pbsCluster = null; try { if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) { try { @@ -139,8 +144,25 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { } } ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); - + String key = this.userName + this.hostName; + boolean recreate = false; + if (clusters.containsKey(key) && clusters.get(key).getSession().isConnected()) { + pbsCluster = (PBSCluster) clusters.get(key); + try { + pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate + log.info("Reusing existing connection for ---- : " + this.hostName); + } catch (Exception e) { + log.info("Connection found the connection map is expired, so we create from the scratch"); + recreate = true; // we make the pbsCluster to create again if there is any exception druing connection + } + } else{ + recreate = true; + } + if(recreate){ pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); + log.info("Connection created for ---- : " + this.hostName); + clusters.put(key, pbsCluster); + } if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && !jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){ outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID() + File.separator; @@ -185,14 +207,6 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { log.error(e1.getLocalizedMessage()); } throw new GFacHandlerException(e); - }finally { - if (pbsCluster != null) { - try { - pbsCluster.disconnect(); - } catch (SSHApiException e) { - throw new GFacHandlerException(e.getMessage(), e); - } - } } } }
