Repository: airavata
Updated Branches:
  refs/heads/master 3721954dd -> f37dad87a


Added new method to get monitor command and create client without
zookeeper.

Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f37dad87
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f37dad87
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f37dad87

Branch: refs/heads/master
Commit: f37dad87a0d86692abbab89cf3abd6010d7302e0
Parents: 3721954
Author: raminder <[email protected]>
Authored: Wed Aug 13 22:33:13 2014 -0400
Committer: raminder <[email protected]>
Committed: Wed Aug 13 22:33:13 2014 -0400

----------------------------------------------------------------------
 .../core/impl/GFACServiceJobSubmitter.java      | 155 ++++++++++---------
 .../gsi/ssh/impl/GSISSHAbstractCluster.java     |  13 +-
 .../airavata/gsi/ssh/impl/RawCommandInfo.java   |   6 +-
 3 files changed, 90 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/f37dad87/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index 1dd8dbb..a2c153e 100644
--- 
a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ 
b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -17,7 +17,7 @@
  * specific language governing permissions and limitations
  * under the License.
  *
-*/
+ */
 package org.apache.airavata.orchestrator.core.impl;
 
 import java.io.File;
@@ -29,6 +29,8 @@ import 
org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.cpi.GFac;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
@@ -50,86 +52,87 @@ import org.slf4j.LoggerFactory;
  * gfac instance.
  */
 public class GFACServiceJobSubmitter implements JobSubmitter, Watcher {
-    private final static Logger logger = 
LoggerFactory.getLogger(GFACServiceJobSubmitter.class);
-    public static final String IP = "ip";
-
-    private OrchestratorContext orchestratorContext;
-
-    private static Integer mutex = -1;
-
-    public void initialize(OrchestratorContext orchestratorContext) throws 
OrchestratorException {
-        this.orchestratorContext = orchestratorContext;
-    }
-
-    public GFACInstance selectGFACInstance() throws OrchestratorException {
-        // currently we only support one instance but future we have to pick 
an instance
-        return null;
-    }
+       private final static Logger logger = 
LoggerFactory.getLogger(GFACServiceJobSubmitter.class);
+       public static final String IP = "ip";
 
+       private OrchestratorContext orchestratorContext;
 
-    public boolean submit(String experimentID, String taskID) throws 
OrchestratorException {
-        return this.submit(experimentID, taskID, null);
-    }
+       private static Integer mutex = -1;
 
+       public void initialize(OrchestratorContext orchestratorContext) throws 
OrchestratorException {
+               this.orchestratorContext = orchestratorContext;
+       }
 
-    public boolean submit(String experimentID, String taskID, String tokenId) 
throws OrchestratorException {
-        ZooKeeper zk = orchestratorContext.getZk();
-        try {
-            if (zk==null || !zk.getState().isConnected()) {
-                String zkhostPort = AiravataZKUtils.getZKhostPort();
-                zk = new ZooKeeper(zkhostPort, 6000, this);
-                synchronized (mutex) {
-                    mutex.wait();
-                }
-            }
-            String gfacServer = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
-            String experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
-            List<String> children = zk.getChildren(gfacServer, this);
-//            System.out.println(children);
-            String pickedChild;
-            if(children.size() == 0){
-               pickedChild = "gfac-node0";
-            }else{
-             pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
-            }
-            // here we are not using an index because the getChildren does not 
return the same order everytime
+       public GFACInstance selectGFACInstance() throws OrchestratorException {
+               // currently we only support one instance but future we have to 
pick an
+               // instance
+               return null;
+       }
 
-            String gfacNodeData = new String(zk.getData(gfacServer + 
File.separator + pickedChild, false, null));
-            logger.info("GFAC instance node data: " + gfacNodeData);
-            String[] split = gfacNodeData.split(":");
-            GfacService.Client localhost = 
GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
-            if (zk.exists(gfacServer + File.separator + pickedChild, false) != 
null) {      // before submitting the job we check again the state of the node
-                if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, 
experimentNode, pickedChild,tokenId)) {
-                    //FIXME:: The GatewayID is temporarily read from 
properties file. It should instead be inferred from the token.
-                    return localhost.submitJob(experimentID, taskID, 
ServerSettings.getSetting(Constants.GATEWAY_NAME));
-                }
-            }
-        } catch (TException e) {
-            throw new OrchestratorException(e);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        } catch (KeeperException e) {
-            e.printStackTrace();
-        } catch (ApplicationSettingsException e) {
-            e.printStackTrace();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        return false;
-    }
+       public boolean submit(String experimentID, String taskID) throws 
OrchestratorException {
+               return this.submit(experimentID, taskID, null);
+       }
 
