Repository: airavata Updated Branches: refs/heads/develop fd9b64feb -> 253dee8cd
Make unicore works with local files Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/ef4329c5 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/ef4329c5 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/ef4329c5 Branch: refs/heads/develop Commit: ef4329c51c9d81c41c42c5dd77eb5ca34ef40675 Parents: 6c483bc Author: Shameera Rathnayaka <[email protected]> Authored: Fri Mar 25 14:11:07 2016 -0400 Committer: Shameera Rathnayaka <[email protected]> Committed: Fri Mar 25 14:11:07 2016 -0400 ---------------------------------------------------------------------- .../gfac/impl/task/BESJobSubmissionTask.java | 48 ++++++++------ .../impl/task/utils/bes/DataTransferrer.java | 70 +++++++++++--------- 2 files changed, 64 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/ef4329c5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java index 145cde9..678924f 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/BESJobSubmissionTask.java @@ -77,8 +77,9 @@ public class BESJobSubmissionTask implements JobSubmissionTask { public TaskStatus execute(TaskContext taskContext) { TaskStatus taskStatus = new TaskStatus(TaskState.CREATED); StorageClient sc = null; + // FIXME - use original output dir + taskContext.getParentProcessContext().setOutputDir(""); - //TODO - initialize securityContext secProperties try { if (secProperties == null) { secProperties = getSecurityConfig(taskContext.getParentProcessContext()); @@ -121,6 +122,8 @@ public class BESJobSubmissionTask implements JobSubmissionTask { dt.uploadLocalFiles(); JobModel jobDetails = new JobModel(); + jobDetails.setTaskId(taskContext.getTaskId()); + jobDetails.setProcessId(taskContext.getProcessId()); FactoryClient factory = new FactoryClient(eprt, secProperties); log.info(String.format("Activity Submitting to %s ... \n", @@ -143,6 +146,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { jobDetails.setJobDescription(activityEpr.toString()); jobDetails.setJobStatus(new JobStatus(JobState.SUBMITTED)); processContext.setJobModel(jobDetails); + GFacUtils.saveJobModel(processContext, jobDetails); GFacUtils.saveJobStatus(processContext, jobDetails); log.info(formatStatusMessage(activityEpr.getAddress() .getStringValue(), factory.getActivityStatus(activityEpr) @@ -164,7 +168,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { + "\n" + activityStatus.getFault().getFaultstring() + "\n EXITCODE: " + activityStatus.getExitCode(); - log.info(error); + log.error(error); JobState applicationJobStatus = JobState.FAILED; jobDetails.setJobStatus(new JobStatus(applicationJobStatus)); @@ -173,7 +177,7 @@ public class BESJobSubmissionTask implements JobSubmissionTask { //What if job is failed before execution and there are not stdouts generated yet? log.debug("Downloading any standard output and error files, if they were produced."); - dt.downloadStdOuts(); + dt.downloadRemoteFiles(); } else if (activityStatus.getState() == ActivityStateEnumeration.CANCELLED) { JobState applicationJobStatus = JobState.CANCELED; @@ -184,17 +188,19 @@ public class BESJobSubmissionTask implements JobSubmissionTask { } else if (activityStatus.getState() == ActivityStateEnumeration.FINISHED) { try { Thread.sleep(5000); - JobState applicationJobStatus = JobState.COMPLETE; - jobDetails.setJobStatus(new JobStatus(applicationJobStatus)); - GFacUtils.saveJobStatus(processContext, jobDetails); - - } catch (InterruptedException e) { - } - if (activityStatus.getExitCode() == 0) { - dt.downloadRemoteFiles(); - } else { - dt.downloadStdOuts(); + } catch (InterruptedException ignored) { } + JobState applicationJobStatus = JobState.COMPLETE; + jobDetails.setJobStatus(new JobStatus(applicationJobStatus)); + GFacUtils.saveJobStatus(processContext, jobDetails); + log.info("Job Id: {}, exit code: {}, exit status: {}", jobDetails.getJobId(), + activityStatus.getExitCode(), ActivityStateEnumeration.FINISHED.toString()); + +// if (activityStatus.getExitCode() == 0) { +// } else { +// dt.downloadStdOuts(); +// } + dt.downloadRemoteFiles(); } dt.publishFinalOutputs(); @@ -243,22 +249,22 @@ public class BESJobSubmissionTask implements JobSubmissionTask { FactoryClient factoryClient = new FactoryClient(factoryEpr, secProperties); JobState applicationJobStatus = null; - while ((factoryClient.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED) - && (factoryClient.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED) - && (factoryClient.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED) + ActivityStateEnumeration.Enum activityStatus = factoryClient.getActivityStatus(activityEpr); + while ((activityStatus != ActivityStateEnumeration.FINISHED) + && (activityStatus != ActivityStateEnumeration.FAILED) + && (activityStatus != ActivityStateEnumeration.CANCELLED) && (applicationJobStatus != JobState.COMPLETE)) { - ActivityStatusType activityStatus = getStatus(factoryClient, activityEpr); - applicationJobStatus = getApplicationJobStatus(activityStatus); - + ActivityStatusType activityStatusType = getStatus(factoryClient, activityEpr); + applicationJobStatus = getApplicationJobStatus(activityStatusType); sendNotification(processContext,processContext.getJobModel()); - // GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, // applicationJobStatus); try { Thread.sleep(5000); } catch (InterruptedException e) {} - continue; + + activityStatus = factoryClient.getActivityStatus(activityEpr); } } catch(Exception e) { log.error("Error monitoring job status.."); http://git-wip-us.apache.org/repos/asf/airavata/blob/ef4329c5/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java index 4a0cbbf..b4bf9ed 100644 --- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java +++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/utils/bes/DataTransferrer.java @@ -150,25 +150,32 @@ public class DataTransferrer { String stderrFileName = new File(stderrLocation).getName(); FileDownloader f1 = null; - try { - log.info("Downloading stdout and stderr.."); - log.info(stdoutFileName + " -> "+stdoutLocation); + log.info("Downloading stdout and stderr.."); + log.info(stdoutFileName + " -> " + stdoutLocation); - f1 = new FileDownloader(stdoutFileName,stdoutLocation, Mode.overwrite); + f1 = new FileDownloader(stdoutFileName, stdoutLocation, Mode.overwrite); + try { f1.perform(storageClient); - String stdoutput = readFile(stdoutLocation); +// String stdoutput = readFile(stdoutLocation); + } catch (Exception e) { + log.error("Error while downloading " + stdoutFileName + " to location " + stdoutLocation, e); + } - log.info(stderrFileName + " -> " + stderrLocation); - f1.setFrom(stderrFileName); - f1.setTo(stderrLocation); + log.info(stderrFileName + " -> " + stderrLocation); + f1.setFrom(stderrFileName); + f1.setTo(stderrLocation); + try { f1.perform(storageClient); - String stderror = readFile(stderrLocation); - - if(UASDataStagingProcessor.isUnicoreEndpoint(processContext)) { - String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE"; - String scriptCodeLocation = gatewayDownloadLocation+File.separator+scriptExitCodeFName; - f1.setFrom(scriptExitCodeFName); - f1.setTo(scriptCodeLocation); +// String stderror = readFile(stderrLocation); + } catch (Exception e) { + log.error("Error while downloading " + stderrFileName + " to location " + stderrLocation); + } + String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE"; + String scriptCodeLocation = gatewayDownloadLocation + File.separator + scriptExitCodeFName; + if (UASDataStagingProcessor.isUnicoreEndpoint(processContext)) { + f1.setFrom(scriptExitCodeFName); + f1.setTo(scriptCodeLocation); + try { f1.perform(storageClient); OutputDataObjectType output = new OutputDataObjectType(); output.setName(scriptExitCodeFName); @@ -176,13 +183,12 @@ public class DataTransferrer { output.setType(DataType.URI); output.setIsRequired(true); processContext.getProcessModel().getProcessOutputs().add(output); - log.info("UNICORE_SCRIPT_EXIT_CODE -> "+scriptCodeLocation); - log.info("EXIT CODE: "+ readFile(scriptCodeLocation)); + log.info("UNICORE_SCRIPT_EXIT_CODE -> " + scriptCodeLocation); + log.info("EXIT CODE: " + readFile(scriptCodeLocation)); + } catch (Exception e) { + log.error("Error downloading file " + scriptExitCodeFName + " to location " + scriptCodeLocation, e); } - } catch (Exception e) { - throw new GFacException(e.getLocalizedMessage(),e); } - } private String readFile(String localFile) throws IOException { @@ -253,7 +259,7 @@ public class DataTransferrer { public void downloadRemoteFiles() throws GFacException { if(log.isDebugEnabled()) { - log.debug("Download location is:"+gatewayDownloadLocation); + log.debug("Download location is:" + gatewayDownloadLocation); } List<OutputDataObjectType> applicationOutputs = processContext.getProcessModel().getProcessOutputs(); @@ -265,35 +271,33 @@ public class DataTransferrer { if(output.getType().equals(DataType.STDOUT)) { output.setValue(processContext.getStdoutLocation()); resultantOutputsLst.add(output); - } - - else if(output.getType().equals(DataType.STDERR)) { + } else if(output.getType().equals(DataType.STDERR)) { output.setValue(processContext.getStderrLocation()); resultantOutputsLst.add(output); - } - else if(output.getType().equals(DataType.STRING)) { + } else if (output.getType().equals(DataType.STRING)) { String value = null; - if(!output.getLocation().isEmpty()){ + if (!output.getLocation().isEmpty()) { value = output.getLocation() + File.separator + output.getValue(); - }else{ + } else { value = output.getValue(); } String outputPath = gatewayDownloadLocation + File.separator + output.getValue(); File f = new File(gatewayDownloadLocation); - if(!f.exists()) + if (!f.exists()) f.mkdirs(); - FileDownloader fileDownloader = new FileDownloader(value,outputPath, Mode.overwrite); + FileDownloader fileDownloader = new FileDownloader(value, outputPath, Mode.overwrite); try { fileDownloader.perform(storageClient); output.setType(DataType.URI); output.setValue(outputPath); - processContext.getProcessModel().getProcessOutputs().add(output); resultantOutputsLst.add(output); } catch (Exception e) { - log.error("Error downloading "+value+" from job working directory. "); - throw new GFacException(e.getLocalizedMessage(),e); + log.error("Error downloading " + value + " from job working directory. "); +// throw new GFacException(e.getLocalizedMessage(),e); } + } else { + log.info("Ignore output file {}, type {}", output.getValue(), output.getType().toString()); } }
