Repository: airavata
Updated Branches:
  refs/heads/master 6da674fdd -> 3a84ee8c3


Making GSISSHOutputProvider a RecoverableHandler


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3a84ee8c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3a84ee8c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3a84ee8c

Branch: refs/heads/master
Commit: 3a84ee8c3e953d3b364c6075d84b2fa506759529
Parents: 6da674f
Author: lahiru <[email protected]>
Authored: Thu Jun 26 15:25:19 2014 -0400
Committer: lahiru <[email protected]>
Committed: Thu Jun 26 15:25:19 2014 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |   8 +-
 .../gsissh/handler/GSISSHOutputHandler.java     | 103 ++++++++++++++-----
 2 files changed, 84 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/3a84ee8c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 5f2c78c..05d866b 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -344,8 +344,10 @@ public class BetterGfacImpl implements GFac {
             } else if (stateVal == 5 && 
!GFacUtils.isSynchronousMode(jobExecutionContext)) {
                 // this is async mode where monitoring of jobs is hapenning, 
we have to recover
                 reInvokeProvider(jobExecutionContext);
-            } else {
-                log.info("We skip invoking Handler, because the experiment 
state is beyond the Provider Invocation !!!");
+            } else if( stateVal == 6){
+                reInvokeOutFlowHandlers(jobExecutionContext);
+            } else{
+                log.info("We skip invoking Handler, because the experiment:" + 
stateVal +" state is beyond the Provider Invocation !!!");
                 log.info("ExperimentId: " + experimentID + " taskId: " + 
jobExecutionContext.getTaskData().getTaskID());
             }
         } catch (Exception e) {
@@ -692,8 +694,6 @@ public class BetterGfacImpl implements GFac {
                     handler.invoke(jobExecutionContext);
                     GFacUtils.updatePluginState(zk, jobExecutionContext, 
handlerClassName.getClassName(), GfacPluginState.COMPLETED);
                 }
-                handler.initProperties(handlerClassName.getProperties());
-                handler.invoke(jobExecutionContext);
             } catch (ClassNotFoundException e) {
                 log.error(e.getMessage());
                 throw new GFacException("Cannot load handler class " + 
handlerClassName, e);

http://git-wip-us.apache.org/repos/asf/airavata/blob/3a84ee8c/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
 
b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
index f3736f2..f8d8c52 100644
--- 
a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
+++ 
b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
@@ -35,6 +35,7 @@ import org.apache.airavata.commons.gfac.type.MappingFactory;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.handler.AbstractHandler;
+import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
@@ -53,11 +54,16 @@ import org.apache.xmlbeans.XmlException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class GSISSHOutputHandler extends AbstractHandler {
+public class GSISSHOutputHandler extends AbstractRecoverableHandler {
     private static final Logger log = 
LoggerFactory.getLogger(GSISSHOutputHandler.class);
 
     public void invoke(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
-        
if(jobExecutionContext.getApplicationContext().getHostDescription().getType() 
instanceof GsisshHostType) { // this is because we don't have the right 
jobexecution context
+        super.invoke(jobExecutionContext);
+        int index = 0;
+        int oldIndex = 0;
+        List<String> oldFiles = new ArrayList<String>();
+        StringBuffer data = new StringBuffer("|");
+        if 
(jobExecutionContext.getApplicationContext().getHostDescription().getType() 
instanceof GsisshHostType) { // this is because we don't have the right 
jobexecution context
             // so attempting to get it from the registry
             if (Constants.PUSH.equals(((GsisshHostType) 
jobExecutionContext.getApplicationContext().getHostDescription().getType()).getMonitorMode()))
 {
                 log.warn("During the out handler chain jobExecution context 
came null, so trying to handler");
@@ -99,7 +105,6 @@ public class GSISSHOutputHandler extends AbstractHandler {
             log.error(e.getMessage());
             throw new GFacHandlerException("Error while creating 
SSHSecurityContext", e, e.getLocalizedMessage());
         }
-        super.invoke(jobExecutionContext);
         DataTransferDetails detail = new DataTransferDetails();
         TransferStatus status = new TransferStatus();
 
@@ -119,6 +124,22 @@ public class GSISSHOutputHandler extends AbstractHandler {
             }
 
             // Get the Stdouts and StdErrs
+            String pluginData = GFacUtils.getPluginData(jobExecutionContext, 
this.getClass().getName());
+            if (pluginData != null) {
+                try {
+                    oldIndex = 
Integer.parseInt(pluginData.split("\\|")[0].trim());
+                    oldFiles = 
Arrays.asList(pluginData.split("\\|")[1].split(","));
+                    if (oldIndex == oldFiles.size()) {
+                        log.info("Old data looks good !!!!");
+                    } else {
+                        oldIndex = 0;
+                        oldFiles.clear();
+                    }
+                } catch (NumberFormatException e) {
+                    log.error("Previously stored data " + pluginData + " is 
wrong so we continue the operations");
+                }
+            }
+
             String timeStampedServiceName = 
GFacUtils.createUniqueNameForService(jobExecutionContext.getServiceName());
 
             TaskDetails taskData = jobExecutionContext.getTaskData();
@@ -136,13 +157,26 @@ public class GSISSHOutputHandler extends AbstractHandler {
             (new File(outputDataDir)).mkdirs();
 
 
-            localStdOutFile = new File(outputDataDir + File.separator + 
timeStampedServiceName + "stdout");
-            localStdErrFile = new File(outputDataDir + File.separator + 
timeStampedServiceName + "stderr");
-//            cluster.makeDirectory(outputDataDir);
-            cluster.scpFrom(app.getStandardOutput(), 
localStdOutFile.getAbsolutePath());
-            Thread.sleep(1000);
-            cluster.scpFrom(app.getStandardError(), 
localStdErrFile.getAbsolutePath());
-            Thread.sleep(1000);
+            if (index < oldIndex) {
+                localStdOutFile = new File(oldFiles.get(index));
+                data.append(oldFiles.get(index++)).append(",");
+            } else {
+                localStdOutFile = new File(outputDataDir + File.separator + 
timeStampedServiceName + "stdout");
+                cluster.scpFrom(app.getStandardOutput(), 
localStdOutFile.getAbsolutePath());
+                Thread.sleep(1000);
+                StringBuffer temp = new 
StringBuffer(data.append(localStdOutFile.getAbsolutePath()).append(",").toString());
+                GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, 
++index), this.getClass().getName());
+            }
+            if (index < oldIndex) {
+                localStdErrFile = new File(oldFiles.get(index));
+                data.append(oldFiles.get(index++)).append(",");
+            } else {
+                localStdErrFile = new File(outputDataDir + File.separator + 
timeStampedServiceName + "stderr");
+                cluster.scpFrom(app.getStandardError(), 
localStdErrFile.getAbsolutePath());
+                Thread.sleep(1000);
+                StringBuffer temp = new 
StringBuffer(data.append(localStdErrFile.getAbsolutePath()).append(",").toString());
+                GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, 
++index), this.getClass().getName());
+            }
 
             String stdOutStr = 
GFacUtils.readFileToString(localStdOutFile.getAbsolutePath());
             String stdErrStr = 
GFacUtils.readFileToString(localStdErrFile.getAbsolutePath());
@@ -169,16 +203,24 @@ public class GSISSHOutputHandler extends AbstractHandler {
                         OutputUtils.fillOutputFromStdout(output, stdOutStr, 
stdErrStr, outputArray);
                         Set<String> strings = output.keySet();
                         outputArray.clear();
-                        for(String key:strings) {
+                        for (String key : strings) {
                             ActualParameter actualParameter1 = 
(ActualParameter) output.get(key);
-                            
if("URI".equals(actualParameter1.getType().getType().toString())){
-                               String downloadFile = 
MappingFactory.toString(actualParameter1);
-                               cluster.scpFrom(downloadFile, outputDataDir);
-                               String fileName = 
downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar)+1, 
downloadFile.length());
-                               String localFile = outputDataDir +  
File.separator +fileName;
-                                                               
jobExecutionContext.addOutputFile(localFile);
-                                                               
MappingFactory.fromString(actualParameter1, localFile);
-                                                               DataObjectType 
dataObjectType = new DataObjectType();
+                            if 
("URI".equals(actualParameter1.getType().getType().toString())) {
+                                String downloadFile = 
MappingFactory.toString(actualParameter1);
+                                String localFile;
+                                if (index < oldIndex) {
+                                    localFile = oldFiles.get(index);
+                                    
data.append(oldFiles.get(index++)).append(",");
+                                } else {
+                                    cluster.scpFrom(downloadFile, 
outputDataDir);
+                                    String fileName = 
downloadFile.substring(downloadFile.lastIndexOf(File.separatorChar) + 1, 
downloadFile.length());
+                                    localFile = outputDataDir + File.separator 
+ fileName;
+                                    StringBuffer temp = new 
StringBuffer(data.append(localFile).append(",").toString());
+                                    
GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), 
this.getClass().getName());
+                                }
+                                jobExecutionContext.addOutputFile(localFile);
+                                MappingFactory.fromString(actualParameter1, 
localFile);
+                                DataObjectType dataObjectType = new 
DataObjectType();
                                 dataObjectType.setValue(localFile);
                                 dataObjectType.setKey(key);
                                 dataObjectType.setType(DataType.URI);
@@ -188,8 +230,18 @@ public class GSISSHOutputHandler extends AbstractHandler {
                         break;
                     } else {
                         String valueList = outputList.get(0);
-                        cluster.scpFrom(app.getOutputDataDirectory() + 
File.separator + valueList, outputDataDir);
-                        jobExecutionContext.addOutputFile(outputDataDir + 
File.separator + valueList);
+                        String outputFile;
+                        if (index < oldIndex) {
+                            outputFile = oldFiles.get(index);
+                            data.append(oldFiles.get(index++)).append(",");
+                        } else {
+                            cluster.scpFrom(app.getOutputDataDirectory() + 
File.separator + valueList, outputDataDir);
+                            outputFile = outputDataDir + File.separator + 
valueList;
+                            jobExecutionContext.addOutputFile(outputFile);
+                            StringBuffer temp = new 
StringBuffer(data.append(outputFile).append(",").toString());
+                            GFacUtils.savePluginData(jobExecutionContext, 
temp.insert(0, ++index), this.getClass().getName());
+                        }
+                        jobExecutionContext.addOutputFile(outputFile);
                         DataObjectType dataObjectType = new DataObjectType();
                         dataObjectType.setValue(valueList);
                         dataObjectType.setKey(paramName);
@@ -197,14 +249,15 @@ public class GSISSHOutputHandler extends AbstractHandler {
                         outputArray.add(dataObjectType);
                     }
                 } else {
-                    OutputUtils.fillOutputFromStdout(output, 
stdOutStr,stdErrStr, outputArray);
+                    OutputUtils.fillOutputFromStdout(output, stdOutStr, 
stdErrStr, outputArray);
                     break;
                 }
             }
             if (outputArray == null || outputArray.isEmpty()) {
                 throw new GFacHandlerException(
                         "Empty Output returned from the Application, Double 
check the application"
-                                + "and ApplicationDescriptor output Parameter 
Names");
+                                + "and ApplicationDescriptor output Parameter 
Names"
+                );
             }
             app.setStandardError(localStdErrFile.getAbsolutePath());
             app.setStandardOutput(localStdOutFile.getAbsolutePath());
@@ -240,4 +293,8 @@ public class GSISSHOutputHandler extends AbstractHandler {
     public void initProperties(Properties properties) throws 
GFacHandlerException {
 
     }
+
+    public void recover(JobExecutionContext jobExecutionContext) throws 
GFacHandlerException {
+        this.invoke(jobExecutionContext);
+    }
 }

Reply via email to