Hi Lahiru, This is in a separate branch. Not in master. I do not think that should effect the error I'm getting in master.
On Tue, Nov 11, 2014 at 9:30 AM, Lahiru Gunathilake <[email protected]> wrote: > This commit broke the monitor. > > On Wed, Nov 5, 2014 at 1:29 PM, <[email protected]> wrote: > >> Repository: airavata >> Updated Branches: >> refs/heads/gfac_appcatalog_int e9ee22b97 -> 755273e1a >> >> >> fixing input/outhandler - AIRAVATA-1488 >> >> >> Project: http://git-wip-us.apache.org/repos/asf/airavata/repo >> Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b3769516 >> Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b3769516 >> Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b3769516 >> >> Branch: refs/heads/gfac_appcatalog_int >> Commit: b3769516eec7d7127e17d2e24e4001718763c1ec >> Parents: 3b330c0 >> Author: lahiru <[email protected]> >> Authored: Thu Oct 30 11:27:12 2014 -0400 >> Committer: lahiru <[email protected]> >> Committed: Thu Oct 30 11:27:12 2014 -0400 >> >> ---------------------------------------------------------------------- >> .../impl/password/PasswordCredential.java | 3 +- >> .../gfac/core/context/JobExecutionContext.java | 2 +- >> .../airavata/gfac/core/utils/GFacUtils.java | 3 +- >> .../gfac/gsissh/util/GFACGSISSHUtils.java | 2 +- >> .../monitor/impl/pull/qstat/HPCPullMonitor.java | 114 ++++++++++--------- >> .../ssh/handler/AdvancedSCPInputHandler.java | 9 +- >> .../ssh/handler/AdvancedSCPOutputHandler.java | 12 +- >> .../airavata/gfac/ssh/util/GFACSSHUtils.java | 86 +++++++++++--- >> 8 files changed, 146 insertions(+), 85 deletions(-) >> ---------------------------------------------------------------------- >> >> >> >> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java >> ---------------------------------------------------------------------- >> diff --git >> a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java >> b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java >> index ee32ef4..a31c98b 100644 >> --- >> a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java >> +++ >> b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java >> @@ -22,13 +22,14 @@ >> package org.apache.airavata.credential.store.credential.impl.password; >> >> import org.apache.airavata.credential.store.credential.Credential; >> +import >> org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; >> >> import java.util.Date; >> >> /** >> * User name password credentials. >> */ >> -public class PasswordCredential extends Credential { >> +public class PasswordCredential extends SSHCredential { >> >> private String userName; >> private String password; >> >> >> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java >> ---------------------------------------------------------------------- >> diff --git >> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java >> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java >> index 9abab8d..2f94ec5 100644 >> --- >> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java >> +++ >> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java >> @@ -234,7 +234,7 @@ public class JobExecutionContext extends >> AbstractContext implements Serializable >> >> >> public SecurityContext getSecurityContext(String name) throws >> GFacException{ >> - SecurityContext secContext = securityContext.get(name); >> + SecurityContext secContext = >> securityContext.get(name+"-"+this.getApplicationContext().getHostDescription().getType().getHostAddress()); >> return secContext; >> } >> >> >> >> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java >> ---------------------------------------------------------------------- >> diff --git >> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java >> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java >> index 729c1ee..eef44a4 100644 >> --- >> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java >> +++ >> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java >> @@ -1092,7 +1092,8 @@ public class GFacUtils { >> }else if(experimentEntry != null && >> GFacUtils.isCancelled(experimentID,taskID,zk) ){ >> // this happens when a cancel request comes to a differnt >> gfac node, in this case we do not move gfac experiment >> // node to gfac node specific location, because original >> request execution will fail with errors >> - return true; >> + log.error("This experiment is already cancelled and its >> already executing the cancel operation so cannot submit again !"); >> + return false; >> } else { >> log.error("ExperimentID: " + experimentID + " taskID: " + >> taskID >> + " is already running by this Gfac instance"); >> >> >> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java >> ---------------------------------------------------------------------- >> diff --git >> a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java >> b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java >> index 4d338e3..2f9dbc3 100644 >> --- >> a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java >> +++ >> b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java >> @@ -163,7 +163,7 @@ public class GFACGSISSHUtils { >> } catch (Exception e) { >> throw new GFacException("An error occurred while >> creating GSI security context", e); >> } >> - >> jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, >> context); >> + >> jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(), >> context); >> } >> } >> >> >> >> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java >> ---------------------------------------------------------------------- >> diff --git >> a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java >> b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java >> index d3c3df8..952b30e 100644 >> --- >> a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java >> +++ >> b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java >> @@ -157,12 +157,10 @@ public class HPCPullMonitor extends PullMonitor { >> HostDescription currentHostDescription = null; >> try { >> take = this.queue.take(); >> - Map<String,MonitorID> completedJobs = new >> HashMap<String,MonitorID>(); >> List<HostMonitorData> hostMonitorData = >> take.getHostMonitorData(); >> for (HostMonitorData iHostMonitorData : hostMonitorData) { >> if (iHostMonitorData.getHost().getType() instanceof >> GsisshHostType >> || iHostMonitorData.getHost().getType() >> instanceof SSHHostType) { >> - currentHostDescription = iHostMonitorData.getHost(); >> String hostName = >> iHostMonitorData.getHost().getType().getHostAddress(); >> ResourceConnection connection = null; >> if (connections.containsKey(hostName)) { >> @@ -181,17 +179,22 @@ public class HPCPullMonitor extends PullMonitor { >> // before we get the statuses, we check the cancel >> job list and remove them permanently >> List<MonitorID> monitorID = >> iHostMonitorData.getMonitorIDs(); >> Iterator<String> iterator1 = >> cancelJobList.iterator(); >> - >> - for(MonitorID iMonitorID:monitorID){ >> + ListIterator<MonitorID> monitorIDListIterator = >> monitorID.listIterator(); >> + while (monitorIDListIterator.hasNext()){ >> + MonitorID iMonitorID = >> monitorIDListIterator.next(); >> while(iterator1.hasNext()) { >> String cancelMId = iterator1.next(); >> if >> (cancelMId.equals(iMonitorID.getExperimentID() + "+" + >> iMonitorID.getTaskID())) { >> iMonitorID.setStatus(JobState.CANCELED); >> - >> completedJobs.put(iMonitorID.getJobName(), iMonitorID); >> iterator1.remove(); >> logger.debugId(cancelMId, "Found a match >> in cancel monitor queue, hence moved to the " + >> "completed job queue, >> experiment {}, task {} , job {}", >> iMonitorID.getExperimentID(), >> iMonitorID.getTaskID(), iMonitorID.getJobID()); >> + logger.info("Job cancelled: marking the >> Job as ************CANCELLED************ experiment {}, task {}, job name >> {} .", >> + >> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); >> + sendNotification(iMonitorID); >> + monitorIDListIterator.remove(); >> + >> GFacThreadPoolExecutor.getFixedThreadPool().submit(new >> OutHandlerWorker(gfac, iMonitorID, publisher)); >> break; >> } >> } >> @@ -199,26 +202,36 @@ public class HPCPullMonitor extends PullMonitor { >> } >> synchronized (completedJobsFromPush) { >> ListIterator<String> iterator = >> completedJobsFromPush.listIterator(); >> - for (MonitorID iMonitorID : monitorID) { >> + monitorIDListIterator = monitorID.listIterator(); >> + while (monitorIDListIterator.hasNext()) { >> + MonitorID iMonitorID = >> monitorIDListIterator.next(); >> String completeId = null; >> while (iterator.hasNext()) { >> completeId = iterator.next(); >> if >> (completeId.equals(iMonitorID.getUserName() + "," + >> iMonitorID.getJobName())) { >> logger.info("This job is finished >> because push notification came with <username,jobName> " + completeId); >> - >> completedJobs.put(iMonitorID.getJobName(), iMonitorID); >> >> iMonitorID.setStatus(JobState.COMPLETE); >> iterator.remove();//we have to make >> this empty everytime we iterate, otherwise this list will accumulate and >> will lead to a memory leak >> logger.debugId(completeId, "Push >> notification updated job {} status to {}. " + >> "experiment {} , >> task {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(), >> >> iMonitorID.getExperimentID(), iMonitorID.getTaskID()); >> + logger.info("AMQP message recieved: >> marking the Job as ************COMPLETE************ experiment {}, task {}, >> job name {} .", >> + >> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); >> + >> + monitorIDListIterator.remove(); >> + sendNotification(iMonitorID); >> + >> GFacThreadPoolExecutor.getFixedThreadPool().submit(new >> OutHandlerWorker(gfac, iMonitorID, publisher)); >> break; >> } >> } >> iterator = >> completedJobsFromPush.listIterator(); >> } >> } >> + >> + // we have to get this again because we removed the >> already completed jobs with amqp messages >> + monitorID = iHostMonitorData.getMonitorIDs(); >> Map<String, JobState> jobStatuses = >> connection.getJobStatuses(monitorID); >> - Iterator<MonitorID> iterator = monitorID.iterator(); >> + Iterator<MonitorID> iterator = >> monitorID.listIterator(); >> while (iterator.hasNext()) { >> MonitorID iMonitorID = iterator.next(); >> currentMonitorID = iMonitorID; >> @@ -226,13 +239,25 @@ public class HPCPullMonitor extends PullMonitor { >> >> !JobState.COMPLETE.equals(iMonitorID.getStatus())) { >> >> iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + >> iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we >> have a logic >> }else >> if(JobState.COMPLETE.equals(iMonitorID.getStatus())){ >> - completedJobs.put(iMonitorID.getJobName(), >> iMonitorID); >> logger.debugId(iMonitorID.getJobID(), "Moved >> job {} to completed jobs map, experiment {}, " + >> "task {}", iMonitorID.getJobID(), >> iMonitorID.getExperimentID(), iMonitorID.getTaskID()); >> + iterator.remove(); >> + logger.info("PULL Notification is complete: >> marking the Job as ************COMPLETE************ experiment {}, task {}, >> job name {} .", >> + >> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); >> + >> GFacThreadPoolExecutor.getFixedThreadPool().submit(new >> OutHandlerWorker(gfac, iMonitorID, publisher)); >> } >> - jobStatus = new JobStatusChangeRequestEvent(); >> >> >> iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName())); >> //IMPORTANT this is not a simple setter we have a logic >> - >> + iMonitorID.setLastMonitored(new Timestamp((new >> Date()).getTime())); >> + sendNotification(iMonitorID); >> + >> logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status >> change request, " + >> + "experiment {} , task {}", >> jobStatus.getJobIdentity().getExperimentId(), >> + jobStatus.getJobIdentity().getTaskId()); >> + // if the job is completed we do not have to put >> the job to the queue again >> + iMonitorID.setLastMonitored(new Timestamp((new >> Date()).getTime())); >> + } >> + iterator = monitorID.listIterator(); >> + while(iterator.hasNext()){ >> + MonitorID iMonitorID = iterator.next(); >> if (iMonitorID.getFailedCount() > FAILED_COUNT) { >> iMonitorID.setLastMonitored(new >> Timestamp((new Date()).getTime())); >> String outputDir = >> iMonitorID.getJobExecutionContext().getApplicationContext() >> @@ -245,15 +270,19 @@ public class HPCPullMonitor extends PullMonitor { >> // this is because while we run >> output handler something failed and during exception >> // we store all the jobs in the >> monitor queue again >> logger.error("We know this job is >> already attempted to run out-handlers"); >> - >> CommonUtils.removeMonitorFromQueue(queue, iMonitorID); >> +// >> CommonUtils.removeMonitorFromQueue(queue, iMonitorID); >> } >> } >> if (stdOut != null && stdOut.size() > 0 && >> !stdOut.get(0).isEmpty()) { // have to be careful with this >> iMonitorID.setStatus(JobState.COMPLETE); >> - >> completedJobs.put(iMonitorID.getJobName(), iMonitorID); >> - logger.errorId(iMonitorID.getJobID(), >> "Job monitoring failed {} times, removed job {} from " + >> - "monitor queue. >> Experiment {} , task {}", iMonitorID.getFailedCount(), >> + logger.errorId(iMonitorID.getJobID(), >> "Job monitoring failed {} times, " + >> + " Experiment {} , task >> {}", iMonitorID.getFailedCount(), >> iMonitorID.getExperimentID(), >> iMonitorID.getTaskID()); >> + logger.info("Listing directory came as >> complete: marking the Job as ************COMPLETE************ experiment >> {}, task {}, job name {} .", >> + >> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); >> + sendNotification(iMonitorID); >> + iterator.remove(); >> + >> GFacThreadPoolExecutor.getFixedThreadPool().submit(new >> OutHandlerWorker(gfac, iMonitorID, publisher)); >> } else { >> iMonitorID.setFailedCount(0); >> } >> @@ -263,22 +292,9 @@ public class HPCPullMonitor extends PullMonitor { >> // if the job is complete we remove it from >> the Map, if any of these maps >> // get empty this userMonitorData will get >> delete from the queue >> } >> - JobIdentifier jobIdentity = new >> JobIdentifier(iMonitorID.getJobID(), >> - iMonitorID.getTaskID(), >> - iMonitorID.getWorkflowNodeID(), >> - iMonitorID.getExperimentID(), >> - >> iMonitorID.getJobExecutionContext().getGatewayID()); >> - jobStatus.setJobIdentity(jobIdentity); >> - jobStatus.setState(iMonitorID.getStatus()); >> - // we have this JobStatus class to handle amqp >> monitoring >> - >> - publisher.publish(jobStatus); >> - >> logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status >> change request, " + >> - "experiment {} , task {}", >> jobStatus.getJobIdentity().getExperimentId(), >> - jobStatus.getJobIdentity().getTaskId()); >> - // if the job is completed we do not have to put >> the job to the queue again >> - iMonitorID.setLastMonitored(new Timestamp((new >> Date()).getTime())); >> } >> + >> + >> } else { >> logger.debug("Qstat Monitor doesn't handle >> non-gsissh hosts , host {}", iHostMonitorData.getHost() >> .getType().getHostAddress()); >> @@ -287,30 +303,6 @@ public class HPCPullMonitor extends PullMonitor { >> // We have finished all the HostMonitorData object in >> userMonitorData, now we need to put it back >> // now the userMonitorData goes back to the tail of the queue >> queue.put(take); >> - // cleaning up the completed jobs, this method will remove >> some of the userMonitorData from the queue if >> - // they become empty >> - Map<String, Integer> jobRemoveCountMap = new HashMap<String, >> Integer>(); >> - ZooKeeper zk = null; >> - Set<String> keys = completedJobs.keySet(); >> - for (String jobName: keys) { >> - MonitorID completedJob = completedJobs.get(jobName); >> - CommonUtils.removeMonitorFromQueue(queue, completedJob); >> -// >> gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext()); >> - GFacThreadPoolExecutor.getFixedThreadPool().submit(new >> OutHandlerWorker(gfac, completedJob, publisher)); >> - if (zk == null) { >> - zk = completedJob.getJobExecutionContext().getZk(); >> - } >> - String key = >> CommonUtils.getJobCountUpdatePath(completedJob); >> - int i = 0; >> - if (jobRemoveCountMap.containsKey(key)) { >> - i = Integer.valueOf(jobRemoveCountMap.get(key)); >> - } >> - jobRemoveCountMap.put(key, ++i); >> - } >> - if (completedJobs.size() > 0) { >> - // reduce completed job count from zookeeper >> - CommonUtils.updateZkWithJobCount(zk, jobRemoveCountMap, >> false); >> - } >> } catch (InterruptedException e) { >> if (!this.queue.contains(take)) { >> try { >> @@ -357,6 +349,20 @@ public class HPCPullMonitor extends PullMonitor { >> return true; >> } >> >> + private void sendNotification(MonitorID iMonitorID) { >> + JobStatusChangeRequestEvent jobStatus = new >> JobStatusChangeRequestEvent(); >> + JobIdentifier jobIdentity = new >> JobIdentifier(iMonitorID.getJobID(), >> + iMonitorID.getTaskID(), >> + iMonitorID.getWorkflowNodeID(), >> + iMonitorID.getExperimentID(), >> + iMonitorID.getJobExecutionContext().getGatewayID()); >> + jobStatus.setJobIdentity(jobIdentity); >> + jobStatus.setState(iMonitorID.getStatus()); >> + // we have this JobStatus class to handle amqp monitoring >> + >> + publisher.publish(jobStatus); >> + } >> + >> /** >> * This is the method to stop the polling process >> * >> >> >> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java >> ---------------------------------------------------------------------- >> diff --git >> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java >> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java >> index ce296da..de4dd41 100644 >> --- >> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java >> +++ >> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java >> @@ -72,6 +72,7 @@ import java.util.*; >> public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { >> private static final Logger log = >> LoggerFactory.getLogger(AdvancedSCPInputHandler.class); >> public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; >> + public static final int DEFAULT_SSH_PORT = 22; >> >> private String password = null; >> >> @@ -131,11 +132,11 @@ public class AdvancedSCPInputHandler extends >> AbstractRecoverableHandler { >> this.passPhrase); >> } >> ServerInfo serverInfo = new ServerInfo(this.userName, >> this.hostName); >> - String key = this.userName + this.hostName; >> - jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new >> SSHAuthWrapper(serverInfo,authenticationInfo,key)); >> - if >> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) >> == null) { >> + String key = this.userName + this.hostName + >> DEFAULT_SSH_PORT; >> + SSHAuthWrapper sshAuthWrapper = new >> SSHAuthWrapper(serverInfo, authenticationInfo, key); >> + if >> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key) >> == null) { >> try { >> - GFACSSHUtils.addSecurityContext(jobExecutionContext); >> + >> GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper); >> } catch (ApplicationSettingsException e) { >> log.error(e.getMessage()); >> try { >> >> >> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java >> ---------------------------------------------------------------------- >> diff --git >> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java >> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java >> index ad2131e..aed6e9f 100644 >> --- >> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java >> +++ >> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java >> @@ -73,6 +73,8 @@ import java.util.Set; >> public class AdvancedSCPOutputHandler extends AbstractHandler { >> private static final Logger log = >> LoggerFactory.getLogger(AdvancedSCPOutputHandler.class); >> >> + public static final int DEFAULT_SSH_PORT = 22; >> + >> private String password = null; >> >> private String publicKeyPath; >> @@ -87,8 +89,6 @@ public class AdvancedSCPOutputHandler extends >> AbstractHandler { >> >> private String outputPath; >> >> - public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; >> - >> >> public void initProperties(Properties properties) throws >> GFacHandlerException { >> password = (String)properties.get("password"); >> @@ -111,12 +111,12 @@ public class AdvancedSCPOutputHandler extends >> AbstractHandler { >> this.passPhrase); >> } >> ServerInfo serverInfo = new ServerInfo(this.userName, >> this.hostName); >> - String key = this.userName + this.hostName; >> - jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new >> SSHAuthWrapper(serverInfo,authenticationInfo,key)); >> + String key = this.userName + this.hostName + DEFAULT_SSH_PORT; >> + SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, >> authenticationInfo, key); >> try { >> - if >> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) >> == null) { >> + if >> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key) >> == null) { >> try { >> - GFACSSHUtils.addSecurityContext(jobExecutionContext); >> + >> GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper); >> } catch (ApplicationSettingsException e) { >> log.error(e.getMessage()); >> try { >> >> >> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java >> ---------------------------------------------------------------------- >> diff --git >> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java >> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java >> index 7ee5d6a..94f07b1 100644 >> --- >> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java >> +++ >> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java >> @@ -62,9 +62,12 @@ public class GFACSSHUtils { >> >> public static int maxClusterCount = 5; >> >> - public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; >> - >> - >> + /** >> + * This method is to add computing resource specific authentication, >> if its a third party machine, use the other addSecurityContext >> + * @param jobExecutionContext >> + * @throws GFacException >> + * @throws ApplicationSettingsException >> + */ >> public static void addSecurityContext(JobExecutionContext >> jobExecutionContext) throws GFacException, ApplicationSettingsException { >> HostDescription registeredHost = >> jobExecutionContext.getApplicationContext().getHostDescription(); >> if (registeredHost.getType() instanceof GlobusHostType || >> registeredHost.getType() instanceof UnicoreHostType) { >> @@ -77,8 +80,6 @@ public class GFACSSHUtils { >> requestData.setTokenId(credentialStoreToken); >> >> ServerInfo serverInfo = new ServerInfo(null, >> registeredHost.getType().getHostAddress()); >> - SSHAuthWrapper sshAuth = (SSHAuthWrapper) >> jobExecutionContext.getProperty(ADVANCED_SSH_AUTH); >> - >> Cluster pbsCluster = null; >> try { >> TokenizedSSHAuthInfo tokenizedSSHAuthInfo = new >> TokenizedSSHAuthInfo(requestData); >> @@ -95,9 +96,6 @@ public class GFACSSHUtils { >> >> String key = credentials.getPortalUserName() + >> registeredHost.getType().getHostAddress() + >> serverInfo.getPort(); >> - if(sshAuth!=null){ >> - key=sshAuth.getKey(); >> - } >> boolean recreate = false; >> synchronized (clusters) { >> if (clusters.containsKey(key) && >> clusters.get(key).size() < maxClusterCount) { >> @@ -125,15 +123,8 @@ public class GFACSSHUtils { >> recreate = true; >> } >> if (recreate) { >> - if (sshAuth != null) { >> - pbsCluster = new >> PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(), >> + pbsCluster = new PBSCluster(serverInfo, >> tokenizedSSHAuthInfo, >> >> CommonUtils.getPBSJobManager(installedParentPath)); >> - >> jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,null); // some other >> provider might fail >> - key = sshAuth.getKey(); >> - } else { >> - pbsCluster = new PBSCluster(serverInfo, >> tokenizedSSHAuthInfo, >> - >> CommonUtils.getPBSJobManager(installedParentPath)); >> - } >> List<Cluster> pbsClusters = null; >> if (!(clusters.containsKey(key))) { >> pbsClusters = new ArrayList<Cluster>(); >> @@ -148,10 +139,71 @@ public class GFACSSHUtils { >> e.printStackTrace(); //To change body of catch >> statement use File | Settings | File Templates. >> } >> sshSecurityContext.setPbsCluster(pbsCluster); >> - >> jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT, >> sshSecurityContext); >> + >> jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(), >> sshSecurityContext); >> } >> } >> >> + /** >> + * This method can be used to add third party resource security >> contexts >> + * @param jobExecutionContext >> + * @param sshAuth >> + * @throws GFacException >> + * @throws ApplicationSettingsException >> + */ >> + public static void addSecurityContext(JobExecutionContext >> jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException, >> ApplicationSettingsException { >> + try { >> + if(sshAuth== null) { >> + throw new GFacException("Error adding security >> Context, because sshAuthWrapper is null"); >> + } >> + SSHSecurityContext sshSecurityContext = new >> SSHSecurityContext(); >> + Cluster pbsCluster = null; >> + String key=sshAuth.getKey(); >> + boolean recreate = false; >> + synchronized (clusters) { >> + if (clusters.containsKey(key) && >> clusters.get(key).size() < maxClusterCount) { >> + recreate = true; >> + } else if (clusters.containsKey(key)) { >> + int i = new Random().nextInt(Integer.MAX_VALUE) >> % maxClusterCount; >> + if >> (clusters.get(key).get(i).getSession().isConnected()) { >> + pbsCluster = clusters.get(key).get(i); >> + } else { >> + clusters.get(key).remove(i); >> + recreate = true; >> + } >> + if (!recreate) { >> + try { >> + pbsCluster.listDirectory("~/"); // its >> hard to trust isConnected method, so we try to connect if it works we are >> good,else we recreate >> + } catch (Exception e) { >> + clusters.get(key).remove(i); >> + logger.info("Connection found the >> connection map is expired, so we create from the scratch"); >> + maxClusterCount++; >> + recreate = true; // we make the >> pbsCluster to create again if there is any exception druing connection >> + } >> + } >> + logger.info("Re-using the same connection used >> with the connection string:" + key); >> + } else { >> + recreate = true; >> + } >> + if (recreate) { >> + pbsCluster = new >> PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),null); >> + key = sshAuth.getKey(); >> + List<Cluster> pbsClusters = null; >> + if (!(clusters.containsKey(key))) { >> + pbsClusters = new ArrayList<Cluster>(); >> + } else { >> + pbsClusters = clusters.get(key); >> + } >> + pbsClusters.add(pbsCluster); >> + clusters.put(key, pbsClusters); >> + } >> + } >> + sshSecurityContext.setPbsCluster(pbsCluster); >> + >> jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+key, >> sshSecurityContext); >> + } catch (Exception e) { >> + e.printStackTrace(); //To change body of catch >> statement use File | Settings | File Templates. >> + } >> + } >> + >> public static JobDescriptor createJobDescriptor(JobExecutionContext >> jobExecutionContext, >> >> ApplicationDeploymentDescriptionType app, Cluster cluster) { >> JobDescriptor jobDescriptor = new JobDescriptor(); >> >> > > > -- > Research Assistant > Science Gateways Group > Indiana University >