+       public boolean submit(String experimentID, String taskID, String 
tokenId) throws OrchestratorException {
+               ZooKeeper zk = orchestratorContext.getZk();
+               try {
+                       if (zk == null || !zk.getState().isConnected()) {
+                               String zkhostPort = 
AiravataZKUtils.getZKhostPort();
+                               zk = new ZooKeeper(zkhostPort, 6000, this);
+                               synchronized (mutex) {
+                                       mutex.wait();
+                               }
+                       }
+                       String gfacServer = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
+                       String experimentNode = 
ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, 
"/gfac-experiments");
+                       List<String> children = zk.getChildren(gfacServer, 
this);
+                       
+                       if (children.size() == 0) {
+                               // Zookeeper data need cleaning
+                               GfacService.Client localhost = 
GFacClientFactory.createGFacClient(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST),
 Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)));
+                               return localhost.submitJob(experimentID, 
taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME));
+                       } else {
+                               String pickedChild = children.get(new 
Random().nextInt(Integer.MAX_VALUE) % children.size());
+                               // here we are not using an index because the 
getChildren does not return the same order everytime
+                               String gfacNodeData = new 
String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
+                               logger.info("GFAC instance node data: " + 
gfacNodeData);
+                               String[] split = gfacNodeData.split(":");
+                               GfacService.Client localhost = 
GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
+                               if (zk.exists(gfacServer + File.separator + 
pickedChild, false) != null) {
+                                       // before submitting the job we check 
again the state of the node
+                                       if 
(GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, 
pickedChild, tokenId)) {
+                                               // FIXME:: The GatewayID is 
temporarily read from properties file. It should instead be inferred from the 
token.
+                                               return 
localhost.submitJob(experimentID, taskID, 
ServerSettings.getSetting(Constants.GATEWAY_NAME));
+                                       }
+                               }
+                       }
+               } catch (TException e) {
+                       throw new OrchestratorException(e);
+               } catch (InterruptedException e) {
+                       e.printStackTrace();
+               } catch (KeeperException e) {
+                       e.printStackTrace();
+               } catch (ApplicationSettingsException e) {
+                       e.printStackTrace();
+               } catch (IOException e) {
+                       e.printStackTrace();
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+               return false;
+       }
 
