Repository: airavata Updated Branches: refs/heads/master 39370c526 -> 14c1841ca
Fixing the build and more issues related to gfac Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/14c1841c Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/14c1841c Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/14c1841c Branch: refs/heads/master Commit: 14c1841ca20d08bc1e9da254dbce5cb8a7266b26 Parents: 39370c5 Author: lahiru <[email protected]> Authored: Mon May 5 17:06:10 2014 -0400 Committer: lahiru <[email protected]> Committed: Mon May 5 17:06:10 2014 -0400 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 3 +- .../airavata/common/utils/ServerSettings.java | 5 ++ .../server/src/main/resources/gfac-config.xml | 6 +- .../org/apache/airavata/gfac/core/cpi/GFac.java | 7 --- .../apache/airavata/gfac/core/cpi/GFacImpl.java | 46 +++++++++++++-- .../gfac/core/handler/AbstractHandler.java | 25 ++++++++- .../core/monitor/AiravataJobStatusUpdator.java | 18 ------ .../gfac/core/provider/AbstractProvider.java | 19 ++++++- .../gsissh/provider/impl/GSISSHProvider.java | 2 +- .../gfac/local/provider/impl/LocalProvider.java | 10 ++++ .../gfac/services/impl/LocalProviderTest.java | 7 +++ .../airavata/gfac/monitor/HPCMonitorID.java | 2 +- .../handlers/GridPullMonitorHandler.java | 29 +--------- .../gfac/monitor/impl/LocalJobMonitor.java | 59 -------------------- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 7 +++ .../apache/airavata/job/AMQPMonitorTest.java | 2 +- .../core/impl/EmbeddedGFACJobSubmitter.java | 8 --- .../orchestrator/core/job/JobSubmitter.java | 7 --- 18 files changed, 123 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index 89bab76..f207720 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -64,8 +64,9 @@ public class CreateLaunchExperiment { // addDescriptors(); // final String expId = createExperimentForSSHHost(airavata); // final String expId = createExperimentForSSHHost(airavata); - final String expId = createExperimentForTrestles(airavata); +// final String expId = createExperimentForTrestles(airavata); // final String expId = createExperimentForStampede(airavata); + final String expId = createExperimentForLocalHost(airavata); System.out.println("Experiment ID : " + expId); String clonedExpId = cloneExperiment(airavata, expId); System.out.println("Cloned Experiment ID : " + clonedExpId); http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java ---------------------------------------------------------------------- diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java index 1380955..71e1437 100644 --- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java +++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java @@ -65,6 +65,8 @@ public class ServerSettings extends ApplicationSettings{ private static final String MY_PROXY_USER = "myproxy.user"; private static final String MY_PROXY_PASSWORD = "myproxy.password"; private static final String MY_PROXY_LIFETIME = "myproxy.life"; + private static final String ACTIVITY_LISTENERS = "activity.listeners"; + private static boolean stopAllThreads = false; public static String getDefaultUser() throws ApplicationSettingsException{ @@ -213,4 +215,7 @@ public class ServerSettings extends ApplicationSettings{ public static int getMyProxyLifetime() throws ApplicationSettingsException { return Integer.parseInt(getSetting(MY_PROXY_LIFETIME)); } + public static String[] getActivityListeners() throws ApplicationSettingsException { + return getSetting(ACTIVITY_LISTENERS).split(","); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/configuration/server/src/main/resources/gfac-config.xml ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/gfac-config.xml b/modules/configuration/server/src/main/resources/gfac-config.xml index 29ef0f8..6d81a59 100644 --- a/modules/configuration/server/src/main/resources/gfac-config.xml +++ b/modules/configuration/server/src/main/resources/gfac-config.xml @@ -12,13 +12,11 @@ <GFac> <DaemonHandlers> - <Handler class="org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler"> - <property name="listeners" value="org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator"/> - </Handler> + <Handler class="org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler"/> </DaemonHandlers> <GlobalHandlers> <InHandlers> - <Handler class="org.apache.airavata.gfac.core.handlerAppDescriptorCheckHandler"> + <Handler class="org.apache.airavata.gfac.core.handler.AppDescriptorCheckHandler"> <property name="name" value="value"/> </Handler> </InHandlers> http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java index c0f7984..60e499f 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java @@ -39,11 +39,4 @@ public interface GFac { */ public JobExecutionContext submitJob(String experimentID,String taskID) throws GFacException; - /** - * This method has to be invoked after submitting the job and have to make sure job is properly finished - * @param jobExecutionContext - * @throws GFacException - */ - public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException; - } http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java index 15b0d72..1d5c6f9 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java @@ -28,7 +28,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import com.google.common.eventbus.EventBus; import org.apache.airavata.client.api.AiravataAPI; +import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.commons.gfac.type.ApplicationDescription; import org.apache.airavata.commons.gfac.type.HostDescription; @@ -40,6 +42,8 @@ import org.apache.airavata.gfac.Scheduler; import org.apache.airavata.gfac.core.context.ApplicationContext; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.context.MessageContext; +import org.apache.airavata.gfac.core.monitor.AbstractActivityListener; +import org.apache.airavata.gfac.core.notification.MonitorPublisher; import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent; import org.apache.airavata.gfac.core.notification.listeners.LoggingListener; import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener; @@ -81,6 +85,11 @@ public class GFacImpl implements GFac { private static List<ThreadedHandler> daemonHandlers; private File gfacConfigFile; + + private List<AbstractActivityListener> activityListeners; + + private static MonitorPublisher monitorPublisher; + /** * Constructor for GFac * @@ -93,9 +102,32 @@ public class GFacImpl implements GFac { this.airavataAPI = airavataAPI; this.airavataRegistry2 = airavataRegistry2; daemonHandlers = new ArrayList<ThreadedHandler>(); + activityListeners = new ArrayList<AbstractActivityListener>(); + monitorPublisher = new MonitorPublisher(new EventBus()); // This is a EventBus common for gfac + startStatusUpdators(); startDaemonHandlers(); } + private void startStatusUpdators() { + try { + String[] listenerClassList = ServerSettings.getActivityListeners(); + for (String listenerClass : listenerClassList) { + Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class); + AbstractActivityListener abstractActivityListener = aClass.newInstance(); + activityListeners.add(abstractActivityListener); + abstractActivityListener.setup(getMonitorPublisher(),registry); + getMonitorPublisher().registerListener(abstractActivityListener); + } + }catch (ClassNotFoundException e) { + log.error("Error loading the listener classes configured in airavata-server.properties",e); + } catch (InstantiationException e) { + log.error("Error loading the listener classes configured in airavata-server.properties",e); + } catch (IllegalAccessException e) { + log.error("Error loading the listener classes configured in airavata-server.properties",e); + } catch (ApplicationSettingsException e){ + log.error("Error loading the listener classes configured in airavata-server.properties",e); + } + } private void startDaemonHandlers() { List<GFacHandlerConfig> daemonHandlerConfig = null; URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); @@ -205,15 +237,15 @@ public class GFacImpl implements GFac { // start constructing jobexecutioncontext jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName); + + // setting experiment/task/workflownode related information Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentID); jobExecutionContext.setExperiment(experiment); jobExecutionContext.setExperimentID(experimentID); - + jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0)); jobExecutionContext.setTaskData(taskData); - - - + // setting the registry jobExecutionContext.setRegistry(registry); ApplicationContext applicationContext = new ApplicationContext(); @@ -399,5 +431,11 @@ public class GFacImpl implements GFac { return gfacConfigFile; } + public static MonitorPublisher getMonitorPublisher() { + return monitorPublisher; + } + public Registry getRegistry() { + return registry; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java index dcbc79d..0049c3c 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java @@ -21,17 +21,40 @@ package org.apache.airavata.gfac.core.handler; import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.cpi.GFacImpl; +import org.apache.airavata.gfac.core.notification.MonitorPublisher; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; import org.apache.airavata.registry.cpi.Registry; public abstract class AbstractHandler implements GFacHandler { protected Registry registry = null; - public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { + protected MonitorPublisher publisher = null; + + protected AbstractHandler() { + publisher = GFacImpl.getMonitorPublisher(); // This will not be null because this will be initialize in GFacIml + } + + public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { registry = jobExecutionContext.getRegistry(); if(registry == null){ registry = RegistryFactory.getDefaultRegistry(); } } + public MonitorPublisher getPublisher() { + return publisher; + } + + public void setPublisher(MonitorPublisher publisher) { + this.publisher = publisher; + } + + public Registry getRegistry() { + return registry; + } + + public void setRegistry(Registry registry) { + this.registry = registry; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java index aaf7084..254d7fc 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java @@ -43,8 +43,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { private MonitorPublisher monitorPublisher; - private BlockingQueue<MonitorID> jobsToMonitor; - public Registry getAiravataRegistry() { return airavataRegistry; @@ -54,13 +52,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { this.airavataRegistry = airavataRegistry; } - public BlockingQueue<MonitorID> getJobsToMonitor() { - return jobsToMonitor; - } - - public void setJobsToMonitor(BlockingQueue<MonitorID> jobsToMonitor) { - this.jobsToMonitor = jobsToMonitor; - } @Subscribe public void updateRegistry(JobStatusChangeRequest jobStatus) { @@ -77,13 +68,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { logger.error("Error persisting data" + e.getLocalizedMessage(), e); } logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString()); - switch (state) { - case COMPLETE: case UNKNOWN: case CANCELED:case FAILED:case SUSPENDED: - jobsToMonitor.remove(jobStatus.getMonitorID()); - break; - default: - break; - } } } @@ -135,8 +119,6 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener { for (Object configuration : configurations) { if (configuration instanceof Registry){ this.airavataRegistry=(Registry)configuration; - } else if (configuration instanceof BlockingQueue<?>){ - this.jobsToMonitor=(BlockingQueue<MonitorID>) configuration; } else if (configuration instanceof MonitorPublisher){ this.monitorPublisher=(MonitorPublisher) configuration; } http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java index 4d28d72..5b65185 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java @@ -21,8 +21,11 @@ package org.apache.airavata.gfac.core.provider; +import com.google.common.eventbus.EventBus; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.cpi.GFacImpl; +import org.apache.airavata.gfac.core.notification.MonitorPublisher; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobStatus; import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory; @@ -38,11 +41,25 @@ public abstract class AbstractProvider implements GFacProvider{ protected JobStatus status; //todo we need to remove this and add methods to fill Job details, this is not a property of a provider protected JobExecutionContext jobExecutionContext; - public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { + protected MonitorPublisher monitorPublisher; + + protected AbstractProvider() { + this.monitorPublisher = GFacImpl.getMonitorPublisher(); + } + + public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException { log.debug("Initializing " + this.getClass().getName()); registry = RegistryFactory.getDefaultRegistry(); details = new JobDetails(); status = new JobStatus(); this.jobExecutionContext=jobExecutionContext; } + + public MonitorPublisher getMonitorPublisher() { + return monitorPublisher; + } + + public void setMonitorPublisher(MonitorPublisher monitorPublisher) { + this.monitorPublisher = monitorPublisher; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java index 9e66b71..880cfbd 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java @@ -103,7 +103,7 @@ public class GSISSHProvider extends AbstractProvider { pullMonitorHandler = threadedHandler; } } - // we know this hos is type GsiSSHHostType + // we know this host is type GsiSSHHostType String monitorMode = ((GsisshHostType) host).getMonitorMode(); if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){ log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID); http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java index 5494629..8e78f5a 100644 --- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java +++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java @@ -29,6 +29,9 @@ import java.util.Map; import org.apache.airavata.gfac.Constants; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.monitor.JobIdentity; +import org.apache.airavata.gfac.core.monitor.MonitorID; +import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent; import org.apache.airavata.gfac.core.provider.AbstractProvider; import org.apache.airavata.gfac.core.provider.GFacProviderException; @@ -163,6 +166,13 @@ public class LocalProvider extends AbstractProvider { .append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ") .append(String.valueOf(returnValue)); log.info(buf.toString()); + MonitorID monitorID = new MonitorID(jobExecutionContext.getApplicationContext().getHostDescription(),jobId, + jobExecutionContext.getTaskData().getTaskID(), + jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),jobExecutionContext.getExperimentID(), + jobExecutionContext.getExperiment().getUserName()); + JobStatusChangeRequest jobStatusChangeRequest = new JobStatusChangeRequest(monitorID); + jobStatusChangeRequest.setState(JobState.COMPLETE); + this.getMonitorPublisher().publish(jobStatusChangeRequest); } catch (IOException io) { throw new GFacProviderException(io.getMessage(), io); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java index df628e4..a6f689f 100644 --- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java +++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java @@ -20,16 +20,20 @@ */ package org.apache.airavata.core.gfac.services.impl; +import com.google.common.eventbus.EventBus; import org.apache.airavata.commons.gfac.type.*; import org.apache.airavata.gfac.GFacConfiguration; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.core.context.ApplicationContext; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.context.MessageContext; +import org.apache.airavata.gfac.core.notification.MonitorPublisher; import org.apache.airavata.gfac.core.provider.GFacProviderException; import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler; import org.apache.airavata.gfac.local.provider.impl.LocalProvider; +import org.apache.airavata.model.workspace.experiment.Experiment; import org.apache.airavata.model.workspace.experiment.TaskDetails; +import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl; import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; import org.apache.airavata.schemas.gfac.InputParameterType; @@ -141,8 +145,10 @@ public class LocalProviderTest { jobExecutionContext.setOutMessageContext(outMessage); jobExecutionContext.setExperimentID("test123"); + jobExecutionContext.setExperiment(new Experiment("test123","project1","admin","testExp")); jobExecutionContext.setTaskData(new TaskDetails(jobExecutionContext.getExperimentID())); jobExecutionContext.setRegistry(new LoggingRegistryImpl()); + jobExecutionContext.setWorkflowNodeDetails(new WorkflowNodeDetails(jobExecutionContext.getExperimentID(),"none")); } @@ -165,6 +171,7 @@ public class LocalProviderTest { LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler(); localDirectorySetupHandler.invoke(jobExecutionContext); LocalProvider localProvider = new LocalProvider(); + localProvider.setMonitorPublisher(new MonitorPublisher(new EventBus())); localProvider.initialize(jobExecutionContext); localProvider.execute(jobExecutionContext); localProvider.dispose(jobExecutionContext); http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java index da4ff88..942f6ae 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java @@ -80,7 +80,7 @@ public class HPCMonitorID extends MonitorID { jobID = jobExecutionContext.getJobDetails().getJobID(); taskID = jobExecutionContext.getTaskData().getTaskID(); experimentID = jobExecutionContext.getExperiment().getExperimentID(); - workflowNodeID = jobExecutionContext.getExperiment().getWorkflowNodeDetailsList().get(0).getNodeInstanceId();// at this point we only have one node todo: fix this + workflowNodeID = jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId();// at this point we only have one node todo: fix this } http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index a87b8a5..a9d2e73 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -23,6 +23,7 @@ package org.apache.airavata.gfac.monitor.handlers; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.gfac.core.context.JobExecutionContext; +import org.apache.airavata.gfac.core.cpi.GFacImpl; import org.apache.airavata.gfac.core.handler.GFacHandlerException; import org.apache.airavata.gfac.core.handler.ThreadedHandler; import org.apache.airavata.gfac.core.monitor.AbstractActivityListener; @@ -53,11 +54,7 @@ public class GridPullMonitorHandler extends ThreadedHandler { private AuthenticationInfo authenticationInfo; - private List<AbstractActivityListener> activityListeners; - - boolean registrySet = false; public void initProperties(Map<String, String> properties) throws GFacHandlerException { - activityListeners = new ArrayList<AbstractActivityListener>(); String myProxyUser = null; try { myProxyUser = ServerSettings.getSetting("myproxy.username"); @@ -66,24 +63,9 @@ public class GridPullMonitorHandler extends ThreadedHandler { String myProxyServer = ServerSettings.getSetting("myproxy.server"); setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer, 7512, 17280000, certPath)); - hpcPullMonitor = new HPCPullMonitor(); - String listeners = properties.get("listeners"); - String[] split = listeners.split(","); - for (String listenerClass : split) { - Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class); - AbstractActivityListener abstractActivityListener = aClass.newInstance(); - activityListeners.add(abstractActivityListener); - abstractActivityListener.setup(hpcPullMonitor.getQueue(), hpcPullMonitor.getPublisher()); - hpcPullMonitor.getPublisher().registerListener(abstractActivityListener); - } + hpcPullMonitor = new HPCPullMonitor(GFacImpl.getMonitorPublisher()); } catch (ApplicationSettingsException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } catch (ClassNotFoundException e) { - logger.error("Error loading the listener classes configured in gfac-config.xml"); - } catch (InstantiationException e) { - logger.error("Error loading the listener classes configured in gfac-config.xml"); - } catch (IllegalAccessException e) { - logger.error("Error loading the listener classes configured in gfac-config.xml"); } } @@ -92,13 +74,8 @@ public class GridPullMonitorHandler extends ThreadedHandler { } public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { - if(!registrySet){ - for(AbstractActivityListener listener:activityListeners){ - listener.setup(jobExecutionContext.getRegistry()); - } - } super.invoke(jobExecutionContext); - MonitorID monitorID = new HPCMonitorID(authenticationInfo, jobExecutionContext); + MonitorID monitorID = new HPCMonitorID(getAuthenticationInfo(), jobExecutionContext); try { CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID); } catch (AiravataMonitorException e) { http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java deleted file mode 100644 index 3c87d7d..0000000 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/LocalJobMonitor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.gfac.monitor.impl; - -import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest; -import org.apache.airavata.gfac.core.monitor.JobIdentity; -import org.apache.airavata.gfac.monitor.core.AiravataAbstractMonitor; -import org.apache.airavata.model.workspace.experiment.JobState; - -import java.util.concurrent.BlockingQueue; - -/** - * This monitor can be used to monitor a job which runs locally, - * Since its a local job job doesn't have states, once it get executed - * then the job starts running - */ -public class LocalJobMonitor extends AiravataAbstractMonitor { - // Though we have a qeuue here, it not going to be used in local jobs - BlockingQueue<MonitorID> jobQueue; - - public void run() { - do { - try { - MonitorID take = jobQueue.take(); - getPublisher().publish(new JobStatusChangeRequest(take, new JobIdentity(take.getExperimentID(), take.getWorkflowNodeID(), take.getTaskID(), take.getJobID()), JobState.COMPLETE)); - } catch (Exception e) { - e.printStackTrace(); - } - } while (!ServerSettings.isStopAllThreads()); - } - - public BlockingQueue<MonitorID> getJobQueue() { - return jobQueue; - } - - public void setJobQueue(BlockingQueue<MonitorID> jobQueue) { - this.jobQueue = jobQueue; - } -} http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 38453e8..38b631c 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -63,6 +63,13 @@ public class HPCPullMonitor extends PullMonitor { this.queue = new LinkedBlockingDeque<UserMonitorData>(); publisher = new MonitorPublisher(new EventBus()); } + + public HPCPullMonitor(MonitorPublisher monitorPublisher){ + connections = new HashMap<String, ResourceConnection>(); + this.queue = new LinkedBlockingDeque<UserMonitorData>(); + publisher = monitorPublisher; + } + public HPCPullMonitor(BlockingQueue<UserMonitorData> queue, MonitorPublisher publisher) { this.queue = queue; this.publisher = publisher; http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java index 2c34df9..9c3d08b 100644 --- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java +++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java @@ -62,7 +62,7 @@ public class AMQPMonitorTest { @Before public void setUp() throws Exception { System.setProperty("myproxy.username", "ogce"); - System.setProperty("myproxy.password", "OpenGwy14"); + System.setProperty("myproxy.password", ""); System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh"); System.setProperty("gsi.working.directory", "/home1/01437/ogce"); System.setProperty("trusted.cert.location", "/Users/lahirugunathilake/Downloads/certificates"); http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java index cbc564d..90b07a2 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/EmbeddedGFACJobSubmitter.java @@ -82,12 +82,4 @@ public class EmbeddedGFACJobSubmitter implements JobSubmitter { public void setOrchestratorContext(OrchestratorContext orchestratorContext) { this.orchestratorContext = orchestratorContext; } - - public void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException { - try { - gfac.invokeOutFlowHandlers(jobExecutionContext); - } catch (GFacException e) { - throw new OrchestratorException(e); - } - } } http://git-wip-us.apache.org/repos/asf/airavata/blob/14c1841c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java index b1787a3..1fd5269 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/job/JobSubmitter.java @@ -52,11 +52,4 @@ public interface JobSubmitter { * @return JobExecutionContext return the jobExecutionContext from GFac */ JobExecutionContext submit(String experimentID, String taskID) throws OrchestratorException; - - /** - * This can be use to handle any after Jobsubmission task - * @param jobExecutionContext - * @throws OrchestratorException - */ - void runAfterJobTask(JobExecutionContext jobExecutionContext) throws OrchestratorException; }
