This commit broke the monitor.

On Wed, Nov 5, 2014 at 1:29 PM, <[email protected]> wrote:

> Repository: airavata
> Updated Branches:
>   refs/heads/gfac_appcatalog_int e9ee22b97 -> 755273e1a
>
>
> fixing input/outhandler - AIRAVATA-1488
>
>
> Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
> Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b3769516
> Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b3769516
> Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b3769516
>
> Branch: refs/heads/gfac_appcatalog_int
> Commit: b3769516eec7d7127e17d2e24e4001718763c1ec
> Parents: 3b330c0
> Author: lahiru <[email protected]>
> Authored: Thu Oct 30 11:27:12 2014 -0400
> Committer: lahiru <[email protected]>
> Committed: Thu Oct 30 11:27:12 2014 -0400
>
> ----------------------------------------------------------------------
>  .../impl/password/PasswordCredential.java       |   3 +-
>  .../gfac/core/context/JobExecutionContext.java  |   2 +-
>  .../airavata/gfac/core/utils/GFacUtils.java     |   3 +-
>  .../gfac/gsissh/util/GFACGSISSHUtils.java       |   2 +-
>  .../monitor/impl/pull/qstat/HPCPullMonitor.java | 114 ++++++++++---------
>  .../ssh/handler/AdvancedSCPInputHandler.java    |   9 +-
>  .../ssh/handler/AdvancedSCPOutputHandler.java   |  12 +-
>  .../airavata/gfac/ssh/util/GFACSSHUtils.java    |  86 +++++++++++---
>  8 files changed, 146 insertions(+), 85 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
> ----------------------------------------------------------------------
> diff --git
> a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
> b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
> index ee32ef4..a31c98b 100644
> ---
> a/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
> +++
> b/modules/credential-store-service/credential-store/src/main/java/org/apache/airavata/credential/store/credential/impl/password/PasswordCredential.java
> @@ -22,13 +22,14 @@
>  package org.apache.airavata.credential.store.credential.impl.password;
>
>  import org.apache.airavata.credential.store.credential.Credential;
> +import
> org.apache.airavata.credential.store.credential.impl.ssh.SSHCredential;
>
>  import java.util.Date;
>
>  /**
>   * User name password credentials.
>   */
> -public class PasswordCredential extends Credential {
> +public class PasswordCredential extends SSHCredential {
>
>      private String userName;
>      private String password;
>
>
> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
> ----------------------------------------------------------------------
> diff --git
> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
> index 9abab8d..2f94ec5 100644
> ---
> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
> +++
> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
> @@ -234,7 +234,7 @@ public class JobExecutionContext extends
> AbstractContext implements Serializable
>
>
>         public SecurityContext getSecurityContext(String name) throws
> GFacException{
> -               SecurityContext secContext = securityContext.get(name);
> +               SecurityContext secContext =
> securityContext.get(name+"-"+this.getApplicationContext().getHostDescription().getType().getHostAddress());
>                 return secContext;
>         }
>
>
>
> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
> ----------------------------------------------------------------------
> diff --git
> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
> index 729c1ee..eef44a4 100644
> ---
> a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
> +++
> b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
> @@ -1092,7 +1092,8 @@ public class GFacUtils {
>                 }else if(experimentEntry != null &&
> GFacUtils.isCancelled(experimentID,taskID,zk) ){
>              // this happens when a cancel request comes to a differnt
> gfac node, in this case we do not move gfac experiment
>              // node to gfac node specific location, because original
> request execution will fail with errors
> -            return true;
> +            log.error("This experiment is already cancelled and its
> already executing the cancel operation so cannot submit again !");
> +            return false;
>          } else {
>              log.error("ExperimentID: " + experimentID + " taskID: " +
> taskID
>                      + " is already running by this Gfac instance");
>
>
> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
> ----------------------------------------------------------------------
> diff --git
> a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
> b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
> index 4d338e3..2f9dbc3 100644
> ---
> a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
> +++
> b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/util/GFACGSISSHUtils.java
> @@ -163,7 +163,7 @@ public class GFACGSISSHUtils {
>              } catch (Exception e) {
>                  throw new GFacException("An error occurred while creating
> GSI security context", e);
>              }
> -
> jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT,
> context);
> +
> jobExecutionContext.addSecurityContext(Constants.GSI_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(),
> context);
>          }
>      }
>
>
>
> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
> ----------------------------------------------------------------------
> diff --git
> a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
> b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
> index d3c3df8..952b30e 100644
> ---
> a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
> +++
> b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
> @@ -157,12 +157,10 @@ public class HPCPullMonitor extends PullMonitor {
>          HostDescription currentHostDescription = null;
>          try {
>              take = this.queue.take();
> -            Map<String,MonitorID> completedJobs = new
> HashMap<String,MonitorID>();
>              List<HostMonitorData> hostMonitorData =
> take.getHostMonitorData();
>              for (HostMonitorData iHostMonitorData : hostMonitorData) {
>                  if (iHostMonitorData.getHost().getType() instanceof
> GsisshHostType
>                          || iHostMonitorData.getHost().getType()
> instanceof SSHHostType) {
> -                    currentHostDescription = iHostMonitorData.getHost();
>                      String hostName =
> iHostMonitorData.getHost().getType().getHostAddress();
>                      ResourceConnection connection = null;
>                      if (connections.containsKey(hostName)) {
> @@ -181,17 +179,22 @@ public class HPCPullMonitor extends PullMonitor {
>                      // before we get the statuses, we check the cancel
> job list and remove them permanently
>                      List<MonitorID> monitorID =
> iHostMonitorData.getMonitorIDs();
>                      Iterator<String> iterator1 = cancelJobList.iterator();
> -
> -                    for(MonitorID iMonitorID:monitorID){
> +                    ListIterator<MonitorID> monitorIDListIterator =
> monitorID.listIterator();
> +                    while (monitorIDListIterator.hasNext()){
> +                        MonitorID iMonitorID =
> monitorIDListIterator.next();
>                          while(iterator1.hasNext()) {
>                              String cancelMId = iterator1.next();
>                              if
> (cancelMId.equals(iMonitorID.getExperimentID() + "+" +
> iMonitorID.getTaskID())) {
>                                  iMonitorID.setStatus(JobState.CANCELED);
> -
> completedJobs.put(iMonitorID.getJobName(), iMonitorID);
>                                  iterator1.remove();
>                                  logger.debugId(cancelMId, "Found a match
> in cancel monitor queue, hence moved to the " +
>                                                  "completed job queue,
> experiment {}, task {} , job {}",
>                                          iMonitorID.getExperimentID(),
> iMonitorID.getTaskID(), iMonitorID.getJobID());
> +                                logger.info("Job cancelled: marking the
> Job as ************CANCELLED************ experiment {}, task {}, job name
> {} .",
> +
> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
> +                                sendNotification(iMonitorID);
> +                                monitorIDListIterator.remove();
> +
> GFacThreadPoolExecutor.getFixedThreadPool().submit(new
> OutHandlerWorker(gfac, iMonitorID, publisher));
>                                  break;
>                              }
>                          }
> @@ -199,26 +202,36 @@ public class HPCPullMonitor extends PullMonitor {
>                      }
>                      synchronized (completedJobsFromPush) {
>                          ListIterator<String> iterator =
> completedJobsFromPush.listIterator();
> -                        for (MonitorID iMonitorID : monitorID) {
> +                        monitorIDListIterator = monitorID.listIterator();
> +                        while (monitorIDListIterator.hasNext()) {
> +                            MonitorID iMonitorID =
> monitorIDListIterator.next();
>                              String completeId = null;
>                              while (iterator.hasNext()) {
>                                   completeId = iterator.next();
>                                  if
> (completeId.equals(iMonitorID.getUserName() + "," +
> iMonitorID.getJobName())) {
>                                      logger.info("This job is finished
> because push notification came with <username,jobName> " + completeId);
> -
> completedJobs.put(iMonitorID.getJobName(), iMonitorID);
>
>  iMonitorID.setStatus(JobState.COMPLETE);
>                                      iterator.remove();//we have to make
> this empty everytime we iterate, otherwise this list will accumulate and
> will lead to a memory leak
>                                      logger.debugId(completeId, "Push
> notification updated job {} status to {}. " +
>                                                      "experiment {} , task
> {}.", iMonitorID.getJobID(), JobState.COMPLETE.toString(),
>                                              iMonitorID.getExperimentID(),
> iMonitorID.getTaskID());
> +                                    logger.info("AMQP message recieved:
> marking the Job as ************COMPLETE************ experiment {}, task {},
> job name {} .",
> +
> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
> +
> +                                    monitorIDListIterator.remove();
> +                                    sendNotification(iMonitorID);
> +
> GFacThreadPoolExecutor.getFixedThreadPool().submit(new
> OutHandlerWorker(gfac, iMonitorID, publisher));
>                                      break;
>                                  }
>                              }
>                              iterator =
> completedJobsFromPush.listIterator();
>                          }
>                      }
> +
> +                    // we have to get this again because we removed the
> already completed jobs with amqp messages
> +                    monitorID = iHostMonitorData.getMonitorIDs();
>                      Map<String, JobState> jobStatuses =
> connection.getJobStatuses(monitorID);
> -                    Iterator<MonitorID> iterator = monitorID.iterator();
> +                    Iterator<MonitorID> iterator =
> monitorID.listIterator();
>                      while (iterator.hasNext()) {
>                          MonitorID iMonitorID = iterator.next();
>                          currentMonitorID = iMonitorID;
> @@ -226,13 +239,25 @@ public class HPCPullMonitor extends PullMonitor {
>
>  !JobState.COMPLETE.equals(iMonitorID.getStatus())) {
>
>  iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," +
> iMonitorID.getJobName()));    //IMPORTANT this is NOT a simple setter we
> have a logic
>                          }else
> if(JobState.COMPLETE.equals(iMonitorID.getStatus())){
> -                            completedJobs.put(iMonitorID.getJobName(),
> iMonitorID);
>                              logger.debugId(iMonitorID.getJobID(), "Moved
> job {} to completed jobs map, experiment {}, " +
>                                      "task {}", iMonitorID.getJobID(),
> iMonitorID.getExperimentID(), iMonitorID.getTaskID());
> +                            iterator.remove();
> +                            logger.info("PULL Notification is complete:
> marking the Job as ************COMPLETE************ experiment {}, task {},
> job name {} .",
> +
> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
> +
> GFacThreadPoolExecutor.getFixedThreadPool().submit(new
> OutHandlerWorker(gfac, iMonitorID, publisher));
>                          }
> -                        jobStatus = new JobStatusChangeRequestEvent();
>
>  
> iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID()+","+iMonitorID.getJobName()));
>   //IMPORTANT this is not a simple setter we have a logic
> -
> +                        iMonitorID.setLastMonitored(new Timestamp((new
> Date()).getTime()));
> +                        sendNotification(iMonitorID);
> +
> logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status
> change request, " +
> +                                        "experiment {} , task {}",
> jobStatus.getJobIdentity().getExperimentId(),
> +                                jobStatus.getJobIdentity().getTaskId());
> +                        // if the job is completed we do not have to put
> the job to the queue again
> +                        iMonitorID.setLastMonitored(new Timestamp((new
> Date()).getTime()));
> +                    }
> +                    iterator = monitorID.listIterator();
> +                    while(iterator.hasNext()){
> +                        MonitorID iMonitorID = iterator.next();
>                          if (iMonitorID.getFailedCount() > FAILED_COUNT) {
>                              iMonitorID.setLastMonitored(new
> Timestamp((new Date()).getTime()));
>                              String outputDir =
> iMonitorID.getJobExecutionContext().getApplicationContext()
> @@ -245,15 +270,19 @@ public class HPCPullMonitor extends PullMonitor {
>                                      // this is because while we run
> output handler something failed and during exception
>                                      // we store all the jobs in the
> monitor queue again
>                                      logger.error("We know this  job is
> already attempted to run out-handlers");
> -
> CommonUtils.removeMonitorFromQueue(queue, iMonitorID);
> +//
> CommonUtils.removeMonitorFromQueue(queue, iMonitorID);
>                                  }
>                              }
>                              if (stdOut != null && stdOut.size() > 0 &&
> !stdOut.get(0).isEmpty()) { // have to be careful with this
>                                  iMonitorID.setStatus(JobState.COMPLETE);
> -
> completedJobs.put(iMonitorID.getJobName(), iMonitorID);
> -                                logger.errorId(iMonitorID.getJobID(),
> "Job monitoring failed {} times, removed job {} from " +
> -                                                "monitor queue.
> Experiment {} , task {}", iMonitorID.getFailedCount(),
> +                                logger.errorId(iMonitorID.getJobID(),
> "Job monitoring failed {} times, " +
> +                                                " Experiment {} , task
> {}", iMonitorID.getFailedCount(),
>                                          iMonitorID.getExperimentID(),
> iMonitorID.getTaskID());
> +                                logger.info("Listing directory came as
> complete: marking the Job as ************COMPLETE************ experiment
> {}, task {}, job name {} .",
> +
> iMonitorID.getExperimentID(),iMonitorID.getTaskID(),iMonitorID.getJobName());
> +                                sendNotification(iMonitorID);
> +                                iterator.remove();
> +
> GFacThreadPoolExecutor.getFixedThreadPool().submit(new
> OutHandlerWorker(gfac, iMonitorID, publisher));
>                              } else {
>                                  iMonitorID.setFailedCount(0);
>                              }
> @@ -263,22 +292,9 @@ public class HPCPullMonitor extends PullMonitor {
>                              // if the job is complete we remove it from
> the Map, if any of these maps
>                              // get empty this userMonitorData will get
> delete from the queue
>                          }
> -                        JobIdentifier jobIdentity = new
> JobIdentifier(iMonitorID.getJobID(),
> -                                iMonitorID.getTaskID(),
> -                                iMonitorID.getWorkflowNodeID(),
> -                                iMonitorID.getExperimentID(),
> -
> iMonitorID.getJobExecutionContext().getGatewayID());
> -                        jobStatus.setJobIdentity(jobIdentity);
> -                        jobStatus.setState(iMonitorID.getStatus());
> -                        // we have this JobStatus class to handle amqp
> monitoring
> -
> -                        publisher.publish(jobStatus);
> -
> logger.debugId(jobStatus.getJobIdentity().getJobId(), "Published job status
> change request, " +
> -                                        "experiment {} , task {}",
> jobStatus.getJobIdentity().getExperimentId(),
> -                                jobStatus.getJobIdentity().getTaskId());
> -                        // if the job is completed we do not have to put
> the job to the queue again
> -                        iMonitorID.setLastMonitored(new Timestamp((new
> Date()).getTime()));
>                      }
> +
> +
>                  } else {
>                      logger.debug("Qstat Monitor doesn't handle non-gsissh
> hosts , host {}", iHostMonitorData.getHost()
>                              .getType().getHostAddress());
> @@ -287,30 +303,6 @@ public class HPCPullMonitor extends PullMonitor {
>              // We have finished all the HostMonitorData object in
> userMonitorData, now we need to put it back
>              // now the userMonitorData goes back to the tail of the queue
>              queue.put(take);
> -            // cleaning up the completed jobs, this method will remove
> some of the userMonitorData from the queue if
> -            // they become empty
> -            Map<String, Integer> jobRemoveCountMap = new HashMap<String,
> Integer>();
> -            ZooKeeper zk = null;
> -            Set<String> keys = completedJobs.keySet();
> -            for (String jobName: keys) {
> -                MonitorID completedJob = completedJobs.get(jobName);
> -                CommonUtils.removeMonitorFromQueue(queue, completedJob);
> -//
> gfac.invokeOutFlowHandlers(completedJob.getJobExecutionContext());
> -                  GFacThreadPoolExecutor.getFixedThreadPool().submit(new
> OutHandlerWorker(gfac, completedJob, publisher));
> -                if (zk == null) {
> -                    zk = completedJob.getJobExecutionContext().getZk();
> -                }
> -                String key =
> CommonUtils.getJobCountUpdatePath(completedJob);
> -                int i = 0;
> -                if (jobRemoveCountMap.containsKey(key)) {
> -                    i = Integer.valueOf(jobRemoveCountMap.get(key));
> -                }
> -                jobRemoveCountMap.put(key, ++i);
> -            }
> -            if (completedJobs.size() > 0) {
> -                // reduce completed job count from zookeeper
> -                CommonUtils.updateZkWithJobCount(zk, jobRemoveCountMap,
> false);
> -            }
>          } catch (InterruptedException e) {
>              if (!this.queue.contains(take)) {
>                  try {
> @@ -357,6 +349,20 @@ public class HPCPullMonitor extends PullMonitor {
>          return true;
>      }
>
> +    private void sendNotification(MonitorID iMonitorID) {
> +        JobStatusChangeRequestEvent jobStatus = new
> JobStatusChangeRequestEvent();
> +        JobIdentifier jobIdentity = new
> JobIdentifier(iMonitorID.getJobID(),
> +                iMonitorID.getTaskID(),
> +                iMonitorID.getWorkflowNodeID(),
> +                iMonitorID.getExperimentID(),
> +                iMonitorID.getJobExecutionContext().getGatewayID());
> +        jobStatus.setJobIdentity(jobIdentity);
> +        jobStatus.setState(iMonitorID.getStatus());
> +        // we have this JobStatus class to handle amqp monitoring
> +
> +        publisher.publish(jobStatus);
> +    }
> +
>      /**
>       * This is the method to stop the polling process
>       *
>
>
> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
> ----------------------------------------------------------------------
> diff --git
> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
> index ce296da..de4dd41 100644
> ---
> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
> +++
> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
> @@ -72,6 +72,7 @@ import java.util.*;
>  public class AdvancedSCPInputHandler extends AbstractRecoverableHandler {
>      private static final Logger log =
> LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
>      public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
> +    public static final int DEFAULT_SSH_PORT = 22;
>
>      private String password = null;
>
> @@ -131,11 +132,11 @@ public class AdvancedSCPInputHandler extends
> AbstractRecoverableHandler {
>                          this.passPhrase);
>              }
>              ServerInfo serverInfo = new ServerInfo(this.userName,
> this.hostName);
> -            String key = this.userName + this.hostName;
> -            jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new
> SSHAuthWrapper(serverInfo,authenticationInfo,key));
> -            if
> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)
> == null) {
> +            String key = this.userName + this.hostName + DEFAULT_SSH_PORT;
> +            SSHAuthWrapper sshAuthWrapper = new
> SSHAuthWrapper(serverInfo, authenticationInfo, key);
> +            if
> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key)
> == null) {
>                  try {
> -                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
> +
> GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper);
>                  } catch (ApplicationSettingsException e) {
>                      log.error(e.getMessage());
>                      try {
>
>
> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
> ----------------------------------------------------------------------
> diff --git
> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
> index ad2131e..aed6e9f 100644
> ---
> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
> +++
> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
> @@ -73,6 +73,8 @@ import java.util.Set;
>  public class AdvancedSCPOutputHandler extends AbstractHandler {
>      private static final Logger log =
> LoggerFactory.getLogger(AdvancedSCPOutputHandler.class);
>
> +    public static final int DEFAULT_SSH_PORT = 22;
> +
>      private String password = null;
>
>      private String publicKeyPath;
> @@ -87,8 +89,6 @@ public class AdvancedSCPOutputHandler extends
> AbstractHandler {
>
>      private String outputPath;
>
> -    public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
> -
>
>      public void initProperties(Properties properties) throws
> GFacHandlerException {
>          password = (String)properties.get("password");
> @@ -111,12 +111,12 @@ public class AdvancedSCPOutputHandler extends
> AbstractHandler {
>                      this.passPhrase);
>          }
>          ServerInfo serverInfo = new ServerInfo(this.userName,
> this.hostName);
> -        String key = this.userName + this.hostName;
> -        jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,new
> SSHAuthWrapper(serverInfo,authenticationInfo,key));
> +        String key = this.userName + this.hostName + DEFAULT_SSH_PORT;
> +        SSHAuthWrapper sshAuthWrapper = new SSHAuthWrapper(serverInfo,
> authenticationInfo, key);
>          try {
> -            if
> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT)
> == null) {
> +            if
> (jobExecutionContext.getSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT+key)
> == null) {
>                  try {
> -                    GFACSSHUtils.addSecurityContext(jobExecutionContext);
> +
> GFACSSHUtils.addSecurityContext(jobExecutionContext,sshAuthWrapper);
>                  } catch (ApplicationSettingsException e) {
>                      log.error(e.getMessage());
>                      try {
>
>
> http://git-wip-us.apache.org/repos/asf/airavata/blob/b3769516/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
> ----------------------------------------------------------------------
> diff --git
> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
> index 7ee5d6a..94f07b1 100644
> ---
> a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
> +++
> b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/util/GFACSSHUtils.java
> @@ -62,9 +62,12 @@ public class GFACSSHUtils {
>
>      public static int maxClusterCount = 5;
>
> -    public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
> -
> -
> +    /**
> +     * This method is to add computing resource specific authentication,
> if its a third party machine, use the other addSecurityContext
> +     * @param jobExecutionContext
> +     * @throws GFacException
> +     * @throws ApplicationSettingsException
> +     */
>      public static void addSecurityContext(JobExecutionContext
> jobExecutionContext) throws GFacException, ApplicationSettingsException {
>          HostDescription registeredHost =
> jobExecutionContext.getApplicationContext().getHostDescription();
>          if (registeredHost.getType() instanceof GlobusHostType ||
> registeredHost.getType() instanceof UnicoreHostType) {
> @@ -77,8 +80,6 @@ public class GFACSSHUtils {
>              requestData.setTokenId(credentialStoreToken);
>
>              ServerInfo serverInfo = new ServerInfo(null,
> registeredHost.getType().getHostAddress());
> -            SSHAuthWrapper sshAuth = (SSHAuthWrapper)
> jobExecutionContext.getProperty(ADVANCED_SSH_AUTH);
> -
>              Cluster pbsCluster = null;
>              try {
>                  TokenizedSSHAuthInfo tokenizedSSHAuthInfo = new
> TokenizedSSHAuthInfo(requestData);
> @@ -95,9 +96,6 @@ public class GFACSSHUtils {
>
>                  String key = credentials.getPortalUserName() +
> registeredHost.getType().getHostAddress() +
>                          serverInfo.getPort();
> -                if(sshAuth!=null){
> -                    key=sshAuth.getKey();
> -                }
>                  boolean recreate = false;
>                  synchronized (clusters) {
>                      if (clusters.containsKey(key) &&
> clusters.get(key).size() < maxClusterCount) {
> @@ -125,15 +123,8 @@ public class GFACSSHUtils {
>                          recreate = true;
>                      }
>                      if (recreate) {
> -                        if (sshAuth != null) {
> -                            pbsCluster = new
> PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),
> +                        pbsCluster = new PBSCluster(serverInfo,
> tokenizedSSHAuthInfo,
>
>  CommonUtils.getPBSJobManager(installedParentPath));
> -
> jobExecutionContext.setProperty(ADVANCED_SSH_AUTH,null); // some other
> provider might fail
> -                            key = sshAuth.getKey();
> -                        } else {
> -                            pbsCluster = new PBSCluster(serverInfo,
> tokenizedSSHAuthInfo,
> -
> CommonUtils.getPBSJobManager(installedParentPath));
> -                        }
>                          List<Cluster> pbsClusters = null;
>                          if (!(clusters.containsKey(key))) {
>                              pbsClusters = new ArrayList<Cluster>();
> @@ -148,10 +139,71 @@ public class GFACSSHUtils {
>                  e.printStackTrace();  //To change body of catch statement
> use File | Settings | File Templates.
>              }
>              sshSecurityContext.setPbsCluster(pbsCluster);
> -
> jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT,
> sshSecurityContext);
> +
> jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+"-"+registeredHost.getType().getHostAddress(),
> sshSecurityContext);
>          }
>      }
>
> +    /**
> +     * This method can be used to add third party resource security
> contexts
> +     * @param jobExecutionContext
> +     * @param sshAuth
> +     * @throws GFacException
> +     * @throws ApplicationSettingsException
> +     */
> +    public static void addSecurityContext(JobExecutionContext
> jobExecutionContext,SSHAuthWrapper sshAuth) throws GFacException,
> ApplicationSettingsException {
> +            try {
> +                if(sshAuth== null) {
> +                    throw new GFacException("Error adding security
> Context, because sshAuthWrapper is null");
> +                }
> +                SSHSecurityContext sshSecurityContext = new
> SSHSecurityContext();
> +                Cluster pbsCluster = null;
> +                String key=sshAuth.getKey();
> +                boolean recreate = false;
> +                synchronized (clusters) {
> +                    if (clusters.containsKey(key) &&
> clusters.get(key).size() < maxClusterCount) {
> +                        recreate = true;
> +                    } else if (clusters.containsKey(key)) {
> +                        int i = new Random().nextInt(Integer.MAX_VALUE) %
> maxClusterCount;
> +                        if
> (clusters.get(key).get(i).getSession().isConnected()) {
> +                            pbsCluster = clusters.get(key).get(i);
> +                        } else {
> +                            clusters.get(key).remove(i);
> +                            recreate = true;
> +                        }
> +                        if (!recreate) {
> +                            try {
> +                                pbsCluster.listDirectory("~/"); // its
> hard to trust isConnected method, so we try to connect if it works we are
> good,else we recreate
> +                            } catch (Exception e) {
> +                                clusters.get(key).remove(i);
> +                                logger.info("Connection found the
> connection map is expired, so we create from the scratch");
> +                                maxClusterCount++;
> +                                recreate = true; // we make the
> pbsCluster to create again if there is any exception druing connection
> +                            }
> +                        }
> +                        logger.info("Re-using the same connection used
> with the connection string:" + key);
> +                    } else {
> +                        recreate = true;
> +                    }
> +                    if (recreate) {
> +                        pbsCluster = new
> PBSCluster(sshAuth.getServerInfo(), sshAuth.getAuthenticationInfo(),null);
> +                        key = sshAuth.getKey();
> +                        List<Cluster> pbsClusters = null;
> +                        if (!(clusters.containsKey(key))) {
> +                            pbsClusters = new ArrayList<Cluster>();
> +                        } else {
> +                            pbsClusters = clusters.get(key);
> +                        }
> +                        pbsClusters.add(pbsCluster);
> +                        clusters.put(key, pbsClusters);
> +                    }
> +                }
> +                sshSecurityContext.setPbsCluster(pbsCluster);
> +
> jobExecutionContext.addSecurityContext(Constants.SSH_SECURITY_CONTEXT+key,
> sshSecurityContext);
> +            } catch (Exception e) {
> +                e.printStackTrace();  //To change body of catch statement
> use File | Settings | File Templates.
> +            }
> +    }
> +
>      public static JobDescriptor createJobDescriptor(JobExecutionContext
> jobExecutionContext,
>
>  ApplicationDeploymentDescriptionType app, Cluster cluster) {
>          JobDescriptor jobDescriptor = new JobDescriptor();
>
>


-- 
Research Assistant
Science Gateways Group
Indiana University

Reply via email to