Repository: airavata Updated Branches: refs/heads/master 2bf83dd03 -> fea3325f7
Fixed the file transfer issues. AIRAVATA-1476 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6fb6644a Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6fb6644a Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6fb6644a Branch: refs/heads/master Commit: 6fb6644a41819dd59dbc01718dd83e72305ddd74 Parents: ea93cc1 Author: raminder <[email protected]> Authored: Wed May 13 16:39:40 2015 -0400 Committer: raminder <[email protected]> Committed: Wed May 13 16:39:40 2015 -0400 ---------------------------------------------------------------------- .../gfac/bes/provider/impl/BESProvider.java | 16 +++++++----- .../gfac/bes/utils/DataTransferrer.java | 26 ++++++++++++++++---- 2 files changed, 31 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/6fb6644a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java index 73bf0fc..ce7c629 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java @@ -263,10 +263,12 @@ public class BESProvider extends AbstractProvider implements GFacProvider, return JobState.QUEUED; } else if (status.equalsIgnoreCase("Staging-In")) { return JobState.SUBMITTED; - } else if (status.equalsIgnoreCase("Staging-Out") - || status.equalsIgnoreCase("FINISHED")) { + } else if (status.equalsIgnoreCase("FINISHED")) { return JobState.COMPLETE; - } else if (status.equalsIgnoreCase("Executing")) { + }else if(status.equalsIgnoreCase("Staging-Out")){ + return JobState.ACTIVE; + } + else if (status.equalsIgnoreCase("Executing")) { return JobState.ACTIVE; } else if (status.equalsIgnoreCase("FAILED")) { return JobState.FAILED; @@ -419,12 +421,15 @@ public class BESProvider extends AbstractProvider implements GFacProvider, protected void waitUntilDone(FactoryClient factory, EndpointReferenceType activityEpr, JobDetails jobDetails) throws Exception { try { + JobState applicationJobStatus = null; + while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED) && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FAILED) - && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED)) { + && (factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.CANCELLED) + && (applicationJobStatus != JobState.COMPLETE)) { ActivityStatusType activityStatus = getStatus(factory, activityEpr); - JobState applicationJobStatus = getApplicationJobStatus(activityStatus); + applicationJobStatus = getApplicationJobStatus(activityStatus); sendNotification(jobExecutionContext,applicationJobStatus); @@ -435,7 +440,6 @@ public class BESProvider extends AbstractProvider implements GFacProvider, } catch (InterruptedException e) {} continue; } - return; } catch(Exception e) { log.error("Error monitoring job status.."); throw e; http://git-wip-us.apache.org/repos/asf/airavata/blob/6fb6644a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java index 453e45a..d70e4b1 100644 --- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java +++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/utils/DataTransferrer.java @@ -28,6 +28,8 @@ import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.airavata.gfac.Constants; import org.apache.airavata.gfac.core.context.JobExecutionContext; @@ -92,13 +94,23 @@ public class DataTransferrer { if(!file.exists()){ file.mkdirs(); } - List<String> outPrms = extractOutParams(jobContext); - - for (String outPrm : outPrms) { - if("".equals(outPrm)) continue; - FileDownloader fileDownloader = new FileDownloader(outPrm,downloadLocation, Mode.overwrite); + + Map<String, Object> output = jobContext.getOutMessageContext().getParameters(); + Set<String> keys = output.keySet(); + + for (String outPrm : keys) { + OutputDataObjectType actualParameter = (OutputDataObjectType) output.get(outPrm); + if (DataType.STDERR == actualParameter.getType()) continue; + if (DataType.STDOUT == actualParameter.getType()) continue; + + String value = actualParameter.getValue(); + FileDownloader fileDownloader = new FileDownloader(value,downloadLocation, Mode.overwrite); try { fileDownloader.perform(storageClient); + String outputPath = downloadLocation + File.separator + value.substring(value.lastIndexOf('/')+1); + actualParameter.setValue(outputPath); + actualParameter.setType(DataType.URI); + jobContext.addOutputFile(outputPath); } catch (Exception e) { throw new GFacProviderException(e.getLocalizedMessage(),e); } @@ -137,6 +149,8 @@ public class DataTransferrer { f1.perform(storageClient); log.info("Downloading stdout and stderr.."); String stdoutput = readFile(stdoutLocation); + jobContext.addOutputFile(stdoutLocation); + jobContext.setStandardOutput(stdoutLocation); log.info("Stdout downloaded to -> "+stdoutLocation); if(UASDataStagingProcessor.isUnicoreEndpoint(jobContext)) { String scriptExitCodeFName = "UNICORE_SCRIPT_EXIT_CODE"; @@ -151,6 +165,8 @@ public class DataTransferrer { f1.setTo(stderrLocation); f1.perform(storageClient); String stderror = readFile(stderrLocation); + jobContext.addOutputFile(stderrLocation); + jobContext.setStandardError(stderrLocation); log.info("Stderr downloaded to -> "+stderrLocation); } catch (Exception e) { throw new GFacProviderException(e.getLocalizedMessage(),e);
