Repository: airavata Updated Branches: refs/heads/gfac_appcatalog_int e9ee22b97 -> 755273e1a
fixing input/outhandler - AIRAVATA-1488 Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b3769516 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b3769516 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b3769516 Branch: refs/heads/gfac_appcatalog_int Commit: b3769516eec7d7127e17d2e24e4001718763c1ec Parents: 3b330c0 Author: lahiru <[email protected]> Authored: Thu Oct 30 11:27:12 2014 -0400 Committer: lahiru <[email protected]> Committed: Thu Oct 30 11:27:12 2014 -0400 ---------------------------------------------------------------------- .../impl/password/PasswordCredential.java | 3 +- .../gfac/core/context/JobExecutionContext.java | 2 +- .../airavata/gfac/core/utils/GFacUtils.java | 3 +- .../gfac/gsissh/util/GFACGSISSHUtils.java | 2 +- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 114 ++++++++++--------- .../ssh/handler/AdvancedSCPInputHandler.java | 9 +- .../ssh/handler/AdvancedSCPOutputHandler.java | 12 +- .../airavata/gfac/ssh/util/GFACSSHUtils.java | 86 +++++++++++--- 8 files changed, 146 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java ---------------------------------------------------------------------- diff --git a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java index ee32ef4..a31c98b 100644 --- a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java +++ b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java @@ -22,13 +22,14 @@ package org.apache.airavata.credential.store.credential.impl.password; import org.apache.airavata.credential.store.credential.Credential; +import org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; import java.util.Date; /** * User name password credentials. */ -public class PasswordCredential extends Credential { +public class PasswordCredential extends SSHCredential { private String userName; private String password; http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java index 9abab8d..2f94ec5 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java @@ -234,7 +234,7 @@ public class JobExecutionContext extends AbstractContext implements Serializable public SecurityContext getSecurityContext(String name) throws GFacException{ - SecurityContext secContext = securityContext.get(name); + SecurityContext secContext = securityContext.get(name+"-"+this.getApplicationContext().getHostDescription().getType().getHostAddress()); return secContext; } http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java index 729c1ee..eef44a4 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java @@ -1092,7 +1092,8 @@ public class GFacUtils { }else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){ // this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment // node to gfac node specific location, because original request execution will fail with errors - return true; + log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !"); + return false; } else { log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " is already running by this Gfac instance"); http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java index 4d338e3..2f9dbc3 100644 --- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java +++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java @@ -163,7 +163,7 @@ public class GFACGSISSHUtils { } catch (Exception e) { throw new GFacException("An error occurred while creating GSI security context", e); } - jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, context); + jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(), context); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index d3c3df8..952b30e 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -157,12 +157,10 @@ public class HPCPullMonitor extends PullMonitor { HostDescription currentHostDescription = null; try { take = this.queue.take(); - Map<String,MonitorID> completedJobs = new HashMap<String,MonitorID>(); List<HostMonitorData> hostMonitorData = take.getHostMonitorData(); for (HostMonitorData iHostMonitorData : hostMonitorData) { if (iHostMonitorData.getHost().getType() instanceof GsisshHostType || iHostMonitorData.getHost().getType() instanceof SSHHostType) { - currentHostDescription = iHostMonitorData.getHost(); String hostName = iHostMonitorData.getHost().getType().getHostAddress(); ResourceConnection connection = null; if (connections.containsKey(hostName)) { @@ -181,17 +179,22 @@ public class HPCPullMonitor extends PullMonitor { // before we get the statuses, we check the cancel job list and remove them permanently List<MonitorID> monitorID = iHostMonitorData.getMonitorIDs(); Iterator<String> iterator1 = cancelJobList.iterator(); - - for(MonitorID iMonitorID:monitorID){ + ListIterator<MonitorID> monitorIDListIterator = monitorID.listIterator(); + while (monitorIDListIterator.hasNext()){ + MonitorID iMonitorID = monitorIDListIterator.next(); while(iterator1.hasNext()) { String cancelMId = iterator1.next(); if (cancelMId.equals(iMonitorID.getExperimentID() + "+" + iMonitorID.getTaskID())) { iMonitorID.setStatus(JobState.CANCELED); - completedJobs.put(iMonitorID.getJobName(), iMonitorID); iterator1.remove(); logger.debugId(cancelMId, "Found a match in cancel monitor queue, hence moved to the " + "completed job queue, experiment {}, task {} , job {}", iMonitorID.getExperimentID(), iMonitorID.getTaskID(), iMonitorID.getJobID()); + logger.info("Job cancelled: marking the Job as ************CANCELLED************ experiment {}, task {}, job name {} .", + iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + sendNotification(iMonitorID); + monitorIDListIterator.remove(); + GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); break; } } @@ -199,26 +202,36 @@ public class HPCPullMonitor extends PullMonitor { } synchronized (completedJobsFromPush) { ListIterator<String> iterator = completedJobsFromPush.listIterator(); - for (MonitorID iMonitorID : monitorID) { + monitorIDListIterator = monitorID.listIterator(); + while (monitorIDListIterator.hasNext()) { + MonitorID iMonitorID = monitorIDListIterator.next(); String completeId = null; while (iterator.hasNext()) { completeId = iterator.next(); if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { logger.info("This job is finished because push notification came with <username,jobName> " + completeId); - completedJobs.put(iMonitorID.getJobName(), iMonitorID); iMonitorID.setStatus(JobState.COMPLETE); iterator.remove();//we have to make this empty everytime we iterate, otherwise this list will accumulate and will lead to a memory leak logger.debugId(completeId, "Push notification updated job {} status to {}. " + "experiment {} , task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); + logger.info("AMQP message recieved: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", + iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + + monitorIDListIterator.remove(); + sendNotification(iMonitorID); + GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); break; } } iterator = completedJobsFromPush.listIterator(); } } + + // we have to get this again because we removed the already completed jobs with amqp messages + monitorID = iHostMonitorData.getMonitorIDs(); Map<String, JobState> jobStatuses = connection.getJobStatuses(monitorID); - Iterator<MonitorID> iterator = monitorID.iterator(); + Iterator<MonitorID> iterator = monitorID.listIterator(); while (iterator.hasNext()) { MonitorID iMonitorID = iterator.next(); currentMonitorID = iMonitorID; @@ -226,13 +239,25 @@ public class HPCPullMonitor extends PullMonitor { !JobState.COMPLETE.equals(iMonitorID.getStatus())) { iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){ - completedJobs.put(iMonitorID.getJobName(), iMonitorID); logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " + "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); + iterator.remove(); + logger.info("PULL Notification is complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", + iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); } - jobStatus = new JobStatusChangeRequestEvent(); iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName())); //IMPORTANT this is not a simple setter we have a logic - + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + sendNotification(iMonitorID); + logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " + + "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(), + jobStatus.getJobIdentity().getTaskId()); + // if the job is completed we do not have to put the job to the queue again + iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); + } + iterator = monitorID.listIterator(); + while(iterator.hasNext()){ + MonitorID iMonitorID = iterator.next(); if (iMonitorID.getFailedCount() > FAILED_COUNT) { iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); String outputDir = iMonitorID.getJobExecutionContext().getApplicationContext() @@ -245,15 +270,19 @@ public class HPCPullMonitor extends PullMonitor { // this is because while we run output handler something failed and during exception // we store all the jobs in the monitor queue again logger.error("We know this job is already attempted to run out-handlers"); - CommonUtils.removeMonitorFromQueue(queue, iMonitorID); +// CommonUtils.removeMonitorFromQueue(queue, iMonitorID); } } if (stdOut != null && stdOut.size() > 0 && !stdOut.get(0).isEmpty()) { // have to be careful with this iMonitorID.setStatus(JobState.COMPLETE); - completedJobs.put(iMonitorID.getJobName(), iMonitorID); - logger.errorId(iMonitorID.getJobID(), "Job monitoring failed {} times, removed job {} from " + - "monitor queue. Experiment {} , task {}", iMonitorID.getFailedCount(), + logger.errorId(iMonitorID.getJobID(), "Job monitoring failed {} times, " + + " Experiment {} , task {}", iMonitorID.getFailedCount(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); + logger.info("Listing directory came as complete: marking the Job as ************COMPLETE************ experiment {}, task {}, job name {} .", + iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); + sendNotification(iMonitorID); + iterator.remove(); + GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, iMonitorID, publisher)); } else { iMonitorID.setFailedCount(0); } @@ -263,22 +292,9 @@ public class HPCPullMonitor extends PullMonitor { // if the job is complete we remove it from the Map, if any of these maps // get empty this userMonitorData will get delete from the queue } - JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(), - iMonitorID.getTaskID(), - iMonitorID.getWorkflowNodeID(), - iMonitorID.getExperimentID(), - iMonitorID.getJobExecutionContext().getGatewayID()); - jobStatus.setJobIdentity(jobIdentity); - jobStatus.setState(iMonitorID.getStatus()); - // we have this JobStatus class to handle amqp monitoring - - publisher.publish(jobStatus); - logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status change request, " + - "experiment {} , task {}", jobStatus.getJobIdentity().getExperimentId(), - jobStatus.getJobIdentity().getTaskId()); - // if the job is completed we do not have to put the job to the queue again - iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); } + + } else { logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", iHostMonitorData.getHost() .getType().getHostAddress()); @@ -287,30 +303,6 @@ public class HPCPullMonitor extends PullMonitor { // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back // now the userMonitorData goes back to the tail of the queue queue.put(take); - // cleaning up the completed jobs, this method will remove some of the userMonitorData from the queue if - // they become empty - Map<String, Integer> jobRemoveCountMap = new HashMap<String, Integer>(); - ZooKeeper zk = null; - Set<String> keys = completedJobs.keySet(); - for (String jobName: keys) { - MonitorID completedJob = completedJobs.get(jobName); - CommonUtils.removeMonitorFromQueue(queue, completedJob); -// gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext()); - GFacThreadPoolExecutor.getFixedThreadPool().submit(new OutHandlerWorker(gfac, completedJob, publisher)); - if (zk == null) { - zk = completedJob.getJobExecutionContext().getZk(); - } - String key = CommonUtils.getJobCountUpdatePath(completedJob); - int i = 0; - if (jobRemoveCountMap.containsKey(key)) { - i = Integer.valueOf(jobRemoveCountMap.get(key)); - } - jobRemoveCountMap.put(key, ++i); - } - if (completedJobs.size() > 0) { - // reduce completed job count from zookeeper - CommonUtils.updateZkWithJobCount(zk, jobRemoveCountMap, false); - } } catch (InterruptedException e) { if (!this.queue.contains(take)) { try { @@ -357,6 +349,20 @@ public class HPCPullMonitor extends PullMonitor { return true; } + private void sendNotification(MonitorID iMonitorID) { + JobStatusChangeRequestEvent jobStatus = new JobStatusChangeRequestEvent(); + JobIdentifier jobIdentity = new JobIdentifier(iMonitorID.getJobID(), + iMonitorID.getTaskID(), + iMonitorID.getWorkflowNodeID(), + iMonitorID.getExperimentID(), + iMonitorID.getJobExecutionContext().getGatewayID()); + jobStatus.setJobIdentity(jobIdentity); + jobStatus.setState(iMonitorID.getStatus()); + // we have this JobStatus class to handle amqp monitoring + + publisher.publish(jobStatus); + } + /** * This is the method to stop the polling process * http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java index ce296da..de4dd41 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java @@ -72,6 +72,7 @@ import java.util.*; public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class); public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; + public static final int DEFAULT_SSH_PORT = 22; private String password = null; @@ -131,11 +132,11 @@ public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { this.passPhrase); } ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); - String key = this.userName + this.hostName; - jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new SSHAuthWrapper(serverInfo,authenticationInfo,key)); - if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) { + String key = this.userName + this.hostName + DEFAULT_SSH_PORT; + SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, authenticationInfo, key); + if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key) == null) { try { - GFACSSHUtils.addSecurityContext(jobExecutionContext); + GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper); } catch (ApplicationSettingsException e) { log.error(e.getMessage()); try { http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java index ad2131e..aed6e9f 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java @@ -73,6 +73,8 @@ import java.util.Set; public class AdvancedSCPOutputHandler extends AbstractHandler { private static final Logger log = LoggerFactory.getLogger(AdvancedSCPOutputHandler.class); + public static final int DEFAULT_SSH_PORT = 22; + private String password = null; private String publicKeyPath; @@ -87,8 +89,6 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { private String outputPath; - public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; - public void initProperties(Properties properties) throws GFacHandlerException { password = (String)properties.get("password"); @@ -111,12 +111,12 @@ public class AdvancedSCPOutputHandler extends AbstractHandler { this.passPhrase); } ServerInfo serverInfo = new ServerInfo(this.userName, this.hostName); - String key = this.userName + this.hostName; - jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new SSHAuthWrapper(serverInfo,authenticationInfo,key)); + String key = this.userName + this.hostName + DEFAULT_SSH_PORT; + SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, authenticationInfo, key); try { - if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) == null) { + if (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key) == null) { try { - GFACSSHUtils.addSecurityContext(jobExecutionContext); + GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper); } catch (ApplicationSettingsException e) { log.error(e.getMessage()); try { http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java index 7ee5d6a..94f07b1 100644 --- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java +++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java @@ -62,9 +62,12 @@ public class GFACSSHUtils { public static int maxClusterCount = 5; - public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; - - + /** + * This method is to add computing resource specific authentication, if its a third party machine, use the other addSecurityContext + * @param jobExecutionContext + * @throws GFacException + * @throws ApplicationSettingsException + */ public static void addSecurityContext(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException { HostDescription registeredHost = jobExecutionContext.getApplicationContext().getHostDescription(); if (registeredHost.getType() instanceof GlobusHostType || registeredHost.getType() instanceof UnicoreHostType) { @@ -77,8 +80,6 @@ public class GFACSSHUtils { requestData.setTokenId(credentialStoreToken); ServerInfo serverInfo = new ServerInfo(null, registeredHost.getType().getHostAddress()); - SSHAuthWrapper sshAuth = (SSHAuthWrapper) jobExecutionContext.getProperty(ADVANCED_SSH_AUTH); - Cluster pbsCluster = null; try { TokenizedSSHAuthInfo tokenizedSSHAuthInfo = new TokenizedSSHAuthInfo(requestData); @@ -95,9 +96,6 @@ public class GFACSSHUtils { String key = credentials.getPortalUserName() + registeredHost.getType().getHostAddress() + serverInfo.getPort(); - if(sshAuth!=null){ - key=sshAuth.getKey(); - } boolean recreate = false; synchronized (clusters) { if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) { @@ -125,15 +123,8 @@ public class GFACSSHUtils { recreate = true; } if (recreate) { - if (sshAuth != null) { - pbsCluster = new PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(), + pbsCluster = new PBSCluster(serverInfo, tokenizedSSHAuthInfo, CommonUtils.getPBSJobManager(installedParentPath)); - jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,null); // some other provider might fail - key = sshAuth.getKey(); - } else { - pbsCluster = new PBSCluster(serverInfo, tokenizedSSHAuthInfo, - CommonUtils.getPBSJobManager(installedParentPath)); - } List<Cluster> pbsClusters = null; if (!(clusters.containsKey(key))) { pbsClusters = new ArrayList<Cluster>(); @@ -148,10 +139,71 @@ public class GFACSSHUtils { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } sshSecurityContext.setPbsCluster(pbsCluster); - jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT, sshSecurityContext); + jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(), sshSecurityContext); } } + /** + * This method can be used to add third party resource security contexts + * @param jobExecutionContext + * @param sshAuth + * @throws GFacException + * @throws ApplicationSettingsException + */ + public static void addSecurityContext(JobExecutionContext jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException, ApplicationSettingsException { + try { + if(sshAuth== null) { + throw new GFacException("Error adding security Context, because sshAuthWrapper is null"); + } + SSHSecurityContext sshSecurityContext = new SSHSecurityContext(); + Cluster pbsCluster = null; + String key=sshAuth.getKey(); + boolean recreate = false; + synchronized (clusters) { + if (clusters.containsKey(key) && clusters.get(key).size() < maxClusterCount) { + recreate = true; + } else if (clusters.containsKey(key)) { + int i = new Random().nextInt(Integer.MAX_VALUE) % maxClusterCount; + if (clusters.get(key).get(i).getSession().isConnected()) { + pbsCluster = clusters.get(key).get(i); + } else { + clusters.get(key).remove(i); + recreate = true; + } + if (!recreate) { + try { + pbsCluster.listDirectory("~/"); // its hard to trust isConnected method, so we try to connect if it works we are good,else we recreate + } catch (Exception e) { + clusters.get(key).remove(i); + logger.info("Connection found the connection map is expired, so we create from the scratch"); + maxClusterCount++; + recreate = true; // we make the pbsCluster to create again if there is any exception druing connection + } + } + logger.info("Re-using the same connection used with the connection string:" + key); + } else { + recreate = true; + } + if (recreate) { + pbsCluster = new PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),null); + key = sshAuth.getKey(); + List<Cluster> pbsClusters = null; + if (!(clusters.containsKey(key))) { + pbsClusters = new ArrayList<Cluster>(); + } else { + pbsClusters = clusters.get(key); + } + pbsClusters.add(pbsCluster); + clusters.put(key, pbsClusters); + } + } + sshSecurityContext.setPbsCluster(pbsCluster); + jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+key, sshSecurityContext); + } catch (Exception e) { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + } + } + public static JobDescriptor createJobDescriptor(JobExecutionContext jobExecutionContext, ApplicationDeploymentDescriptionType app, Cluster cluster) { JobDescriptor jobDescriptor = new JobDescriptor();
