Removed legacy descriptions from MonitorID, GSISSH provider and utils and AMQPMonitor classes
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/eb626fa7 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/eb626fa7 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/eb626fa7 Branch: refs/heads/gfac_appcatalog_int Commit: eb626fa754d75bcb6fc328507d9ff8c3bceb4bcf Parents: 5136157 Author: shamrath <[email protected]> Authored: Fri Oct 31 12:25:31 2014 -0400 Committer: Chathuri Wimalasena <[email protected]> Committed: Wed Nov 5 11:23:05 2014 -0500 ---------------------------------------------------------------------- .../data/impl/GwyResourceProfileImpl.java | 8 +- .../data/util/AppCatalogThriftConversion.java | 4 +- .../app/catalog/test/GatewayProfileTest.java | 8 +- .../gfac/core/context/JobExecutionContext.java | 4 + .../airavata/gfac/core/cpi/BetterGfacImpl.java | 33 +++--- .../airavata/gfac/core/monitor/MonitorID.java | 19 ++-- .../gsissh/provider/impl/GSISSHProvider.java | 64 ++++++----- .../gfac/gsissh/util/GFACGSISSHUtils.java | 108 ++++++++++--------- .../monitor/impl/push/amqp/AMQPMonitor.java | 57 +++++----- .../apache/airavata/job/AMQPMonitorTest.java | 64 +++++++---- 10 files changed, 213 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/eb626fa7/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java ---------------------------------------------------------------------- diff --git a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java index ed66bff..101b647 100644 --- a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java +++ b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/impl/GwyResourceProfileImpl.java @@ -66,8 +66,8 @@ public class GwyResourceProfileImpl implements GwyResourceProfile { resource.setComputeHostResource((ComputeResourceResource)computeHostResource.get(preference.getComputeResourceId())); resource.setGatewayId(profileResource.getGatewayID()); resource.setOverrideByAiravata(preference.isOverridebyAiravata()); - resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol()); - resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol()); + resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol().toString()); + resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol().toString()); resource.setBatchQueue(preference.getPreferredBatchQueue()); resource.setProjectNumber(preference.getAllocationProjectNumber()); resource.setScratchLocation(preference.getScratchLocation()); @@ -100,8 +100,8 @@ public class GwyResourceProfileImpl implements GwyResourceProfile { resource.setComputeHostResource((ComputeResourceResource)computeHostResource.get(preference.getComputeResourceId())); resource.setGatewayId(gatewayId); resource.setOverrideByAiravata(preference.isOverridebyAiravata()); - resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol()); - resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol()); + resource.setPreferredJobProtocol(preference.getPreferredJobSubmissionProtocol().toString()); + resource.setPreferedDMProtocol(preference.getPreferredDataMovementProtocol().toString()); resource.setBatchQueue(preference.getPreferredBatchQueue()); resource.setProjectNumber(preference.getAllocationProjectNumber()); resource.setScratchLocation(preference.getScratchLocation()); http://git-wip-us.apache.org/repos/asf/airavata/blob/eb626fa7/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java ---------------------------------------------------------------------- diff --git a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java index 14a0ab0..35549f4 100644 --- a/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java +++ b/modules/app-catalog/app-catalog-data/src/main/java/org/apache/aiaravata/application/catalog/data/util/AppCatalogThriftConversion.java @@ -670,8 +670,8 @@ public class AppCatalogThriftConversion { ComputeResourcePreference preference = new ComputeResourcePreference(); preference.setComputeResourceId(resource.getResourceId()); preference.setOverridebyAiravata(resource.getOverrideByAiravata()); - preference.setPreferredJobSubmissionProtocol(resource.getPreferredJobProtocol()); - preference.setPreferredDataMovementProtocol(resource.getPreferedDMProtocol()); + preference.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.valueOf(resource.getPreferredJobProtocol())); + preference.setPreferredDataMovementProtocol(DataMovementProtocol.valueOf(resource.getPreferedDMProtocol())); preference.setPreferredBatchQueue(resource.getBatchQueue()); preference.setScratchLocation(resource.getScratchLocation()); preference.setAllocationProjectNumber(resource.getProjectNumber()); http://git-wip-us.apache.org/repos/asf/airavata/blob/eb626fa7/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java ---------------------------------------------------------------------- diff --git a/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java b/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java index 66eb6bb..3593e11 100644 --- a/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java +++ b/modules/app-catalog/app-catalog-data/src/test/java/org/apache/airavata/app/catalog/test/GatewayProfileTest.java @@ -84,8 +84,8 @@ public class GatewayProfileTest { ComputeResourcePreference preference1 = new ComputeResourcePreference(); preference1.setComputeResourceId(hostId1); preference1.setOverridebyAiravata(true); - preference1.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.SSH.toString()); - preference1.setPreferredDataMovementProtocol(DataMovementProtocol.SCP.toString()); + preference1.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.SSH); + preference1.setPreferredDataMovementProtocol(DataMovementProtocol.SCP); preference1.setPreferredBatchQueue("queue1"); preference1.setScratchLocation("/tmp"); preference1.setAllocationProjectNumber("project1"); @@ -93,8 +93,8 @@ public class GatewayProfileTest { ComputeResourcePreference preference2 = new ComputeResourcePreference(); preference2.setComputeResourceId(hostId2); preference2.setOverridebyAiravata(true); - preference2.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.LOCAL.toString()); - preference2.setPreferredDataMovementProtocol(DataMovementProtocol.GridFTP.toString()); + preference2.setPreferredJobSubmissionProtocol(JobSubmissionProtocol.LOCAL); + preference2.setPreferredDataMovementProtocol(DataMovementProtocol.GridFTP); preference2.setPreferredBatchQueue("queue2"); preference2.setScratchLocation("/tmp"); preference2.setAllocationProjectNumber("project2"); http://git-wip-us.apache.org/repos/asf/airavata/blob/eb626fa7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java index 3616b42..cade06b 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java @@ -430,4 +430,8 @@ public class JobExecutionContext extends AbstractContext implements Serializable public void setPreferredJobSubmissionInterface(JobSubmissionInterface preferredJobSubmissionInterface) { this.preferredJobSubmissionInterface = preferredJobSubmissionInterface; } + + public String getHostName() { + return applicationContext.getComputeResourceDescription().getHostName(); + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/eb626fa7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 696b61b..e8e4c66 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -302,6 +302,20 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.setGfac(this); jobExecutionContext.setZk(zk); jobExecutionContext.setCredentialStoreToken(AiravataZKUtils.getExpTokenId(zk, experimentID, taskID)); + + List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces(); + if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){ + Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() { + @Override + public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { + return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); + } + }); + + jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces); + }else { + throw new GFacException("Compute resource should have at least one job submission interface defined..."); + } if (gatewayResourcePreferences != null ) { if (gatewayResourcePreferences.getScratchLocation() == null) { gatewayResourcePreferences.setScratchLocation("/tmp"); @@ -326,22 +340,11 @@ public class BetterGfacImpl implements GFac,Watcher { jobExecutionContext.setStandardError(workingDir + File.separator + applicationInterface.getApplicationName().replaceAll("\\s+", "") + ".stderr"); jobExecutionContext.setPreferredJobSubmissionProtocol(gatewayResourcePreferences.getPreferredJobSubmissionProtocol()); + if (gatewayResourcePreferences.getPreferredJobSubmissionProtocol() == null) { + jobExecutionContext.setPreferredJobSubmissionInterface(jobExecutionContext.getHostPrioritizedJobSubmissionInterfaces().get(0)); + jobExecutionContext.setPreferredJobSubmissionProtocol(jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionProtocol()); + } } - - List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces(); - if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){ - Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() { - @Override - public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) { - return jobSubmissionInterface.getPriorityOrder() - jobSubmissionInterface2.getPriorityOrder(); - } - }); - - jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces); - }else { - throw new GFacException("Compute resource should have at least one job submission interface defined..."); - } - return jobExecutionContext; } http://git-wip-us.apache.org/repos/asf/airavata/blob/eb626fa7/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java index 6ea1839..55da288 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java @@ -22,7 +22,6 @@ package org.apache.airavata.gfac.core.monitor; import org.apache.airavata.common.logger.AiravataLogger; import org.apache.airavata.common.logger.AiravataLoggerFactory; -import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.workspace.experiment.JobState; @@ -44,7 +43,7 @@ public class MonitorID { private Timestamp lastMonitored; - private HostDescription host; + private ComputeResourceDescription computeResourceDescription; private Map<String, Object> parameters; @@ -67,7 +66,7 @@ public class MonitorID { public MonitorID() { } public MonitorID(MonitorID monitorID){ - this.host = monitorID.getHost(); + this.computeResourceDescription = monitorID.getComputeResourceDescription(); this.jobStartedTime = new Timestamp((new Date()).getTime()); this.userName = monitorID.getUserName(); this.jobID = monitorID.getJobID(); @@ -76,8 +75,8 @@ public class MonitorID { this.workflowNodeID = monitorID.getWorkflowNodeID(); this.jobName = monitorID.getJobName(); } - public MonitorID(HostDescription host, String jobID, String taskID, String workflowNodeID, String experimentID, String userName,String jobName) { - this.host = host; + public MonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID, String experimentID, String userName,String jobName) { + this.computeResourceDescription = computeResourceDescription; this.jobStartedTime = new Timestamp((new Date()).getTime()); this.userName = userName; this.jobID = jobID; @@ -89,7 +88,7 @@ public class MonitorID { public MonitorID(JobExecutionContext jobExecutionContext) { this.jobExecutionContext = jobExecutionContext; - host = jobExecutionContext.getApplicationContext().getHostDescription(); + this.computeResourceDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription(); userName = jobExecutionContext.getExperiment().getUserName(); taskID = jobExecutionContext.getTaskData().getTaskID(); experimentID = jobExecutionContext.getExperiment().getExperimentID(); @@ -102,12 +101,12 @@ public class MonitorID { } } - public HostDescription getHost() { - return host; + public ComputeResourceDescription getComputeResourceDescription() { + return computeResourceDescription; } - public void setHost(HostDescription host) { - this.host = host; + public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) { + this.computeResourceDescription = computeResourceDescription; } public Timestamp getLastMonitored() { http://git-wip-us.apache.org/repos/asf/airavata/blob/eb626fa7/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java index b5a325a..92a50e4 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java @@ -20,6 +20,9 @@ */ package org.apache.airavata.gfac.gsissh.provider.impl; +import org.airavata.appcatalog.cpi.AppCatalog; +import org.airavata.appcatalog.cpi.AppCatalogException; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.gfac.ExecutionMode; import org.apache.airavata.gfac.GFacException; @@ -36,11 +39,16 @@ import org.apache.airavata.gfac.gsissh.util.GFACGSISSHUtils; import org.apache.airavata.gsi.ssh.api.Cluster; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.MonitorMode; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; import org.apache.airavata.model.workspace.experiment.CorrectiveAction; import org.apache.airavata.model.workspace.experiment.ErrorCategory; import org.apache.airavata.model.workspace.experiment.JobDetails; import org.apache.airavata.model.workspace.experiment.JobState; -import org.apache.airavata.schemas.gfac.GsisshHostType; +//import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.airavata.schemas.gfac.HostDescriptionType; import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType; import org.apache.zookeeper.KeeperException; @@ -48,6 +56,7 @@ import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.management.monitor.Monitor; import java.util.List; import java.util.Map; @@ -76,14 +85,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider { log.info("Invoking GSISSH Provider Invoke ..."); StringBuffer data = new StringBuffer(); jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); - HostDescriptionType host = jobExecutionContext.getApplicationContext(). - getHostDescription().getType(); - HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext(). - getApplicationDeploymentDescription().getType(); + ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext() + .getComputeResourceDescription(); + ApplicationDeploymentDescription appDeployDesc = jobExecutionContext.getApplicationContext() + .getApplicationDeploymentDescription(); JobDetails jobDetails = new JobDetails(); Cluster cluster = null; - + try { + AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission( + jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId()); + if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) { cluster = ((GSISecurityContext) jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT)).getPbsCluster(); } @@ -93,7 +106,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider { log.info("Successfully retrieved the Security Context"); } // This installed path is a mandetory field, because this could change based on the computing resource - JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster); + JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, cluster); jobDetails.setJobName(jobDescriptor.getJobName()); log.info(jobDescriptor.toXML()); @@ -113,10 +126,10 @@ public class GSISSHProvider extends AbstractRecoverableProvider { // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler // to perform monitoring, daemon handlers can be accessed from anywhere - delegateToMonitorHandlers(jobExecutionContext, (GsisshHostType) host, jobDetails.getJobID()); + delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission , jobDetails.getJobID()); // we know this host is type GsiSSHHostType } catch (Exception e) { - String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + String error = "Error submitting the job to host " + computeResourceDescription.getHostName() + " message: " + e.getMessage(); log.error(error); jobDetails.setJobID("none"); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); @@ -130,18 +143,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider { } - public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException { + public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException { List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers(); if (daemonHandlers == null) { daemonHandlers = BetterGfacImpl.getDaemonHandlers(); } ThreadedHandler pullMonitorHandler = null; ThreadedHandler pushMonitorHandler = null; - String monitorMode = host.getMonitorMode(); + MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); for (ThreadedHandler threadedHandler : daemonHandlers) { if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { pullMonitorHandler = threadedHandler; - if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) { + if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) { log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID); pullMonitorHandler.invoke(jobExecutionContext); } else { @@ -150,7 +163,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider { } } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { pushMonitorHandler = threadedHandler; - if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) { + if (monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) { log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID); pushMonitorHandler.invoke(jobExecutionContext); } else { @@ -166,18 +179,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider { } } - public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException { + public void removeFromMonitorHandlers(JobExecutionContext jobExecutionContext, SSHJobSubmission sshJobSubmission, String jobID) throws GFacHandlerException { List<ThreadedHandler> daemonHandlers = BetterGfacImpl.getDaemonHandlers(); if (daemonHandlers == null) { daemonHandlers = BetterGfacImpl.getDaemonHandlers(); } ThreadedHandler pullMonitorHandler = null; ThreadedHandler pushMonitorHandler = null; - String monitorMode = host.getMonitorMode(); + MonitorMode monitorMode = sshJobSubmission.getMonitorMode(); for (ThreadedHandler threadedHandler : daemonHandlers) { if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) { pullMonitorHandler = threadedHandler; - if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) { + if (monitorMode == null || monitorMode == MonitorMode.POLL_JOB_MANAGER) { jobExecutionContext.setProperty("cancel","true"); pullMonitorHandler.invoke(jobExecutionContext); } else { @@ -186,7 +199,7 @@ public class GSISSHProvider extends AbstractRecoverableProvider { } } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) { pushMonitorHandler = threadedHandler; - if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) { + if ( monitorMode == null || monitorMode == MonitorMode.XSEDE_AMQP_SUBSCRIBE) { pushMonitorHandler.invoke(jobExecutionContext); } else { log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" + @@ -208,8 +221,6 @@ public class GSISSHProvider extends AbstractRecoverableProvider { public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { //To change body of implemented methods use File | Settings | File Templates. log.info("canceling the job status in GSISSHProvider!!!!!"); - HostDescriptionType host = jobExecutionContext.getApplicationContext(). - getHostDescription().getType(); JobDetails jobDetails = jobExecutionContext.getJobDetails(); try { Cluster cluster = null; @@ -236,14 +247,14 @@ public class GSISSHProvider extends AbstractRecoverableProvider { GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.CANCELED); // we know this host is type GsiSSHHostType } catch (SSHApiException e) { - String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); log.error(error); jobDetails.setJobID("none"); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR); throw new GFacProviderException(error, e); } catch (Exception e) { - String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage(); + String error = "Error submitting the job to host " + jobExecutionContext.getHostName() + " message: " + e.getMessage(); log.error(error); jobDetails.setJobID("none"); GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED); @@ -255,8 +266,8 @@ public class GSISSHProvider extends AbstractRecoverableProvider { public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { // have to implement the logic to recover a gfac failure log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID()); - HostDescriptionType host = jobExecutionContext.getApplicationContext(). - getHostDescription().getType(); + ComputeResourceDescription computeResourceDescription = jobExecutionContext.getApplicationContext() + .getComputeResourceDescription(); String jobId = ""; String jobDesc = ""; try { @@ -306,8 +317,11 @@ public class GSISSHProvider extends AbstractRecoverableProvider { throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage()); } } - delegateToMonitorHandlers(jobExecutionContext, (GsisshHostType) host, jobId); - } catch (GFacHandlerException e) { + AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission( + jobExecutionContext.getPreferredJobSubmissionInterface().getJobSubmissionInterfaceId()); + delegateToMonitorHandlers(jobExecutionContext, sshJobSubmission, jobId); + } catch (Exception e) { throw new GFacProviderException("Error delegating already ran job to Monitoring", e); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/eb626fa7/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java index 2f9dbc3..baca65c 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java @@ -20,21 +20,19 @@ */ package org.apache.airavata.gfac.gsissh.util; -import java.sql.SQLException; -import java.util.*; - +import org.airavata.appcatalog.cpi.AppCatalog; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.ServerSettings; import org.apache.airavata.common.utils.StringUtil; import org.apache.airavata.commons.gfac.type.ActualParameter; -import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.commons.gfac.type.MappingFactory; -import org.apache.airavata.credential.store.credential.Credential; import org.apache.airavata.credential.store.credential.impl.certificate.CertificateCredential; import org.apache.airavata.credential.store.store.CredentialReader; import org.apache.airavata.gfac.Constants; import org.apache.airavata.gfac.GFacException; import org.apache.airavata.gfac.RequestData; +import org.apache.airavata.gfac.core.context.ApplicationContext; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.context.MessageContext; import org.apache.airavata.gfac.core.utils.GFacUtils; @@ -47,22 +45,26 @@ import org.apache.airavata.gsi.ssh.api.job.JobManagerConfiguration; import org.apache.airavata.gsi.ssh.impl.GSISSHAbstractCluster; import org.apache.airavata.gsi.ssh.impl.PBSCluster; import org.apache.airavata.gsi.ssh.util.CommonUtils; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; +import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol; import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling; import org.apache.airavata.model.workspace.experiment.TaskDetails; -import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; import org.apache.airavata.schemas.gfac.FileArrayType; -import org.apache.airavata.schemas.gfac.GlobusHostType; -import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType; -import org.apache.airavata.schemas.gfac.SSHHostType; import org.apache.airavata.schemas.gfac.StringArrayType; import org.apache.airavata.schemas.gfac.URIArrayType; -import org.apache.airavata.schemas.gfac.UnicoreHostType; -import org.apache.openjpa.lib.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.validation.constraints.Max; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; public class GFACGSISSHUtils { @@ -74,32 +76,35 @@ public class GFACGSISSHUtils { public static int maxClusterCount = 5; public static Map<String, List<Cluster>> clusters = new HashMap<String, List<Cluster>>(); public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException { - HostDescription registeredHost = jobExecutionContext.getApplicationContext().getHostDescription(); - if (registeredHost.getType() instanceof GlobusHostType || registeredHost.getType() instanceof UnicoreHostType - || registeredHost.getType() instanceof SSHHostType) { - logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml"); - } else if (registeredHost.getType() instanceof GsisshHostType) { - String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework - RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway()); - requestData.setTokenId(credentialStoreToken); - PBSCluster pbsCluster = null; - GSISecurityContext context = null; - try { + JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface(); + JobSubmissionProtocol jobProtocol = jobSubmissionInterface.getJobSubmissionProtocol(); + try { + AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId()); + if (jobProtocol == JobSubmissionProtocol.GLOBUS || jobProtocol == JobSubmissionProtocol.UNICORE + || jobProtocol == JobSubmissionProtocol.CLOUD || jobProtocol == JobSubmissionProtocol.LOCAL) { + logger.error("This is a wrong method to invoke to non ssh host types,please check your gfac-config.xml"); + } else if (jobProtocol == JobSubmissionProtocol.SSH && sshJobSubmission.getSecurityProtocol() == SecurityProtocol.GSI) { + String credentialStoreToken = jobExecutionContext.getCredentialStoreToken(); // this is set by the framework + RequestData requestData = new RequestData(ServerSettings.getDefaultUserGateway()); + requestData.setTokenId(credentialStoreToken); + PBSCluster pbsCluster = null; + GSISecurityContext context = null; + TokenizedMyProxyAuthInfo tokenizedMyProxyAuthInfo = new TokenizedMyProxyAuthInfo(requestData); CredentialReader credentialReader = GFacUtils.getCredentialReader(); - if(credentialReader != null){ - CertificateCredential credential = null; - try { - credential = (CertificateCredential)credentialReader.getCredential(ServerSettings.getDefaultUserGateway(), credentialStoreToken); - requestData.setMyProxyUserName(credential.getCommunityUser().getUserName()); - } catch (Exception e) { - logger.error(e.getLocalizedMessage()); - } + if (credentialReader != null) { + CertificateCredential credential = null; + try { + credential = (CertificateCredential) credentialReader.getCredential(ServerSettings.getDefaultUserGateway(), credentialStoreToken); + requestData.setMyProxyUserName(credential.getCommunityUser().getUserName()); + } catch (Exception e) { + logger.error(e.getLocalizedMessage()); + } } - GsisshHostType gsisshHostType = (GsisshHostType) registeredHost.getType(); - String key = requestData.getMyProxyUserName() + registeredHost.getType().getHostAddress() + - gsisshHostType.getPort(); + String key = requestData.getMyProxyUserName() + jobExecutionContext.getHostName()+ + sshJobSubmission.getSshPort(); boolean recreate = false; synchronized (clusters) { if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) { @@ -112,7 +117,7 @@ public class GFACGSISSHUtils { clusters.get(key).remove(i); recreate = true; } - if(!recreate) { + if (!recreate) { try { pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate } catch (Exception e) { @@ -129,13 +134,12 @@ public class GFACGSISSHUtils { } if (recreate) { - ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), registeredHost.getType().getHostAddress(), - gsisshHostType.getPort()); + ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), jobExecutionContext.getHostName(), + sshJobSubmission.getSshPort()); JobManagerConfiguration jConfig = null; - String installedParentPath = ((HpcApplicationDeploymentType) - jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath(); - String jobManager = ((GsisshHostType) registeredHost.getType()).getJobManager(); + String installedParentPath = sshJobSubmission.getResourceJobManager().getJobManagerBinPath(); + String jobManager = sshJobSubmission.getResourceJobManager().getResourceJobManagerType().toString(); if (jobManager == null) { logger.error("No Job Manager is configured, so we are picking pbs as the default job manager"); jConfig = CommonUtils.getPBSJobManager(installedParentPath); @@ -160,28 +164,30 @@ public class GFACGSISSHUtils { clusters.put(key, pbsClusters); } } - } catch (Exception e) { - throw new GFacException("An error occurred while creating GSI security context", e); + + jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, context); } - jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(), context); + } catch (Exception e) { + throw new GFacException("An error occurred while creating GSI security context", e); } } - public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, - ApplicationDeploymentDescriptionType app, Cluster cluster) { + public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, Cluster cluster) { JobDescriptor jobDescriptor = new JobDescriptor(); + ApplicationContext applicationContext = jobExecutionContext.getApplicationContext(); + ApplicationDeploymentDescription app = applicationContext.getApplicationDeploymentDescription(); // this is common for any application descriptor jobDescriptor.setCallBackIp(ServerSettings.getIp()); jobDescriptor.setCallBackPort(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.GFAC_SERVER_PORT, "8950")); - jobDescriptor.setInputDirectory(app.getInputDataDirectory()); - jobDescriptor.setOutputDirectory(app.getOutputDataDirectory()); - jobDescriptor.setExecutablePath(app.getExecutableLocation()); - jobDescriptor.setStandardOutFile(app.getStandardOutput()); - jobDescriptor.setStandardErrorFile(app.getStandardError()); + jobDescriptor.setInputDirectory(jobExecutionContext.getInputDir()); + jobDescriptor.setOutputDirectory(jobExecutionContext.getOutputDir()); + jobDescriptor.setExecutablePath(app.getExecutablePath()); + jobDescriptor.setStandardOutFile(jobExecutionContext.getStandardOutput()); + jobDescriptor.setStandardErrorFile(jobExecutionContext.getStandardError()); Random random = new Random(); int i = random.nextInt(Integer.MAX_VALUE); // We always set the job name jobDescriptor.setJobName("A" + String.valueOf(i+99999999)); - jobDescriptor.setWorkingDirectory(app.getStaticWorkingDirectory()); + jobDescriptor.setWorkingDirectory(jobExecutionContext.getWorkingDir()); List<String> inputValues = new ArrayList<String>(); MessageContext input = jobExecutionContext.getInMessageContext(); http://git-wip-us.apache.org/repos/asf/airavata/blob/eb626fa7/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java index baab7b4..28d13f2 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java @@ -30,12 +30,12 @@ import java.util.concurrent.BlockingQueue; import org.apache.airavata.common.utils.MonitorPublisher; import org.apache.airavata.common.utils.ServerSettings; -import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.monitor.core.PushMonitor; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil; import org.apache.airavata.gfac.monitor.util.CommonUtils; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; import org.apache.airavata.model.workspace.experiment.JobState; @@ -107,30 +107,37 @@ public class AMQPMonitor extends PushMonitor { @Override public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException { // we subscribe to read user-host based subscription - HostDescription host = monitorID.getHost(); - String hostAddress = host.getType().getHostAddress(); - // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it - // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue - String channelID = CommonUtils.getChannelID(monitorID); - if(availableChannels.get(channelID) == null){ - try { - //todo need to fix this rather getting it from a file - Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath); - Channel channel = null; - channel = connection.createChannel(); - availableChannels.put(channelID, channel); - String queueName = channel.queueDeclare().getQueue(); - - BasicConsumer consumer = new - BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher - channel.basicConsume(queueName, true, consumer); - String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress); - // here we queuebind to a particular user in a particular machine - channel.queueBind(queueName, "glue2.computing_activity", filterString); - logger.info("Using filtering string to monitor: " + filterString); - } catch (IOException e) { - logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName()); - } + ComputeResourceDescription computeResourceDescription = monitorID.getComputeResourceDescription(); + if (computeResourceDescription.isSetIpAddresses() && computeResourceDescription.getIpAddresses().size() > 0) { + // we get first ip address for the moment + String hostAddress = computeResourceDescription.getIpAddresses().get(0); + // in amqp case there are no multiple jobs per each host, because once a job is put in to the queue it + // will be picked by the Monitor, so jobs will not stay in this queueu but jobs will stay in finishQueue + String channelID = CommonUtils.getChannelID(monitorID); + if (availableChannels.get(channelID) == null) { + try { + //todo need to fix this rather getting it from a file + Connection connection = AMQPConnectionUtil.connect(amqpHosts, connectionName, proxyPath); + Channel channel = null; + channel = connection.createChannel(); + availableChannels.put(channelID, channel); + String queueName = channel.queueDeclare().getQueue(); + + BasicConsumer consumer = new + BasicConsumer(new JSONMessageParser(), localPublisher); // here we use local publisher + channel.basicConsume(queueName, true, consumer); + String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress); + // here we queuebind to a particular user in a particular machine + channel.queueBind(queueName, "glue2.computing_activity", filterString); + logger.info("Using filtering string to monitor: " + filterString); + } catch (IOException e) { + logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName()); + } + } + } else { + throw new AiravataMonitorException("Couldn't register monitor for jobId :" + monitorID.getJobID() + + " , ComputeResourceDescription " + computeResourceDescription.getHostName() + " doesn't has an " + + "IpAddress with it"); } return true; } http://git-wip-us.apache.org/repos/asf/airavata/blob/eb626fa7/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java index 94528b9..a979890 100644 --- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java +++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java @@ -20,15 +20,11 @@ */ package org.apache.airavata.job; -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import org.airavata.appcatalog.cpi.AppCatalog; +import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory; import org.apache.airavata.common.utils.MonitorPublisher; -import org.apache.airavata.commons.gfac.type.HostDescription; import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor; import org.apache.airavata.gsi.ssh.api.Cluster; @@ -38,14 +34,29 @@ import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo; import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; import org.apache.airavata.gsi.ssh.impl.PBSCluster; import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface; +import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol; +import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager; +import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType; +import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission; +import org.apache.airavata.model.appcatalog.computeresource.SecurityProtocol; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.schemas.gfac.GsisshHostType; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; public class AMQPMonitorTest { @@ -54,12 +65,13 @@ public class AMQPMonitorTest { private String certificateLocation; private String pbsFilePath; private String workingDirectory; - private HostDescription hostDescription; private MonitorPublisher monitorPublisher; private BlockingQueue<MonitorID> finishQueue; private BlockingQueue<MonitorID> pushQueue; private Thread pushThread; private String proxyFilePath; + private ComputeResourceDescription computeResourceDescription; + @Before public void setUp() throws Exception { System.setProperty("myproxy.username", "ogce"); @@ -98,14 +110,26 @@ public class AMQPMonitorTest { } catch (Exception e) { e.printStackTrace(); } + computeResourceDescription = new ComputeResourceDescription("TestComputerResoruceId", "TestHostName"); + computeResourceDescription.setHostName("stampede-host"); + computeResourceDescription.addToIpAddresses("login1.stampede.tacc.utexas.edu"); + ResourceJobManager resourceJobManager = new ResourceJobManager("1234", ResourceJobManagerType.SLURM); + Map<JobManagerCommand, String> commandMap = new HashMap<JobManagerCommand, String>(); + commandMap.put(JobManagerCommand.SUBMISSION, "test"); + resourceJobManager.setJobManagerCommands(commandMap); + resourceJobManager.setJobManagerBinPath("/usr/bin/"); + resourceJobManager.setPushMonitoringEndpoint("push"); // TODO - add monitor mode + SSHJobSubmission sshJobSubmission = new SSHJobSubmission("TestSSHJobSubmissionInterfaceId", SecurityProtocol.GSI, + resourceJobManager); + + AppCatalog appCatalog = AppCatalogFactory.getAppCatalog(); + String jobSubmissionID = appCatalog.getComputeResource().addSSHJobSubmission(sshJobSubmission); + + JobSubmissionInterface jobSubmissionInterface = new JobSubmissionInterface(jobSubmissionID, JobSubmissionProtocol.SSH, 1); + + computeResourceDescription.addToJobSubmissionInterfaces(jobSubmissionInterface); + computeResourceDescription.addToDataMovementInterfaces(new DataMovementInterface("4532", DataMovementProtocol.SCP, 1)); - hostDescription = new HostDescription(GsisshHostType.type); - hostDescription.getType().setHostAddress("login1.stampede.tacc.utexas.edu"); - hostDescription.getType().setHostName("stampede-host"); - ((GsisshHostType) hostDescription.getType()).setJobManager("slurm"); - ((GsisshHostType) hostDescription.getType()).setInstalledPath("/usr/bin/"); - ((GsisshHostType) hostDescription.getType()).setPort(2222); - ((GsisshHostType) hostDescription.getType()).setMonitorMode("push"); } @Test @@ -151,7 +175,7 @@ public class AMQPMonitorTest { String jobID = pbsCluster.submitBatchJob(jobDescriptor); System.out.println(jobID); try { - pushQueue.add(new MonitorID(hostDescription, jobID,null,null,null, "ogce", jobName)); + pushQueue.add(new MonitorID(computeResourceDescription, jobID,null,null,null, "ogce", jobName)); } catch (Exception e) { e.printStackTrace(); }
