Repository: airavata Updated Branches: refs/heads/master dfa2f22e2 -> d3d04cd17
fixing issues in SSH job submission with app catalog - AIRAVATA-1511 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d3d04cd1 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d3d04cd1 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d3d04cd1 Branch: refs/heads/master Commit: d3d04cd1778e9d8235ed4aa57398f7c22676a6df Parents: dfa2f22 Author: Chathuri Wimalasena <[email protected]> Authored: Wed Nov 12 16:52:27 2014 -0500 Committer: Chathuri Wimalasena <[email protected]> Committed: Wed Nov 12 16:52:27 2014 -0500 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 6 +- .../airavata/gfac/server/GfacServerHandler.java | 6 +- .../org/apache/airavata/gfac/Scheduler.java | 6 +- .../gfac/core/context/JobExecutionContext.java | 22 ++++++ .../airavata/gfac/core/cpi/BetterGfacImpl.java | 6 +- .../gsissh/provider/impl/GSISSHProvider.java | 4 +- .../gfac/gsissh/util/GFACGSISSHUtils.java | 3 +- .../gfac/ssh/handler/SSHInputHandler.java | 69 ++++++++++--------- .../gfac/ssh/provider/impl/SSHProvider.java | 70 ++++++++++++-------- .../airavata/gfac/ssh/util/GFACSSHUtils.java | 67 +++++++++++++------ .../core/impl/GFACEmbeddedJobSubmitter.java | 2 +- 11 files changed, 167 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index cc2307e..f6eb175 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -58,7 +58,7 @@ public class CreateLaunchExperiment { private static final String DEFAULT_GATEWAY = "default.registry.gateway"; private static Airavata.Client airavataClient; - private static String echoAppId = "Echo_3f2bb10b-2ee4-48c0-a526-c7c3c53f0a97"; + private static String echoAppId = "Echo_36fbb479-5b41-4f48-a9c5-382ee910ac6b"; private static String wrfAppId = "WRF_a37fb0be-a252-4185-a31c-ff43f585f88f"; private static String amberAppId = "Amber_a8187edf-7be0-4dad-9c8a-d0cc9075e6a0"; @@ -93,11 +93,11 @@ public class CreateLaunchExperiment { // final String expId = createEchoExperimentForTrestles(airavataClient); // final String expId = createExperimentEchoForLocalHost(airavataClient); // final String expId = createExperimentWRFTrestles(airavataClient); -// final String expId = createExperimentForBR2(airavataClient); + final String expId = createExperimentForBR2(airavataClient); // final String expId = createExperimentForBR2Amber(airavataClient); // final String expId = createExperimentWRFStampede(airavataClient); // final String expId = createExperimentForStampedeAmber(airavataClient); - final String expId = createExperimentForTrestlesAmber(airavataClient); +// final String expId = createExperimentForTrestlesAmber(airavataClient); System.out.println("Experiment ID : " + expId); // updateExperiment(airavata, expId); http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index cbc0c86..3faa9eb 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -21,6 +21,8 @@ package org.apache.airavata.gfac.server; import com.google.common.eventbus.EventBus; +import org.airavata.appcatalog.cpi.AppCatalog; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.logger.AiravataLogger; import org.apache.airavata.common.logger.AiravataLoggerFactory; @@ -52,6 +54,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class); private Registry registry; + private AppCatalog appCatalog; private String registryURL; @@ -104,6 +107,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ publisher = new MonitorPublisher(new EventBus()); BetterGfacImpl.setMonitorPublisher(publisher); registry = RegistryFactory.getDefaultRegistry(); + appCatalog = AppCatalogFactory.getAppCatalog(); setGatewayProperties(); BetterGfacImpl.startDaemonHandlers(); BetterGfacImpl.startStatusUpdators(registry,zk,publisher); @@ -261,7 +265,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ private GFac getGfac()throws TException{ try { - return new BetterGfacImpl(registry,zk,publisher); + return new BetterGfacImpl(registry, appCatalog, zk,publisher); } catch (Exception e) { throw new TException("Error initializing gfac instance",e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java index fb5899d..d8c7663 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java @@ -123,7 +123,7 @@ public class Scheduler { LOCALSubmission localSubmission; String securityProtocol = null; try { - AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); if (jobSubmissionProtocol == JobSubmissionProtocol.SSH) { sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission( jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId()); @@ -136,9 +136,9 @@ public class Scheduler { List<Element> elements = GFacUtils.getElementList(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_SUBMISSION + jobSubmissionProtocol + "']"); for (Element element : elements) { String security = element.getAttribute(Constants.GFAC_CONFIG_SECURITY_ATTRIBUTE); - if (securityProtocol == null && security == null) { + if (security.equals("")) { providerClassName = element.getAttribute(Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); - }else if (securityProtocol.equals(security)) { + }else if (securityProtocol != null && securityProtocol.equals(security)) { providerClassName = element.getAttribute(Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java index ff764a0..4dbde7e 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java @@ -27,6 +27,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.airavata.appcatalog.cpi.AppCatalog; +import org.airavata.appcatalog.cpi.AppCatalogException; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; import org.apache.airavata.gfac.GFacConfiguration; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.SecurityContext; @@ -43,9 +46,12 @@ import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; import org.apache.airavata.registry.cpi.Registry; import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JobExecutionContext extends AbstractContext implements Serializable{ + private static final Logger log = LoggerFactory.getLogger(JobExecutionContext.class); private GFacConfiguration gfacConfiguration; private ApplicationContext applicationContext; @@ -147,6 +153,8 @@ public class JobExecutionContext extends AbstractContext implements Serializable private String experimentID; + private AppCatalog appCatalog; + public String getGatewayID() { return gatewayID; } @@ -177,6 +185,20 @@ public class JobExecutionContext extends AbstractContext implements Serializable outputFileList = new ArrayList<String>(); } + public AppCatalog getAppCatalog() { + return appCatalog; + } + + public void setAppCatalog(AppCatalog appCatalog) { + if (appCatalog == null){ + try { + this.appCatalog = AppCatalogFactory.getAppCatalog(); + } catch (AppCatalogException e) { + log.error("Unable to create app catalog instance" , e); + } + } + this.appCatalog = appCatalog; + } public String getExperimentID() { return experimentID; http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 814efb3..666190e 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -89,6 +89,7 @@ public class BetterGfacImpl implements GFac,Watcher { public static final String ERROR_SENT = "ErrorSent"; private Registry registry; + private AppCatalog appCatalog; // we are not storing zk instance in to jobExecution context private ZooKeeper zk; @@ -109,11 +110,12 @@ public class BetterGfacImpl implements GFac,Watcher { * @param registry * @param zooKeeper */ - public BetterGfacImpl(Registry registry, ZooKeeper zooKeeper, + public BetterGfacImpl(Registry registry, AppCatalog appCatalog, ZooKeeper zooKeeper, MonitorPublisher publisher) { this.registry = registry; monitorPublisher = publisher; // This is a EventBus common for gfac this.zk = zooKeeper; + this.appCatalog = appCatalog; } public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) { @@ -280,7 +282,7 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0)); jobExecutionContext.setTaskData(taskData); jobExecutionContext.setGatewayID(gatewayID); - + jobExecutionContext.setAppCatalog(appCatalog); List<JobDetails> jobDetailsList = taskData.getJobDetailsList(); //FIXME: Following for loop only set last jobDetails element to the jobExecutionContext http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java index d26d31b..9918f42 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java @@ -94,7 +94,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider { Cluster cluster = null; try { - AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission( jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId()); if (jobExecutionContext.getSecurityContext(jobExecutionContext.getHostName()) != null) { @@ -319,7 +319,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider { throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); } } - AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission( jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId()); delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission, jobId); http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java index 45ef1b8..73b2462 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java @@ -70,11 +70,12 @@ public class GFACGSISSHUtils { public static final String SUN_GRID_ENGINE_JOB_MANAGER = "UGE"; public static int maxClusterCount = 5; public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>(); + public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException { JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); JobSubmissionProtocol jobProtocol = jobSubmissionInterface.getJobSubmissionProtocol(); try { - AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId()); if (jobProtocol == JobSubmissionProtocol.GLOBUS || jobProtocol == JobSubmissionProtocol.UNICORE || jobProtocol == JobSubmissionProtocol.CLOUD || jobProtocol == JobSubmissionProtocol.LOCAL) { http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java index dd27d6b..0aec7cc 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java @@ -38,6 +38,8 @@ import org.apache.airavata.gfac.ssh.security.SSHSecurityContext; import org.apache.airavata.gfac.ssh.util.GFACSSHUtils; import org.apache.airavata.gsi.ssh.api.Cluster; import org.apache.airavata.gsi.ssh.api.SSHApiException; +import org.apache.airavata.model.appcatalog.appinterface.DataType; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; import org.apache.airavata.model.workspace.experiment.CorrectiveAction; import org.apache.airavata.model.workspace.experiment.DataTransferDetails; import org.apache.airavata.model.workspace.experiment.ErrorCategory; @@ -87,47 +89,48 @@ public class SSHInputHandler extends AbstractHandler { MessageContext input = jobExecutionContext.getInMessageContext(); Set<String> parameters = input.getParameters().keySet(); for (String paramName : parameters) { - ActualParameter actualParameter = (ActualParameter) input.getParameters().get(paramName); - String paramValue = MappingFactory.toString(actualParameter); + InputDataObjectType inputParamType = (InputDataObjectType) input.getParameters().get(paramName); + String paramValue = inputParamType.getValue(); //TODO: Review this with type - if ("URI".equals(actualParameter.getType().getType().toString())) { - if (index < oldIndex) { + if (inputParamType.getType() == DataType.URI) { + if (index < oldIndex) { log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); - ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); + inputParamType.setValue(oldFiles.get(index)); data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index - }else{ - String stageInputFile = stageInputFiles(cluster,jobExecutionContext, paramValue); - ((URIParameterType) actualParameter.getType()).setValue(stageInputFile); - StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); - - status.setTransferState(TransferState.UPLOAD); - detail.setTransferStatus(status); - detail.setTransferDescription("Input Data Staged: " + stageInputFile); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); - } - } else if ("URIArray".equals(actualParameter.getType().getType().toString())) { - if (index < oldIndex) { - log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); - ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); - data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index - }else{ - List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); - List<String> newFiles = new ArrayList<String>(); - for (String paramValueEach : split) { - String stageInputFiles = stageInputFiles(cluster,jobExecutionContext, paramValueEach); + } else { + String stageInputFile = stageInputFiles(cluster, jobExecutionContext, paramValue); + inputParamType.setValue(stageInputFile); + StringBuffer temp = new StringBuffer(data.append(stageInputFile).append(",").toString()); status.setTransferState(TransferState.UPLOAD); detail.setTransferStatus(status); - detail.setTransferDescription("Input Data Staged: " + stageInputFiles); + detail.setTransferDescription("Input Data Staged: " + stageInputFile); registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - newFiles.add(stageInputFiles); - StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); } - ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); - } - } - inputNew.getParameters().put(paramName, actualParameter); + }// FIXME: what is the thrift model DataType equivalent for URIArray type? +// else if ("URIArray".equals(actualParameter.getType().getType().toString())) { +// if (index < oldIndex) { +// log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); +// ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); +// data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index +// }else{ +// List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); +// List<String> newFiles = new ArrayList<String>(); +// for (String paramValueEach : split) { +// String stageInputFiles = stageInputFiles(cluster,jobExecutionContext, paramValueEach); +// status.setTransferState(TransferState.UPLOAD); +// detail.setTransferStatus(status); +// detail.setTransferDescription("Input Data Staged: " + stageInputFiles); +// registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); +// newFiles.add(stageInputFiles); +// StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); +// GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); +// } +// ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); +// } +// } + inputNew.getParameters().put(paramName, inputParamType); } } catch (Exception e) { log.error(e.getMessage()); http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java index ff2267c..fd618e4 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java @@ -28,6 +28,9 @@ import java.io.OutputStream; import java.util.*; import java.util.Map.Entry; +import org.airavata.appcatalog.cpi.AppCatalog; +import org.airavata.appcatalog.cpi.AppCatalogException; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.commons.gfac.type.ActualParameter; import org.apache.airavata.commons.gfac.type.MappingFactory; @@ -52,7 +55,10 @@ import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; import org.apache.airavata.gsi.ssh.impl.RawCommandInfo; import org.apache.airavata.gsi.ssh.impl.StandardOutReader; import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.workspace.experiment.CorrectiveAction; import org.apache.airavata.model.workspace.experiment.ErrorCategory; import org.apache.airavata.model.workspace.experiment.JobDetails; @@ -77,40 +83,46 @@ public class SSHProvider extends AbstractProvider { private boolean hpcType = false; public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { - super.initialize(jobExecutionContext); - String hostAddress = jobExecutionContext.getHostName(); - if (jobExecutionContext.getSecurityContext(hostAddress) == null) { - try { + try { + super.initialize(jobExecutionContext); + String hostAddress = jobExecutionContext.getHostName(); + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); + JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId()); + ResourceJobManagerType resourceJobManagerType = sshJobSubmission.getResourceJobManager().getResourceJobManagerType(); + if (jobExecutionContext.getSecurityContext(hostAddress) == null) { GFACSSHUtils.addSecurityContext(jobExecutionContext); - } catch (ApplicationSettingsException e) { - log.error(e.getMessage()); - throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); } - } - taskID = jobExecutionContext.getTaskData().getTaskID(); - - JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); - if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH) { - jobID = "SSH_" + jobExecutionContext.getHostName() + "_" + Calendar.getInstance().getTimeInMillis(); - cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); - - String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME; - details.setJobID(taskID); - details.setJobDescription(remoteFile); - jobExecutionContext.setJobDetails(details); - JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, null); - details.setJobDescription(jobDescriptor.toXML()); - - GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP); - log.info(remoteFile); - try { + taskID = jobExecutionContext.getTaskData().getTaskID(); + + JobSubmissionProtocol preferredJobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); + if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH && resourceJobManagerType == ResourceJobManagerType.FORK) { + jobID = "SSH_" + jobExecutionContext.getHostName() + "_" + Calendar.getInstance().getTimeInMillis(); + cluster = ((SSHSecurityContext) jobExecutionContext.getSecurityContext(hostAddress)).getPbsCluster(); + + String remoteFile = jobExecutionContext.getWorkingDir() + File.separatorChar + Constants.EXECUTABLE_NAME; + details.setJobID(taskID); + details.setJobDescription(remoteFile); + jobExecutionContext.setJobDetails(details); + // FIXME : Why cluster is passed as null + JobDescriptor jobDescriptor = GFACSSHUtils.createJobDescriptor(jobExecutionContext, cluster); + details.setJobDescription(jobDescriptor.toXML()); + + GFacUtils.saveJobStatus(jobExecutionContext, details, JobState.SETUP); + log.info(remoteFile); File runscript = createShellScript(jobExecutionContext); cluster.scpTo(remoteFile, runscript.getAbsolutePath()); - } catch (Exception e) { - throw new GFacProviderException(e.getLocalizedMessage(), e); + } else { + hpcType = true; } - } else { - hpcType = true; + } catch (AppCatalogException e) { + log.error("Error while creating app catalog", e); + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } catch (ApplicationSettingsException e) { + log.error(e.getMessage()); + throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); + } catch (Exception e) { + throw new GFacProviderException(e.getLocalizedMessage(), e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java index 05cdf31..cec4d28 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java @@ -43,14 +43,18 @@ import org.apache.airavata.gsi.ssh.api.Cluster; import org.apache.airavata.gsi.ssh.api.ServerInfo; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; +import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster; import org.apache.airavata.gsi.ssh.impl.PBSCluster; import org.apache.airavata.gsi.ssh.util.CommonUtils; +import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol; +import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; import org.apache.airavata.model.workspace.experiment.CorrectiveAction; import org.apache.airavata.model.workspace.experiment.ErrorCategory; +import org.apache.airavata.model.workspace.experiment.TaskDetails; import org.apache.airavata.schemas.gfac.FileArrayType; import org.apache.airavata.schemas.gfac.StringArrayType; import org.apache.airavata.schemas.gfac.URIArrayType; @@ -79,9 +83,10 @@ public class GFACSSHUtils { logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml"); } else if (preferredJobSubmissionProtocol == JobSubmissionProtocol.SSH) { try { - AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + AppCatalog appCatalog = jobExecutionContext.getAppCatalog(); SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionInterface.getJobSubmissionInterfaceId()); - if (sshJobSubmission.getSecurityProtocol() == SecurityProtocol.GSI) { + SecurityProtocol securityProtocol = sshJobSubmission.getSecurityProtocol(); + if (securityProtocol == SecurityProtocol.GSI || securityProtocol == SecurityProtocol.SSH_KEYS) { SSHSecurityContext sshSecurityContext = new SSHSecurityContext(); String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway()); @@ -218,6 +223,7 @@ public class GFACSSHUtils { public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) { JobDescriptor jobDescriptor = new JobDescriptor(); + TaskDetails taskData = jobExecutionContext.getTaskData(); // this is common for any application descriptor jobDescriptor.setCallBackIp(ServerSettings.getIp()); jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950")); @@ -236,26 +242,49 @@ public class GFACSSHUtils { Map<String, Object> inputs = input.getParameters(); Set<String> keys = inputs.keySet(); for (String paramName : keys) { - ActualParameter actualParameter = (ActualParameter) inputs.get(paramName); - if ("URIArray".equals(actualParameter.getType().getType().toString()) || "StringArray".equals(actualParameter.getType().getType().toString()) - || "FileArray".equals(actualParameter.getType().getType().toString())) { - String[] values = null; - if (actualParameter.getType() instanceof URIArrayType) { - values = ((URIArrayType) actualParameter.getType()).getValueArray(); - } else if (actualParameter.getType() instanceof StringArrayType) { - values = ((StringArrayType) actualParameter.getType()).getValueArray(); - } else if (actualParameter.getType() instanceof FileArrayType) { - values = ((FileArrayType) actualParameter.getType()).getValueArray(); - } - String value = StringUtil.createDelimiteredString(values, " "); - inputValues.add(value); - } else { - String paramValue = MappingFactory.toString(actualParameter); - inputValues.add(paramValue); - } + InputDataObjectType inputDataObjectType = (InputDataObjectType) inputs.get(paramName); + inputValues.add(inputDataObjectType.getValue()); } jobDescriptor.setInputValues(inputValues); + jobDescriptor.setUserName(((GSISSHAbstractCluster) cluster).getServerInfo().getUserName()); + jobDescriptor.setShellName("/bin/bash"); + jobDescriptor.setAllEnvExport(true); + jobDescriptor.setMailOptions("n"); + jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName()); + + ComputationalResourceScheduling taskScheduling = taskData.getTaskScheduling(); + if (taskScheduling != null) { + int totalNodeCount = taskScheduling.getNodeCount(); + int totalCPUCount = taskScheduling.getTotalCPUCount(); + +// jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand()); + if (taskScheduling.getComputationalProjectAccount() != null) { + jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount()); + } + if (taskScheduling.getQueueName() != null) { + jobDescriptor.setQueueName(taskScheduling.getQueueName()); + } + if (totalNodeCount > 0) { + jobDescriptor.setNodes(totalNodeCount); + } + if (taskScheduling.getComputationalProjectAccount() != null) { + jobDescriptor.setAcountString(taskScheduling.getComputationalProjectAccount()); + } + if (taskScheduling.getQueueName() != null) { + jobDescriptor.setQueueName(taskScheduling.getQueueName()); + } + if (totalCPUCount > 0) { + int ppn = totalCPUCount / totalNodeCount; + jobDescriptor.setProcessesPerNode(ppn); + jobDescriptor.setCPUCount(totalCPUCount); + } + if (taskScheduling.getWallTimeLimit() > 0) { + jobDescriptor.setMaxWallTime(String.valueOf(taskScheduling.getWallTimeLimit())); + } + } else { + logger.error("Task scheduling cannot be null at this point.."); + } return jobDescriptor; } http://git-wip-us.apache.org/repos/asf/airavata/blob/d3d04cd1/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java index cacbc37..5341623 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java @@ -49,7 +49,7 @@ public class GFACEmbeddedJobSubmitter implements JobSubmitter { public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException { this.orchestratorContext = orchestratorContext; - gfac = new BetterGfacImpl(orchestratorContext.getNewRegistry(), null, new MonitorPublisher(new EventBus())); + gfac = new BetterGfacImpl(orchestratorContext.getNewRegistry(),null, null, new MonitorPublisher(new EventBus())); } public GFACInstance selectGFACInstance() throws OrchestratorException {
