Integrated appCatalog for ssh and gsi modules, commented out old test classes, need to fix this
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d94e8c95 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d94e8c95 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d94e8c95 Branch: refs/heads/gfac_appcatalog_int Commit: d94e8c955763243f7b36b5151fd0a27aff90e0f6 Parents: 5a28f74 Author: shamrath <[email protected]> Authored: Tue Nov 4 12:32:09 2014 -0500 Committer: Chathuri Wimalasena <[email protected]> Committed: Wed Nov 5 11:53:12 2014 -0500 ---------------------------------------------------------------------- .../org/apache/airavata/gfac/Scheduler.java | 6 +- .../gfac/core/context/JobExecutionContext.java | 31 +- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 3 +- .../gfac/gram/handler/GridFTPOutputHandler.java | 2 +- .../gfac/gsissh/util/GFACGSISSHUtils.java | 58 +-- .../impl/GSISSHProviderTestWithMyProxyAuth.java | 465 +++++++++-------- .../ssh/handler/AdvancedSCPOutputHandler.java | 16 + .../ssh/handler/SSHDirectorySetupHandler.java | 7 +- .../gfac/ssh/handler/SSHInputHandler.java | 3 +- .../gfac/ssh/handler/SSHOutputHandler.java | 142 +++--- .../gfac/ssh/provider/impl/SSHProvider.java | 69 +-- .../airavata/gfac/ssh/util/GFACSSHUtils.java | 300 +++++------ .../services/impl/BigRed2TestWithSSHAuth.java | 504 +++++++++---------- .../impl/SSHProviderTestWithSSHAuth.java | 342 ++++++------- 14 files changed, 909 insertions(+), 1039 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java index 9e642fe..2bd612c 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java @@ -60,9 +60,9 @@ public class Scheduler { jobExecutionContext.setProvider(getProvider(jobExecutionContext)); // TODO: Selecting the provider based on application description. jobExecutionContext.getGFacConfiguration().setInHandlers(jobExecutionContext.getProvider().getClass().getName(), - jobExecutionContext.getServiceName()); + jobExecutionContext.getApplicationName()); jobExecutionContext.getGFacConfiguration().setOutHandlers(jobExecutionContext.getProvider().getClass().getName(), - jobExecutionContext.getServiceName()); + jobExecutionContext.getApplicationName()); jobExecutionContext.getGFacConfiguration().setExecutionMode(getExecutionMode(jobExecutionContext)); } @@ -72,7 +72,7 @@ public class Scheduler { * @return GFacProvider instance. */ private static GFacProvider getProvider(JobExecutionContext jobExecutionContext) throws GFacException { - String applicationName = jobExecutionContext.getServiceName(); + String applicationName = jobExecutionContext.getApplicationName(); URL resource = Scheduler.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java index dcae96a..2d1a975 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java @@ -139,7 +139,7 @@ public class JobExecutionContext extends AbstractContext implements Serializable // a scientific application(or algorithm) as a service. Service name is there to identify to // which service description we should refer during the execution of the current job represented // by this context instance. - private String serviceName; + private String applicationName; private String experimentID; @@ -166,10 +166,10 @@ public class JobExecutionContext extends AbstractContext implements Serializable */ private Map<String, SecurityContext> securityContext = new HashMap<String, SecurityContext>(); - public JobExecutionContext(GFacConfiguration gFacConfiguration,String serviceName){ + public JobExecutionContext(GFacConfiguration gFacConfiguration,String applicationName){ this.gfacConfiguration = gFacConfiguration; notifier = new GFacNotifier(); - setServiceName(serviceName); + setApplicationName(applicationName); outputFileList = new ArrayList<String>(); } @@ -238,12 +238,12 @@ public class JobExecutionContext extends AbstractContext implements Serializable this.outHandlers = outHandlers; } - public String getServiceName() { - return serviceName; + public String getApplicationName() { + return applicationName; } - public void setServiceName(String serviceName) { - this.serviceName = serviceName; + public void setApplicationName(String applicationName) { + this.applicationName = applicationName; } public GFacNotifier getNotifier() { @@ -274,15 +274,6 @@ public class JobExecutionContext extends AbstractContext implements Serializable this.inPath = false; } -// public ContextHeaderDocument.ContextHeader getContextHeader() { -// return contextHeader; -// } -// -// public void setContextHeader(ContextHeaderDocument.ContextHeader contextHeader) { -// this.contextHeader = contextHeader; -// } - - public SecurityContext getSecurityContext(String name) throws GFacException{ SecurityContext secContext = securityContext.get(name+"-"+this.getApplicationContext().getHostDescription().getType().getHostAddress()); return secContext; @@ -459,4 +450,12 @@ public class JobExecutionContext extends AbstractContext implements Serializable public void setPreferredDataMovementInterface(DataMovementInterface preferredDataMovementInterface) { this.preferredDataMovementInterface = preferredDataMovementInterface; } + + public String getExecutablePath() { + if (applicationContext == null || applicationContext.getApplicationDeploymentDescription() == null) { + return null; + } else { + return applicationContext.getApplicationDeploymentDescription().getExecutablePath(); + } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 656a291..0455f7e 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -269,7 +269,7 @@ public class BetterGfacImpl implements GFac,Watcher { GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), configurationProperties); // start constructing jobexecutioncontext - jobExecutionContext = new JobExecutionContext(gFacConfiguration, applicationInterfaceId); + jobExecutionContext = new JobExecutionContext(gFacConfiguration, applicationInterface.getApplicationName()); // setting experiment/task/workflownode related information Experiment experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentID); @@ -281,6 +281,7 @@ public class BetterGfacImpl implements GFac,Watcher { List<JobDetails> jobDetailsList = taskData.getJobDetailsList(); + //FIXME: Following for loop only set last jobDetails element to the jobExecutionContext for(JobDetails jDetails:jobDetailsList){ jobExecutionContext.setJobDetails(jDetails); } http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/handler/GridFTPOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/handler/GridFTPOutputHandler.java b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/handler/GridFTPOutputHandler.java index a424da0..7e226ea 100644 --- a/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/handler/GridFTPOutputHandler.java +++ b/modules/gfac/gfac-gram/src/main/java/org/apache/airavata/gfac/gram/handler/GridFTPOutputHandler.java @@ -133,7 +133,7 @@ public class GridFTPOutputHandler extends AbstractHandler { } String timeStampedServiceName = GFacUtils.createUniqueNameWithDate(jobExecutionContext - .getServiceName()); + .getApplicationName()); File localStdOutFile = File.createTempFile(timeStampedServiceName, "stdout"); localStdErrFile = File.createTempFile(timeStampedServiceName, "stderr"); http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java index baca65c..0a521b5 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java @@ -42,7 +42,6 @@ import org.apache.airavata.gsi.ssh.api.Cluster; import org.apache.airavata.gsi.ssh.api.ServerInfo; import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration; -import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster; import org.apache.airavata.gsi.ssh.impl.PBSCluster; import org.apache.airavata.gsi.ssh.util.CommonUtils; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; @@ -50,21 +49,13 @@ import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterfa import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol; -import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; -import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.schemas.gfac.FileArrayType; -import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType; import org.apache.airavata.schemas.gfac.StringArrayType; import org.apache.airavata.schemas.gfac.URIArrayType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; +import java.util.*; public class GFACGSISSHUtils { @@ -181,7 +172,7 @@ public class GFACGSISSHUtils { jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950")); jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir()); jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir()); - jobDescriptor.setExecutablePath(app.getExecutablePath()); + jobDescriptor.setExecutablePath(jobExecutionContext.getExecutablePath()); jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput()); jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError()); Random random = new Random(); @@ -214,51 +205,6 @@ public class GFACGSISSHUtils { } jobDescriptor.setInputValues(inputValues); - // this part will fill out the hpcApplicationDescriptor - if (app instanceof HpcApplicationDeploymentType) { - HpcApplicationDeploymentType applicationDeploymentType - = (HpcApplicationDeploymentType) app; - jobDescriptor.setUserName(((GSISSHAbstractCluster)cluster).getServerInfo().getUserName()); - jobDescriptor.setShellName("/bin/bash"); - jobDescriptor.setAllEnvExport(true); - jobDescriptor.setMailOptions("n"); - jobDescriptor.setNodes(applicationDeploymentType.getNodeCount()); - jobDescriptor.setProcessesPerNode(applicationDeploymentType.getProcessorsPerNode()); - jobDescriptor.setMaxWallTime(String.valueOf(applicationDeploymentType.getMaxWallTime())); - jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand()); - jobDescriptor.setCPUCount(applicationDeploymentType.getCpuCount()); - if (applicationDeploymentType.getProjectAccount() != null) { - if (applicationDeploymentType.getProjectAccount().getProjectAccountNumber() != null) { - jobDescriptor.setAcountString(applicationDeploymentType.getProjectAccount().getProjectAccountNumber()); - } - } - if (applicationDeploymentType.getQueue() != null) { - if (applicationDeploymentType.getQueue().getQueueName() != null) { - jobDescriptor.setQueueName(applicationDeploymentType.getQueue().getQueueName()); - } - } - jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName()); - TaskDetails taskData = jobExecutionContext.getTaskData(); - if (taskData != null && taskData.isSetTaskScheduling()) { - ComputationalResourceScheduling computionnalResource = taskData.getTaskScheduling(); - if (computionnalResource.getNodeCount() > 0) { - jobDescriptor.setNodes(computionnalResource.getNodeCount()); - } - if (computionnalResource.getComputationalProjectAccount() != null) { - jobDescriptor.setAcountString(computionnalResource.getComputationalProjectAccount()); - } - if (computionnalResource.getQueueName() != null) { - jobDescriptor.setQueueName(computionnalResource.getQueueName()); - } - if (computionnalResource.getTotalCPUCount() > 0) { - jobDescriptor.setProcessesPerNode(computionnalResource.getTotalCPUCount()); - } - if (computionnalResource.getWallTimeLimit() > 0) { - jobDescriptor.setMaxWallTime(String.valueOf(computionnalResource.getWallTimeLimit())); - } - } - - } return jobDescriptor; } } http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java b/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java index 0774022..630cd5c 100644 --- a/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java +++ b/modules/gfac/gfac-gsissh/src/test/java/org/apache/airavata/core/gfac/services/impl/GSISSHProviderTestWithMyProxyAuth.java @@ -1,236 +1,229 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.core.gfac.services.impl; - -import java.io.File; -import java.net.URL; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.UUID; - -import org.apache.airavata.commons.gfac.type.ActualParameter; -import org.apache.airavata.commons.gfac.type.ApplicationDescription; -import org.apache.airavata.commons.gfac.type.HostDescription; -import org.apache.airavata.commons.gfac.type.ServiceDescription; -import org.apache.airavata.gfac.GFacConfiguration; -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.SecurityContext; -import org.apache.airavata.gfac.core.context.ApplicationContext; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.context.MessageContext; -import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; -import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; -import org.apache.airavata.gsi.ssh.api.Cluster; -import org.apache.airavata.gsi.ssh.api.SSHApiException; -import org.apache.airavata.gsi.ssh.api.ServerInfo; -import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo; -import org.apache.airavata.gsi.ssh.impl.PBSCluster; -import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; -import org.apache.airavata.gsi.ssh.util.CommonUtils; -import org.apache.airavata.model.workspace.experiment.TaskDetails; -import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; -import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; -import org.apache.airavata.schemas.gfac.GsisshHostType; -import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType; -import org.apache.airavata.schemas.gfac.InputParameterType; -import org.apache.airavata.schemas.gfac.JobTypeType; -import org.apache.airavata.schemas.gfac.OutputParameterType; -import org.apache.airavata.schemas.gfac.ProjectAccountType; -import org.apache.airavata.schemas.gfac.QueueType; -import org.apache.airavata.schemas.gfac.StringParameterType; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -public class GSISSHProviderTestWithMyProxyAuth { - private JobExecutionContext jobExecutionContext; - - //FIXME: move job properties to configuration file - private static final String hostAddress = "trestles.sdsc.edu"; - private static final String hostName = "trestles"; - private String myProxyUserName; - private String myProxyPassword; - private String workingDirectory; - private String certificateLocation = "/Users/lahirugunathilake/Downloads/certificates"; - - @BeforeClass - public void setUp() throws Exception { -// System.setProperty("myproxy.user", "ogce"); -// System.setProperty("myproxy.password", ""); -// System.setProperty("basedir", "/Users/lahirugunathilake/Downloads"); -// System.setProperty("gsi.working.directory", "/home/ogce"); -// System.setProperty("gsi.certificate.path", "/Users/lahirugunathilake/Downloads/certificates"); - certificateLocation = System.getProperty("trusted.cert.location"); - myProxyUserName = System.getProperty("myproxy.username"); - myProxyPassword = System.getProperty("myproxy.password"); - workingDirectory = System.getProperty("gsi.working.directory"); - - if (myProxyUserName == null || myProxyPassword == null || certificateLocation == null) { - System.out.println(">>>>>> Please run tests with my proxy user name and password. " + - "E.g :- mvn clean install -Dmyproxy.username=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<"); - throw new Exception("Need my proxy user name password to run tests."); - } - URL resource = GSISSHProviderTestWithMyProxyAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); - assert resource != null; - System.out.println(resource.getFile()); - GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), null); - -// gFacConfiguration.setMyProxyLifeCycle(3600); -// gFacConfiguration.setMyProxyServer("myproxy.teragrid.org"); -// gFacConfiguration.setMyProxyUser("*****"); -// gFacConfiguration.setMyProxyPassphrase("*****"); -// gFacConfiguration.setTrustedCertLocation("./certificates"); -// //have to set InFlwo Handlers and outFlowHandlers -// gFacConfiguration.setInHandlers(Arrays.asList(new String[] {"org.apache.airavata.gfac.handler.GramDirectorySetupHandler","org.apache.airavata.gfac.handler.GridFTPInputHandler"})); -// gFacConfiguration.setOutHandlers(Arrays.asList(new String[] {"org.apache.airavata.gfac.handler.GridFTPOutputHandler"})); - - /* - * Host - */ - HostDescription host = new HostDescription(GsisshHostType.type); - host.getType().setHostAddress(hostAddress); - host.getType().setHostName(hostName); - - /* - * App - */ - ApplicationDescription appDesc = new ApplicationDescription(HpcApplicationDeploymentType.type); - HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) appDesc.getType(); - ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance(); - name.setStringValue("EchoLocal"); - app.setApplicationName(name); - ProjectAccountType projectAccountType = app.addNewProjectAccount(); - projectAccountType.setProjectAccountNumber("sds128"); - - QueueType queueType = app.addNewQueue(); - queueType.setQueueName("normal"); - - app.setCpuCount(1); - app.setJobType(JobTypeType.SERIAL); - app.setNodeCount(1); - app.setProcessorsPerNode(1); - - /* - * Use bat file if it is compiled on Windows - */ - app.setExecutableLocation("/bin/echo"); - - /* - * Default tmp location - */ - String tempDir = "/home/ogce/scratch/"; - String date = (new Date()).toString(); - date = date.replaceAll(" ", "_"); - date = date.replaceAll(":", "_"); - - tempDir = workingDirectory + File.separator - + "SimpleEcho" + "_" + date + "_" + UUID.randomUUID(); - - System.out.println(tempDir); - app.setScratchWorkingDirectory(tempDir); - app.setStaticWorkingDirectory(tempDir); - app.setInputDataDirectory(tempDir + File.separator + "inputData"); - app.setOutputDataDirectory(tempDir + File.separator + "outputData"); - app.setStandardOutput(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stdout"); - app.setStandardError(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stderr"); - app.setMaxWallTime(5); - app.setInstalledParentPath("/opt/torque/bin/"); - - /* - * Service - */ - ServiceDescription serv = new ServiceDescription(); - serv.getType().setName("SimpleEcho"); - - List<InputParameterType> inputList = new ArrayList<InputParameterType>(); - - InputParameterType input = InputParameterType.Factory.newInstance(); - input.setParameterName("echo_input"); - input.setParameterType(StringParameterType.Factory.newInstance()); - inputList.add(input); - - InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList - - .size()]); - List<OutputParameterType> outputList = new ArrayList<OutputParameterType>(); - OutputParameterType output = OutputParameterType.Factory.newInstance(); - output.setParameterName("echo_output"); - output.setParameterType(StringParameterType.Factory.newInstance()); - outputList.add(output); - - OutputParameterType[] outputParamList = outputList - .toArray(new OutputParameterType[outputList.size()]); - - serv.getType().setInputParametersArray(inputParamList); - serv.getType().setOutputParametersArray(outputParamList); - - jobExecutionContext = new JobExecutionContext(gFacConfiguration, serv.getType().getName()); - // Adding security context - jobExecutionContext.addSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT, getSecurityContext(app)); - ApplicationContext applicationContext = new ApplicationContext(); - jobExecutionContext.setApplicationContext(applicationContext); - applicationContext.setServiceDescription(serv); - applicationContext.setApplicationDeploymentDescription(appDesc); - applicationContext.setHostDescription(host); - - MessageContext inMessage = new MessageContext(); - ActualParameter echo_input = new ActualParameter(); - ((StringParameterType) echo_input.getType()).setValue("echo_output=hello"); - inMessage.addParameter("echo_input", echo_input); - - - jobExecutionContext.setInMessageContext(inMessage); - - MessageContext outMessage = new MessageContext(); - ActualParameter echo_out = new ActualParameter(); -// ((StringParameterType)echo_input.getType()).setValue("echo_output=hello"); - outMessage.addParameter("echo_output", echo_out); - jobExecutionContext.setRegistry(RegistryFactory.getLoggingRegistry()); - jobExecutionContext.setTaskData(new TaskDetails("11323")); - jobExecutionContext.setOutMessageContext(outMessage); - - } - - private SecurityContext getSecurityContext(HpcApplicationDeploymentType app) { - GSIAuthenticationInfo authenticationInfo - = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org", - 7512, 17280000, certificateLocation); - - // Server info - ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu"); - Cluster pbsCluster = null; - try { - pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager(app.getInstalledParentPath())); - } catch (SSHApiException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - GSISecurityContext sshSecurityContext = new GSISecurityContext(pbsCluster); - return sshSecurityContext; - } - @Test - public void testGSISSHProvider() throws GFacException { - BetterGfacImpl gFacAPI = new BetterGfacImpl(); - gFacAPI.submitJob(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID()); - System.out.println(jobExecutionContext.getJobDetails().getJobDescription()); - System.out.println(jobExecutionContext.getJobDetails().getJobID()); - } - -} +///* +// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// * +//*/ +//package org.apache.airavata.core.gfac.services.impl; +// +//import java.io.File; +//import java.net.URL; +//import java.util.ArrayList; +//import java.util.Date; +//import java.util.List; +//import java.util.UUID; +// +//import org.apache.aiaravata.application.catalog.data.model.ApplicationInterface; +//import org.apache.airavata.commons.gfac.type.ActualParameter; +//import org.apache.airavata.commons.gfac.type.ApplicationDescription; +//import org.apache.airavata.commons.gfac.type.HostDescription; +//import org.apache.airavata.commons.gfac.type.ServiceDescription; +//import org.apache.airavata.gfac.GFacConfiguration; +//import org.apache.airavata.gfac.GFacException; +//import org.apache.airavata.gfac.SecurityContext; +//import org.apache.airavata.gfac.core.context.ApplicationContext; +//import org.apache.airavata.gfac.core.context.JobExecutionContext; +//import org.apache.airavata.gfac.core.context.MessageContext; +//import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; +//import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; +//import org.apache.airavata.gsi.ssh.api.Cluster; +//import org.apache.airavata.gsi.ssh.api.SSHApiException; +//import org.apache.airavata.gsi.ssh.api.ServerInfo; +//import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo; +//import org.apache.airavata.gsi.ssh.impl.PBSCluster; +//import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; +//import org.apache.airavata.gsi.ssh.util.CommonUtils; +//import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +//import org.apache.airavata.model.workspace.experiment.TaskDetails; +//import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; +//import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; +//import org.apache.airavata.schemas.gfac.GsisshHostType; +//import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType; +//import org.apache.airavata.schemas.gfac.InputParameterType; +//import org.apache.airavata.schemas.gfac.JobTypeType; +//import org.apache.airavata.schemas.gfac.OutputParameterType; +//import org.apache.airavata.schemas.gfac.ProjectAccountType; +//import org.apache.airavata.schemas.gfac.QueueType; +//import org.apache.airavata.schemas.gfac.StringParameterType; +//import org.testng.annotations.BeforeClass; +//import org.testng.annotations.Test; +// +//public class GSISSHProviderTestWithMyProxyAuth { +// private JobExecutionContext jobExecutionContext; +// +// //FIXME: move job properties to configuration file +// private static final String hostAddress = "trestles.sdsc.edu"; +// private static final String hostName = "trestles"; +// private String myProxyUserName; +// private String myProxyPassword; +// private String workingDirectory; +// private String certificateLocation = "/Users/lahirugunathilake/Downloads/certificates"; +// +// @BeforeClass +// public void setUp() throws Exception { +//// System.setProperty("myproxy.user", "ogce"); +//// System.setProperty("myproxy.password", ""); +//// System.setProperty("basedir", "/Users/lahirugunathilake/Downloads"); +//// System.setProperty("gsi.working.directory", "/home/ogce"); +//// System.setProperty("gsi.certificate.path", "/Users/lahirugunathilake/Downloads/certificates"); +// certificateLocation = System.getProperty("trusted.cert.location"); +// myProxyUserName = System.getProperty("myproxy.username"); +// myProxyPassword = System.getProperty("myproxy.password"); +// workingDirectory = System.getProperty("gsi.working.directory"); +// +// if (myProxyUserName == null || myProxyPassword == null || certificateLocation == null) { +// System.out.println(">>>>>> Please run tests with my proxy user name and password. " + +// "E.g :- mvn clean install -Dmyproxy.username=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<"); +// throw new Exception("Need my proxy user name password to run tests."); +// } +// URL resource = GSISSHProviderTestWithMyProxyAuth.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); +// assert resource != null; +// System.out.println(resource.getFile()); +// GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), null); +// +// /* +// * Host +// */ +// HostDescription host = new HostDescription(GsisshHostType.type); +// host.getType().setHostAddress(hostAddress); +// host.getType().setHostName(hostName); +// +// /* +// * App +// */ +// ApplicationDescription appDesc = new ApplicationDescription(HpcApplicationDeploymentType.type); +// HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) appDesc.getType(); +// ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance(); +// name.setStringValue("EchoLocal"); +// app.setApplicationName(name); +// ProjectAccountType projectAccountType = app.addNewProjectAccount(); +// projectAccountType.setProjectAccountNumber("sds128"); +// +// QueueType queueType = app.addNewQueue(); +// queueType.setQueueName("normal"); +// +// app.setCpuCount(1); +// app.setJobType(JobTypeType.SERIAL); +// app.setNodeCount(1); +// app.setProcessorsPerNode(1); +// +// /* +// * Use bat file if it is compiled on Windows +// */ +// app.setExecutableLocation("/bin/echo"); +// +// /* +// * Default tmp location +// */ +// String tempDir = "/home/ogce/scratch/"; +// String date = (new Date()).toString(); +// date = date.replaceAll(" ", "_"); +// date = date.replaceAll(":", "_"); +// +// tempDir = workingDirectory + File.separator +// + "SimpleEcho" + "_" + date + "_" + UUID.randomUUID(); +// +// System.out.println(tempDir); +// app.setScratchWorkingDirectory(tempDir); +// app.setStaticWorkingDirectory(tempDir); +// app.setInputDataDirectory(tempDir + File.separator + "inputData"); +// app.setOutputDataDirectory(tempDir + File.separator + "outputData"); +// app.setStandardOutput(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stdout"); +// app.setStandardError(tempDir + File.separator + app.getApplicationName().getStringValue() + ".stderr"); +// app.setMaxWallTime(5); +// app.setInstalledParentPath("/opt/torque/bin/"); +// +// /* +// * Service +// */ +// ServiceDescription serv = new ServiceDescription(); +// serv.getType().setName("SimpleEcho"); +// +// List<InputParameterType> inputList = new ArrayList<InputParameterType>(); +// +// InputParameterType input = InputParameterType.Factory.newInstance(); +// input.setParameterName("echo_input"); +// input.setParameterType(StringParameterType.Factory.newInstance()); +// inputList.add(input); +// +// InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList +// +// .size()]); +// List<OutputParameterType> outputList = new ArrayList<OutputParameterType>(); +// OutputParameterType output = OutputParameterType.Factory.newInstance(); +// output.setParameterName("echo_output"); +// output.setParameterType(StringParameterType.Factory.newInstance()); +// outputList.add(output); +// +// OutputParameterType[] outputParamList = outputList +// .toArray(new OutputParameterType[outputList.size()]); +// +// serv.getType().setInputParametersArray(inputParamList); +// serv.getType().setOutputParametersArray(outputParamList); +// +// jobExecutionContext = new JobExecutionContext(gFacConfiguration, serv.getType().getName()); +// // Adding security context +// jobExecutionContext.addSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT, getSecurityContext(app)); +// ApplicationContext applicationContext = new ApplicationContext(); +// jobExecutionContext.setApplicationContext(applicationContext); +// applicationContext.setServiceDescription(serv); +// applicationContext.setApplicationDeploymentDescription(appDesc); +// applicationContext.setHostDescription(host); +// +// MessageContext inMessage = new MessageContext(); +// ActualParameter echo_input = new ActualParameter(); +// ((StringParameterType) echo_input.getType()).setValue("echo_output=hello"); +// inMessage.addParameter("echo_input", echo_input); +// +// +// jobExecutionContext.setInMessageContext(inMessage); +// +// MessageContext outMessage = new MessageContext(); +// ActualParameter echo_out = new ActualParameter(); +//// ((StringParameterType)echo_input.getType()).setValue("echo_output=hello"); +// outMessage.addParameter("echo_output", echo_out); +// jobExecutionContext.setRegistry(RegistryFactory.getLoggingRegistry()); +// jobExecutionContext.setTaskData(new TaskDetails("11323")); +// jobExecutionContext.setOutMessageContext(outMessage); +// +// } +// +// private SecurityContext getSecurityContext(HpcApplicationDeploymentType app) { +// GSIAuthenticationInfo authenticationInfo +// = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org", +// 7512, 17280000, certificateLocation); +// +// // Server info +// ServerInfo serverInfo = new ServerInfo("ogce", "trestles.sdsc.edu"); +// Cluster pbsCluster = null; +// try { +// pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager(app.getInstalledParentPath())); +// } catch (SSHApiException e) { +// e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. +// } +// GSISecurityContext sshSecurityContext = new GSISecurityContext(pbsCluster); +// return sshSecurityContext; +// } +// @Test +// public void testGSISSHProvider() throws GFacException { +// BetterGfacImpl gFacAPI = new BetterGfacImpl(); +// gFacAPI.submitJob(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID()); +// System.out.println(jobExecutionContext.getJobDetails().getJobDescription()); +// System.out.println(jobExecutionContext.getJobDetails().getJobID()); +// } +// +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java index dfd84de..f508e23 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java @@ -114,6 +114,22 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { .getApplicationDeploymentDescription().getType(); String standardError = app.getStandardError(); String standardOutput = app.getStandardOutput(); + if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) { + try { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + try { + GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + pbsCluster = ((SSHSecurityContext)jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); + String standardError = jobExecutionContext.getStandardError(); + String standardOutput = jobExecutionContext.getStandardOutput(); super.invoke(jobExecutionContext); // Server info if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() != null && jobExecutionContext.getTaskData().getAdvancedOutputDataHandling().getOutputDataDir() != null){ http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java index 0be6820..f7cbcc0 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java @@ -73,11 +73,10 @@ public class SSHDirectorySetupHandler extends AbstractHandler { } else { log.info("Successfully retrieved the Security Context"); } - ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); - String workingDirectory = app.getScratchWorkingDirectory(); + String workingDirectory = jobExecutionContext.getWorkingDir(); cluster.makeDirectory(workingDirectory); - cluster.makeDirectory(app.getInputDataDirectory()); - cluster.makeDirectory(app.getOutputDataDirectory()); + cluster.makeDirectory(jobExecutionContext.getInputDir()); + cluster.makeDirectory(jobExecutionContext.getOutputDir()); DataTransferDetails detail = new DataTransferDetails(); TransferStatus status = new TransferStatus(); status.setTransferState(TransferState.DIRECTORY_SETUP); http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java index b26e035..b0367f3 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java @@ -150,11 +150,10 @@ public class SSHInputHandler extends AbstractHandler { } private static String stageInputFiles(Cluster cluster, JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException { - ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); int i = paramValue.lastIndexOf(File.separator); String substring = paramValue.substring(i + 1); try { - String targetFile = app.getInputDataDirectory() + File.separator + substring; + String targetFile = jobExecutionContext.getInputDir() + File.separator + substring; if(paramValue.startsWith("scp:")){ paramValue = paramValue.substring(paramValue.indexOf(":") + 1, paramValue.length()); cluster.scpThirdParty(paramValue, targetFile); http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java index 328ad32..d80e92b 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java @@ -27,6 +27,8 @@ import java.util.*; import net.schmizz.sshj.connection.ConnectionException; import net.schmizz.sshj.transport.TransportException; +import org.airavata.appcatalog.cpi.AppCatalog; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.Constants; import org.apache.airavata.commons.gfac.type.ActualParameter; @@ -44,6 +46,9 @@ import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; import org.apache.airavata.gsi.ssh.api.Cluster; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; +import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol; import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.registry.cpi.ChildDataType; import org.apache.airavata.registry.cpi.RegistryModelType; @@ -58,38 +63,6 @@ public class SSHOutputHandler extends AbstractHandler { private static final Logger log = LoggerFactory.getLogger(SSHOutputHandler.class); public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GsisshHostType) { // this is because we don't have the right jobexecution context - // so attempting to get it from the registry - if (Constants.PUSH.equals(((GsisshHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getMonitorMode())) { // this is because we don't have the right jobexecution context - // so attempting to get it from the registry - log.warn("During the out handler chain jobExecution context came null, so trying to handler"); - ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); - TaskDetails taskData = null; - try { - taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID()); - } catch (RegistryException e) { - log.error("Error retrieving job details from Registry"); - throw new GFacHandlerException("Error retrieving job details from Registry", e); - } - JobDetails jobDetails = taskData.getJobDetailsList().get(0); - String jobDescription = jobDetails.getJobDescription(); - if (jobDescription != null) { - JobDescriptor jobDescriptor = null; - try { - jobDescriptor = JobDescriptor.fromXML(jobDescription); - } catch (XmlException e1) { - e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - applicationDeploymentDescription.getType().setScratchWorkingDirectory( - jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory()); - applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory()); - applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory()); - applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile()); - applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile()); - } - } - } - try { if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) { @@ -98,10 +71,10 @@ public class SSHOutputHandler extends AbstractHandler { } catch (Exception e) { log.error(e.getMessage()); try { - GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); - } catch (GFacException e1) { - log.error(e1.getLocalizedMessage()); - } + GFacUtils.saveErrorDetails(jobExecutionContext, e.getLocalizedMessage(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + } catch (GFacException e1) { + log.error(e1.getLocalizedMessage()); + } throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); } @@ -109,11 +82,9 @@ public class SSHOutputHandler extends AbstractHandler { DataTransferDetails detail = new DataTransferDetails(); TransferStatus status = new TransferStatus(); - ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext() - .getApplicationDeploymentDescription().getType(); Cluster cluster = null; try { - cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); if (cluster == null) { throw new GFacProviderException("Security context is not set properly"); } else { @@ -143,19 +114,19 @@ public class SSHOutputHandler extends AbstractHandler { // cluster.makeDirectory(outputDataDir); int i = 0; String stdOutStr = ""; - while(stdOutStr.isEmpty()){ - try { - cluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath()); - stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath()); - } catch (Exception e) { - log.error(e.getLocalizedMessage()); - Thread.sleep(2000); - } - i++; - if(i == 3) break; + while (stdOutStr.isEmpty()) { + try { + cluster.scpFrom(jobExecutionContext.getStandardOutput(), localStdOutFile.getAbsolutePath()); + stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath()); + } catch (Exception e) { + log.error(e.getLocalizedMessage()); + Thread.sleep(2000); + } + i++; + if (i == 3) break; } Thread.sleep(1000); - cluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath()); + cluster.scpFrom(jobExecutionContext.getStandardError(), localStdErrFile.getAbsolutePath()); Thread.sleep(1000); String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath()); @@ -177,72 +148,73 @@ public class SSHOutputHandler extends AbstractHandler { ActualParameter actualParameter = (ActualParameter) output.get(paramName); if ("URI".equals(actualParameter.getType().getType().toString())) { List<String> outputList = null; - int retry=3; - while(retry>0){ - outputList = cluster.listDirectory(app.getOutputDataDirectory()); - if(outputList.size() > 0){ - break; - } - retry--; - Thread.sleep(2000); + int retry = 3; + while (retry > 0) { + outputList = cluster.listDirectory(jobExecutionContext.getOutputDir()); + if (outputList.size() > 0) { + break; + } + retry--; + Thread.sleep(2000); } - + if (outputList.size() == 0 || outputList.get(0).isEmpty() || outputList.size() > 0) { - OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr,outputArray); + OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); Set<String> strings = output.keySet(); outputArray.clear(); for (String key : strings) { ActualParameter actualParameter1 = (ActualParameter) output.get(key); if ("URI".equals(actualParameter1.getType().getType().toString())) { - String downloadFile = MappingFactory.toString(actualParameter1); - cluster.scpFrom(downloadFile, outputDataDir); - String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar)+1, downloadFile.length()); - String localFile = outputDataDir + File.separator +fileName; - jobExecutionContext.addOutputFile(localFile); - MappingFactory.fromString(actualParameter1, localFile); - DataObjectType dataObjectType = new DataObjectType(); + String downloadFile = MappingFactory.toString(actualParameter1); + cluster.scpFrom(downloadFile, outputDataDir); + String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length()); + String localFile = outputDataDir + File.separator + fileName; + jobExecutionContext.addOutputFile(localFile); + MappingFactory.fromString(actualParameter1, localFile); + DataObjectType dataObjectType = new DataObjectType(); dataObjectType.setValue(localFile); dataObjectType.setKey(key); dataObjectType.setType(DataType.URI); outputArray.add(dataObjectType); } } - + break; - } else if( outputList.size() == 0) {//FIXME: Ultrascan case + } else if (outputList.size() == 0) {//FIXME: Ultrascan case String valueList = outputList.get(0); - cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir); + cluster.scpFrom(jobExecutionContext.getOutputDir() + File.separator + valueList, outputDataDir); String outputPath = outputDataDir + File.separator + valueList; - jobExecutionContext.addOutputFile(outputPath); - MappingFactory.fromString(actualParameter, outputPath); - DataObjectType dataObjectType = new DataObjectType(); + jobExecutionContext.addOutputFile(outputPath); + MappingFactory.fromString(actualParameter, outputPath); + DataObjectType dataObjectType = new DataObjectType(); dataObjectType.setValue(outputPath); dataObjectType.setKey(paramName); dataObjectType.setType(DataType.URI); outputArray.add(dataObjectType); } } else { - OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr,outputArray); + OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); } } if (outputArray == null || outputArray.isEmpty()) { - log.error("Empty Output returned from the Application, Double check the application and ApplicationDescriptor output Parameter Names"); - if(jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null){ - throw new GFacHandlerException( - "Empty Output returned from the Application, Double check the application" - + "and ApplicationDescriptor output Parameter Names"); - } + log.error("Empty Output returned from the Application, Double check the application and ApplicationDescriptor output Parameter Names"); + if (jobExecutionContext.getTaskData().getAdvancedOutputDataHandling() == null) { + throw new GFacHandlerException( + "Empty Output returned from the Application, Double check the application" + + "and ApplicationDescriptor output Parameter Names"); + } } - app.setStandardError(localStdErrFile.getAbsolutePath()); - app.setStandardOutput(localStdOutFile.getAbsolutePath()); - app.setOutputDataDirectory(outputDataDir); + // FIXME: why we set standard error ouput and outputDirectory again ? +// app.setStandardError(localStdErrFile.getAbsolutePath()); +// app.setStandardOutput(localStdOutFile.getAbsolutePath()); +// app.setOutputDataDirectory(outputDataDir); status.setTransferState(TransferState.DOWNLOAD); detail.setTransferStatus(status); detail.setTransferDescription(outputDataDir); registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); registry.add(ChildDataType.EXPERIMENT_OUTPUT, outputArray, jobExecutionContext.getExperimentID()); - - }catch (Exception e) { + + } catch (Exception e) { try { status.setTransferState(TransferState.FAILED); detail.setTransferStatus(status); http://git-wip-us.apache.org/repos/asf/airavata/blob/d94e8c95/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java index 0527c78..573ddf0 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -51,6 +51,8 @@ import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; import org.apache.airavata.gsi.ssh.impl.RawCommandInfo; import org.apache.airavata.gsi.ssh.impl.StandardOutReader; +import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.workspace.experiment.CorrectiveAction; import org.apache.airavata.model.workspace.experiment.ErrorCategory; import org.apache.airavata.model.workspace.experiment.JobDetails; @@ -86,16 +88,16 @@ public class SSHProvider extends AbstractProvider { } taskID = jobExecutionContext.getTaskData().getTaskID(); - if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) { - jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis(); + JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); + if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH) { + jobID = "SSH_" + jobExecutionContext.getHostName() + "_" + Calendar.getInstance().getTimeInMillis(); cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); - ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); - String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME; + String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME; details.setJobID(taskID); details.setJobDescription(remoteFile); jobExecutionContext.setJobDetails(details); - JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, null); + JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, null); details.setJobDescription(jobDescriptor.toXML()); GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP); @@ -114,16 +116,15 @@ public class SSHProvider extends AbstractProvider { public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException { if (!hpcType) { - ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); try { /* * Execute */ - String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME; - details.setJobDescription(execuable); + String executable = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME; + details.setJobDescription(executable); // GFacUtils.updateJobStatus(details, JobState.SUBMITTED); - RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable); + RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + executable + "; " + executable); StandardOutReader jobIDReaderCommandOutput = new StandardOutReader(); @@ -139,10 +140,6 @@ public class SSHProvider extends AbstractProvider { } else { try { jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); - HostDescriptionType host = jobExecutionContext.getApplicationContext(). - getHostDescription().getType(); - HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext(). - getApplicationDeploymentDescription().getType(); JobDetails jobDetails = new JobDetails(); try { Cluster cluster = null; @@ -155,7 +152,7 @@ public class SSHProvider extends AbstractProvider { log.info("Successfully retrieved the Security Context"); } // This installed path is a mandetory field, because this could change based on the computing resource - JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, cluster); + JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, cluster); jobDetails.setJobName(jobDescriptor.getJobName()); log.info(jobDescriptor.toXML()); @@ -172,14 +169,14 @@ public class SSHProvider extends AbstractProvider { } delegateToMonitorHandlers(jobExecutionContext); } catch (SSHApiException e) { - String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); log.error(error); jobDetails.setJobID("none"); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); throw new GFacProviderException(error, e); } catch (Exception e) { - String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); log.error(error); jobDetails.setJobID("none"); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); @@ -199,8 +196,6 @@ public class SSHProvider extends AbstractProvider { public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { JobDetails jobDetails = jobExecutionContext.getJobDetails(); - HostDescriptionType host = jobExecutionContext.getApplicationContext(). - getHostDescription().getType(); StringBuffer data = new StringBuffer(); if (!hpcType) { throw new NotImplementedException(); @@ -225,14 +220,14 @@ public class SSHProvider extends AbstractProvider { } GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); } catch (SSHApiException e) { - String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); log.error(error); jobDetails.setJobID("none"); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); throw new GFacProviderException(error, e); } catch (Exception e) { - String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); log.error(error); jobDetails.setJobID("none"); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); @@ -279,40 +274,28 @@ public class SSHProvider extends AbstractProvider { } } private File createShellScript(JobExecutionContext context) throws IOException { - ApplicationDeploymentDescriptionType app = context.getApplicationContext() - .getApplicationDeploymentDescription().getType(); - String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis() + String uniqueDir = jobExecutionContext.getApplicationName() + System.currentTimeMillis() + new Random().nextLong(); File shellScript = File.createTempFile(uniqueDir, "sh"); OutputStream out = new FileOutputStream(shellScript); out.write("#!/bin/bash\n".getBytes()); - out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes()); - out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes()); - out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n") + out.write(("cd " + jobExecutionContext.getWorkingDir() + "\n").getBytes()); + out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getInputDir() + "\n").getBytes()); + out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + jobExecutionContext.getOutputDir() + "\n") .getBytes()); // get the env of the host and the application - NameValuePairType[] env = app.getApplicationEnvironmentArray(); - - Map<String, String> nv = new HashMap<String, String>(); - if (env != null) { - for (int i = 0; i < env.length; i++) { - String key = env[i].getName(); - String value = env[i].getValue(); - nv.put(key, value); - } - } - for (Entry<String, String> entry : nv.entrySet()) { - log.debug("Env[" + entry.getKey() + "] = " + entry.getValue()); - out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes()); - + List<SetEnvPaths> envPathList = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getSetEnvironment(); + for (SetEnvPaths setEnvPaths : envPathList) { + log.debug("Env[" + setEnvPaths.getName() + "] = " + setEnvPaths.getValue()); + out.write(("export " + setEnvPaths.getName() + "=" + setEnvPaths.getValue() + "\n").getBytes()); } // prepare the command final String SPACE = " "; StringBuffer cmd = new StringBuffer(); - cmd.append(app.getExecutableLocation()); + cmd.append(jobExecutionContext.getExecutablePath()); cmd.append(SPACE); MessageContext input = context.getInMessageContext(); @@ -338,11 +321,11 @@ public class SSHProvider extends AbstractProvider { cmd.append(SPACE); cmd.append("1>"); cmd.append(SPACE); - cmd.append(app.getStandardOutput()); + cmd.append(jobExecutionContext.getStandardOutput()); cmd.append(SPACE); cmd.append("2>"); cmd.append(SPACE); - cmd.append(app.getStandardError()); + cmd.append(jobExecutionContext.getStandardError()); String cmdStr = cmd.toString(); log.info("Command = " + cmdStr);