-    synchronized public void process(WatchedEvent event) {
-        synchronized (mutex) {
-            switch (event.getState()) {
-                case SyncConnected:
-                    mutex.notify();
-            }
-            switch (event.getType()) {
-                case NodeCreated:
-                    mutex.notify();
-                    break;
-            }
-        }
-    }
+       synchronized public void process(WatchedEvent event) {
+               synchronized (mutex) {
+                       switch (event.getState()) {
+                       case SyncConnected:
+                               mutex.notify();
+                       }
+                       switch (event.getType()) {
+                       case NodeCreated:
+                               mutex.notify();
+                               break;
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/f37dad87/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
----------------------------------------------------------------------
diff --git 
a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
 
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
index 937ed9c..cf65a7f 100644
--- 
a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
+++ 
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java
@@ -222,7 +222,7 @@ public class GSISSHAbstractCluster implements Cluster {
 
         StandardOutReader stdOutReader = new StandardOutReader();
         CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), 
stdOutReader);
-        String outputifAvailable = getOutputifAvailable(stdOutReader, "Error 
reading output of job submission",rawCommandInfo.getCommand());
+        String outputifAvailable = getOutputifAvailable(stdOutReader, "Error 
reading output of job submission",rawCommandInfo.getBaseCommand());
         // this might not be the case for all teh resources, if so Cluster 
implementation can override this method
         // because here after cancelling we try to get the job description and 
return it back
         JobDescriptor jobById = this.getJobDescriptorById(jobID);
@@ -250,7 +250,7 @@ public class GSISSHAbstractCluster implements Cluster {
         //Check whether pbs submission is successful or not, if it failed 
throw and exception in submitJob method
         // with the error thrown in qsub command
         //
-        String outputifAvailable = 
getOutputifAvailable(standardOutReader,"Error reading output of job 
submission",rawCommandInfo.getCommand());
+        String outputifAvailable = 
getOutputifAvailable(standardOutReader,"Error reading output of job 
submission",rawCommandInfo.getBaseCommand());
         OutputParser outputParser = jobManagerConfiguration.getParser();
         return  outputParser.parse(outputifAvailable);
     }
@@ -315,7 +315,7 @@ public class GSISSHAbstractCluster implements Cluster {
         RawCommandInfo rawCommandInfo = 
jobManagerConfiguration.getMonitorCommand(jobID);
         StandardOutReader stdOutReader = new StandardOutReader();
         CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), 
stdOutReader);
-        String result = getOutputifAvailable(stdOutReader, "Error getting job 
information from the resource !",rawCommandInfo.getCommand());
+        String result = getOutputifAvailable(stdOutReader, "Error getting job 
information from the resource !",rawCommandInfo.getBaseCommand());
         JobDescriptor jobDescriptor = new JobDescriptor();
         jobManagerConfiguration.getParser().parse(jobDescriptor,result);
         return jobDescriptor;
@@ -325,7 +325,7 @@ public class GSISSHAbstractCluster implements Cluster {
         RawCommandInfo rawCommandInfo = 
jobManagerConfiguration.getMonitorCommand(jobID);
         StandardOutReader stdOutReader = new StandardOutReader();
         CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), 
stdOutReader);
-        String result = getOutputifAvailable(stdOutReader, "Error getting job 
information from the resource !", rawCommandInfo.getCommand());
+        String result = getOutputifAvailable(stdOutReader, "Error getting job 
information from the resource !", rawCommandInfo.getBaseCommand());
         return jobManagerConfiguration.getParser().parse(jobID, result);
     }
 
@@ -424,11 +424,10 @@ public class GSISSHAbstractCluster implements Cluster {
     }
 
     public void getJobStatuses(String userName, Map<String,JobStatus> 
jobIDs)throws SSHApiException {
-//        RawCommandInfo rawCommandInfo = 
jobManagerConfiguration.getUserBasedMonitorCommand(userName);
-        RawCommandInfo rawCommandInfo = new RawCommandInfo("qstat -u abc");
+        RawCommandInfo rawCommandInfo = 
jobManagerConfiguration.getUserBasedMonitorCommand(userName);
         StandardOutReader stdOutReader = new StandardOutReader();
         CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), 
stdOutReader);
-        String result = getOutputifAvailable(stdOutReader, "Error getting job 
information from the resource !", rawCommandInfo.getCommand());
+        String result = getOutputifAvailable(stdOutReader, "Error getting job 
information from the resource !", rawCommandInfo.getBaseCommand());
         jobManagerConfiguration.getParser().parse(userName,jobIDs, result);
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/f37dad87/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java
----------------------------------------------------------------------
diff --git 
a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java
 
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java
index 0e9d16e..ab85925 100644
--- 
a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java
+++ 
b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java
@@ -48,7 +48,11 @@ public class RawCommandInfo implements CommandInfo {
     public String getRawCommand() {
         return rawCommand;
     }
-
+    
+    public String getBaseCommand() {
+        return rawCommand.substring(0, rawCommand.indexOf(" "));
+    }
+    
     public void setRawCommand(String rawCommand) {
         this.rawCommand = rawCommand;
     }

Reply via email to