Repository: airavata Updated Branches: refs/heads/master 553caa087 -> b203064df
fixing issues with new changes Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b203064d Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b203064d Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b203064d Branch: refs/heads/master Commit: b203064dfa23e9e5cba0e9716327f398a2595cbf Parents: 553caa0 Author: lahiru <[email protected]> Authored: Thu May 1 16:50:56 2014 -0400 Committer: lahiru <[email protected]> Committed: Thu May 1 16:50:56 2014 -0400 ---------------------------------------------------------------------- .../client/samples/CreateLaunchExperiment.java | 2 +- .../server/src/main/resources/gfac-config.xml | 6 ++- .../org/apache/airavata/gfac/cpi/GFacImpl.java | 2 +- modules/gfac/gfac-gsissh/pom.xml | 5 -- .../gfac/provider/impl/GSISSHProvider.java | 8 ++- modules/gfac/gfac-monitor/pom.xml | 7 ++- .../handlers/GridPullMonitorHandler.java | 15 ++++++ .../monitor/impl/pull/qstat/HPCPullMonitor.java | 5 +- .../impl/pull/qstat/ResourceConnection.java | 54 +++++--------------- .../airavata/gfac/monitor/util/CommonUtils.java | 35 +++++++++++++ 10 files changed, 80 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b203064d/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java index eac1350..89bab76 100644 --- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java +++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java @@ -61,7 +61,7 @@ public class CreateLaunchExperiment { AiravataUtils.setExecutionAsClient(); final Airavata.Client airavata = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT); System.out.println("API version is " + airavata.GetAPIVersion()); - addDescriptors(); +// addDescriptors(); // final String expId = createExperimentForSSHHost(airavata); // final String expId = createExperimentForSSHHost(airavata); final String expId = createExperimentForTrestles(airavata); http://git-wip-us.apache.org/repos/asf/airavata/blob/b203064d/modules/configuration/server/src/main/resources/gfac-config.xml ---------------------------------------------------------------------- diff --git a/modules/configuration/server/src/main/resources/gfac-config.xml b/modules/configuration/server/src/main/resources/gfac-config.xml index f44fbec..5b80367 100644 --- a/modules/configuration/server/src/main/resources/gfac-config.xml +++ b/modules/configuration/server/src/main/resources/gfac-config.xml @@ -12,7 +12,9 @@ <GFac> <DaemonHandlers> - <Handler class="org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler"/> + <Handler class="org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler"> + <property name="listeners" value="org.apache.airavata.gfac.monitor.AiravataJobStatusUpdator"/> + </Handler> </DaemonHandlers> <GlobalHandlers> <InHandlers> @@ -74,7 +76,7 @@ <Handler class="org.apache.airavata.gfac.handler.SSHOutputHandler"/> </OutHandlers> </Provider> - <Provider class="org.apache.airavata.gfac.provider.impl.GSISSHProvider" host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl"> + <Provider class="org.apache.airavata.gfac.provider.impl.GSISSHProvider" host="org.apache.airavata.schemas.gfac.impl.GsisshHostTypeImpl" executionMode="async"> <InHandlers> <Handler class="org.apache.airavata.gfac.handler.GSISSHDirectorySetupHandler"/> <!--Handler class="org.apache.airavata.gfac.handler.AdvancedSCPInputHandler"> http://git-wip-us.apache.org/repos/asf/airavata/blob/b203064d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java index 642d659..2241cb2 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java @@ -136,7 +136,7 @@ public class GFacImpl implements GFac { } } for(ThreadedHandler tHandler:daemonHandlers){ - tHandler.run(); + (new Thread(tHandler)).start(); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b203064d/modules/gfac/gfac-gsissh/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/pom.xml b/modules/gfac/gfac-gsissh/pom.xml index 402619b..efba132 100644 --- a/modules/gfac/gfac-gsissh/pom.xml +++ b/modules/gfac/gfac-gsissh/pom.xml @@ -36,11 +36,6 @@ <artifactId>airavata-gfac-core</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.airavata</groupId> - <artifactId>airavata-gfac-hpc-monitor</artifactId> - <version>${project.version}</version> - </dependency> <!-- Credential Store --> <dependency> <groupId>org.apache.airavata</groupId> http://git-wip-us.apache.org/repos/asf/airavata/blob/b203064d/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java index 3cfe599..da5b330 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java @@ -20,13 +20,11 @@ */ package org.apache.airavata.gfac.provider.impl; -import org.apache.airavata.gfac.Constants; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.context.JobExecutionContext; import org.apache.airavata.gfac.context.security.GSISecurityContext; import org.apache.airavata.gfac.cpi.GFacImpl; import org.apache.airavata.gfac.handler.ThreadedHandler; -import org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler; import org.apache.airavata.gfac.notification.events.StartExecutionEvent; import org.apache.airavata.gfac.provider.AbstractProvider; import org.apache.airavata.gfac.provider.GFacProviderException; @@ -99,10 +97,10 @@ public class GSISSHProvider extends AbstractProvider { // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler // to perform monitoring, daemon handlers can be accessed from anywhere List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers(); - GridPullMonitorHandler pullMonitorHandler = null; + ThreadedHandler pullMonitorHandler = null; for(ThreadedHandler threadedHandler:daemonHandlers){ - if(threadedHandler instanceof GridPullMonitorHandler){ - pullMonitorHandler = (GridPullMonitorHandler)threadedHandler; + if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){ + pullMonitorHandler = threadedHandler; } } // we know this hos is type GsiSSHHostType http://git-wip-us.apache.org/repos/asf/airavata/blob/b203064d/modules/gfac/gfac-monitor/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/pom.xml b/modules/gfac/gfac-monitor/pom.xml index 8991dcd..30382a2 100644 --- a/modules/gfac/gfac-monitor/pom.xml +++ b/modules/gfac/gfac-monitor/pom.xml @@ -36,13 +36,18 @@ <artifactId>airavata-client-api</artifactId> <version>${project.version}</version> </dependency> - <dependency> + <dependency> <groupId>org.apache.airavata</groupId> <artifactId>airavata-gfac-core</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.airavata</groupId> + <artifactId>airavata-gfac-gsissh</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.airavata</groupId> <artifactId>airavata-workflow-execution-context</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/airavata/blob/b203064d/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index b3420b8..452f536 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -25,6 +25,7 @@ import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.gfac.context.JobExecutionContext; import org.apache.airavata.gfac.handler.GFacHandlerException; import org.apache.airavata.gfac.handler.ThreadedHandler; +import org.apache.airavata.gfac.monitor.AbstractActivityListener; import org.apache.airavata.gfac.monitor.MonitorID; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor; @@ -59,8 +60,22 @@ public class GridPullMonitorHandler extends ThreadedHandler { setAuthenticationInfo(new MyProxyAuthenticationInfo(myProxyUser, myProxyPass, myProxyServer, 7512, 17280000, certPath)); hpcPullMonitor = new HPCPullMonitor(); + String listeners = properties.get("listeners"); + String[] split = listeners.split(","); + for(String listenerClass:split) { + Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class); + AbstractActivityListener abstractActivityListener = aClass.newInstance(); + abstractActivityListener.setup(hpcPullMonitor.getQueue(),hpcPullMonitor.getPublisher()); + hpcPullMonitor.getPublisher().registerListener(abstractActivityListener); + } } catch (ApplicationSettingsException e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } catch (ClassNotFoundException e) { + logger.error("Error loading the listener classes configured in gfac-config.xml"); + } catch (InstantiationException e) { + logger.error("Error loading the listener classes configured in gfac-config.xml"); + } catch (IllegalAccessException e) { + logger.error("Error loading the listener classes configured in gfac-config.xml"); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b203064d/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index edd6ce0..0a1e985 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -132,11 +132,11 @@ public class HPCPullMonitor extends PullMonitor { logger.debug("We already have this connection so not going to create one"); connection = connections.get(hostName); } else { - connection = new ResourceConnection(take.getUserName(), iHostMonitorData, gsisshHostType.getInstalledPath()); + connection = new ResourceConnection(iHostMonitorData); connections.put(hostName, connection); } List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs(); - Map<String, JobState> jobStatuses = connection.getJobStatuses(take.getUserName(), monitorID); + Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID); for (MonitorID iMonitorID : monitorID) { currentMonitorID = iMonitorID; iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID())); @@ -150,6 +150,7 @@ public class HPCPullMonitor extends PullMonitor { // After successful monitoring perform following actions to cleanup the queue, if necessary if (jobStatus.getState().equals(JobState.COMPLETE)) { completedJobs.add(iMonitorID); + CommonUtils.invokeOutFlowHandlers(iMonitorID.getJobExecutionContext()); } else if (iMonitorID.getFailedCount() > 2 && iMonitorID.getStatus().equals(JobState.UNKNOWN)) { logger.error("Tried to monitor the job with ID " + iMonitorID.getJobID() + " But failed 3 times, so skip this Job from Monitor"); iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); http://git-wip-us.apache.org/repos/asf/airavata/blob/b203064d/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java index 7a37b88..ec14c08 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/ResourceConnection.java @@ -20,6 +20,9 @@ */ package org.apache.airavata.gfac.monitor.impl.pull.qstat; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.SecurityContext; +import org.apache.airavata.gfac.context.security.GSISecurityContext; import org.apache.airavata.gfac.monitor.HostMonitorData; import org.apache.airavata.gfac.monitor.MonitorID; import org.apache.airavata.gsi.ssh.api.SSHApiException; @@ -44,49 +47,15 @@ public class ResourceConnection { private PBSCluster cluster; - public ResourceConnection(MonitorID monitorID, String installedPath) throws SSHApiException { - AuthenticationInfo authenticationInfo = monitorID.getAuthenticationInfo(); - String hostAddress = monitorID.getHost().getType().getHostAddress(); - String userName = monitorID.getUserName(); - String jobManager = ((GsisshHostType)monitorID.getHost().getType()).getJobManager(); - JobManagerConfiguration jConfig = null; - if (jobManager == null) { - log.error("No Job Manager is configured, so we are picking pbs as the default job manager"); - jConfig = CommonUtils.getPBSJobManager(installedPath); - } else { - if (org.apache.airavata.gfac.monitor.util.CommonUtils.isPBSHost(monitorID.getHost())) { - jConfig = CommonUtils.getPBSJobManager(installedPath); - } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSlurm(monitorID.getHost())) { - jConfig = CommonUtils.getSLURMJobManager(installedPath); - } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSGE(monitorID.getHost())) { - jConfig = CommonUtils.getSGEJobManager(installedPath); - } - //todo support br2 etc - } - ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)monitorID.getHost().getType()).getPort()); - cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig); - } - public ResourceConnection(String userName, HostMonitorData hostMonitorData, String installedPath) throws SSHApiException { - AuthenticationInfo authenticationInfo = hostMonitorData.getMonitorIDs().get(0).getAuthenticationInfo(); - String hostAddress = hostMonitorData.getHost().getType().getHostAddress(); - String jobManager = ((GsisshHostType)hostMonitorData.getHost().getType()).getJobManager(); - JobManagerConfiguration jConfig = null; - if (jobManager == null) { - log.error("No Job Manager is configured, so we are picking pbs as the default job manager"); - jConfig = CommonUtils.getPBSJobManager(installedPath); - } else { - if (org.apache.airavata.gfac.monitor.util.CommonUtils.isPBSHost(hostMonitorData.getHost())) { - jConfig = CommonUtils.getPBSJobManager(installedPath); - } else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSlurm(hostMonitorData.getHost())) { - jConfig = CommonUtils.getSLURMJobManager(installedPath); - }else if(org.apache.airavata.gfac.monitor.util.CommonUtils.isSGE(hostMonitorData.getHost())) { - jConfig = CommonUtils.getSGEJobManager(installedPath); - } - //todo support br2 etc + public ResourceConnection(HostMonitorData hostMonitorData) throws SSHApiException { + MonitorID monitorID = hostMonitorData.getMonitorIDs().get(0); + try { + GSISecurityContext securityContext = (GSISecurityContext)monitorID.getJobExecutionContext().getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT); + cluster = (PBSCluster)securityContext.getPbsCluster(); + } catch (GFacException e) { + log.error("Error reading data from job ExecutionContext"); } - ServerInfo serverInfo = new ServerInfo(userName, hostAddress, ((GsisshHostType)hostMonitorData.getHost().getType()).getPort()); - cluster = new PBSCluster(serverInfo, authenticationInfo, jConfig); } public JobState getJobStatus(MonitorID monitorID) throws SSHApiException { String jobID = monitorID.getJobID(); @@ -95,7 +64,7 @@ public class ResourceConnection { return getStatusFromString(cluster.getJobStatus(jobID).toString()); } - public Map<String,JobState> getJobStatuses(String userName,List<MonitorID> monitorIDs) throws SSHApiException { + public Map<String,JobState> getJobStatuses(List<MonitorID> monitorIDs) throws SSHApiException { Map<String,JobStatus> treeMap = new TreeMap<String,JobStatus>(); Map<String,JobState> treeMap1 = new TreeMap<String,JobState>(); // creating a sorted map with all the jobIds and with the predefined @@ -103,6 +72,7 @@ public class ResourceConnection { for (MonitorID monitorID : monitorIDs) { treeMap.put(monitorID.getJobID(), JobStatus.U); } + String userName = cluster.getServerInfo().getUserName(); //todo so currently we execute the qstat for each job but we can use user based monitoring //todo or we should concatenate all the commands and execute them in one go and parse the response cluster.getJobStatuses(userName,treeMap); http://git-wip-us.apache.org/repos/asf/airavata/blob/b203064d/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java index 30f1ae4..2071c5d 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -21,6 +21,11 @@ package org.apache.airavata.gfac.monitor.util; import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.gfac.GFacConfiguration; +import org.apache.airavata.gfac.GFacException; +import org.apache.airavata.gfac.context.JobExecutionContext; +import org.apache.airavata.gfac.handler.GFacHandler; +import org.apache.airavata.gfac.handler.GFacHandlerConfig; import org.apache.airavata.gfac.monitor.HostMonitorData; import org.apache.airavata.gfac.monitor.MonitorID; import org.apache.airavata.gfac.monitor.UserMonitorData; @@ -169,4 +174,34 @@ public class CommonUtils { } return false; } + + public static void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException { + GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration(); + List<GFacHandlerConfig> handlers = null; + + for (GFacHandlerConfig handlerClassName : handlers) { + Class<? extends GFacHandler> handlerClass; + GFacHandler handler; + try { + handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class); + handler = handlerClass.newInstance(); + handler.initProperties(handlerClassName.getProperties()); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage()); + throw new GFacException("Cannot load handler class " + handlerClassName, e); + } catch (InstantiationException e) { + logger.error(e.getMessage()); + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } catch (IllegalAccessException e) { + logger.error(e.getMessage()); + throw new GFacException("Cannot instantiate handler class " + handlerClassName, e); + } + try { + handler.invoke(jobExecutionContext); + } catch (Exception e) { + // TODO: Better error reporting. + throw new GFacException("Error Executing a OutFlow Handler", e); + } + } + } }
