Repository: airavata Updated Branches: refs/heads/master 6da674fdd -> 3a84ee8c3
Making GSISSHOutputProvider a RecoverableHandler Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3a84ee8c Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3a84ee8c Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3a84ee8c Branch: refs/heads/master Commit: 3a84ee8c3e953d3b364c6075d84b2fa506759529 Parents: 6da674f Author: lahiru <[email protected]> Authored: Thu Jun 26 15:25:19 2014 -0400 Committer: lahiru <[email protected]> Committed: Thu Jun 26 15:25:19 2014 -0400 ---------------------------------------------------------------------- .../airavata/gfac/core/cpi/BetterGfacImpl.java | 8 +- .../gsissh/handler/GSISSHOutputHandler.java | 103 ++++++++++++++----- 2 files changed, 84 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/3a84ee8c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 5f2c78c..05d866b 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -344,8 +344,10 @@ public class BetterGfacImpl implements GFac { } else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)) { // this is async mode where monitoring of jobs is hapenning, we have to recover reInvokeProvider(jobExecutionContext); - } else { - log.info("We skip invoking Handler, because the experiment state is beyond the Provider Invocation !!!"); + } else if( stateVal == 6){ + reInvokeOutFlowHandlers(jobExecutionContext); + } else{ + log.info("We skip invoking Handler, because the experiment:" + stateVal +" state is beyond the Provider Invocation !!!"); log.info("ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID()); } } catch (Exception e) { @@ -692,8 +694,6 @@ public class BetterGfacImpl implements GFac { handler.invoke(jobExecutionContext); GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED); } - handler.initProperties(handlerClassName.getProperties()); - handler.invoke(jobExecutionContext); } catch (ClassNotFoundException e) { log.error(e.getMessage()); throw new GFacException("Cannot load handler class " + handlerClassName, e); http://git-wip-us.apache.org/repos/asf/airavata/blob/3a84ee8c/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java index f3736f2..f8d8c52 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java @@ -35,6 +35,7 @@ import org.apache.airavata.commons.gfac.type.MappingFactory; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.provider.GFacProviderException; import org.apache.airavata.gfac.core.utils.GFacUtils; @@ -53,11 +54,16 @@ import org.apache.xmlbeans.XmlException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GSISSHOutputHandler extends AbstractHandler { +public class GSISSHOutputHandler extends AbstractRecoverableHandler { private static final Logger log = LoggerFactory.getLogger(GSISSHOutputHandler.class); public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - if(jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GsisshHostType) { // this is because we don't have the right jobexecution context + super.invoke(jobExecutionContext); + int index = 0; + int oldIndex = 0; + List<String> oldFiles = new ArrayList<String>(); + StringBuffer data = new StringBuffer("|"); + if (jobExecutionContext.getApplicationContext().getHostDescription().getType() instanceof GsisshHostType) { // this is because we don't have the right jobexecution context // so attempting to get it from the registry if (Constants.PUSH.equals(((GsisshHostType) jobExecutionContext.getApplicationContext().getHostDescription().getType()).getMonitorMode())) { log.warn("During the out handler chain jobExecution context came null, so trying to handler"); @@ -99,7 +105,6 @@ public class GSISSHOutputHandler extends AbstractHandler { log.error(e.getMessage()); throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); } - super.invoke(jobExecutionContext); DataTransferDetails detail = new DataTransferDetails(); TransferStatus status = new TransferStatus(); @@ -119,6 +124,22 @@ public class GSISSHOutputHandler extends AbstractHandler { } // Get the Stdouts and StdErrs + String pluginData = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName()); + if (pluginData != null) { + try { + oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim()); + oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(",")); + if (oldIndex == oldFiles.size()) { + log.info("Old data looks good !!!!"); + } else { + oldIndex = 0; + oldFiles.clear(); + } + } catch (NumberFormatException e) { + log.error("Previously stored data " + pluginData + " is wrong so we continue the operations"); + } + } + String timeStampedServiceName = GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName()); TaskDetails taskData = jobExecutionContext.getTaskData(); @@ -136,13 +157,26 @@ public class GSISSHOutputHandler extends AbstractHandler { (new File(outputDataDir)).mkdirs(); - localStdOutFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stdout"); - localStdErrFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stderr"); -// cluster.makeDirectory(outputDataDir); - cluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath()); - Thread.sleep(1000); - cluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath()); - Thread.sleep(1000); + if (index < oldIndex) { + localStdOutFile = new File(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); + } else { + localStdOutFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stdout"); + cluster.scpFrom(app.getStandardOutput(), localStdOutFile.getAbsolutePath()); + Thread.sleep(1000); + StringBuffer temp = new StringBuffer(data.append(localStdOutFile.getAbsolutePath()).append(",").toString()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + if (index < oldIndex) { + localStdErrFile = new File(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); + } else { + localStdErrFile = new File(outputDataDir + File.separator + timeStampedServiceName + "stderr"); + cluster.scpFrom(app.getStandardError(), localStdErrFile.getAbsolutePath()); + Thread.sleep(1000); + StringBuffer temp = new StringBuffer(data.append(localStdErrFile.getAbsolutePath()).append(",").toString()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } String stdOutStr = GFacUtils.readFileToString(localStdOutFile.getAbsolutePath()); String stdErrStr = GFacUtils.readFileToString(localStdErrFile.getAbsolutePath()); @@ -169,16 +203,24 @@ public class GSISSHOutputHandler extends AbstractHandler { OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); Set<String> strings = output.keySet(); outputArray.clear(); - for(String key:strings) { + for (String key : strings) { ActualParameter actualParameter1 = (ActualParameter) output.get(key); - if("URI".equals(actualParameter1.getType().getType().toString())){ - String downloadFile = MappingFactory.toString(actualParameter1); - cluster.scpFrom(downloadFile, outputDataDir); - String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar)+1, downloadFile.length()); - String localFile = outputDataDir + File.separator +fileName; - jobExecutionContext.addOutputFile(localFile); - MappingFactory.fromString(actualParameter1, localFile); - DataObjectType dataObjectType = new DataObjectType(); + if ("URI".equals(actualParameter1.getType().getType().toString())) { + String downloadFile = MappingFactory.toString(actualParameter1); + String localFile; + if (index < oldIndex) { + localFile = oldFiles.get(index); + data.append(oldFiles.get(index++)).append(","); + } else { + cluster.scpFrom(downloadFile, outputDataDir); + String fileName = downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, downloadFile.length()); + localFile = outputDataDir + File.separator + fileName; + StringBuffer temp = new StringBuffer(data.append(localFile).append(",").toString()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + jobExecutionContext.addOutputFile(localFile); + MappingFactory.fromString(actualParameter1, localFile); + DataObjectType dataObjectType = new DataObjectType(); dataObjectType.setValue(localFile); dataObjectType.setKey(key); dataObjectType.setType(DataType.URI); @@ -188,8 +230,18 @@ public class GSISSHOutputHandler extends AbstractHandler { break; } else { String valueList = outputList.get(0); - cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir); - jobExecutionContext.addOutputFile(outputDataDir + File.separator + valueList); + String outputFile; + if (index < oldIndex) { + outputFile = oldFiles.get(index); + data.append(oldFiles.get(index++)).append(","); + } else { + cluster.scpFrom(app.getOutputDataDirectory() + File.separator + valueList, outputDataDir); + outputFile = outputDataDir + File.separator + valueList; + jobExecutionContext.addOutputFile(outputFile); + StringBuffer temp = new StringBuffer(data.append(outputFile).append(",").toString()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } + jobExecutionContext.addOutputFile(outputFile); DataObjectType dataObjectType = new DataObjectType(); dataObjectType.setValue(valueList); dataObjectType.setKey(paramName); @@ -197,14 +249,15 @@ public class GSISSHOutputHandler extends AbstractHandler { outputArray.add(dataObjectType); } } else { - OutputUtils.fillOutputFromStdout(output, stdOutStr,stdErrStr, outputArray); + OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); break; } } if (outputArray == null || outputArray.isEmpty()) { throw new GFacHandlerException( "Empty Output returned from the Application, Double check the application" - + "and ApplicationDescriptor output Parameter Names"); + + "and ApplicationDescriptor output Parameter Names" + ); } app.setStandardError(localStdErrFile.getAbsolutePath()); app.setStandardOutput(localStdOutFile.getAbsolutePath()); @@ -240,4 +293,8 @@ public class GSISSHOutputHandler extends AbstractHandler { public void initProperties(Properties properties) throws GFacHandlerException { } + + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + this.invoke(jobExecutionContext); + } }
