Repository: airavata Updated Branches: refs/heads/master 49b6987f3 -> 6209ee096
http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java new file mode 100644 index 0000000..b0fd9eb --- /dev/null +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -0,0 +1,310 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.airavata.gfac.ssh.provider.impl; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Calendar; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; + +import org.apache.airavata.common.exception.ApplicationSettingsException; +import org.apache.airavata.commons.gfac.type.ActualParameter; +import org.apache.airavata.commons.gfac.type.MappingFactory; +import org.apache.airavata.gfac.Constants; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.context.JobExecutionContext; +import org.apache.airavata.gfac.context.MessageContext; +import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; +import org.apache.airavata.gfac.handler.GFacHandlerException; +import org.apache.airavata.gfac.notification.events.StartExecutionEvent; +import org.apache.airavata.gfac.provider.AbstractProvider; +import org.apache.airavata.gfac.provider.GFacProviderException; +import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; +import org.apache.airavata.gfac.utils.GFacUtils; +import org.apache.airavata.gsi.ssh.api.Cluster; +import org.apache.airavata.gsi.ssh.api.CommandExecutor; +import org.apache.airavata.gsi.ssh.api.SSHApiException; +import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; +import org.apache.airavata.gsi.ssh.impl.RawCommandInfo; +import org.apache.airavata.gsi.ssh.impl.StandardOutReader; +import org.apache.airavata.model.workspace.experiment.CorrectiveAction; +import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.model.workspace.experiment.JobDetails; +import org.apache.airavata.model.workspace.experiment.JobState; +import org.apache.airavata.schemas.gfac.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +/** + * Execute application using remote SSH + */ +public class SSHProvider extends AbstractProvider { + private static final Logger log = LoggerFactory.getLogger(SSHProvider.class); + private Cluster cluster; + private String jobID = null; + private String taskID = null; + // we keep gsisshprovider to support qsub submission incase of hpc scenario with ssh + private boolean hpcType = false; + + public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + super.initialize(jobExecutionContext); + if(jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null){ + try { + GFACSSHUtils.addSecurityContext(jobExecutionContext); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } + } + taskID = jobExecutionContext.getTaskData().getTaskID(); + if (!((SSHHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getHpcResource()) { + jobID = "SSH_" + jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress() + "_" + Calendar.getInstance().getTimeInMillis(); + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); + + ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); + String remoteFile = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME; + details.setJobID(taskID); + details.setJobDescription(remoteFile); + jobExecutionContext.setJobDetails(details); + JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, null); + details.setJobDescription(jobDescriptor.toXML()); + + GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP); + log.info(remoteFile); + try { + File runscript = createShellScript(jobExecutionContext); + cluster.scpTo(remoteFile, runscript.getAbsolutePath()); + } catch (Exception e) { + throw new GFacProviderException(e.getLocalizedMessage(), e); + } + }else{ + hpcType = true; + } + } + + + public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException { + if (!hpcType) { + ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); + try { + /* + * Execute + */ + String execuable = app.getStaticWorkingDirectory() + File.separatorChar + Constants.EXECUTABLE_NAME; + details.setJobDescription(execuable); + +// GFacUtils.updateJobStatus(details, JobState.SUBMITTED); + RawCommandInfo rawCommandInfo = new RawCommandInfo("/bin/chmod 755 " + execuable + "; " + execuable); + + StandardOutReader jobIDReaderCommandOutput = new StandardOutReader(); + + CommandExecutor.executeCommand(rawCommandInfo, cluster.getSession(), jobIDReaderCommandOutput); + String stdOutputString = getOutputifAvailable(jobIDReaderCommandOutput, "Error submitting job to resource"); + + log.info("stdout=" + stdOutputString); + +// GFacUtils.updateJobStatus(details, JobState.COMPLETE); + } catch (Exception e) { + throw new GFacProviderException(e.getMessage(), e); + } finally { + if (cluster != null) { + try { + cluster.disconnect(); + } catch (SSHApiException e) { + throw new GFacProviderException(e.getMessage(), e); + } + } + } + } else { + try { + jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); + HostDescriptionType host = jobExecutionContext.getApplicationContext(). + getHostDescription().getType(); + HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext(). + getApplicationDeploymentDescription().getType(); + JobDetails jobDetails = new JobDetails(); + String taskID = jobExecutionContext.getTaskData().getTaskID(); + try { + Cluster cluster = null; + if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) != null) { + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)).getPbsCluster(); + } + if (cluster == null) { + throw new GFacProviderException("Security context is not set properly"); + } else { + log.info("Successfully retrieved the Security Context"); + } + // This installed path is a mandetory field, because this could change based on the computing resource + JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, app, cluster); + + log.info(jobDescriptor.toXML()); + + jobDetails.setJobDescription(jobDescriptor.toXML()); + + String jobID = cluster.submitBatchJob(jobDescriptor); + jobExecutionContext.setJobDetails(jobDetails); + if (jobID == null) { + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + } else { + jobDetails.setJobID(jobID); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED); + } + + } catch (SSHApiException e) { + String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + log.error(error); + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + throw new GFacProviderException(error, e); + } catch (Exception e) { + String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + log.error(error); + jobDetails.setJobID("none"); + GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); + GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); + throw new GFacProviderException(error, e); + } + } catch (GFacException e) { + throw new GFacProviderException(e.getMessage(), e); + } + } + } + + public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { + + } + + + public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException { + throw new NotImplementedException(); + } + + + private File createShellScript(JobExecutionContext context) throws IOException { + ApplicationDeploymentDescriptionType app = context.getApplicationContext() + .getApplicationDeploymentDescription().getType(); + String uniqueDir = app.getApplicationName().getStringValue() + System.currentTimeMillis() + + new Random().nextLong(); + + File shellScript = File.createTempFile(uniqueDir, "sh"); + OutputStream out = new FileOutputStream(shellScript); + + out.write("#!/bin/bash\n".getBytes()); + out.write(("cd " + app.getStaticWorkingDirectory() + "\n").getBytes()); + out.write(("export " + Constants.INPUT_DATA_DIR_VAR_NAME + "=" + app.getInputDataDirectory() + "\n").getBytes()); + out.write(("export " + Constants.OUTPUT_DATA_DIR_VAR_NAME + "=" + app.getOutputDataDirectory() + "\n") + .getBytes()); + // get the env of the host and the application + NameValuePairType[] env = app.getApplicationEnvironmentArray(); + + Map<String, String> nv = new HashMap<String, String>(); + if (env != null) { + for (int i = 0; i < env.length; i++) { + String key = env[i].getName(); + String value = env[i].getValue(); + nv.put(key, value); + } + } + for (Entry<String, String> entry : nv.entrySet()) { + log.debug("Env[" + entry.getKey() + "] = " + entry.getValue()); + out.write(("export " + entry.getKey() + "=" + entry.getValue() + "\n").getBytes()); + + } + + // prepare the command + final String SPACE = " "; + StringBuffer cmd = new StringBuffer(); + cmd.append(app.getExecutableLocation()); + cmd.append(SPACE); + + MessageContext input = context.getInMessageContext(); + ; + Map<String, Object> inputs = input.getParameters(); + Set<String> keys = inputs.keySet(); + for (String paramName : keys) { + ActualParameter actualParameter = (ActualParameter) inputs.get(paramName); + if ("URIArray".equals(actualParameter.getType().getType().toString())) { + String[] values = ((URIArrayType) actualParameter.getType()).getValueArray(); + for (String value : values) { + cmd.append(value); + cmd.append(SPACE); + } + } else { + String paramValue = MappingFactory.toString(actualParameter); + cmd.append(paramValue); + cmd.append(SPACE); + } + } + // We redirect the error and stdout to remote files, they will be read + // in later + cmd.append(SPACE); + cmd.append("1>"); + cmd.append(SPACE); + cmd.append(app.getStandardOutput()); + cmd.append(SPACE); + cmd.append("2>"); + cmd.append(SPACE); + cmd.append(app.getStandardError()); + + String cmdStr = cmd.toString(); + log.info("Command = " + cmdStr); + out.write((cmdStr + "\n").getBytes()); + String message = "\"execuationSuceeded\""; + out.write(("echo " + message + "\n").getBytes()); + out.close(); + + return shellScript; + } + + public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException { + + } + /** + * This method will read standard output and if there's any it will be parsed + * @param jobIDReaderCommandOutput + * @param errorMsg + * @return + * @throws SSHApiException + */ + private String getOutputifAvailable(StandardOutReader jobIDReaderCommandOutput, String errorMsg) throws SSHApiException { + String stdOutputString = jobIDReaderCommandOutput.getStdOutputString(); + String stdErrorString = jobIDReaderCommandOutput.getStdErrorString(); + + if(stdOutputString == null || stdOutputString.isEmpty() || (stdErrorString != null && !stdErrorString.isEmpty())){ + log.error("Standard Error output : " + stdErrorString); + throw new SSHApiException(errorMsg + stdErrorString); + } + return stdOutputString; + } + + +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/integration-tests/src/test/resources/gfac-config.xml ---------------------------------------------------------------------- diff --git a/modules/integration-tests/src/test/resources/gfac-config.xml b/modules/integration-tests/src/test/resources/gfac-config.xml index 06afc6e..9ebee37 100644 --- a/modules/integration-tests/src/test/resources/gfac-config.xml +++ b/modules/integration-tests/src/test/resources/gfac-config.xml @@ -24,7 +24,7 @@ </InHandlers> <OutHandlers></OutHandlers> </GlobalHandlers> - <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl"> + <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl"> <InHandlers> <Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/> </InHandlers> http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml index 06afc6e..9ebee37 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/resources/gfac-config.xml @@ -24,7 +24,7 @@ </InHandlers> <OutHandlers></OutHandlers> </GlobalHandlers> - <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl"> + <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl"> <InHandlers> <Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/> </InHandlers> http://git-wip-us.apache.org/repos/asf/airavata/blob/6209ee09/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml b/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml index 06afc6e..9ebee37 100644 --- a/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml +++ b/modules/orchestrator/orchestrator-core/src/test/resources/gfac-config.xml @@ -24,7 +24,7 @@ </InHandlers> <OutHandlers></OutHandlers> </GlobalHandlers> - <Provider class="org.apache.airavata.gfac.local.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl"> + <Provider class="org.apache.airavata.gfac.local.provider.impl.LocalProvider" host="org.apache.airavata.schemas.gfac.impl.HostDescriptionTypeImpl"> <InHandlers> <Handler class="org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler"/> </InHandlers>
