Repository: airavata Updated Branches: refs/heads/master cf6304c6e -> 7e7e3a969
Add task creation logic for unicore jobs Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6d320248 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6d320248 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6d320248 Branch: refs/heads/master Commit: 6d320248f7a0c12b651e55aa520503ca08a04d2f Parents: 8496ae6 Author: Shameera Rathnayaka <[email protected]> Authored: Mon Mar 21 15:45:38 2016 -0400 Committer: Shameera Rathnayaka <[email protected]> Committed: Mon Mar 21 15:45:38 2016 -0400 ---------------------------------------------------------------------- .../cpi/impl/SimpleOrchestratorImpl.java | 60 +++++++++++--------- 1 file changed, 34 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/6d320248/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java index ca7be48..427be5b 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java @@ -283,36 +283,43 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ throw new OrchestratorException("Compute Resource Id cannot be null at this point"); } ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(resourceHostId); - + JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId); List<String> taskIdList = createAndSaveEnvSetupTask(gatewayId, processModel, experimentCatalog); - taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId)); - JobSubmissionInterface preferredJobSubmissionInterface = OrchestratorUtils.getPreferredJobSubmissionInterface(orchestratorContext, processModel, gatewayId); - if (autoSchedule) { - List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues(); - for (BatchQueue batchQueue : definedBatchQueues) { - if (batchQueue.getQueueName().equals(userGivenQueueName)) { - int maxRunTime = batchQueue.getMaxRunTime(); - if (maxRunTime < userGivenWallTime) { - resourceSchedule.setWallTimeLimit(maxRunTime); - // need to create more job submissions - int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime)); - for (int i = 1; i <= numOfMaxWallTimeJobs; i++) { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime)); - } - int leftWallTime = userGivenWallTime % maxRunTime; - if (leftWallTime != 0) { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime)); + ComputeResourcePreference resourcePreference = OrchestratorUtils.getComputeResourcePreference(orchestratorContext, processModel, gatewayId); + if (resourcePreference.getPreferredJobSubmissionProtocol() == JobSubmissionProtocol.UNICORE) { + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId, preferredJobSubmissionInterface, processModel, userGivenWallTime)); + } else { + taskIdList.addAll(createAndSaveInputDataStagingTasks(processModel, gatewayId)); + + if (autoSchedule) { + List<BatchQueue> definedBatchQueues = computeResource.getBatchQueues(); + for (BatchQueue batchQueue : definedBatchQueues) { + if (batchQueue.getQueueName().equals(userGivenQueueName)) { + int maxRunTime = batchQueue.getMaxRunTime(); + if (maxRunTime < userGivenWallTime) { + resourceSchedule.setWallTimeLimit(maxRunTime); + // need to create more job submissions + int numOfMaxWallTimeJobs = ((int) Math.floor(userGivenWallTime / maxRunTime)); + for (int i = 1; i <= numOfMaxWallTimeJobs; i++) { + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, maxRunTime)); + } + int leftWallTime = userGivenWallTime % maxRunTime; + if (leftWallTime != 0) { + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, leftWallTime)); + } + } else { + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime)); } - } else { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime)); } } + } else { + taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime)); } - } else { - taskIdList.addAll(createAndSaveSubmissionTasks(gatewayId,preferredJobSubmissionInterface, processModel, userGivenWallTime)); + + taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId)); } - taskIdList.addAll(createAndSaveOutputDataStagingTasks(processModel, gatewayId)); + // update process scheduling experimentCatalog.update(ExperimentCatalogModelType.PROCESS, processModel, processModel.getProcessId()); return getTaskDag(taskIdList); @@ -466,7 +473,9 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ if (jobSubmissionProtocol == JobSubmissionProtocol.SSH || jobSubmissionProtocol == JobSubmissionProtocol.SSH_FORK) { SSHJobSubmission sshJobSubmission = OrchestratorUtils.getSSHJobSubmission(orchestratorContext, jobSubmissionInterface.getJobSubmissionInterfaceId()); monitorMode = sshJobSubmission.getMonitorMode(); - }else { + } else if (jobSubmissionProtocol == JobSubmissionProtocol.UNICORE) { + monitorMode = MonitorMode.FORK; + } else { logger.error("expId : {}, processId : {} :- Unsupported Job submission protocol {}.", processModel.getExperimentId(), processModel.getProcessId(), jobSubmissionProtocol.name()); throw new OrchestratorException("Unsupported Job Submission Protocol " + jobSubmissionProtocol.name()); @@ -482,8 +491,7 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{ taskModel.setTaskType(TaskTypes.JOB_SUBMISSION); JobSubmissionTaskModel submissionSubTask = new JobSubmissionTaskModel(); submissionSubTask.setMonitorMode(monitorMode); - submissionSubTask.setJobSubmissionProtocol( - OrchestratorUtils.getPreferredJobSubmissionProtocol(orchestratorContext, processModel, gatewayId)); + submissionSubTask.setJobSubmissionProtocol(jobSubmissionProtocol); submissionSubTask.setWallTime(wallTime); byte[] bytes = ThriftUtils.serializeThriftObject(submissionSubTask); taskModel.setSubTaskModel(bytes);
