Repository: airavata Updated Branches: refs/heads/master bf8f3d0c9 -> 5c389b977
Close ssh session. AIRAVATA-1456 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5c389b97 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5c389b97 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5c389b97 Branch: refs/heads/master Commit: 5c389b977650b5230a5d83183dfb0223905f6a8b Parents: bf8f3d0 Author: raminder <[email protected]> Authored: Sun Sep 28 09:44:04 2014 -0400 Committer: raminder <[email protected]> Committed: Sun Sep 28 09:44:04 2014 -0400 ---------------------------------------------------------------------- .../handler/GSISSHDirectorySetupHandler.java | 14 ++++++++-- .../gfac/gsissh/handler/GSISSHInputHandler.java | 8 ++++++ .../gsissh/handler/GSISSHOutputHandler.java | 12 +++++++- .../gsissh/provider/impl/GSISSHProvider.java | 13 +++++++-- .../ssh/handler/AdvancedSCPInputHandler.java | 12 +++++++- .../ssh/handler/AdvancedSCPOutputHandler.java | 11 +++++++- .../ssh/handler/SSHDirectorySetupHandler.java | 14 ++++++++-- .../gfac/ssh/handler/SSHInputHandler.java | 29 +++++++++++++------- .../gfac/ssh/handler/SSHOutputHandler.java | 12 +++++++- 9 files changed, 105 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/5c389b97/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java index c90074d..b7d9d67 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java @@ -26,6 +26,7 @@ import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler; import org.apache.airavata.gfac.core.handler.GFacHandlerException; +import org.apache.airavata.gfac.core.provider.GFacProviderException; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; @@ -63,8 +64,9 @@ public class GSISSHDirectorySetupHandler extends AbstractRecoverableHandler { makeDirectory(jobExecutionContext); } private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - try { - Cluster cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster(); + Cluster cluster = null; + try { + cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster(); if (cluster == null) { try { GFacUtils.saveErrorDetails(jobExecutionContext, "Security context is not set properly", CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); @@ -102,6 +104,14 @@ public class GSISSHDirectorySetupHandler extends AbstractRecoverableHandler { throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); } throw new GFacHandlerException("Error executing the Handler: " + GSISSHDirectorySetupHandler.class, e); + }finally { + if (cluster != null) { + try { + cluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacHandlerException(e.getMessage(), e); + } + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/5c389b97/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java index 726f609..039ddab 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java @@ -195,6 +195,14 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler { return targetFile; } catch (Exception e) { throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + }finally { + if (cluster != null) { + try { + cluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacHandlerException(e.getMessage(), e); + } + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/5c389b97/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java index a870e52..c90bc1d 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java @@ -43,6 +43,7 @@ import org.apache.airavata.gfac.core.utils.OutputUtils; import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; import org.apache.airavata.gsi.ssh.api.Cluster; +import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.registry.cpi.ChildDataType; @@ -112,8 +113,9 @@ public class GSISSHOutputHandler extends AbstractRecoverableHandler { ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext() .getApplicationDeploymentDescription().getType(); + Cluster cluster = null; + try { - Cluster cluster = null; if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) { cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster(); } else { @@ -294,6 +296,14 @@ public class GSISSHOutputHandler extends AbstractRecoverableHandler { throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); } throw new GFacHandlerException("Error in retrieving results", e); + }finally { + if (cluster != null) { + try { + cluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacHandlerException(e.getMessage(), e); + } + } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/5c389b97/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java index b4de604..9c6a94e 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java @@ -81,8 +81,9 @@ public class GSISSHProvider extends AbstractRecoverableProvider { HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext(). getApplicationDeploymentDescription().getType(); JobDetails jobDetails = new JobDetails(); + Cluster cluster = null; + try { - Cluster cluster = null; if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) { cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster(); } @@ -125,7 +126,15 @@ public class GSISSHProvider extends AbstractRecoverableProvider { log.info("Saving data for future recovery: "); log.info(data.toString()); GFacUtils.savePluginData(jobExecutionContext, data, this.getClass().getName()); - } + if (cluster != null) { + try { + cluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacProviderException(e.getMessage(), e); + } + } + } + } public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException { http://git-wip-us.apache.org/repos/asf/airavata/blob/5c389b97/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java index e4c7dfd..850471d 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java @@ -102,6 +102,8 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { List<String> oldFiles = new ArrayList<String>(); MessageContext inputNew = new MessageContext(); StringBuffer data = new StringBuffer("|"); + Cluster pbsCluster = null; + try { String pluginData = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName()); if (pluginData != null) { @@ -153,7 +155,6 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { } DataTransferDetails detail = new DataTransferDetails(); TransferStatus status = new TransferStatus(); - Cluster pbsCluster = null; // here doesn't matter what the job manager is because we are only doing some file handling // not really dealing with monitoring or job submission, so we pa String lastHost = null; @@ -235,7 +236,16 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { log.error(e1.getLocalizedMessage()); } throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + }finally { + if (pbsCluster != null) { + try { + pbsCluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacHandlerException(e.getMessage(), e); + } + } } + jobExecutionContext.setInMessageContext(inputNew); } http://git-wip-us.apache.org/repos/asf/airavata/blob/5c389b97/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java index 7c839b2..03eced4 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java @@ -99,6 +99,7 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { @Override public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + Cluster pbsCluster = null; try { if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) { try { @@ -139,7 +140,7 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { } ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); - Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); + pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/torque-4.2.3.1/bin/")); if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && !jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().isPersistOutputData()){ outputPath = outputPath + File.separator + jobExecutionContext.getExperimentID() + "-" + jobExecutionContext.getTaskData().getTaskID() + File.separator; @@ -184,6 +185,14 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { log.error(e1.getLocalizedMessage()); } throw new GFacHandlerException(e); + }finally { + if (pbsCluster != null) { + try { + pbsCluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacHandlerException(e.getMessage(), e); + } + } } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/5c389b97/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java index ac53b11..05458e4 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java @@ -65,8 +65,9 @@ public class SSHDirectorySetupHandler extends AbstractHandler { } private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - try{ - Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); + Cluster cluster = null; + try{ + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); if (cluster == null) { throw new GFacHandlerException("Security context is not set properly"); } else { @@ -97,7 +98,16 @@ public class SSHDirectorySetupHandler extends AbstractHandler { throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); } throw new GFacHandlerException("Error executing the Handler: " + SSHDirectorySetupHandler.class, e); + }finally { + if (cluster != null) { + try { + cluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacHandlerException(e.getMessage(), e); + } + } } + } public void initProperties(Properties properties) throws GFacHandlerException { http://git-wip-us.apache.org/repos/asf/airavata/blob/5c389b97/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java index 994a670..e87bdd4 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java @@ -63,8 +63,15 @@ public class SSHInputHandler extends AbstractHandler { List<String> oldFiles = new ArrayList<String>(); StringBuffer data = new StringBuffer("|"); MessageContext inputNew = new MessageContext(); + Cluster cluster = null; + try { - + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); + if (cluster == null) { + throw new GFacException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) { try { GFACSSHUtils.addSecurityContext(jobExecutionContext); @@ -94,7 +101,7 @@ public class SSHInputHandler extends AbstractHandler { ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index }else{ - String stageInputFile = stageInputFiles(jobExecutionContext, paramValue); + String stageInputFile = stageInputFiles(cluster,jobExecutionContext, paramValue); ((URIParameterType) actualParameter.getType()).setValue(stageInputFile); StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); @@ -113,7 +120,7 @@ public class SSHInputHandler extends AbstractHandler { List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); List<String> newFiles = new ArrayList<String>(); for (String paramValueEach : split) { - String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach); + String stageInputFiles = stageInputFiles(cluster,jobExecutionContext, paramValueEach); status.setTransferState(TransferState.UPLOAD); detail.setTransferStatus(status); detail.setTransferDescription("Input Data Staged: " + stageInputFiles); @@ -138,17 +145,19 @@ public class SSHInputHandler extends AbstractHandler { throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); } throw new GFacHandlerException("Error while input File Staging", e, e.getLocalizedMessage()); + }finally { + if (cluster != null) { + try { + cluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacHandlerException(e.getMessage(), e); + } + } } jobExecutionContext.setInMessageContext(inputNew); } - private static String stageInputFiles(JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException { - Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); - if (cluster == null) { - throw new GFacException("Security context is not set properly"); - } else { - log.info("Successfully retrieved the Security Context"); - } + private static String stageInputFiles(Cluster cluster, JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException { ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); int i = paramValue.lastIndexOf(File.separator); String substring = paramValue.substring(i + 1); http://git-wip-us.apache.org/repos/asf/airavata/blob/5c389b97/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java index ce63ddc..83e8599 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java @@ -42,6 +42,7 @@ import org.apache.airavata.gfac.core.utils.OutputUtils; import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; import org.apache.airavata.gsi.ssh.api.Cluster; +import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.registry.cpi.ChildDataType; @@ -110,8 +111,9 @@ public class SSHOutputHandler extends AbstractHandler { ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext() .getApplicationDeploymentDescription().getType(); + Cluster cluster = null; try { - Cluster cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); if (cluster == null) { throw new GFacProviderException("Security context is not set properly"); } else { @@ -241,6 +243,14 @@ public class SSHOutputHandler extends AbstractHandler { throw new GFacHandlerException("Error persisting status", e1, e1.getLocalizedMessage()); } throw new GFacHandlerException("Error in retrieving results", e); + }finally { + if (cluster != null) { + try { + cluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacHandlerException(e.getMessage(), e); + } + } } }
