Repository: airavata Updated Branches: refs/heads/master 109b679ad -> 5e81196d1
Adding more recoverable Handlers Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/5e81196d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/5e81196d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/5e81196d Branch: refs/heads/master Commit: 5e81196d18310551b3a484f2cef2bc544ea27556 Parents: 109b679 Author: lahiru <[email protected]> Authored: Thu Jun 26 10:08:37 2014 -0400 Committer: lahiru <[email protected]> Committed: Thu Jun 26 10:08:37 2014 -0400 ---------------------------------------------------------------------- .../airavata/gfac/server/GfacServerHandler.java | 1 + .../airavata/gfac/core/cpi/BetterGfacImpl.java | 8 +++++++- .../core/handler/AbstractRecoverableHandler.java | 16 +++++++++++++--- .../core/handler/AppDescriptorCheckHandler.java | 9 ++++++++- .../gfac/core/handler/GFacRecoverableHandler.java | 1 + .../gsissh/handler/GSISSHDirectorySetupHandler.java | 7 ++++++- .../gfac/gsissh/handler/GSISSHInputHandler.java | 7 ++++++- 7 files changed, 42 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/5e81196d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java index 2343a40..622ba61 100644 --- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java +++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java @@ -159,6 +159,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{ } public boolean submitJob(String experimentId, String taskId) throws TException { + logger.info("GFac Recieved the Experiment: " + experimentId + " TaskId: " + taskId); GFac gfac = getGfac(); try { return gfac.submitJob(experimentId, taskId); http://git-wip-us.apache.org/repos/asf/airavata/blob/5e81196d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index dc635d1..5f2c78c 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -613,7 +613,13 @@ public class BetterGfacImpl implements GFac { handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class); handler = handlerClass.newInstance(); String plState = GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName()); - if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) { + int state = 0; + try { + state = Integer.valueOf(plState); + } catch (NumberFormatException e) { + + } + if (state >= GfacPluginState.INVOKED.getValue()) { if (handler instanceof GFacRecoverableHandler) { // if these already ran we re-run only recoverable handlers log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler"); http://git-wip-us.apache.org/repos/asf/airavata/blob/5e81196d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java index 0c6810b..431c202 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java @@ -23,11 +23,16 @@ package org.apache.airavata.gfac.core.handler; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.cpi.GFacImpl; import org.apache.airavata.gfac.core.notification.MonitorPublisher; +import org.apache.airavata.gfac.core.states.GfacPluginState; +import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.Registry; import org.apache.airavata.registry.cpi.RegistryException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractRecoverableHandler implements GFacRecoverableHandler { + private static final Logger logger = LoggerFactory.getLogger(AppDescriptorCheckHandler.class); protected Registry registry = null; protected MonitorPublisher publisher = null; @@ -37,15 +42,20 @@ public abstract class AbstractRecoverableHandler implements GFacRecoverableHandl } public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - registry = jobExecutionContext.getRegistry(); - if(registry == null){ + try { + GFacUtils.updatePluginState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacPluginState.INVOKED); + } catch (Exception e) { + logger.error("Error saving Recoverable provider state", e); + } + registry = jobExecutionContext.getRegistry(); + if (registry == null) { try { registry = RegistryFactory.getDefaultRegistry(); } catch (RegistryException e) { throw new GFacHandlerException("unable to create registry instance", e); } } - } + } public MonitorPublisher getPublisher() { return publisher; http://git-wip-us.apache.org/repos/asf/airavata/blob/5e81196d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java index 2183f59..b47bb8e 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java @@ -25,6 +25,7 @@ import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.commons.gfac.type.ApplicationDescription; import org.apache.airavata.gfac.Constants; import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.states.GfacPluginState; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; import org.apache.zookeeper.KeeperException; @@ -43,6 +44,12 @@ public class AppDescriptorCheckHandler implements GFacRecoverableHandler { public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { logger.info("Invoking ApplicationDescriptorCheckHandler ..."); + try { + GFacUtils.updatePluginState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacPluginState.INVOKED); + } catch (Exception e) { + logger.info("Error saving plugin status to ZK"); + } + StringBuffer data = new StringBuffer(); ApplicationDescription app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); ApplicationDeploymentDescriptionType appDesc = app.getType(); @@ -106,7 +113,7 @@ public class AppDescriptorCheckHandler implements GFacRecoverableHandler { ApplicationDeploymentDescriptionType appDesc = app.getType(); try { String s = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName()); - String[] split = s.split(","); + String[] split = s.split(","); // this is ugly code but nobody else is saving or reading this data, so this is the fastest way appDesc.setScratchWorkingDirectory(split[0]); appDesc.setStaticWorkingDirectory(split[1]); appDesc.setInputDataDirectory(split[2]); http://git-wip-us.apache.org/repos/asf/airavata/blob/5e81196d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java index cbae5e0..ab778b8 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java @@ -33,6 +33,7 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext; */ public interface GFacRecoverableHandler extends GFacHandler { + /** * This method can be used to implement recovering part of the stateful handler * If you do not want to recover an already ran handler you can simply implement http://git-wip-us.apache.org/repos/asf/airavata/blob/5e81196d/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java index 1b1a6b9..e676b4b 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java @@ -24,6 +24,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; @@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory; import java.util.Properties; -public class GSISSHDirectorySetupHandler extends AbstractHandler { +public class GSISSHDirectorySetupHandler extends AbstractRecoverableHandler { private static final Logger log = LoggerFactory.getLogger(GSISSHDirectorySetupHandler.class); public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { @@ -98,6 +99,10 @@ public class GSISSHDirectorySetupHandler extends AbstractHandler { } } + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + this.invoke(jobExecutionContext); + } + public void initProperties(Properties properties) throws GFacHandlerException { } http://git-wip-us.apache.org/repos/asf/airavata/blob/5e81196d/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java index 3ac4d5e..440009b 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java @@ -28,6 +28,7 @@ import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.context.MessageContext; import org.apache.airavata.gfac.core.handler.AbstractHandler; +import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.gsissh.security.GSISecurityContext; @@ -46,7 +47,7 @@ import java.io.File; import java.io.IOException; import java.util.*; -public class GSISSHInputHandler extends AbstractHandler { +public class GSISSHInputHandler extends AbstractRecoverableHandler { private static final Logger log = LoggerFactory.getLogger(GSISSHInputHandler.class); @@ -136,4 +137,8 @@ public class GSISSHInputHandler extends AbstractHandler { public void initProperties(Properties properties) throws GFacHandlerException { } + + public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + + } }
