Changing gfac-core to use app catalog
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a1e0ec81 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a1e0ec81 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a1e0ec81 Branch: refs/heads/gfac_appcatalog_int Commit: a1e0ec813969000d755e47ee77c2be5fbb401f2b Parents: 8abe8dc Author: chathuriw <[email protected]> Authored: Thu Oct 30 10:11:19 2014 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Wed Nov 5 11:16:14 2014 -0500 ---------------------------------------------------------------------- .../org/apache/airavata/gfac/Scheduler.java | 35 ++++----- .../gfac/core/context/JobExecutionContext.java | 21 ++++++ .../airavata/gfac/core/cpi/BetterGfacImpl.java | 47 ++++++++++++ .../core/handler/AppDescriptorCheckHandler.java | 17 ----- .../airavata/gfac/core/monitor/MonitorID.java | 1 + .../apache/airavata/job/GFacConfigXmlTest.java | 78 ++++++++++++++++---- 6 files changed, 148 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/a1e0ec81/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java index 9b70fae..8f5847f 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/Scheduler.java @@ -21,30 +21,26 @@ package org.apache.airavata.gfac; -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.xpath.XPathExpressionException; - -import org.apache.airavata.commons.gfac.type.ApplicationDescription; import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.provider.GFacProvider; import org.apache.airavata.gfac.core.provider.GFacProviderConfig; import org.apache.airavata.gfac.core.provider.GFacProviderException; -import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.xml.sax.SAXException; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.xpath.XPathExpressionException; +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.List; + /** * Scheduler decides the execution order of handlers based on application description. In addition @@ -76,7 +72,6 @@ public class Scheduler { * @return GFacProvider instance. */ private static GFacProvider getProvider(JobExecutionContext jobExecutionContext) throws GFacException { - ComputeResourceDescription hostDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription(); String applicationName = jobExecutionContext.getServiceName(); URL resource = Scheduler.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); @@ -113,8 +108,8 @@ public class Scheduler { // We give higher preference to applications specific provider if configured if (provider == null) { - jobExecutionContext.getApplicationContext().getComputeResourcePreference().getPreferredJobSubmissionProtocol() - String hostClass = hostDescription.getType().getClass().getName(); + List<JobSubmissionInterface> jobSubmissionInterfaces = jobExecutionContext.getApplicationContext().getComputeResourceDescription().getJobSubmissionInterfaces(); + String hostClass = jobExecutionContext.getPrefferedJobSubmissionProtocal(); providerClassName = GFacConfiguration.getAttributeValue(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_HOST + hostClass + "']", Constants.GFAC_CONFIG_CLASS_ATTRIBUTE); Class<? extends GFacProvider> aClass1 = Class.forName(providerClassName).asSubclass(GFacProvider.class); provider = aClass1.newInstance(); @@ -144,9 +139,7 @@ public class Scheduler { return provider; } public static ExecutionMode getExecutionMode(JobExecutionContext jobExecutionContext)throws GFacException{ - HostDescription hostDescription = jobExecutionContext.getApplicationContext().getHostDescription(); - String applicationName = jobExecutionContext.getServiceName(); - + String applicationName = jobExecutionContext.getApplicationContext().getApplicationInterfaceDescription().getApplicationName(); URL resource = Scheduler.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder docBuilder = null; @@ -169,7 +162,7 @@ public class Scheduler { // This should be have a single element only. if (executionMode == null || "".equals(executionMode)) { - String hostClass = hostDescription.getType().getClass().getName(); + String hostClass = jobExecutionContext.getPrefferedJobSubmissionProtocal(); executionMode = GFacConfiguration.getAttributeValue(GFacConfiguration.getHandlerDoc(), Constants.XPATH_EXPR_PROVIDER_ON_HOST + hostClass + "']", Constants.GFAC_CONFIG_EXECUTION_MODE_ATTRIBUTE); } } catch (XPathExpressionException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/a1e0ec81/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java index da716c5..2b2255f 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java @@ -52,12 +52,14 @@ public class JobExecutionContext extends AbstractContext implements Serializable private GFacNotifier notifier; + //FIXME : not needed for gfac private Experiment experiment; private TaskDetails taskData; private JobDetails jobDetails; + // FIXME : not needed for gfac private WorkflowNodeDetails workflowNodeDetails; private GFac gfac; @@ -72,6 +74,9 @@ public class JobExecutionContext extends AbstractContext implements Serializable private String outputDir; private String standaredOutput; private String standaredError; + private String prefferedJobSubmissionProtocal; + private String prefferedDataMovementProtocal; + // private ContextHeaderDocument.ContextHeader contextHeader; @@ -364,4 +369,20 @@ public class JobExecutionContext extends AbstractContext implements Serializable public void setStandaredError(String standaredError) { this.standaredError = standaredError; } + + public String getPrefferedJobSubmissionProtocal() { + return prefferedJobSubmissionProtocal; + } + + public void setPrefferedJobSubmissionProtocal(String prefferedJobSubmissionProtocal) { + this.prefferedJobSubmissionProtocal = prefferedJobSubmissionProtocal; + } + + public String getPrefferedDataMovementProtocal() { + return prefferedDataMovementProtocal; + } + + public void setPrefferedDataMovementProtocal(String prefferedDataMovementProtocal) { + this.prefferedDataMovementProtocal = prefferedDataMovementProtocal; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/a1e0ec81/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 16c49e6..fd43c65 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -52,6 +52,7 @@ import org.apache.airavata.messaging.core.PublisherFactory; import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; import org.apache.airavata.model.messaging.event.*; import org.apache.airavata.model.workspace.experiment.*; @@ -298,6 +299,52 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.setGfac(this); jobExecutionContext.setZk(zk); jobExecutionContext.setCredentialStoreToken(AiravataZKUtils.getExpTokenId(zk, experimentID, taskID)); + if (gatewayResourcePreferences != null ) { + if (gatewayResourcePreferences.getScratchLocation() == null) { + gatewayResourcePreferences.setScratchLocation("/tmp"); + } + + /** + * Working dir + */ + String workingDir = gatewayResourcePreferences.getScratchLocation() + File.separator + jobExecutionContext.getExperimentID(); + jobExecutionContext.setWorkingDir(workingDir); + + /* + * Input and Output Directory + */ + jobExecutionContext.setInputDir(workingDir + File.separator + Constants.INPUT_DATA_DIR_VAR_NAME); + jobExecutionContext.setOutputDir(workingDir + File.separator + Constants.OUTPUT_DATA_DIR_VAR_NAME); + + /* + * Stdout and Stderr for Shell + */ + jobExecutionContext.setStandaredOutput(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout"); + jobExecutionContext.setStandaredError(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr"); + } + + List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces(); + String preferredJobSubmissionProtocol = gatewayResourcePreferences.getPreferredJobSubmissionProtocol(); + String hostClass; + if (preferredJobSubmissionProtocol != null){ + hostClass = preferredJobSubmissionProtocol; + }else { + if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){ + int lowestPriority = jobSubmissionInterfaces.get(0).getPriorityOrder(); + String selectedHost = null; + for (int i = 0; i < jobSubmissionInterfaces.size() - 1; i++){ + if (jobSubmissionInterfaces.get(i+1).getPriorityOrder() < lowestPriority ){ + lowestPriority = jobSubmissionInterfaces.get(i+1).getPriorityOrder(); + selectedHost = jobSubmissionInterfaces.get(i+1).getJobSubmissionProtocol().toString(); + } + } + hostClass = selectedHost; + }else { + throw new GFacException("Compute resource should have atleast one job submission interface defined..."); + } + } + jobExecutionContext.setPrefferedJobSubmissionProtocal(hostClass); + return jobExecutionContext; } http://git-wip-us.apache.org/repos/asf/airavata/blob/a1e0ec81/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java index 676a15a..4627bf5 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java @@ -20,16 +20,13 @@ */ package org.apache.airavata.gfac.core.handler; -import org.apache.airavata.gfac.Constants; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.states.GfacPluginState; import org.apache.airavata.gfac.core.utils.GFacUtils; -import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.util.Properties; public class AppDescriptorCheckHandler implements GFacRecoverableHandler { @@ -43,33 +40,19 @@ public class AppDescriptorCheckHandler implements GFacRecoverableHandler { logger.info("Error saving plugin status to ZK"); } StringBuffer data = new StringBuffer(); - ApplicationInterfaceDescription appInterface = jobExecutionContext.getApplicationContext().getApplicationInterfaceDescription(); ComputeResourcePreference computeResourcePreference = jobExecutionContext.getApplicationContext().getComputeResourcePreference(); - if (computeResourcePreference.getScratchLocation() == null) { - computeResourcePreference.setScratchLocation("/tmp"); - } - /* - * Working dir - */ - - String workingDir = computeResourcePreference.getScratchLocation() + File.separator+ jobExecutionContext.getExperimentID(); - jobExecutionContext.setWorkingDir(workingDir); data.append(computeResourcePreference.getScratchLocation()); data.append(",").append(jobExecutionContext.getWorkingDir()); /* * Input and Output Directory */ - jobExecutionContext.setInputDir(workingDir + File.separator + Constants.INPUT_DATA_DIR_VAR_NAME ); - jobExecutionContext.setOutputDir(workingDir + File.separator + Constants.OUTPUT_DATA_DIR_VAR_NAME); data.append(",").append(jobExecutionContext.getInputDir()).append(",").append(jobExecutionContext.getOutputDir()); /* * Stdout and Stderr for Shell */ - jobExecutionContext.setStandaredOutput(workingDir + File.separator + appInterface.getApplicationName().replaceAll("\\s+", "") + ".stdout"); - jobExecutionContext.setStandaredError(workingDir + File.separator + appInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr"); data.append(",").append(jobExecutionContext.getStandaredOutput()).append(",").append(jobExecutionContext.getStandaredError()); http://git-wip-us.apache.org/repos/asf/airavata/blob/a1e0ec81/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java index fa4ecd2..6ea1839 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java @@ -24,6 +24,7 @@ import org.apache.airavata.common.logger.AiravataLogger; import org.apache.airavata.common.logger.AiravataLoggerFactory; import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.workspace.experiment.JobState; import java.sql.Timestamp; http://git-wip-us.apache.org/repos/asf/airavata/blob/a1e0ec81/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java index e32bd9b..7e6bc0d 100644 --- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java +++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/GFacConfigXmlTest.java @@ -21,6 +21,9 @@ package org.apache.airavata.job; import junit.framework.Assert; +import org.airavata.appcatalog.cpi.AppCatalog; +import org.airavata.appcatalog.cpi.AppCatalogException; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gfac.ExecutionMode; import org.apache.airavata.gfac.GFacConfiguration; @@ -29,6 +32,7 @@ import org.apache.airavata.gfac.Scheduler; import org.apache.airavata.gfac.core.context.ApplicationContext; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.cpi.BetterGfacImpl; +import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.schemas.gfac.GsisshHostType; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -53,12 +57,34 @@ public class GFacConfigXmlTest { try { JobExecutionContext jec = new JobExecutionContext(GFacConfiguration.create(gfac.getGfacConfigFile(), null), "testService"); ApplicationContext applicationContext = new ApplicationContext(); - HostDescription host = new HostDescription(GsisshHostType.type); - host.getType().setHostAddress("trestles.sdsc.edu"); - host.getType().setHostName("trestles"); - ((GsisshHostType) host.getType()).setPort(22); - ((GsisshHostType) host.getType()).setInstalledPath("/opt/torque/bin/"); - applicationContext.setHostDescription(host); + ComputeResourceDescription computeResourceDescription = new ComputeResourceDescription(); + computeResourceDescription.setHostName("trestles.sdsc.xsede.org"); + computeResourceDescription.setResourceDescription("SDSC Trestles Cluster"); + + AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + + ResourceJobManager resourceJobManager = new ResourceJobManager(); + resourceJobManager.setResourceJobManagerType(ResourceJobManagerType.PBS); + resourceJobManager.setPushMonitoringEndpoint("push"); + resourceJobManager.setJobManagerBinPath("/opt/torque/bin/"); + + SSHJobSubmission sshJobSubmission = new SSHJobSubmission(); + sshJobSubmission.setResourceJobManager(resourceJobManager); + sshJobSubmission.setSecurityProtocol(SecurityProtocol.GSI); + sshJobSubmission.setSshPort(22); + sshJobSubmission.setResourceJobManager(resourceJobManager); + + String jobSubmissionId = appCatalog.getComputeResource().addSSHJobSubmission(sshJobSubmission); + + JobSubmissionInterface submissionInterface = new JobSubmissionInterface(); + submissionInterface.setJobSubmissionInterfaceId(jobSubmissionId); + submissionInterface.setJobSubmissionProtocol(JobSubmissionProtocol.SSH); + submissionInterface.setPriorityOrder(0); + + computeResourceDescription.addToJobSubmissionInterfaces(submissionInterface); + + appCatalog.getComputeResource().addComputeResource(computeResourceDescription); + applicationContext.setComputeResourceDescription(computeResourceDescription); jec.setApplicationContext(applicationContext); Scheduler.schedule(jec); Assert.assertEquals(ExecutionMode.ASYNCHRONOUS, jec.getGFacConfiguration().getExecutionMode()); @@ -73,6 +99,8 @@ public class GFacConfigXmlTest { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } catch (GFacException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (AppCatalogException e) { + e.printStackTrace(); } } @Test @@ -82,12 +110,34 @@ public class GFacConfigXmlTest { try { JobExecutionContext jec = new JobExecutionContext(GFacConfiguration.create(gfac.getGfacConfigFile(), null), "UltraScan"); ApplicationContext applicationContext = new ApplicationContext(); - HostDescription host = new HostDescription(GsisshHostType.type); - host.getType().setHostAddress("trestles.sdsc.edu"); - host.getType().setHostName("trestles"); - ((GsisshHostType) host.getType()).setPort(22); - ((GsisshHostType) host.getType()).setInstalledPath("/opt/torque/bin/"); - applicationContext.setHostDescription(host); + ComputeResourceDescription computeResourceDescription = new ComputeResourceDescription(); + computeResourceDescription.setHostName("trestles.sdsc.xsede.org"); + computeResourceDescription.setResourceDescription("SDSC Trestles Cluster"); + + AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + + ResourceJobManager resourceJobManager = new ResourceJobManager(); + resourceJobManager.setResourceJobManagerType(ResourceJobManagerType.PBS); + resourceJobManager.setPushMonitoringEndpoint("push"); + resourceJobManager.setJobManagerBinPath("/opt/torque/bin/"); + + SSHJobSubmission sshJobSubmission = new SSHJobSubmission(); + sshJobSubmission.setResourceJobManager(resourceJobManager); + sshJobSubmission.setSecurityProtocol(SecurityProtocol.GSI); + sshJobSubmission.setSshPort(22); + sshJobSubmission.setResourceJobManager(resourceJobManager); + + String jobSubmissionId = appCatalog.getComputeResource().addSSHJobSubmission(sshJobSubmission); + + JobSubmissionInterface submissionInterface = new JobSubmissionInterface(); + submissionInterface.setJobSubmissionInterfaceId(jobSubmissionId); + submissionInterface.setJobSubmissionProtocol(JobSubmissionProtocol.SSH); + submissionInterface.setPriorityOrder(0); + + computeResourceDescription.addToJobSubmissionInterfaces(submissionInterface); + + appCatalog.getComputeResource().addComputeResource(computeResourceDescription); + applicationContext.setComputeResourceDescription(computeResourceDescription); jec.setApplicationContext(applicationContext); Scheduler.schedule(jec); Assert.assertEquals(3, jec.getGFacConfiguration().getInHandlers().size()); @@ -106,8 +156,10 @@ public class GFacConfigXmlTest { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } catch (GFacException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (AppCatalogException e) { + e.printStackTrace(); } - } + } }
