Repository: airavata Updated Branches: refs/heads/master 0599edf0b -> 6b90e6427
Fixing Localprovider support Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6b90e642 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6b90e642 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6b90e642 Branch: refs/heads/master Commit: 6b90e6427b9093ead5baf0cda3f599fa1aad363f Parents: 0599edf Author: lahiru <lah...@apache.org> Authored: Thu Mar 20 13:00:04 2014 -0400 Committer: lahiru <lah...@apache.org> Committed: Thu Mar 20 13:00:04 2014 -0400 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 47 +++++++++- .../main/resources/airavata-client.properties | 8 +- .../airavata/client/tools/DocumentCreator.java | 5 +- .../org/apache/airavata/gfac/cpi/GFacImpl.java | 18 ++-- .../gfac/provider/impl/AbstractProvider.java | 6 +- .../gfac/provider/impl/GSISSHProvider.java | 68 +------------- .../gfac/provider/impl/LocalProvider.java | 25 ++--- .../apache/airavata/gfac/utils/GFacUtils.java | 96 ++++++++++++++++++-- .../server/OrchestratorServerHandler.java | 6 +- .../job/monitor/AiravataJobStatusUpdator.java | 8 +- .../airavata/job/monitor/MonitorManager.java | 29 +++++- .../job/monitor/impl/LocalJobMonitor.java | 58 ++++++++++++ .../airavata/job/monitor/state/JobStatus.java | 9 ++ 13 files changed, 265 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index efcf4a0..7c20dff 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -49,7 +49,7 @@ import java.util.Set; public class CreateLaunchExperiment { //FIXME: Read from a config file - public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org"; + public static final String THRIFT_SERVER_HOST = "localhost"; public static final int THRIFT_SERVER_PORT = 8930; private final static Logger logger = LoggerFactory.getLogger(CreateLaunchExperiment.class); private static final String DEFAULT_USER = "defauly.registry.user"; @@ -61,7 +61,7 @@ public class CreateLaunchExperiment { final Airavata.Client airavata = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT); System.out.println("API version is " + airavata.GetAPIVersion()); addDescriptors(); - final String expId = createExperimentForTrestles(airavata); + final String expId = createExperimentForLocalHost(airavata); // final String expId = createUS3ExperimentForTrestles(airavata); // final String expId = createExperimentForStampede(airavata); System.out.println("Experiment ID : " + expId); @@ -194,7 +194,48 @@ public class CreateLaunchExperiment { throw new TException(e); } } - + public static String createExperimentForLocalHost(Airavata.Client client) throws TException { + try{ + List<DataObjectType> exInputs = new ArrayList<DataObjectType>(); + DataObjectType input = new DataObjectType(); + input.setKey("echo_input"); + input.setType(DataType.STRING.toString()); + input.setValue("echo_output=Hello World"); + exInputs.add(input); + + List<DataObjectType> exOut = new ArrayList<DataObjectType>(); + DataObjectType output = new DataObjectType(); + output.setKey("echo_output"); + output.setType(DataType.STRING.toString()); + output.setValue(""); + exOut.add(output); + + Experiment simpleExperiment = + ExperimentModelUtil.createSimpleExperiment("project1", "admin", "echoExperiment", "SimpleEcho0", "SimpleEcho0", exInputs); + simpleExperiment.setExperimentOutputs(exOut); + + ComputationalResourceScheduling scheduling = ExperimentModelUtil.createComputationResourceScheduling("localhost", 1, 1, 1, "normal", 0, 0, 1, "sds128"); + scheduling.setResourceHostId("localhost"); + UserConfigurationData userConfigurationData = new UserConfigurationData(); + userConfigurationData.setAiravataAutoSchedule(false); + userConfigurationData.setOverrideManualScheduledParams(false); + userConfigurationData.setComputationalResourceScheduling(scheduling); + simpleExperiment.setUserConfigurationData(userConfigurationData); + return client.createExperiment(simpleExperiment); + } catch (AiravataSystemException e) { + logger.error("Error occured while creating the experiment...", e.getMessage()); + throw new AiravataSystemException(e); + } catch (InvalidRequestException e) { + logger.error("Error occured while creating the experiment...", e.getMessage()); + throw new InvalidRequestException(e); + } catch (AiravataClientException e) { + logger.error("Error occured while creating the experiment...", e.getMessage()); + throw new AiravataClientException(e); + }catch (TException e) { + logger.error("Error occured while creating the experiment...", e.getMessage()); + throw new TException(e); + } + } public static String createExperimentForStampede(Airavata.Client client) throws TException { try{ List<DataObjectType> exInputs = new ArrayList<DataObjectType>(); http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties b/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties index 83be989..9786a1a 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties @@ -34,12 +34,12 @@ class.registry.accessor=org.apache.airavata.persistance.registry.jpa.impl.Airava ########################Registry JPA Implementation Settings######################## #for mysql [AiravataJPARegistry] -registry.jdbc.driver=com.mysql.jdbc.Driver -registry.jdbc.url=jdbc:mysql://gw111.iu.xsede.org:3306/airavata +#registry.jdbc.driver=com.mysql.jdbc.Driver +#registry.jdbc.url=jdbc:mysql://gw111.iu.xsede.org:3306/airavata #for derby [AiravataJPARegistry] -#registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver -#registry.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata +registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver +registry.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata registry.jdbc.user=airavata registry.jdbc.password=airavata12 start.derby.server.mode=true http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java ---------------------------------------------------------------------- diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java index 7964291..9fbcbd2 100644 --- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java +++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java @@ -64,10 +64,11 @@ public class DocumentCreator { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } + String serviceName = "SimpleEcho0"; ServiceDescription serviceDescription = new ServiceDescription(); List<InputParameterType> inputParameters = new ArrayList<InputParameterType>(); List<OutputParameterType> outputParameters = new ArrayList<OutputParameterType>(); - serviceDescription.getType().setName("Echo"); + serviceDescription.getType().setName(serviceName); serviceDescription.getType().setDescription("Echo service"); // Creating input parameters InputParameterType parameter = InputParameterType.Factory.newInstance(); @@ -105,7 +106,7 @@ public class DocumentCreator { applicationDeploymentDescriptionType.setScratchWorkingDirectory("/tmp"); try { - airavataAPI.getApplicationManager().saveApplicationDescription("Echo", "localhost", applicationDeploymentDescription); + airavataAPI.getApplicationManager().saveApplicationDescription(serviceName, "localhost", applicationDeploymentDescription); } catch (AiravataAPIInvocationException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java index f82fb7f..f79561c 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java @@ -294,14 +294,16 @@ public class GFacImpl implements GFac { TaskDetails taskData = (TaskDetails) registry.get(DataType.TASK_DETAIL, taskID); JobDetails jobDetails = taskData.getJobDetailsList().get(0); String jobDescription = jobDetails.getJobDescription(); - JobDescriptor jobDescriptor = JobDescriptor.fromXML(jobDescription); - applicationDeploymentDescription.getType().setScratchWorkingDirectory( - jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory()); - applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory()); - applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory()); - applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile()); - applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile()); - } catch (Exception e) { + if(jobDescription != null) { + JobDescriptor jobDescriptor = JobDescriptor.fromXML(jobDescription); + applicationDeploymentDescription.getType().setScratchWorkingDirectory( + jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory()); + applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory()); + applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory()); + applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile()); + applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile()); + } + } catch (Exception e) { throw new GFacException(e); } http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java index e49c5dd..dbbcb62 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java @@ -30,9 +30,9 @@ import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.Registry; public abstract class AbstractProvider{ - protected Registry registry = null; - protected JobDetails details; - protected JobStatus status; + protected Registry registry = null; + protected JobDetails details; //todo we need to remove this and add methods to fill Job details, this is not a property of a provider + protected JobStatus status; //todo we need to remove this and add methods to fill Job details, this is not a property of a provider public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { registry = RegistryFactory.getDefaultRegistry(); http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java index e8c7ff4..c20447c 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java @@ -89,73 +89,7 @@ public class GSISSHProvider extends AbstractProvider implements GFacProvider{ log.info("Successfully retrieved the Security Context"); } // This installed path is a mandetory field, because this could change based on the computing resource - JobDescriptor jobDescriptor = new JobDescriptor(); - jobDescriptor.setWorkingDirectory(app.getStaticWorkingDirectory()); - jobDescriptor.setShellName("/bin/bash"); - Random random = new Random(); - int i = random.nextInt(); - jobDescriptor.setJobName(app.getApplicationName().getStringValue() + String.valueOf(i)); - jobDescriptor.setExecutablePath(app.getExecutableLocation()); - jobDescriptor.setAllEnvExport(true); - jobDescriptor.setMailOptions("n"); - jobDescriptor.setStandardOutFile(app.getStandardOutput()); - jobDescriptor.setStandardErrorFile(app.getStandardError()); - jobDescriptor.setNodes(app.getNodeCount()); - jobDescriptor.setProcessesPerNode(app.getProcessorsPerNode()); - jobDescriptor.setMaxWallTime(String.valueOf(app.getMaxWallTime())); - jobDescriptor.setJobSubmitter(app.getJobSubmitterCommand()); - if (app.getProjectAccount().getProjectAccountNumber() != null) { - jobDescriptor.setAcountString(app.getProjectAccount().getProjectAccountNumber()); - } - if (app.getQueue().getQueueName() != null) { - jobDescriptor.setQueueName(app.getQueue().getQueueName()); - } - jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName()); - jobDescriptor.setInputDirectory(app.getInputDataDirectory()); - jobDescriptor.setOutputDirectory(app.getOutputDataDirectory()); - TaskDetails taskData = jobExecutionContext.getTaskData(); - if(taskData != null && taskData.isSetTaskScheduling()){ - ComputationalResourceScheduling computionnalResource = taskData.getTaskScheduling(); - if(computionnalResource.getNodeCount() > 0){ - jobDescriptor.setNodes(computionnalResource.getNodeCount()); - } - if(computionnalResource.getComputationalProjectAccount() != null){ - jobDescriptor.setAcountString(computionnalResource.getComputationalProjectAccount()); - } - if(computionnalResource.getQueueName() != null){ - jobDescriptor.setQueueName(computionnalResource.getQueueName()); - } - if(computionnalResource.getTotalCPUCount() > 0){ - jobDescriptor.setProcessesPerNode(computionnalResource.getTotalCPUCount()); - } - if(computionnalResource.getWallTimeLimit() > 0){ - jobDescriptor.setMaxWallTime(String.valueOf(computionnalResource.getWallTimeLimit())); - } - } - List<String> inputValues = new ArrayList<String>(); - MessageContext input = jobExecutionContext.getInMessageContext(); - Map<String, Object> inputs = input.getParameters(); - Set<String> keys = inputs.keySet(); - for (String paramName : keys) { - ActualParameter actualParameter = (ActualParameter) inputs.get(paramName); - if ("URIArray".equals(actualParameter.getType().getType().toString()) || "StringArray".equals(actualParameter.getType().getType().toString()) - || "FileArray".equals(actualParameter.getType().getType().toString())) { - String[] values = null; - if (actualParameter.getType() instanceof URIArrayType) { - values = ((URIArrayType) actualParameter.getType()).getValueArray(); - } else if (actualParameter.getType() instanceof StringArrayType) { - values = ((StringArrayType) actualParameter.getType()).getValueArray(); - } else if (actualParameter.getType() instanceof FileArrayType) { - values = ((FileArrayType) actualParameter.getType()).getValueArray(); - } - String value = StringUtil.createDelimiteredString(values, " "); - inputValues.add(value); - } else { - String paramValue = MappingFactory.toString(actualParameter); - inputValues.add(paramValue); - } - } - jobDescriptor.setInputValues(inputValues); + JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(jobExecutionContext, app, cluster); log.info(jobDescriptor.toXML()); http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java index 3109080..e446614 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java @@ -41,6 +41,7 @@ import org.apache.airavata.gfac.utils.GFacUtils; import org.apache.airavata.gfac.utils.InputStreamToFileWriter; import org.apache.airavata.gfac.utils.InputUtils; import org.apache.airavata.gfac.utils.OutputUtils; +import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.registry.api.workflow.ApplicationJob; @@ -131,17 +132,14 @@ public class LocalProvider extends AbstractProvider implements GFacProvider{ try { jobId= jobExecutionContext.getTaskData().getTaskID(); jobDetails.setJobID(jobId); + jobDetails.setJobDescription(app.toString()); jobExecutionContext.setJobDetails(jobDetails); - details.setJobID(jobId); - GFacUtils.saveJobStatus(details, JobState.SETUP, jobExecutionContext.getTaskData().getTaskID()); + JobDescriptor jobDescriptor = GFacUtils.createJobDescriptor(jobExecutionContext, app, null); + jobDetails.setJobDescription(jobDescriptor.toXML()); + GFacUtils.saveJobStatus(jobDetails, JobState.SETUP, jobExecutionContext.getTaskData().getTaskID()); // running cmd Process process = builder.start(); - - //todo fix how to incoperate orchestrator with gfac -// if(jobExecutionContext.getGFacConfiguration().getAiravataAPI() != null){ -// saveApplicationJob(jobExecutionContext); -// } -// GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, ApplicationJobStatus.INITIALIZE); + Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), app.getStandardOutput()); Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), app.getStandardError()); @@ -150,11 +148,8 @@ public class LocalProvider extends AbstractProvider implements GFacProvider{ standardErrorWriter.setDaemon(true); standardOutWriter.start(); standardErrorWriter.start(); - GFacUtils.updateJobStatus(jobDetails, JobState.ACTIVE); - // wait for the process (application) to finish executing + int returnValue = process.waitFor(); - //todo fix how to incoperate orchestrator with gfac - GFacUtils.updateJobStatus(jobDetails, JobState.COMPLETE); // make sure other two threads are done standardOutWriter.join(); @@ -165,10 +160,8 @@ public class LocalProvider extends AbstractProvider implements GFacProvider{ * just provide warning in the log messages */ if (returnValue != 0) { -// GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, ApplicationJobStatus.FAILED); log.error("Process finished with non zero return value. Process may have failed"); } else { -// GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, ApplicationJobStatus.FINISHED); log.info("Process finished with return value of zero."); } @@ -177,10 +170,7 @@ public class LocalProvider extends AbstractProvider implements GFacProvider{ .append(" on the localHost, working directory = ").append(app.getStaticWorkingDirectory()) .append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ") .append(String.valueOf(returnValue)); - details.setJobDescription(buf.toString()); - GFacUtils.updateJobStatus(details, JobState.COMPLETE); log.info(buf.toString()); - } catch (IOException io) { throw new GFacProviderException(io.getMessage(), io); } catch (InterruptedException e) { @@ -228,7 +218,6 @@ public class LocalProvider extends AbstractProvider implements GFacProvider{ } } - @Override public void cancelJob(String jobId, JobExecutionContext jobExecutionContext) throws GFacException { throw new NotImplementedException(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java index 6188de3..b4210b5 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java @@ -35,17 +35,15 @@ import org.apache.airavata.client.api.AiravataAPI; import org.apache.airavata.client.api.exception.AiravataAPIInvocationException; import org.apache.airavata.common.utils.StringUtil; import org.apache.airavata.commons.gfac.type.ActualParameter; +import org.apache.airavata.commons.gfac.type.MappingFactory; import org.apache.airavata.gfac.Constants; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.context.JobExecutionContext; -import org.apache.airavata.model.workspace.experiment.ActionableGroup; -import org.apache.airavata.model.workspace.experiment.CorrectiveAction; -import org.apache.airavata.model.workspace.experiment.DataObjectType; -import org.apache.airavata.model.workspace.experiment.ErrorCategory; -import org.apache.airavata.model.workspace.experiment.ErrorDetails; -import org.apache.airavata.model.workspace.experiment.JobDetails; -import org.apache.airavata.model.workspace.experiment.JobState; -import org.apache.airavata.model.workspace.experiment.JobStatus; +import org.apache.airavata.gfac.context.MessageContext; +import org.apache.airavata.gsi.ssh.api.Cluster; +import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; +import org.apache.airavata.gsi.ssh.impl.PBSCluster; +import org.apache.airavata.model.workspace.experiment.*; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.api.workflow.ApplicationJob; import org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus; @@ -691,4 +689,86 @@ public class GFacUtils { } return stringObjectHashMap; } + + public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, + ApplicationDeploymentDescriptionType app, Cluster cluster) { + JobDescriptor jobDescriptor = new JobDescriptor(); + // this is common for any application descriptor + jobDescriptor.setInputDirectory(app.getInputDataDirectory()); + jobDescriptor.setOutputDirectory(app.getOutputDataDirectory()); + jobDescriptor.setExecutablePath(app.getExecutableLocation()); + jobDescriptor.setStandardOutFile(app.getStandardOutput()); + jobDescriptor.setStandardErrorFile(app.getStandardError()); + Random random = new Random(); + int i = random.nextInt(); + jobDescriptor.setJobName(app.getApplicationName().getStringValue() + String.valueOf(i)); + jobDescriptor.setWorkingDirectory(app.getStaticWorkingDirectory()); + + + List<String> inputValues = new ArrayList<String>(); + MessageContext input = jobExecutionContext.getInMessageContext(); + Map<String, Object> inputs = input.getParameters(); + Set<String> keys = inputs.keySet(); + for (String paramName : keys) { + ActualParameter actualParameter = (ActualParameter) inputs.get(paramName); + if ("URIArray".equals(actualParameter.getType().getType().toString()) || "StringArray".equals(actualParameter.getType().getType().toString()) + || "FileArray".equals(actualParameter.getType().getType().toString())) { + String[] values = null; + if (actualParameter.getType() instanceof URIArrayType) { + values = ((URIArrayType) actualParameter.getType()).getValueArray(); + } else if (actualParameter.getType() instanceof StringArrayType) { + values = ((StringArrayType) actualParameter.getType()).getValueArray(); + } else if (actualParameter.getType() instanceof FileArrayType) { + values = ((FileArrayType) actualParameter.getType()).getValueArray(); + } + String value = StringUtil.createDelimiteredString(values, " "); + inputValues.add(value); + } else { + String paramValue = MappingFactory.toString(actualParameter); + inputValues.add(paramValue); + } + } + jobDescriptor.setInputValues(inputValues); + + // this part will fill out the hpcApplicationDescriptor + if (app instanceof HpcApplicationDeploymentType) { + HpcApplicationDeploymentType applicationDeploymentType + = (HpcApplicationDeploymentType) app; + jobDescriptor.setShellName("/bin/bash"); + jobDescriptor.setAllEnvExport(true); + jobDescriptor.setMailOptions("n"); + jobDescriptor.setNodes(applicationDeploymentType.getNodeCount()); + jobDescriptor.setProcessesPerNode(applicationDeploymentType.getProcessorsPerNode()); + jobDescriptor.setMaxWallTime(String.valueOf(applicationDeploymentType.getMaxWallTime())); + jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand()); + if (applicationDeploymentType.getProjectAccount().getProjectAccountNumber() != null) { + jobDescriptor.setAcountString(applicationDeploymentType.getProjectAccount().getProjectAccountNumber()); + } + if (applicationDeploymentType.getQueue().getQueueName() != null) { + jobDescriptor.setQueueName(applicationDeploymentType.getQueue().getQueueName()); + } + jobDescriptor.setOwner(((PBSCluster) cluster).getServerInfo().getUserName()); + TaskDetails taskData = jobExecutionContext.getTaskData(); + if (taskData != null && taskData.isSetTaskScheduling()) { + ComputationalResourceScheduling computionnalResource = taskData.getTaskScheduling(); + if (computionnalResource.getNodeCount() > 0) { + jobDescriptor.setNodes(computionnalResource.getNodeCount()); + } + if (computionnalResource.getComputationalProjectAccount() != null) { + jobDescriptor.setAcountString(computionnalResource.getComputationalProjectAccount()); + } + if (computionnalResource.getQueueName() != null) { + jobDescriptor.setQueueName(computionnalResource.getQueueName()); + } + if (computionnalResource.getTotalCPUCount() > 0) { + jobDescriptor.setProcessesPerNode(computionnalResource.getTotalCPUCount()); + } + if (computionnalResource.getWallTimeLimit() > 0) { + jobDescriptor.setMaxWallTime(String.valueOf(computionnalResource.getWallTimeLimit())); + } + } + + } + return jobDescriptor; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index 6d6d6f9..b00fe4f 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -32,6 +32,7 @@ import org.apache.airavata.job.monitor.core.Monitor; import org.apache.airavata.job.monitor.core.PullMonitor; import org.apache.airavata.job.monitor.core.PushMonitor; import org.apache.airavata.job.monitor.exception.AiravataMonitorException; +import org.apache.airavata.job.monitor.impl.LocalJobMonitor; import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor; import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor; import org.apache.airavata.model.workspace.experiment.Experiment; @@ -112,6 +113,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list); monitorManager.addAMQPMonitor((AMQPMonitor) monitor); } + } else if(monitor instanceof LocalJobMonitor){ + monitorManager.addLocalMonitor((LocalJobMonitor)monitor); } else { log.error("Wrong class is given to primary Monitor"); } @@ -191,7 +194,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { log.error("Job submission Failed, so we remove the job from monitoring"); }else{ - monitorManager.addAJobToMonitor(monitorID); + monitorManager.addAJobToMonitor(monitorID); } } } @@ -209,7 +212,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { this.monitorManager = monitorManager; } - @Override public boolean terminateExperiment(String experimentId) throws TException { return false; } http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java index 601cc27..b755e16 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java @@ -27,6 +27,7 @@ import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.CompositeIdentifier; +import org.apache.airavata.registry.cpi.DataType; import org.apache.airavata.registry.cpi.Registry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,13 +116,16 @@ public class AiravataJobStatusUpdator{ } } public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception { - JobDetails details = new JobDetails(); + CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID); + JobDetails details = (JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids); + if(details == null) { + details = new JobDetails(); + } org.apache.airavata.model.workspace.experiment.JobStatus status = new org.apache.airavata.model.workspace.experiment.JobStatus(); status.setJobState(state); status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis()); details.setJobStatus(status); details.setJobID(jobID); - CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID); airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.JOB_DETAIL, details, ids); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java index 3515a68..b819e2b 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java @@ -22,15 +22,19 @@ package org.apache.airavata.job.monitor; import com.google.common.eventbus.EventBus; import org.apache.airavata.common.utils.Constants; +import org.apache.airavata.job.monitor.core.Monitor; import org.apache.airavata.job.monitor.core.PullMonitor; import org.apache.airavata.job.monitor.core.PushMonitor; import org.apache.airavata.job.monitor.event.MonitorPublisher; import org.apache.airavata.job.monitor.exception.AiravataMonitorException; +import org.apache.airavata.job.monitor.impl.LocalJobMonitor; import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor; import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor; import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread; import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl; +import org.apache.airavata.schemas.gfac.GlobusHostType; import org.apache.airavata.schemas.gfac.GsisshHostType; +import org.apache.airavata.schemas.gfac.HostDescriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +66,8 @@ public class MonitorManager { private MonitorPublisher monitorPublisher; + private Monitor localJobMonitor; + /** * This will initialize the major monitoring system. */ @@ -71,6 +77,7 @@ public class MonitorManager { pullQueue = new LinkedBlockingQueue<MonitorID>(); pushQueue = new LinkedBlockingQueue<MonitorID>(); finishQueue = new LinkedBlockingQueue<MonitorID>(); + localJobQueue = new LinkedBlockingQueue<MonitorID>(); monitorPublisher = new MonitorPublisher(new EventBus()); registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), finishQueue)); } @@ -88,6 +95,19 @@ public class MonitorManager { addPushMonitor(monitor); } + + /** + * This can be use to add an empty AMQPMonitor object to the monitor system + * and tihs method will take care of the initialization + * todo may be we need to move this to some other class + * @param monitor + */ + public void addLocalMonitor(LocalJobMonitor monitor) { + monitor.setPublisher(this.getMonitorPublisher()); + monitor.setJobQueue(this.getLocalJobQueue()); + localJobMonitor = monitor; + } + /** * This can be used to adda a QstatMonitor and it will take care of * the initialization of QstatMonitor @@ -145,8 +165,11 @@ public class MonitorManager { } else if (Constants.PUSH.equals(host.getMonitorMode())) { pushQueue.add(monitorID); } + } else if(monitorID.getHost().getType() instanceof GlobusHostType){ + logger.error("Monitoring does not support GlubusHostType resources"); } else { - logger.error("We only support Gsissh host types currently"); + // we assume this is a type of localJobtype + localJobQueue.add(monitorID); } } @@ -163,6 +186,10 @@ public class MonitorManager { public void launchMonitor() throws AiravataMonitorException { //no push monitor is configured so we launch pull monitor int index = 0; + if(localJobMonitor != null){ + (new Thread(localJobMonitor)).start(); + } + for (PullMonitor monitor : pullMonitors) { (new Thread(monitor)).start(); } http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java new file mode 100644 index 0000000..c20eef2 --- /dev/null +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * +*/ +package org.apache.airavata.job.monitor.impl; + +import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.job.monitor.MonitorID; +import org.apache.airavata.job.monitor.core.AiravataAbstractMonitor; +import org.apache.airavata.job.monitor.state.JobStatus; +import org.apache.airavata.model.workspace.experiment.JobState; + +import java.util.concurrent.BlockingQueue; + +/** + * This monitor can be used to monitor a job which runs locally, + * Since its a local job job doesn't have states, once it get executed + * then the job starts running + */ +public class LocalJobMonitor extends AiravataAbstractMonitor { + // Though we have a qeuue here, it not going to be used in local jobs + BlockingQueue<MonitorID> jobQueue; + + public void run() { + do { + try { + MonitorID take = jobQueue.take(); + getPublisher().publish(new JobStatus(take, JobState.COMPLETE)); + } catch (Exception e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } while (!ServerSettings.isStopAllThreads()); + } + + public BlockingQueue<MonitorID> getJobQueue() { + return jobQueue; + } + + public void setJobQueue(BlockingQueue<MonitorID> jobQueue) { + this.jobQueue = jobQueue; + } +} http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java ---------------------------------------------------------------------- diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java index fe623fb..8f05124 100644 --- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java +++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java @@ -38,6 +38,15 @@ public class JobStatus { private MonitorID monitorID; + // this constructor can be used in Qstat monitor to handle errors + public JobStatus() { + } + + public JobStatus(MonitorID monitorID, JobState state) { + this.monitorID = monitorID; + this.state = state; + } + public JobState getState() { return state; }