Repository: airavata Updated Branches: refs/heads/master 90c47ded4 -> 6da674fdd
Making GSISSH Input handler recoverable Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6da674fd Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6da674fd Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6da674fd Branch: refs/heads/master Commit: 6da674fdd7e415e07df88c3de0f106eec597542f Parents: 90c47de Author: lahiru <[email protected]> Authored: Thu Jun 26 12:47:44 2014 -0400 Committer: lahiru <[email protected]> Committed: Thu Jun 26 12:47:44 2014 -0400 ---------------------------------------------------------------------- .../core/monitor/GfacInternalStatusUpdator.java | 4 +- .../gfac/gsissh/handler/GSISSHInputHandler.java | 61 ++++++++++++++++---- .../server/OrchestratorServerHandler.java | 32 +++++----- 3 files changed, 72 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/6da674fd/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java index 2047f20..97bb49d 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java @@ -81,10 +81,10 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc } switch (statusChangeRequest.getState()) { case COMPLETED: - ZKUtil.deleteRecursive(zk,experimentPath); +// ZKUtil.deleteRecursive(zk,experimentPath); break; case FAILED: - ZKUtil.deleteRecursive(zk,experimentPath); +// ZKUtil.deleteRecursive(zk,experimentPath); break; default: } http://git-wip-us.apache.org/repos/asf/airavata/blob/6da674fd/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java index 440009b..2ed4889 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java @@ -47,15 +47,39 @@ import java.io.File; import java.io.IOException; import java.util.*; +/** + * Recoverability for this handler assumes the same input values will come in the second + * run, and assume nobody is changing registry during the original submission and re-submission + */ public class GSISSHInputHandler extends AbstractRecoverableHandler { private static final Logger log = LoggerFactory.getLogger(GSISSHInputHandler.class); public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + super.invoke(jobExecutionContext); + int index = 0; + int oldIndex = 0; + List<String> oldFiles = new ArrayList<String>(); MessageContext inputNew = new MessageContext(); DataTransferDetails detail = new DataTransferDetails(); TransferStatus status = new TransferStatus(); + StringBuffer data = new StringBuffer("|"); try { + String pluginData = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName()); + if (pluginData != null) { + try { + oldIndex = Integer.parseInt(pluginData.split("\\|")[0].trim()); + oldFiles = Arrays.asList(pluginData.split("\\|")[1].split(",")); + if (oldIndex == oldFiles.size()) { + log.info("Old data looks good !!!!"); + } else { + oldIndex = 0; + oldFiles.clear(); + } + } catch (NumberFormatException e) { + log.error("Previously stored data " + pluginData +" is wrong so we continue the operations"); + } + } if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null) { try { GFACGSISSHUtils.addSecurityContext(jobExecutionContext); @@ -65,8 +89,6 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler { } } log.info("Invoking SCPInputHandler"); - super.invoke(jobExecutionContext); - MessageContext input = jobExecutionContext.getInMessageContext(); Set<String> parameters = input.getParameters().keySet(); @@ -75,17 +97,36 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler { String paramValue = MappingFactory.toString(actualParameter); //TODO: Review this with type if ("URI".equals(actualParameter.getType().getType().toString())) { - ((URIParameterType) actualParameter.getType()).setValue(stageInputFiles(jobExecutionContext, paramValue)); + if (index < oldIndex) { + log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); + ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); // we get already transfered file and increment the index + } else { + String s = stageInputFiles(jobExecutionContext, paramValue); + ((URIParameterType) actualParameter.getType()).setValue(s); + StringBuffer temp = new StringBuffer(data.append(s).append(",").toString()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + } } else if ("URIArray".equals(actualParameter.getType().getType().toString())) { List<String> split = Arrays.asList(StringUtil.getElementsFromString(paramValue)); List<String> newFiles = new ArrayList<String>(); for (String paramValueEach : split) { - String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach); - status.setTransferState(TransferState.UPLOAD); - detail.setTransferStatus(status); - detail.setTransferDescription("Input Data Staged: " + stageInputFiles); - registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); - newFiles.add(stageInputFiles); + if (index < oldIndex) { + log.info("Input File: " + paramValue + " is already transfered, so we skip this operation !!!"); + ((URIParameterType) actualParameter.getType()).setValue(oldFiles.get(index)); + data.append(oldFiles.get(index++)).append(","); + } else { + String stageInputFiles = stageInputFiles(jobExecutionContext, paramValueEach); + GFacUtils.savePluginData(jobExecutionContext, new StringBuffer(String.valueOf(index++)), this.getClass().getName()); + status.setTransferState(TransferState.UPLOAD); + detail.setTransferStatus(status); + detail.setTransferDescription("Input Data Staged: " + stageInputFiles); + registry.add(ChildDataType.DATA_TRANSFER_DETAIL, detail, jobExecutionContext.getTaskData().getTaskID()); + StringBuffer temp = new StringBuffer(data.append(stageInputFiles).append(",").toString()); + GFacUtils.savePluginData(jobExecutionContext, temp.insert(0, ++index), this.getClass().getName()); + newFiles.add(stageInputFiles); + } + } ((URIArrayType) actualParameter.getType()).setValueArray(newFiles.toArray(new String[newFiles.size()])); } @@ -139,6 +180,6 @@ public class GSISSHInputHandler extends AbstractRecoverableHandler { } public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - + this.invoke(jobExecutionContext); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/6da674fd/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java index a3ed956..abf6a2c 100644 --- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java +++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java @@ -123,8 +123,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat } - - /** * * After creating the experiment Data user have the * * experimentID as the handler to the experiment, during the launchExperiment @@ -239,7 +237,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat return true; } - /** This method gracefully handler gfac node failures */ + /** + * This method gracefully handler gfac node failures + */ synchronized public void process(WatchedEvent watchedEvent) { synchronized (mutex) { try { @@ -270,12 +270,18 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat final OrchestratorServerHandler handler = this; (new Thread() { public void run() { - try { - (new OrchestratorRecoveryHandler(handler, event.getPath())).recover(); // run this task in a separate thread - } catch (Exception e) { - e.printStackTrace(); - log.error("error recovering the jobs for gfac-node: " + event.getPath()); + int retry = 0; + while (retry < 3) { + try { + (new OrchestratorRecoveryHandler(handler, event.getPath())).recover(); + break; + } catch (Exception e) { + e.printStackTrace(); + log.error("error recovering the jobs for gfac-node: " + event.getPath()); + log.error("Retrying again to recover jobs and retry attempt: " + ++retry); + } } + } }).start(); break; @@ -290,9 +296,9 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat } - @Override - public boolean launchTask(String taskId) throws TException { - // TODO Auto-generated method stub - return false; - } + @Override + public boolean launchTask(String taskId) throws TException { + // TODO Auto-generated method stub + return false; + } }
