Repository: airavata Updated Branches: refs/heads/master 3721954dd -> f37dad87a
Added new method to get monitor command and create client without zookeeper. Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/f37dad87 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/f37dad87 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/f37dad87 Branch: refs/heads/master Commit: f37dad87a0d86692abbab89cf3abd6010d7302e0 Parents: 3721954 Author: raminder <[email protected]> Authored: Wed Aug 13 22:33:13 2014 -0400 Committer: raminder <[email protected]> Committed: Wed Aug 13 22:33:13 2014 -0400 ---------------------------------------------------------------------- .../core/impl/GFACServiceJobSubmitter.java | 155 ++++++++++--------- .../gsi/ssh/impl/GSISSHAbstractCluster.java | 13 +- .../airavata/gsi/ssh/impl/RawCommandInfo.java | 6 +- 3 files changed, 90 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/f37dad87/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java ---------------------------------------------------------------------- diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java index 1dd8dbb..a2c153e 100644 --- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java +++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java @@ -17,7 +17,7 @@ * specific language governing permissions and limitations * under the License. * -*/ + */ package org.apache.airavata.orchestrator.core.impl; import java.io.File; @@ -29,6 +29,8 @@ import org.apache.airavata.common.exception.ApplicationSettingsException; import org.apache.airavata.common.utils.AiravataZKUtils; import org.apache.airavata.common.utils.Constants; import org.apache.airavata.common.utils.ServerSettings; +import org.apache.airavata.gfac.core.cpi.GFac; +import org.apache.airavata.gfac.core.cpi.GFacImpl; import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.cpi.GfacService; import org.apache.airavata.orchestrator.core.context.OrchestratorContext; @@ -50,86 +52,87 @@ import org.slf4j.LoggerFactory; * gfac instance. */ public class GFACServiceJobSubmitter implements JobSubmitter, Watcher { - private final static Logger logger = LoggerFactory.getLogger(GFACServiceJobSubmitter.class); - public static final String IP = "ip"; - - private OrchestratorContext orchestratorContext; - - private static Integer mutex = -1; - - public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException { - this.orchestratorContext = orchestratorContext; - } - - public GFACInstance selectGFACInstance() throws OrchestratorException { - // currently we only support one instance but future we have to pick an instance - return null; - } + private final static Logger logger = LoggerFactory.getLogger(GFACServiceJobSubmitter.class); + public static final String IP = "ip"; + private OrchestratorContext orchestratorContext; - public boolean submit(String experimentID, String taskID) throws OrchestratorException { - return this.submit(experimentID, taskID, null); - } + private static Integer mutex = -1; + public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException { + this.orchestratorContext = orchestratorContext; + } - public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException { - ZooKeeper zk = orchestratorContext.getZk(); - try { - if (zk==null || !zk.getState().isConnected()) { - String zkhostPort = AiravataZKUtils.getZKhostPort(); - zk = new ZooKeeper(zkhostPort, 6000, this); - synchronized (mutex) { - mutex.wait(); - } - } - String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); - String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); - List<String> children = zk.getChildren(gfacServer, this); -// System.out.println(children); - String pickedChild; - if(children.size() == 0){ - pickedChild = "gfac-node0"; - }else{ - pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); - } - // here we are not using an index because the getChildren does not return the same order everytime + public GFACInstance selectGFACInstance() throws OrchestratorException { + // currently we only support one instance but future we have to pick an + // instance + return null; + } - String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); - logger.info("GFAC instance node data: " + gfacNodeData); - String[] split = gfacNodeData.split(":"); - GfacService.Client localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); - if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { // before submitting the job we check again the state of the node - if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild,tokenId)) { - //FIXME:: The GatewayID is temporarily read from properties file. It should instead be inferred from the token. - return localhost.submitJob(experimentID, taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME)); - } - } - } catch (TException e) { - throw new OrchestratorException(e); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (KeeperException e) { - e.printStackTrace(); - } catch (ApplicationSettingsException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - return false; - } + public boolean submit(String experimentID, String taskID) throws OrchestratorException { + return this.submit(experimentID, taskID, null); + } + public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException { + ZooKeeper zk = orchestratorContext.getZk(); + try { + if (zk == null || !zk.getState().isConnected()) { + String zkhostPort = AiravataZKUtils.getZKhostPort(); + zk = new ZooKeeper(zkhostPort, 6000, this); + synchronized (mutex) { + mutex.wait(); + } + } + String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"); + String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments"); + List<String> children = zk.getChildren(gfacServer, this); + + if (children.size() == 0) { + // Zookeeper data need cleaning + GfacService.Client localhost = GFacClientFactory.createGFacClient(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST), Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_HOST))); + return localhost.submitJob(experimentID, taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME)); + } else { + String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size()); + // here we are not using an index because the getChildren does not return the same order everytime + String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null)); + logger.info("GFAC instance node data: " + gfacNodeData); + String[] split = gfacNodeData.split(":"); + GfacService.Client localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1])); + if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { + // before submitting the job we check again the state of the node + if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) { + // FIXME:: The GatewayID is temporarily read from properties file. It should instead be inferred from the token. + return localhost.submitJob(experimentID, taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME)); + } + } + } + } catch (TException e) { + throw new OrchestratorException(e); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (KeeperException e) { + e.printStackTrace(); + } catch (ApplicationSettingsException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } - synchronized public void process(WatchedEvent event) { - synchronized (mutex) { - switch (event.getState()) { - case SyncConnected: - mutex.notify(); - } - switch (event.getType()) { - case NodeCreated: - mutex.notify(); - break; - } - } - } + synchronized public void process(WatchedEvent event) { + synchronized (mutex) { + switch (event.getState()) { + case SyncConnected: + mutex.notify(); + } + switch (event.getType()) { + case NodeCreated: + mutex.notify(); + break; + } + } + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/f37dad87/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java index 937ed9c..cf65a7f 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/GSISSHAbstractCluster.java @@ -222,7 +222,7 @@ public class GSISSHAbstractCluster implements Cluster { StandardOutReader stdOutReader = new StandardOutReader(); CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader); - String outputifAvailable = getOutputifAvailable(stdOutReader, "Error reading output of job submission",rawCommandInfo.getCommand()); + String outputifAvailable = getOutputifAvailable(stdOutReader, "Error reading output of job submission",rawCommandInfo.getBaseCommand()); // this might not be the case for all teh resources, if so Cluster implementation can override this method // because here after cancelling we try to get the job description and return it back JobDescriptor jobById = this.getJobDescriptorById(jobID); @@ -250,7 +250,7 @@ public class GSISSHAbstractCluster implements Cluster { //Check whether pbs submission is successful or not, if it failed throw and exception in submitJob method // with the error thrown in qsub command // - String outputifAvailable = getOutputifAvailable(standardOutReader,"Error reading output of job submission",rawCommandInfo.getCommand()); + String outputifAvailable = getOutputifAvailable(standardOutReader,"Error reading output of job submission",rawCommandInfo.getBaseCommand()); OutputParser outputParser = jobManagerConfiguration.getParser(); return outputParser.parse(outputifAvailable); } @@ -315,7 +315,7 @@ public class GSISSHAbstractCluster implements Cluster { RawCommandInfo rawCommandInfo = jobManagerConfiguration.getMonitorCommand(jobID); StandardOutReader stdOutReader = new StandardOutReader(); CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader); - String result = getOutputifAvailable(stdOutReader, "Error getting job information from the resource !",rawCommandInfo.getCommand()); + String result = getOutputifAvailable(stdOutReader, "Error getting job information from the resource !",rawCommandInfo.getBaseCommand()); JobDescriptor jobDescriptor = new JobDescriptor(); jobManagerConfiguration.getParser().parse(jobDescriptor,result); return jobDescriptor; @@ -325,7 +325,7 @@ public class GSISSHAbstractCluster implements Cluster { RawCommandInfo rawCommandInfo = jobManagerConfiguration.getMonitorCommand(jobID); StandardOutReader stdOutReader = new StandardOutReader(); CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader); - String result = getOutputifAvailable(stdOutReader, "Error getting job information from the resource !", rawCommandInfo.getCommand()); + String result = getOutputifAvailable(stdOutReader, "Error getting job information from the resource !", rawCommandInfo.getBaseCommand()); return jobManagerConfiguration.getParser().parse(jobID, result); } @@ -424,11 +424,10 @@ public class GSISSHAbstractCluster implements Cluster { } public void getJobStatuses(String userName, Map<String,JobStatus> jobIDs)throws SSHApiException { -// RawCommandInfo rawCommandInfo = jobManagerConfiguration.getUserBasedMonitorCommand(userName); - RawCommandInfo rawCommandInfo = new RawCommandInfo("qstat -u abc"); + RawCommandInfo rawCommandInfo = jobManagerConfiguration.getUserBasedMonitorCommand(userName); StandardOutReader stdOutReader = new StandardOutReader(); CommandExecutor.executeCommand(rawCommandInfo, this.getSession(), stdOutReader); - String result = getOutputifAvailable(stdOutReader, "Error getting job information from the resource !", rawCommandInfo.getCommand()); + String result = getOutputifAvailable(stdOutReader, "Error getting job information from the resource !", rawCommandInfo.getBaseCommand()); jobManagerConfiguration.getParser().parse(userName,jobIDs, result); } http://git-wip-us.apache.org/repos/asf/airavata/blob/f37dad87/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java ---------------------------------------------------------------------- diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java index 0e9d16e..ab85925 100644 --- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java +++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/RawCommandInfo.java @@ -48,7 +48,11 @@ public class RawCommandInfo implements CommandInfo { public String getRawCommand() { return rawCommand; } - + + public String getBaseCommand() { + return rawCommand.substring(0, rawCommand.indexOf(" ")); + } + public void setRawCommand(String rawCommand) { this.rawCommand = rawCommand; }
