This commit broke the monitor. On Wed, Nov 5, 2014 at 1:29 PM, <[email protected]> wrote:
> Repository: airavata > Updated Branches: > refs/heads/gfac_appcatalog_int e9ee22b97 -> 755273e1a > > > fixing input/outhandler - AIRAVATA-1488 > > > Project: http://git-wip-us.apache.org/repos/asf/airavata/repo > Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b3769516 > Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b3769516 > Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b3769516 > > Branch: refs/heads/gfac_appcatalog_int > Commit: b3769516eec7d7127e17d2e24e4001718763c1ec > Parents: 3b330c0 > Author: lahiru <[email protected]> > Authored: Thu Oct 30 11:27:12 2014 -0400 > Committer: lahiru <[email protected]> > Committed: Thu Oct 30 11:27:12 2014 -0400 > > ---------------------------------------------------------------------- > .../impl/password/PasswordCredential.java | 3 +- > .../gfac/core/context/JobExecutionContext.java | 2 +- > .../airavata/gfac/core/utils/GFacUtils.java | 3 +- > .../gfac/gsissh/util/GFACGSISSHUtils.java | 2 +- > .../monitor/impl/pull/qstat/HPCPullMonitor.java | 114 ++++++++++--------- > .../ssh/handler/AdvancedSCPInputHandler.java | 9 +- > .../ssh/handler/AdvancedSCPOutputHandler.java | 12 +- > .../airavata/gfac/ssh/util/GFACSSHUtils.java | 86 +++++++++++--- > 8 files changed, 146 insertions(+), 85 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java > ---------------------------------------------------------------------- > diff --git > a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java > b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java > index ee32ef4..a31c98b 100644 > --- > a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java > +++ > b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java > @@ -22,13 +22,14 @@ > package org.apache.airavata.credential.store.credential.impl.password; > > import org.apache.airavata.credential.store.credential.Credential; > +import > org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential; > > import java.util.Date; > > /** > * User name password credentials. > */ > -public class PasswordCredential extends Credential { > +public class PasswordCredential extends SSHCredential { > > private String userName; > private String password; > > > http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java > ---------------------------------------------------------------------- > diff --git > a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java > b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java > index 9abab8d..2f94ec5 100644 > --- > a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java > +++ > b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java > @@ -234,7 +234,7 @@ public class JobExecutionContext extends > AbstractContext implements Serializable > > > public SecurityContext getSecurityContext(String name) throws > GFacException{ > - SecurityContext secContext = securityContext.get(name); > + SecurityContext secContext = > securityContext.get(name+"-"+this.getApplicationContext().getHostDescription().getType().getHostAddress()); > return secContext; > } > > > > http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java > ---------------------------------------------------------------------- > diff --git > a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java > b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java > index 729c1ee..eef44a4 100644 > --- > a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java > +++ > b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java > @@ -1092,7 +1092,8 @@ public class GFacUtils { > }else if(experimentEntry != null && > GFacUtils.isCancelled(experimentID,taskID,zk) ){ > // this happens when a cancel request comes to a differnt > gfac node, in this case we do not move gfac experiment > // node to gfac node specific location, because original > request execution will fail with errors > - return true; > + log.error("This experiment is already cancelled and its > already executing the cancel operation so cannot submit again !"); > + return false; > } else { > log.error("ExperimentID: " + experimentID + " taskID: " + > taskID > + " is already running by this Gfac instance"); > > > http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java > ---------------------------------------------------------------------- > diff --git > a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java > b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java > index 4d338e3..2f9dbc3 100644 > --- > a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java > +++ > b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java > @@ -163,7 +163,7 @@ public class GFACGSISSHUtils { > } catch (Exception e) { > throw new GFacException("An error occurred while creating > GSI security context", e); > } > - > jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT, > context); > + > jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(), > context); > } > } > > > > http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java > ---------------------------------------------------------------------- > diff --git > a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java > b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java > index d3c3df8..952b30e 100644 > --- > a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java > +++ > b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java > @@ -157,12 +157,10 @@ public class HPCPullMonitor extends PullMonitor { > HostDescription currentHostDescription = null; > try { > take = this.queue.take(); > - Map<String,MonitorID> completedJobs = new > HashMap<String,MonitorID>(); > List<HostMonitorData> hostMonitorData = > take.getHostMonitorData(); > for (HostMonitorData iHostMonitorData : hostMonitorData) { > if (iHostMonitorData.getHost().getType() instanceof > GsisshHostType > || iHostMonitorData.getHost().getType() > instanceof SSHHostType) { > - currentHostDescription = iHostMonitorData.getHost(); > String hostName = > iHostMonitorData.getHost().getType().getHostAddress(); > ResourceConnection connection = null; > if (connections.containsKey(hostName)) { > @@ -181,17 +179,22 @@ public class HPCPullMonitor extends PullMonitor { > // before we get the statuses, we check the cancel > job list and remove them permanently > List<MonitorID> monitorID = > iHostMonitorData.getMonitorIDs(); > Iterator<String> iterator1 = cancelJobList.iterator(); > - > - for(MonitorID iMonitorID:monitorID){ > + ListIterator<MonitorID> monitorIDListIterator = > monitorID.listIterator(); > + while (monitorIDListIterator.hasNext()){ > + MonitorID iMonitorID = > monitorIDListIterator.next(); > while(iterator1.hasNext()) { > String cancelMId = iterator1.next(); > if > (cancelMId.equals(iMonitorID.getExperimentID() + "+" + > iMonitorID.getTaskID())) { > iMonitorID.setStatus(JobState.CANCELED); > - > completedJobs.put(iMonitorID.getJobName(), iMonitorID); > iterator1.remove(); > logger.debugId(cancelMId, "Found a match > in cancel monitor queue, hence moved to the " + > "completed job queue, > experiment {}, task {} , job {}", > iMonitorID.getExperimentID(), > iMonitorID.getTaskID(), iMonitorID.getJobID()); > + logger.info("Job cancelled: marking the > Job as ************CANCELLED************ experiment {}, task {}, job name > {} .", > + > iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); > + sendNotification(iMonitorID); > + monitorIDListIterator.remove(); > + > GFacThreadPoolExecutor.getFixedThreadPool().submit(new > OutHandlerWorker(gfac, iMonitorID, publisher)); > break; > } > } > @@ -199,26 +202,36 @@ public class HPCPullMonitor extends PullMonitor { > } > synchronized (completedJobsFromPush) { > ListIterator<String> iterator = > completedJobsFromPush.listIterator(); > - for (MonitorID iMonitorID : monitorID) { > + monitorIDListIterator = monitorID.listIterator(); > + while (monitorIDListIterator.hasNext()) { > + MonitorID iMonitorID = > monitorIDListIterator.next(); > String completeId = null; > while (iterator.hasNext()) { > completeId = iterator.next(); > if > (completeId.equals(iMonitorID.getUserName() + "," + > iMonitorID.getJobName())) { > logger.info("This job is finished > because push notification came with <username,jobName> " + completeId); > - > completedJobs.put(iMonitorID.getJobName(), iMonitorID); > > iMonitorID.setStatus(JobState.COMPLETE); > iterator.remove();//we have to make > this empty everytime we iterate, otherwise this list will accumulate and > will lead to a memory leak > logger.debugId(completeId, "Push > notification updated job {} status to {}. " + > "experiment {} , task > {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(), > iMonitorID.getExperimentID(), > iMonitorID.getTaskID()); > + logger.info("AMQP message recieved: > marking the Job as ************COMPLETE************ experiment {}, task {}, > job name {} .", > + > iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); > + > + monitorIDListIterator.remove(); > + sendNotification(iMonitorID); > + > GFacThreadPoolExecutor.getFixedThreadPool().submit(new > OutHandlerWorker(gfac, iMonitorID, publisher)); > break; > } > } > iterator = > completedJobsFromPush.listIterator(); > } > } > + > + // we have to get this again because we removed the > already completed jobs with amqp messages > + monitorID = iHostMonitorData.getMonitorIDs(); > Map<String, JobState> jobStatuses = > connection.getJobStatuses(monitorID); > - Iterator<MonitorID> iterator = monitorID.iterator(); > + Iterator<MonitorID> iterator = > monitorID.listIterator(); > while (iterator.hasNext()) { > MonitorID iMonitorID = iterator.next(); > currentMonitorID = iMonitorID; > @@ -226,13 +239,25 @@ public class HPCPullMonitor extends PullMonitor { > > !JobState.COMPLETE.equals(iMonitorID.getStatus())) { > > iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + > iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we > have a logic > }else > if(JobState.COMPLETE.equals(iMonitorID.getStatus())){ > - completedJobs.put(iMonitorID.getJobName(), > iMonitorID); > logger.debugId(iMonitorID.getJobID(), "Moved > job {} to completed jobs map, experiment {}, " + > "task {}", iMonitorID.getJobID(), > iMonitorID.getExperimentID(), iMonitorID.getTaskID()); > + iterator.remove(); > + logger.info("PULL Notification is complete: > marking the Job as ************COMPLETE************ experiment {}, task {}, > job name {} .", > + > iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); > + > GFacThreadPoolExecutor.getFixedThreadPool().submit(new > OutHandlerWorker(gfac, iMonitorID, publisher)); > } > - jobStatus = new JobStatusChangeRequestEvent(); > > > iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName())); > //IMPORTANT this is not a simple setter we have a logic > - > + iMonitorID.setLastMonitored(new Timestamp((new > Date()).getTime())); > + sendNotification(iMonitorID); > + > logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status > change request, " + > + "experiment {} , task {}", > jobStatus.getJobIdentity().getExperimentId(), > + jobStatus.getJobIdentity().getTaskId()); > + // if the job is completed we do not have to put > the job to the queue again > + iMonitorID.setLastMonitored(new Timestamp((new > Date()).getTime())); > + } > + iterator = monitorID.listIterator(); > + while(iterator.hasNext()){ > + MonitorID iMonitorID = iterator.next(); > if (iMonitorID.getFailedCount() > FAILED_COUNT) { > iMonitorID.setLastMonitored(new > Timestamp((new Date()).getTime())); > String outputDir = > iMonitorID.getJobExecutionContext().getApplicationContext() > @@ -245,15 +270,19 @@ public class HPCPullMonitor extends PullMonitor { > // this is because while we run > output handler something failed and during exception > // we store all the jobs in the > monitor queue again > logger.error("We know this job is > already attempted to run out-handlers"); > - > CommonUtils.removeMonitorFromQueue(queue, iMonitorID); > +// > CommonUtils.removeMonitorFromQueue(queue, iMonitorID); > } > } > if (stdOut != null && stdOut.size() > 0 && > !stdOut.get(0).isEmpty()) { // have to be careful with this > iMonitorID.setStatus(JobState.COMPLETE); > - > completedJobs.put(iMonitorID.getJobName(), iMonitorID); > - logger.errorId(iMonitorID.getJobID(), > "Job monitoring failed {} times, removed job {} from " + > - "monitor queue. > Experiment {} , task {}", iMonitorID.getFailedCount(), > + logger.errorId(iMonitorID.getJobID(), > "Job monitoring failed {} times, " + > + " Experiment {} , task > {}", iMonitorID.getFailedCount(), > iMonitorID.getExperimentID(), > iMonitorID.getTaskID()); > + logger.info("Listing directory came as > complete: marking the Job as ************COMPLETE************ experiment > {}, task {}, job name {} .", > + > iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName()); > + sendNotification(iMonitorID); > + iterator.remove(); > + > GFacThreadPoolExecutor.getFixedThreadPool().submit(new > OutHandlerWorker(gfac, iMonitorID, publisher)); > } else { > iMonitorID.setFailedCount(0); > } > @@ -263,22 +292,9 @@ public class HPCPullMonitor extends PullMonitor { > // if the job is complete we remove it from > the Map, if any of these maps > // get empty this userMonitorData will get > delete from the queue > } > - JobIdentifier jobIdentity = new > JobIdentifier(iMonitorID.getJobID(), > - iMonitorID.getTaskID(), > - iMonitorID.getWorkflowNodeID(), > - iMonitorID.getExperimentID(), > - > iMonitorID.getJobExecutionContext().getGatewayID()); > - jobStatus.setJobIdentity(jobIdentity); > - jobStatus.setState(iMonitorID.getStatus()); > - // we have this JobStatus class to handle amqp > monitoring > - > - publisher.publish(jobStatus); > - > logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status > change request, " + > - "experiment {} , task {}", > jobStatus.getJobIdentity().getExperimentId(), > - jobStatus.getJobIdentity().getTaskId()); > - // if the job is completed we do not have to put > the job to the queue again > - iMonitorID.setLastMonitored(new Timestamp((new > Date()).getTime())); > } > + > + > } else { > logger.debug("Qstat Monitor doesn't handle non-gsissh > hosts , host {}", iHostMonitorData.getHost() > .getType().getHostAddress()); > @@ -287,30 +303,6 @@ public class HPCPullMonitor extends PullMonitor { > // We have finished all the HostMonitorData object in > userMonitorData, now we need to put it back > // now the userMonitorData goes back to the tail of the queue > queue.put(take); > - // cleaning up the completed jobs, this method will remove > some of the userMonitorData from the queue if > - // they become empty > - Map<String, Integer> jobRemoveCountMap = new HashMap<String, > Integer>(); > - ZooKeeper zk = null; > - Set<String> keys = completedJobs.keySet(); > - for (String jobName: keys) { > - MonitorID completedJob = completedJobs.get(jobName); > - CommonUtils.removeMonitorFromQueue(queue, completedJob); > -// > gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext()); > - GFacThreadPoolExecutor.getFixedThreadPool().submit(new > OutHandlerWorker(gfac, completedJob, publisher)); > - if (zk == null) { > - zk = completedJob.getJobExecutionContext().getZk(); > - } > - String key = > CommonUtils.getJobCountUpdatePath(completedJob); > - int i = 0; > - if (jobRemoveCountMap.containsKey(key)) { > - i = Integer.valueOf(jobRemoveCountMap.get(key)); > - } > - jobRemoveCountMap.put(key, ++i); > - } > - if (completedJobs.size() > 0) { > - // reduce completed job count from zookeeper > - CommonUtils.updateZkWithJobCount(zk, jobRemoveCountMap, > false); > - } > } catch (InterruptedException e) { > if (!this.queue.contains(take)) { > try { > @@ -357,6 +349,20 @@ public class HPCPullMonitor extends PullMonitor { > return true; > } > > + private void sendNotification(MonitorID iMonitorID) { > + JobStatusChangeRequestEvent jobStatus = new > JobStatusChangeRequestEvent(); > + JobIdentifier jobIdentity = new > JobIdentifier(iMonitorID.getJobID(), > + iMonitorID.getTaskID(), > + iMonitorID.getWorkflowNodeID(), > + iMonitorID.getExperimentID(), > + iMonitorID.getJobExecutionContext().getGatewayID()); > + jobStatus.setJobIdentity(jobIdentity); > + jobStatus.setState(iMonitorID.getStatus()); > + // we have this JobStatus class to handle amqp monitoring > + > + publisher.publish(jobStatus); > + } > + > /** > * This is the method to stop the polling process > * > > > http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java > ---------------------------------------------------------------------- > diff --git > a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java > b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java > index ce296da..de4dd41 100644 > --- > a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java > +++ > b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java > @@ -72,6 +72,7 @@ import java.util.*; > public class AdvancedSCPInputHandler extends AbstractRecoverableHandler { > private static final Logger log = > LoggerFactory.getLogger(AdvancedSCPInputHandler.class); > public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; > + public static final int DEFAULT_SSH_PORT = 22; > > private String password = null; > > @@ -131,11 +132,11 @@ public class AdvancedSCPInputHandler extends > AbstractRecoverableHandler { > this.passPhrase); > } > ServerInfo serverInfo = new ServerInfo(this.userName, > this.hostName); > - String key = this.userName + this.hostName; > - jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new > SSHAuthWrapper(serverInfo,authenticationInfo,key)); > - if > (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) > == null) { > + String key = this.userName + this.hostName + DEFAULT_SSH_PORT; > + SSHAuthWrapper sshAuthWrapper = new > SSHAuthWrapper(serverInfo, authenticationInfo, key); > + if > (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key) > == null) { > try { > - GFACSSHUtils.addSecurityContext(jobExecutionContext); > + > GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper); > } catch (ApplicationSettingsException e) { > log.error(e.getMessage()); > try { > > > http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java > ---------------------------------------------------------------------- > diff --git > a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java > b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java > index ad2131e..aed6e9f 100644 > --- > a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java > +++ > b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java > @@ -73,6 +73,8 @@ import java.util.Set; > public class AdvancedSCPOutputHandler extends AbstractHandler { > private static final Logger log = > LoggerFactory.getLogger(AdvancedSCPOutputHandler.class); > > + public static final int DEFAULT_SSH_PORT = 22; > + > private String password = null; > > private String publicKeyPath; > @@ -87,8 +89,6 @@ public class AdvancedSCPOutputHandler extends > AbstractHandler { > > private String outputPath; > > - public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; > - > > public void initProperties(Properties properties) throws > GFacHandlerException { > password = (String)properties.get("password"); > @@ -111,12 +111,12 @@ public class AdvancedSCPOutputHandler extends > AbstractHandler { > this.passPhrase); > } > ServerInfo serverInfo = new ServerInfo(this.userName, > this.hostName); > - String key = this.userName + this.hostName; > - jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new > SSHAuthWrapper(serverInfo,authenticationInfo,key)); > + String key = this.userName + this.hostName + DEFAULT_SSH_PORT; > + SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo, > authenticationInfo, key); > try { > - if > (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT) > == null) { > + if > (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key) > == null) { > try { > - GFACSSHUtils.addSecurityContext(jobExecutionContext); > + > GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper); > } catch (ApplicationSettingsException e) { > log.error(e.getMessage()); > try { > > > http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java > ---------------------------------------------------------------------- > diff --git > a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java > b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java > index 7ee5d6a..94f07b1 100644 > --- > a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java > +++ > b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java > @@ -62,9 +62,12 @@ public class GFACSSHUtils { > > public static int maxClusterCount = 5; > > - public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth"; > - > - > + /** > + * This method is to add computing resource specific authentication, > if its a third party machine, use the other addSecurityContext > + * @param jobExecutionContext > + * @throws GFacException > + * @throws ApplicationSettingsException > + */ > public static void addSecurityContext(JobExecutionContext > jobExecutionContext) throws GFacException, ApplicationSettingsException { > HostDescription registeredHost = > jobExecutionContext.getApplicationContext().getHostDescription(); > if (registeredHost.getType() instanceof GlobusHostType || > registeredHost.getType() instanceof UnicoreHostType) { > @@ -77,8 +80,6 @@ public class GFACSSHUtils { > requestData.setTokenId(credentialStoreToken); > > ServerInfo serverInfo = new ServerInfo(null, > registeredHost.getType().getHostAddress()); > - SSHAuthWrapper sshAuth = (SSHAuthWrapper) > jobExecutionContext.getProperty(ADVANCED_SSH_AUTH); > - > Cluster pbsCluster = null; > try { > TokenizedSSHAuthInfo tokenizedSSHAuthInfo = new > TokenizedSSHAuthInfo(requestData); > @@ -95,9 +96,6 @@ public class GFACSSHUtils { > > String key = credentials.getPortalUserName() + > registeredHost.getType().getHostAddress() + > serverInfo.getPort(); > - if(sshAuth!=null){ > - key=sshAuth.getKey(); > - } > boolean recreate = false; > synchronized (clusters) { > if (clusters.containsKey(key) && > clusters.get(key).size() < maxClusterCount) { > @@ -125,15 +123,8 @@ public class GFACSSHUtils { > recreate = true; > } > if (recreate) { > - if (sshAuth != null) { > - pbsCluster = new > PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(), > + pbsCluster = new PBSCluster(serverInfo, > tokenizedSSHAuthInfo, > > CommonUtils.getPBSJobManager(installedParentPath)); > - > jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,null); // some other > provider might fail > - key = sshAuth.getKey(); > - } else { > - pbsCluster = new PBSCluster(serverInfo, > tokenizedSSHAuthInfo, > - > CommonUtils.getPBSJobManager(installedParentPath)); > - } > List<Cluster> pbsClusters = null; > if (!(clusters.containsKey(key))) { > pbsClusters = new ArrayList<Cluster>(); > @@ -148,10 +139,71 @@ public class GFACSSHUtils { > e.printStackTrace(); //To change body of catch statement > use File | Settings | File Templates. > } > sshSecurityContext.setPbsCluster(pbsCluster); > - > jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT, > sshSecurityContext); > + > jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(), > sshSecurityContext); > } > } > > + /** > + * This method can be used to add third party resource security > contexts > + * @param jobExecutionContext > + * @param sshAuth > + * @throws GFacException > + * @throws ApplicationSettingsException > + */ > + public static void addSecurityContext(JobExecutionContext > jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException, > ApplicationSettingsException { > + try { > + if(sshAuth== null) { > + throw new GFacException("Error adding security > Context, because sshAuthWrapper is null"); > + } > + SSHSecurityContext sshSecurityContext = new > SSHSecurityContext(); > + Cluster pbsCluster = null; > + String key=sshAuth.getKey(); > + boolean recreate = false; > + synchronized (clusters) { > + if (clusters.containsKey(key) && > clusters.get(key).size() < maxClusterCount) { > + recreate = true; > + } else if (clusters.containsKey(key)) { > + int i = new Random().nextInt(Integer.MAX_VALUE) % > maxClusterCount; > + if > (clusters.get(key).get(i).getSession().isConnected()) { > + pbsCluster = clusters.get(key).get(i); > + } else { > + clusters.get(key).remove(i); > + recreate = true; > + } > + if (!recreate) { > + try { > + pbsCluster.listDirectory("~/"); // its > hard to trust isConnected method, so we try to connect if it works we are > good,else we recreate > + } catch (Exception e) { > + clusters.get(key).remove(i); > + logger.info("Connection found the > connection map is expired, so we create from the scratch"); > + maxClusterCount++; > + recreate = true; // we make the > pbsCluster to create again if there is any exception druing connection > + } > + } > + logger.info("Re-using the same connection used > with the connection string:" + key); > + } else { > + recreate = true; > + } > + if (recreate) { > + pbsCluster = new > PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),null); > + key = sshAuth.getKey(); > + List<Cluster> pbsClusters = null; > + if (!(clusters.containsKey(key))) { > + pbsClusters = new ArrayList<Cluster>(); > + } else { > + pbsClusters = clusters.get(key); > + } > + pbsClusters.add(pbsCluster); > + clusters.put(key, pbsClusters); > + } > + } > + sshSecurityContext.setPbsCluster(pbsCluster); > + > jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+key, > sshSecurityContext); > + } catch (Exception e) { > + e.printStackTrace(); //To change body of catch statement > use File | Settings | File Templates. > + } > + } > + > public static JobDescriptor createJobDescriptor(JobExecutionContext > jobExecutionContext, > > ApplicationDeploymentDescriptionType app, Cluster cluster) { > JobDescriptor jobDescriptor = new JobDescriptor(); > > -- Research Assistant Science Gateways Group Indiana University
