Repository: airavata Updated Branches: refs/heads/master b4e3c33fd -> 85fb6b694
check monitor mode before create monitoring task Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/687d8126 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/687d8126 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/687d8126 Branch: refs/heads/master Commit: 687d812617a0fb236a7c44a2a894d71c641e51d6 Parents: 5eb3c26 Author: Shameera Rathnayaka <[email protected]> Authored: Fri Nov 6 17:25:22 2015 -0500 Committer: Shameera Rathnayaka <[email protected]> Committed: Fri Nov 6 17:25:22 2015 -0500 ---------------------------------------------------------------------- .../cpi/impl/SimpleOrchestratorImpl.java | 61 ++++++++++++-------- .../server/OrchestratorServerHandler.java | 4 +- 2 files changed, 39 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/687d8126/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index 1e2ad58..ee4d2c6 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -20,14 +20,11 @@ */ package org.apache.airavata.orchestrator.cpi.impl; -import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataUtils; -import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.ThriftUtils; import org.apache.airavata.gfac.core.task.TaskException; import org.apache.airavata.model.appcatalog.computeresource.*; import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference; -import org.apache.airavata.model.appcatalog.gatewayprofile.DataStoragePreference; import org.apache.airavata.model.application.io.DataType; import org.apache.airavata.model.application.io.InputDataObjectType; import org.apache.airavata.model.application.io.OutputDataObjectType; @@ -288,6 +285,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ List<String> taskIdList = createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog); taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId)); + JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId); if (autoSchedule) { List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues(); for (BatchQueue batchQueue : definedBatchQueues) { @@ -298,19 +296,19 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ // need to create more job submissions int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime)); for (int i = 1; i <= numOfMaxWallTimeJobs; i++) { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, processModel, maxRunTime)); + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime)); } int leftWallTime = userGivenWallTime % maxRunTime; if (leftWallTime != 0) { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, processModel, leftWallTime)); + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime)); } } else { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, processModel, userGivenWallTime)); + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime)); } } } } else { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, processModel, userGivenWallTime)); + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime)); } taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId)); // update process scheduling @@ -442,8 +440,19 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ return dataStagingTaskIds; } - private List<String> createAndSaveSubmissionTasks(String gatewayId, ProcessModel processModel, int wallTime) - throws TException, RegistryException { + private List<String> createAndSaveSubmissionTasks(String gatewayId, JobSubmissionInterface jobSubmissionInterface, ProcessModel processModel, int wallTime) + throws TException, RegistryException, OrchestratorException { + + JobSubmissionProtocol jobSubmissionProtocol = jobSubmissionInterface.getJobSubmissionProtocol(); + MonitorMode monitorMode = null; + if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) { + SSHJobSubmission sshJobSubmission = OrchestratorUtils.getSSHJobSubmission(orchestratorContext, jobSubmissionInterface.getJobSubmissionInterfaceId()); + monitorMode = sshJobSubmission.getMonitorMode(); + }else { + logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.", + processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name()); + throw new OrchestratorException("Unsupported Job Submission Protocol " + jobSubmissionProtocol.name()); + } List<String> submissionTaskIds = new ArrayList<>(); TaskModel taskModel = new TaskModel(); taskModel.setParentProcessId(processModel.getProcessId()); @@ -454,7 +463,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ taskModel.setTaskStatus(taskStatus); taskModel.setTaskType(TaskTypes.JOB_SUBMISSION); JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel(); - submissionSubTask.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR); + submissionSubTask.setMonitorMode(monitorMode); submissionSubTask.setJobSubmissionProtocol( OrchestratorUtils.getPreferredJobSubmissionProtocol(orchestratorContext, processModel, gatewayId)); submissionSubTask.setWallTime(wallTime); @@ -465,21 +474,23 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ taskModel.setTaskId(taskId); submissionTaskIds.add(taskModel.getTaskId()); - // create monitor task for this job - TaskModel monitorTaskModel = new TaskModel(); - monitorTaskModel.setParentProcessId(processModel.getProcessId()); - monitorTaskModel.setCreationTime(new Date().getTime()); - monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime()); - TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED); - monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); - monitorTaskModel.setTaskStatus(monitorTaskStatus); - monitorTaskModel.setTaskType(TaskTypes.MONITORING); - MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel(); - monitorSubTaskModel.setMonitorMode(MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR); - monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel)); - String mTaskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, monitorTaskModel, processModel.getProcessId()); - monitorTaskModel.setTaskId(mTaskId); - submissionTaskIds.add(monitorTaskModel.getTaskId()); + // create monitor task for this Email based monitor mode job + if (monitorMode == MonitorMode.JOB_EMAIL_NOTIFICATION_MONITOR) { + TaskModel monitorTaskModel = new TaskModel(); + monitorTaskModel.setParentProcessId(processModel.getProcessId()); + monitorTaskModel.setCreationTime(new Date().getTime()); + monitorTaskModel.setLastUpdateTime(monitorTaskModel.getCreationTime()); + TaskStatus monitorTaskStatus = new TaskStatus(TaskState.CREATED); + monitorTaskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime()); + monitorTaskModel.setTaskStatus(monitorTaskStatus); + monitorTaskModel.setTaskType(TaskTypes.MONITORING); + MonitorTaskModel monitorSubTaskModel = new MonitorTaskModel(); + monitorSubTaskModel.setMonitorMode(monitorMode); + monitorTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(monitorSubTaskModel)); + String mTaskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, monitorTaskModel, processModel.getProcessId()); + monitorTaskModel.setTaskId(mTaskId); + submissionTaskIds.add(monitorTaskModel.getTaskId()); + } return submissionTaskIds; } http://git-wip-us.apache.org/repos/asf/airavata/blob/687d8126/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index f75c91e..c1e9d65 100644 --- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -154,7 +154,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface { log.error("Validating process fails for given experiment Id : {}", experimentId); return false; } - ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile().getComputeResourcePreference(gatewayId, experiment.getUserConfigurationData().getComputationalResourceScheduling().getResourceHostId()); + ComputeResourcePreference computeResourcePreference = appCatalog.getGatewayProfile(). + getComputeResourcePreference(gatewayId, + experiment.getUserConfigurationData().getComputationalResourceScheduling().getResourceHostId()); String token = computeResourcePreference.getResourceSpecificCredentialStoreToken(); if (token == null || token.isEmpty()){ // try with gateway profile level token
