Integrated appCatalog model to GFac local and hpc monitor modules, commented out test calsses
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/3e584f87 Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/3e584f87 Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/3e584f87 Branch: refs/heads/gfac_appcatalog_int Commit: 3e584f87d359c07bb2e4429884d8efa820135671 Parents: d94e8c9 Author: shamrath <[email protected]> Authored: Tue Nov 4 17:51:53 2014 -0500 Committer: Chathuri Wimalasena <[email protected]> Committed: Wed Nov 5 13:10:24 2014 -0500 ---------------------------------------------------------------------- .../gfac/core/context/JobExecutionContext.java | 12 + .../airavata/gfac/core/cpi/BetterGfacImpl.java | 4 + .../handler/LocalDirectorySetupHandler.java | 19 +- .../gfac/local/provider/impl/LocalProvider.java | 48 ++- .../gfac/local/utils/LocalProviderUtil.java | 15 +- .../gfac/services/impl/LocalProviderTest.java | 368 +++++++++---------- .../airavata/gfac/monitor/HPCMonitorID.java | 11 +- .../airavata/gfac/monitor/HostMonitorData.java | 38 +- .../handlers/GridPullMonitorHandler.java | 2 +- .../monitor/impl/pull/qstat/HPCPullMonitor.java | 24 +- .../airavata/gfac/monitor/util/CommonUtils.java | 31 +- .../job/QstatMonitorTestWithMyProxyAuth.java | 344 ++++++++--------- 12 files changed, 468 insertions(+), 448 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java index 2d1a975..30142f8 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java @@ -72,6 +72,10 @@ public class JobExecutionContext extends AbstractContext implements Serializable private String credentialStoreToken; /** + * User defined scratch/temp directory + */ + private String scratchLocation; + /** * User defined working directory. */ private String workingDir; @@ -359,6 +363,14 @@ public class JobExecutionContext extends AbstractContext implements Serializable this.credentialStoreToken = credentialStoreToken; } + public String getScratchLocation() { + return scratchLocation; + } + + public void setScratchLocation(String scratchLocation) { + this.scratchLocation = scratchLocation; + } + public String getWorkingDir() { return workingDir; } http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java index 0455f7e..d063dac 100644 --- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java +++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java @@ -372,6 +372,10 @@ public class BetterGfacImpl implements GFac,Watcher { } private void setUpWorkingLocation(JobExecutionContext jobExecutionContext, ApplicationInterfaceDescription applicationInterface, String scratchLocation) { + /** + * Scratch location + */ + jobExecutionContext.setScratchLocation(scratchLocation); /** * Working dir http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java index de516c0..394cfaa 100644 --- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java +++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java @@ -20,12 +20,9 @@ */ package org.apache.airavata.gfac.local.handler; -import org.apache.airavata.commons.gfac.type.ApplicationDescription; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.handler.GFacHandler; import org.apache.airavata.gfac.core.handler.GFacHandlerException; -import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; -import org.apache.airavata.schemas.gfac.HostDescriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,18 +34,14 @@ public class LocalDirectorySetupHandler implements GFacHandler { public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException { log.info("Invoking LocalDirectorySetupHandler ..."); - HostDescriptionType type = jobExecutionContext.getApplicationContext().getHostDescription().getType(); - ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); - ApplicationDeploymentDescriptionType app = applicationDeploymentDescription.getType(); - log.debug("working directory = " + app.getStaticWorkingDirectory()); - log.debug("temp directory = " + app.getScratchWorkingDirectory()); + log.debug("working directory = " + jobExecutionContext.getWorkingDir()); + log.debug("temp directory = " + jobExecutionContext.getWorkingDir()); - makeFileSystemDir(app.getStaticWorkingDirectory(),jobExecutionContext); - makeFileSystemDir(app.getScratchWorkingDirectory(),jobExecutionContext); - makeFileSystemDir(app.getInputDataDirectory(),jobExecutionContext); - makeFileSystemDir(app.getOutputDataDirectory(),jobExecutionContext); + makeFileSystemDir(jobExecutionContext.getWorkingDir()); + makeFileSystemDir(jobExecutionContext.getInputDir()); + makeFileSystemDir(jobExecutionContext.getOutputDir()); } - private void makeFileSystemDir(String dir, JobExecutionContext jobExecutionContext) throws GFacHandlerException { + private void makeFileSystemDir(String dir) throws GFacHandlerException { File f = new File(dir); if (f.isDirectory() && f.exists()) { return; http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java index 51da68a..4cdd0c0 100644 --- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java +++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java @@ -37,6 +37,8 @@ import org.apache.airavata.gfac.core.utils.GFacUtils; import org.apache.airavata.gfac.core.utils.OutputUtils; import org.apache.airavata.gfac.local.utils.InputStreamToFileWriter; import org.apache.airavata.gfac.local.utils.InputUtils; +import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription; +import org.apache.airavata.model.appcatalog.appdeployment.SetEnvPaths; import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; import org.apache.airavata.model.messaging.event.TaskIdentifier; @@ -104,18 +106,16 @@ public class LocalProvider extends AbstractProvider { public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException { super.initialize(jobExecutionContext); - ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext(). - getApplicationDeploymentDescription().getType(); - buildCommand(app.getExecutableLocation(), ProviderUtils.getInputParameters(jobExecutionContext)); - initProcessBuilder(app); + buildCommand(jobExecutionContext.getExecutablePath(), ProviderUtils.getInputParameters(jobExecutionContext)); + initProcessBuilder(jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription()); // extra environment variables - builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, app.getInputDataDirectory()); - builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, app.getOutputDataDirectory()); + builder.environment().put(Constants.INPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getInputDir()); + builder.environment().put(Constants.OUTPUT_DATA_DIR_VAR_NAME, jobExecutionContext.getOutputDir()); // set working directory - builder.directory(new File(app.getStaticWorkingDirectory())); + builder.directory(new File(jobExecutionContext.getWorkingDir())); // log info log.info("Command = " + InputUtils.buildCommand(cmdList)); @@ -127,21 +127,19 @@ public class LocalProvider extends AbstractProvider { public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException { jobExecutionContext.getNotifier().publish(new StartExecutionEvent()); - ApplicationDeploymentDescriptionType app = jobExecutionContext. - getApplicationContext().getApplicationDeploymentDescription().getType(); JobDetails jobDetails = new JobDetails(); try { jobId = jobExecutionContext.getTaskData().getTaskID(); jobDetails.setJobID(jobId); - jobDetails.setJobDescription(app.toString()); + jobDetails.setJobDescription(jobExecutionContext.getApplicationContext() + .getApplicationDeploymentDescription().getAppDeploymentDescription()); jobExecutionContext.setJobDetails(jobDetails); - jobDetails.setJobDescription(app.toString()); GFacUtils.saveJobStatus(jobExecutionContext,jobDetails, JobState.SETUP); // running cmd Process process = builder.start(); - Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), app.getStandardOutput()); - Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), app.getStandardError()); + Thread standardOutWriter = new InputStreamToFileWriter(process.getInputStream(), jobExecutionContext.getStandardOutput()); + Thread standardErrorWriter = new InputStreamToFileWriter(process.getErrorStream(), jobExecutionContext.getStandardError()); // start output threads standardOutWriter.setDaemon(true); @@ -167,9 +165,10 @@ public class LocalProvider extends AbstractProvider { StringBuffer buf = new StringBuffer(); buf.append("Executed ").append(InputUtils.buildCommand(cmdList)) - .append(" on the localHost, working directory = ").append(app.getStaticWorkingDirectory()) - .append(" tempDirectory = ").append(app.getScratchWorkingDirectory()).append(" With the status ") + .append(" on the localHost, working directory = ").append(jobExecutionContext.getWorkingDir()) + .append(" tempDirectory = ").append(jobExecutionContext.getScratchLocation()).append(" With the status ") .append(String.valueOf(returnValue)); + log.info(buf.toString()); // updating the job status to complete because there's nothing to monitor in local jobs @@ -219,12 +218,10 @@ public class LocalProvider extends AbstractProvider { // } public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException { - ApplicationDeploymentDescriptionType app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType(); - try { List<DataObjectType> outputArray = new ArrayList<DataObjectType>(); - String stdOutStr = GFacUtils.readFileToString(app.getStandardOutput()); - String stdErrStr = GFacUtils.readFileToString(app.getStandardError()); + String stdOutStr = GFacUtils.readFileToString(jobExecutionContext.getStandardOutput()); + String stdErrStr = GFacUtils.readFileToString(jobExecutionContext.getStandardError()); Map<String, Object> output = jobExecutionContext.getOutMessageContext().getParameters(); OutputUtils.fillOutputFromStdout(output, stdOutStr, stdErrStr, outputArray); TaskDetails taskDetails = (TaskDetails)registry.get(RegistryModelType.TASK_DETAIL, jobExecutionContext.getTaskData().getTaskID()); @@ -257,15 +254,14 @@ public class LocalProvider extends AbstractProvider { cmdList.addAll(inputParameterList); } - private void initProcessBuilder(ApplicationDeploymentDescriptionType app){ + private void initProcessBuilder(ApplicationDeploymentDescription app){ builder = new ProcessBuilder(cmdList); - NameValuePairType[] env = app.getApplicationEnvironmentArray(); - - if(env != null && env.length > 0){ - Map<String,String> builderEnv = builder.environment(); - for (NameValuePairType entry : env) { - builderEnv.put(entry.getName(), entry.getValue()); + List<SetEnvPaths> setEnvironment = app.getSetEnvironment(); + if (setEnvironment != null) { + for (SetEnvPaths envPath : setEnvironment) { + Map<String,String> builderEnv = builder.environment(); + builderEnv.put(envPath.getName(), envPath.getValue()); } } } http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java index 932c693..2b45df7 100644 --- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java +++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/utils/LocalProviderUtil.java @@ -22,7 +22,6 @@ package org.apache.airavata.gfac.local.utils; import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.provider.GFacProviderException; -import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,14 +40,12 @@ public class LocalProviderUtil { } public void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacProviderException { - ApplicationDeploymentDescriptionType app = jobExecutionContext. - getApplicationContext().getApplicationDeploymentDescription().getType(); - log.info("working diectroy = " + app.getStaticWorkingDirectory()); - log.info("temp directory = " + app.getScratchWorkingDirectory()); - makeFileSystemDir(app.getStaticWorkingDirectory()); - makeFileSystemDir(app.getScratchWorkingDirectory()); - makeFileSystemDir(app.getInputDataDirectory()); - makeFileSystemDir(app.getOutputDataDirectory()); + log.info("working diectroy = " + jobExecutionContext.getWorkingDir()); + log.info("temp directory = " + jobExecutionContext.getScratchLocation()); + makeFileSystemDir(jobExecutionContext.getWorkingDir()); + makeFileSystemDir(jobExecutionContext.getScratchLocation()); + makeFileSystemDir(jobExecutionContext.getInputDir()); + makeFileSystemDir(jobExecutionContext.getOutputDir()); } } http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java index 343b4bf..aeb8158 100644 --- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java +++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java @@ -1,184 +1,184 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.core.gfac.services.impl; - -import java.io.File; -import java.net.URL; -import java.util.ArrayList; -import java.util.List; - -import org.apache.airavata.common.utils.MonitorPublisher; -import org.apache.airavata.commons.gfac.type.ActualParameter; -import org.apache.airavata.commons.gfac.type.ApplicationDescription; -import org.apache.airavata.commons.gfac.type.HostDescription; -import org.apache.airavata.commons.gfac.type.ServiceDescription; -import org.apache.airavata.gfac.GFacConfiguration; -import org.apache.airavata.gfac.GFacException; -import org.apache.airavata.gfac.core.context.ApplicationContext; -import org.apache.airavata.gfac.core.context.JobExecutionContext; -import org.apache.airavata.gfac.core.context.MessageContext; -import org.apache.airavata.gfac.core.provider.GFacProviderException; -import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler; -import org.apache.airavata.gfac.local.provider.impl.LocalProvider; -import org.apache.airavata.model.workspace.experiment.ExecutionUnit; -import org.apache.airavata.model.workspace.experiment.Experiment; -import org.apache.airavata.model.workspace.experiment.TaskDetails; -import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; -import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl; -import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; -import org.apache.airavata.schemas.gfac.InputParameterType; -import org.apache.airavata.schemas.gfac.OutputParameterType; -import org.apache.airavata.schemas.gfac.StringParameterType; -import org.apache.commons.lang.SystemUtils; -import org.testng.annotations.BeforeTest; -import org.testng.annotations.Test; - -import com.google.common.eventbus.EventBus; - -public class LocalProviderTest { - private JobExecutionContext jobExecutionContext; - @BeforeTest - public void setUp() throws Exception { - - URL resource = this.getClass().getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); - File configFile = new File(resource.getPath()); - GFacConfiguration gFacConfiguration = GFacConfiguration.create(configFile, null); - //have to set InFlwo Handlers and outFlowHandlers - ApplicationContext applicationContext = new ApplicationContext(); - HostDescription host = new HostDescription(); - host.getType().setHostName("localhost"); - host.getType().setHostAddress("localhost"); - applicationContext.setHostDescription(host); - /* - * App - */ - ApplicationDescription appDesc = new ApplicationDescription(); - ApplicationDeploymentDescriptionType app = appDesc.getType(); - ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance(); - name.setStringValue("EchoLocal"); - app.setApplicationName(name); - - /* - * Use bat file if it is compiled on Windows - */ - if (SystemUtils.IS_OS_WINDOWS) { - URL url = this.getClass().getClassLoader().getResource("echo.bat"); - app.setExecutableLocation(url.getFile()); - } else { - //for unix and Mac - app.setExecutableLocation("/bin/echo"); - } - - /* - * Default tmp location - */ - String tempDir = System.getProperty("java.io.tmpdir"); - if (tempDir == null) { - tempDir = "/tmp"; - } - - app.setScratchWorkingDirectory(tempDir); - app.setStaticWorkingDirectory(tempDir); - app.setInputDataDirectory(tempDir + File.separator + "input"); - app.setOutputDataDirectory(tempDir + File.separator + "output"); - app.setStandardOutput(tempDir + File.separator + "echo.stdout"); - app.setStandardError(tempDir + File.separator + "echo.stderr"); - - applicationContext.setApplicationDeploymentDescription(appDesc); - - /* - * Service - */ - ServiceDescription serv = new ServiceDescription(); - serv.getType().setName("SimpleEcho"); - - List<InputParameterType> inputList = new ArrayList<InputParameterType>(); - InputParameterType input = InputParameterType.Factory.newInstance(); - input.setParameterName("echo_input"); - input.setParameterType(StringParameterType.Factory.newInstance()); - inputList.add(input); - InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList - .size()]); - - List<OutputParameterType> outputList = new ArrayList<OutputParameterType>(); - OutputParameterType output = OutputParameterType.Factory.newInstance(); - output.setParameterName("echo_output"); - output.setParameterType(StringParameterType.Factory.newInstance()); - outputList.add(output); - OutputParameterType[] outputParamList = outputList - .toArray(new OutputParameterType[outputList.size()]); - - serv.getType().setInputParametersArray(inputParamList); - serv.getType().setOutputParametersArray(outputParamList); - - jobExecutionContext = new JobExecutionContext(gFacConfiguration, serv.getType().getName()); - jobExecutionContext.setApplicationContext(applicationContext); - /* - * Host - */ - applicationContext.setServiceDescription(serv); - - MessageContext inMessage = new MessageContext(); - ActualParameter echo_input = new ActualParameter(); - ((StringParameterType) echo_input.getType()).setValue("echo_output=hello"); - inMessage.addParameter("echo_input", echo_input); - - jobExecutionContext.setInMessageContext(inMessage); - - MessageContext outMessage = new MessageContext(); - ActualParameter echo_out = new ActualParameter(); - outMessage.addParameter("echo_output", echo_out); - - jobExecutionContext.setOutMessageContext(outMessage); - - jobExecutionContext.setExperimentID("test123"); - jobExecutionContext.setExperiment(new Experiment("test123","project1","admin","testExp")); - jobExecutionContext.setTaskData(new TaskDetails(jobExecutionContext.getExperimentID())); - jobExecutionContext.setRegistry(new LoggingRegistryImpl()); - jobExecutionContext.setWorkflowNodeDetails(new WorkflowNodeDetails(jobExecutionContext.getExperimentID(),"none", ExecutionUnit.APPLICATION)); - - - } - - @Test - public void testLocalDirectorySetupHandler() throws GFacException { - LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler(); - localDirectorySetupHandler.invoke(jobExecutionContext); - - ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); - ApplicationDeploymentDescriptionType app = applicationDeploymentDescription.getType(); - junit.framework.Assert.assertTrue(new File(app.getStaticWorkingDirectory()).exists()); - junit.framework.Assert.assertTrue(new File(app.getScratchWorkingDirectory()).exists()); - junit.framework.Assert.assertTrue(new File(app.getInputDataDirectory()).exists()); - junit.framework.Assert.assertTrue(new File(app.getOutputDataDirectory()).exists()); - } - - @Test - public void testLocalProvider() throws GFacException,GFacProviderException { - LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler(); - localDirectorySetupHandler.invoke(jobExecutionContext); - LocalProvider localProvider = new LocalProvider(); - localProvider.setMonitorPublisher(new MonitorPublisher(new EventBus())); - localProvider.initialize(jobExecutionContext); - localProvider.execute(jobExecutionContext); - localProvider.dispose(jobExecutionContext); - } -} +///* +// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// * +//*/ +//package org.apache.airavata.core.gfac.services.impl; +// +//import java.io.File; +//import java.net.URL; +//import java.util.ArrayList; +//import java.util.List; +// +//import org.apache.airavata.common.utils.MonitorPublisher; +//import org.apache.airavata.commons.gfac.type.ActualParameter; +//import org.apache.airavata.commons.gfac.type.ApplicationDescription; +//import org.apache.airavata.commons.gfac.type.HostDescription; +//import org.apache.airavata.commons.gfac.type.ServiceDescription; +//import org.apache.airavata.gfac.GFacConfiguration; +//import org.apache.airavata.gfac.GFacException; +//import org.apache.airavata.gfac.core.context.ApplicationContext; +//import org.apache.airavata.gfac.core.context.JobExecutionContext; +//import org.apache.airavata.gfac.core.context.MessageContext; +//import org.apache.airavata.gfac.core.provider.GFacProviderException; +//import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler; +//import org.apache.airavata.gfac.local.provider.impl.LocalProvider; +//import org.apache.airavata.model.workspace.experiment.ExecutionUnit; +//import org.apache.airavata.model.workspace.experiment.Experiment; +//import org.apache.airavata.model.workspace.experiment.TaskDetails; +//import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails; +//import org.apache.airavata.persistance.registry.jpa.impl.LoggingRegistryImpl; +//import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType; +//import org.apache.airavata.schemas.gfac.InputParameterType; +//import org.apache.airavata.schemas.gfac.OutputParameterType; +//import org.apache.airavata.schemas.gfac.StringParameterType; +//import org.apache.commons.lang.SystemUtils; +//import org.testng.annotations.BeforeTest; +//import org.testng.annotations.Test; +// +//import com.google.common.eventbus.EventBus; +// +//public class LocalProviderTest { +// private JobExecutionContext jobExecutionContext; +// @BeforeTest +// public void setUp() throws Exception { +// +// URL resource = this.getClass().getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML); +// File configFile = new File(resource.getPath()); +// GFacConfiguration gFacConfiguration = GFacConfiguration.create(configFile, null); +// //have to set InFlwo Handlers and outFlowHandlers +// ApplicationContext applicationContext = new ApplicationContext(); +// HostDescription host = new HostDescription(); +// host.getType().setHostName("localhost"); +// host.getType().setHostAddress("localhost"); +// applicationContext.setHostDescription(host); +// /* +// * App +// */ +// ApplicationDescription appDesc = new ApplicationDescription(); +// ApplicationDeploymentDescriptionType app = appDesc.getType(); +// ApplicationDeploymentDescriptionType.ApplicationName name = ApplicationDeploymentDescriptionType.ApplicationName.Factory.newInstance(); +// name.setStringValue("EchoLocal"); +// app.setApplicationName(name); +// +// /* +// * Use bat file if it is compiled on Windows +// */ +// if (SystemUtils.IS_OS_WINDOWS) { +// URL url = this.getClass().getClassLoader().getResource("echo.bat"); +// app.setExecutableLocation(url.getFile()); +// } else { +// //for unix and Mac +// app.setExecutableLocation("/bin/echo"); +// } +// +// /* +// * Default tmp location +// */ +// String tempDir = System.getProperty("java.io.tmpdir"); +// if (tempDir == null) { +// tempDir = "/tmp"; +// } +// +// app.setScratchWorkingDirectory(tempDir); +// app.setStaticWorkingDirectory(tempDir); +// app.setInputDataDirectory(tempDir + File.separator + "input"); +// app.setOutputDataDirectory(tempDir + File.separator + "output"); +// app.setStandardOutput(tempDir + File.separator + "echo.stdout"); +// app.setStandardError(tempDir + File.separator + "echo.stderr"); +// +// applicationContext.setApplicationDeploymentDescription(appDesc); +// +// /* +// * Service +// */ +// ServiceDescription serv = new ServiceDescription(); +// serv.getType().setName("SimpleEcho"); +// +// List<InputParameterType> inputList = new ArrayList<InputParameterType>(); +// InputParameterType input = InputParameterType.Factory.newInstance(); +// input.setParameterName("echo_input"); +// input.setParameterType(StringParameterType.Factory.newInstance()); +// inputList.add(input); +// InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList +// .size()]); +// +// List<OutputParameterType> outputList = new ArrayList<OutputParameterType>(); +// OutputParameterType output = OutputParameterType.Factory.newInstance(); +// output.setParameterName("echo_output"); +// output.setParameterType(StringParameterType.Factory.newInstance()); +// outputList.add(output); +// OutputParameterType[] outputParamList = outputList +// .toArray(new OutputParameterType[outputList.size()]); +// +// serv.getType().setInputParametersArray(inputParamList); +// serv.getType().setOutputParametersArray(outputParamList); +// +// jobExecutionContext = new JobExecutionContext(gFacConfiguration, serv.getType().getName()); +// jobExecutionContext.setApplicationContext(applicationContext); +// /* +// * Host +// */ +// applicationContext.setServiceDescription(serv); +// +// MessageContext inMessage = new MessageContext(); +// ActualParameter echo_input = new ActualParameter(); +// ((StringParameterType) echo_input.getType()).setValue("echo_output=hello"); +// inMessage.addParameter("echo_input", echo_input); +// +// jobExecutionContext.setInMessageContext(inMessage); +// +// MessageContext outMessage = new MessageContext(); +// ActualParameter echo_out = new ActualParameter(); +// outMessage.addParameter("echo_output", echo_out); +// +// jobExecutionContext.setOutMessageContext(outMessage); +// +// jobExecutionContext.setExperimentID("test123"); +// jobExecutionContext.setExperiment(new Experiment("test123","project1","admin","testExp")); +// jobExecutionContext.setTaskData(new TaskDetails(jobExecutionContext.getExperimentID())); +// jobExecutionContext.setRegistry(new LoggingRegistryImpl()); +// jobExecutionContext.setWorkflowNodeDetails(new WorkflowNodeDetails(jobExecutionContext.getExperimentID(),"none", ExecutionUnit.APPLICATION)); +// +// +// } +// +// @Test +// public void testLocalDirectorySetupHandler() throws GFacException { +// LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler(); +// localDirectorySetupHandler.invoke(jobExecutionContext); +// +// ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription(); +// ApplicationDeploymentDescriptionType app = applicationDeploymentDescription.getType(); +// junit.framework.Assert.assertTrue(new File(app.getStaticWorkingDirectory()).exists()); +// junit.framework.Assert.assertTrue(new File(app.getScratchWorkingDirectory()).exists()); +// junit.framework.Assert.assertTrue(new File(app.getInputDataDirectory()).exists()); +// junit.framework.Assert.assertTrue(new File(app.getOutputDataDirectory()).exists()); +// } +// +// @Test +// public void testLocalProvider() throws GFacException,GFacProviderException { +// LocalDirectorySetupHandler localDirectorySetupHandler = new LocalDirectorySetupHandler(); +// localDirectorySetupHandler.invoke(jobExecutionContext); +// LocalProvider localProvider = new LocalProvider(); +// localProvider.setMonitorPublisher(new MonitorPublisher(new EventBus())); +// localProvider.initialize(jobExecutionContext); +// localProvider.execute(jobExecutionContext); +// localProvider.dispose(jobExecutionContext); +// } +//} http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java index a4a131d..c788ace 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HPCMonitorID.java @@ -31,6 +31,7 @@ import org.apache.airavata.gfac.ssh.security.TokenizedSSHAuthInfo; import org.apache.airavata.gsi.ssh.api.ServerInfo; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.model.workspace.experiment.JobState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +46,10 @@ public class HPCMonitorID extends MonitorID { private AuthenticationInfo authenticationInfo = null; - public HPCMonitorID(HostDescription host, String jobID, String taskID, String workflowNodeID, + public HPCMonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID, String experimentID, String userName,String jobName) { - super(host, jobID, taskID, workflowNodeID, experimentID, userName,jobName); - setHost(host); + super(computeResourceDescription, jobID, taskID, workflowNodeID, experimentID, userName,jobName); + setComputeResourceDescription(computeResourceDescription); setJobStartedTime(new Timestamp((new Date()).getTime())); setUserName(userName); setJobID(jobID); @@ -84,8 +85,8 @@ public class HPCMonitorID extends MonitorID { } } - public HPCMonitorID(HostDescription host, String jobID, String taskID, String workflowNodeID, String experimentID, String userName, AuthenticationInfo authenticationInfo) { - setHost(host); + public HPCMonitorID(ComputeResourceDescription computeResourceDescription, String jobID, String taskID, String workflowNodeID, String experimentID, String userName, AuthenticationInfo authenticationInfo) { + setComputeResourceDescription(computeResourceDescription); setJobStartedTime(new Timestamp((new Date()).getTime())); this.authenticationInfo = authenticationInfo; // if we give myproxyauthenticationInfo, so we try to use myproxy user as the user http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java index 0480925..c2017a0 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/HostMonitorData.java @@ -20,34 +20,36 @@ */ package org.apache.airavata.gfac.monitor; -import org.apache.airavata.commons.gfac.type.HostDescription; +import org.apache.airavata.gfac.core.context.JobExecutionContext; import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; +import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; -import java.util.ArrayList; import java.util.List; public class HostMonitorData { - private HostDescription host; +// private HostDescription host; + private ComputeResourceDescription computeResourceDescription; + private JobSubmissionProtocol jobSubmissionProtocol; + private DataMovementProtocol dataMovementProtocol; private List<MonitorID> monitorIDs; - public HostMonitorData(HostDescription host) { - this.host = host; - monitorIDs = new ArrayList<MonitorID>(); - } + public HostMonitorData(JobExecutionContext jobExecutionContext) { + this.computeResourceDescription = jobExecutionContext.getApplicationContext().getComputeResourceDescription(); + this.jobSubmissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol(); + this.dataMovementProtocol = jobExecutionContext.getPreferredDataMovementProtocol(); - public HostMonitorData(HostDescription host, List<MonitorID> monitorIDs) { - this.host = host; - this.monitorIDs = monitorIDs; } - public HostDescription getHost() { - return host; + public ComputeResourceDescription getComputeResourceDescription() { + return computeResourceDescription; } - public void setHost(HostDescription host) { - this.host = host; + public void setComputeResourceDescription(ComputeResourceDescription computeResourceDescription) { + this.computeResourceDescription = computeResourceDescription; } public List<MonitorID> getMonitorIDs() { @@ -67,4 +69,12 @@ public class HostMonitorData { public void addMonitorIDForHost(MonitorID monitorID)throws AiravataMonitorException { monitorIDs.add(monitorID); } + + public JobSubmissionProtocol getJobSubmissionProtocol() { + return jobSubmissionProtocol; + } + + public DataMovementProtocol getDataMovementProtocol() { + return dataMovementProtocol; + } } http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java index ceb440c..3a0e44d 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java @@ -99,7 +99,7 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{ } catch (InterruptedException e) { e.printStackTrace(); } - CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID); + CommonUtils.addMonitortoQueue(hpcPullMonitor.getQueue(), monitorID, jobExecutionContext); CommonUtils.increaseZkJobCount(monitorID); // update change job count to zookeeper } catch (AiravataMonitorException e) { logger.errorId(monitorID.getJobID(), "Error adding job {} monitorID object to the queue with experiment {}", http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java index 952b30e..122d1e2 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java @@ -38,6 +38,7 @@ import org.apache.airavata.gfac.monitor.impl.push.amqp.SimpleJobFinishConsumer; import org.apache.airavata.gfac.monitor.util.CommonUtils; import org.apache.airavata.gsi.ssh.api.SSHApiException; import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo; +import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol; import org.apache.airavata.model.messaging.event.JobIdentifier; import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent; import org.apache.airavata.model.workspace.experiment.JobState; @@ -159,20 +160,19 @@ public class HPCPullMonitor extends PullMonitor { take = this.queue.take(); List<HostMonitorData> hostMonitorData = take.getHostMonitorData(); for (HostMonitorData iHostMonitorData : hostMonitorData) { - if (iHostMonitorData.getHost().getType() instanceof GsisshHostType - || iHostMonitorData.getHost().getType() instanceof SSHHostType) { - String hostName = iHostMonitorData.getHost().getType().getHostAddress(); + if (iHostMonitorData.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) { + String hostName = iHostMonitorData.getComputeResourceDescription().getHostName(); ResourceConnection connection = null; if (connections.containsKey(hostName)) { - if(!connections.get(hostName).isConnected()){ - connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo()); + if (!connections.get(hostName).isConnected()) { + connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo()); connections.put(hostName, connection); - }else{ + } else { logger.debug("We already have this connection so not going to create one"); connection = connections.get(hostName); } } else { - connection = new ResourceConnection(iHostMonitorData,getAuthenticationInfo()); + connection = new ResourceConnection(iHostMonitorData, getAuthenticationInfo()); connections.put(hostName, connection); } @@ -207,7 +207,7 @@ public class HPCPullMonitor extends PullMonitor { MonitorID iMonitorID = monitorIDListIterator.next(); String completeId = null; while (iterator.hasNext()) { - completeId = iterator.next(); + completeId = iterator.next(); if (completeId.equals(iMonitorID.getUserName() + "," + iMonitorID.getJobName())) { logger.info("This job is finished because push notification came with <username,jobName> " + completeId); iMonitorID.setStatus(JobState.COMPLETE); @@ -239,6 +239,7 @@ public class HPCPullMonitor extends PullMonitor { !JobState.COMPLETE.equals(iMonitorID.getStatus())) { iMonitorID.setStatus(jobStatuses.get(iMonitorID.getJobID() + "," + iMonitorID.getJobName())); //IMPORTANT this is NOT a simple setter we have a logic }else if(JobState.COMPLETE.equals(iMonitorID.getStatus())){ + completedJobs.put(iMonitorID.getJobName(), iMonitorID); logger.debugId(iMonitorID.getJobID(), "Moved job {} to completed jobs map, experiment {}, " + "task {}", iMonitorID.getJobID(), iMonitorID.getExperimentID(), iMonitorID.getTaskID()); iterator.remove(); @@ -260,8 +261,7 @@ public class HPCPullMonitor extends PullMonitor { MonitorID iMonitorID = iterator.next(); if (iMonitorID.getFailedCount() > FAILED_COUNT) { iMonitorID.setLastMonitored(new Timestamp((new Date()).getTime())); - String outputDir = iMonitorID.getJobExecutionContext().getApplicationContext() - .getApplicationDeploymentDescription().getType().getOutputDataDirectory(); + String outputDir = iMonitorID.getJobExecutionContext().getOutputDir(); List<String> stdOut = null; try { stdOut = connection.getCluster().listDirectory(outputDir); // check the outputs directory @@ -296,8 +296,8 @@ public class HPCPullMonitor extends PullMonitor { } else { - logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", iHostMonitorData.getHost() - .getType().getHostAddress()); + logger.debug("Qstat Monitor doesn't handle non-gsissh hosts , host {}", iHostMonitorData. + getComputeResourceDescription().getHostName()); } } // We have finished all the HostMonitorData object in userMonitorData, now we need to put it back http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java index 3abcf1d..a503154 100644 --- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java +++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java @@ -34,6 +34,7 @@ import org.apache.airavata.gfac.core.monitor.MonitorID; import org.apache.airavata.gfac.monitor.HostMonitorData; import org.apache.airavata.gfac.monitor.UserMonitorData; import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException; +import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription; import org.apache.airavata.schemas.gfac.GsisshHostType; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -79,11 +80,11 @@ public class CommonUtils { } } public static String getChannelID(MonitorID monitorID) { - return monitorID.getUserName() + "-" + monitorID.getHost().getType().getHostName(); + return monitorID.getUserName() + "-" + monitorID.getComputeResourceDescription().getHostName(); } public static String getRoutingKey(MonitorID monitorID) { - return "*." + monitorID.getUserName() + "." + monitorID.getHost().getType().getHostAddress(); + return "*." + monitorID.getUserName() + "." + monitorID.getComputeResourceDescription().getIpAddresses().get(0); } public static String getChannelID(String userName,String hostAddress) { @@ -94,7 +95,7 @@ public class CommonUtils { return "*." + userName + "." + hostAddress; } - public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID) throws AiravataMonitorException { + public static void addMonitortoQueue(BlockingQueue<UserMonitorData> queue, MonitorID monitorID, JobExecutionContext jobExecutionContext) throws AiravataMonitorException { synchronized (queue) { Iterator<UserMonitorData> iterator = queue.iterator(); while (iterator.hasNext()) { @@ -103,7 +104,7 @@ public class CommonUtils { // then this is the right place to update List<HostMonitorData> monitorIDs = next.getHostMonitorData(); for (HostMonitorData host : monitorIDs) { - if (host.getHost().toXML().equals(monitorID.getHost().toXML())) { + if (isEqual(host.getComputeResourceDescription(), monitorID.getComputeResourceDescription())) { // ok we found right place to add this monitorID host.addMonitorIDForHost(monitorID); logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," + @@ -113,7 +114,7 @@ public class CommonUtils { } // there is a userMonitor object for this user name but no Hosts for this host // so we have to create new Hosts - HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost()); + HostMonitorData hostMonitorData = new HostMonitorData(jobExecutionContext); hostMonitorData.addMonitorIDForHost(monitorID); next.addHostMonitorData(hostMonitorData); logger.debugId(monitorID.getJobID(), "Added new job to the monitoring queue, experiment {}," + @@ -121,7 +122,7 @@ public class CommonUtils { return; } } - HostMonitorData hostMonitorData = new HostMonitorData(monitorID.getHost()); + HostMonitorData hostMonitorData = new HostMonitorData(jobExecutionContext); hostMonitorData.addMonitorIDForHost(monitorID); UserMonitorData userMonitorData = new UserMonitorData(monitorID.getUserName()); @@ -135,11 +136,18 @@ public class CommonUtils { } } } + + private static boolean isEqual(ComputeResourceDescription comRes_1, ComputeResourceDescription comRes_2) { + return comRes_1.getComputeResourceId().equals(comRes_2.getComputeResourceId()) && + comRes_1.getHostName().equals(comRes_2.getHostName()); + } + public static boolean isTheLastJobInQueue(BlockingQueue<MonitorID> queue,MonitorID monitorID){ Iterator<MonitorID> iterator = queue.iterator(); while(iterator.hasNext()){ MonitorID next = iterator.next(); - if(monitorID.getUserName().equals(next.getUserName()) && CommonUtils.isEqual(monitorID.getHost(), next.getHost())){ + if (monitorID.getUserName().equals(next.getUserName()) && + CommonUtils.isEqual(monitorID.getComputeResourceDescription(), next.getComputeResourceDescription())) { return false; } } @@ -162,7 +170,7 @@ public class CommonUtils { Iterator<HostMonitorData> iterator1 = hostMonitorData.iterator(); while (iterator1.hasNext()) { HostMonitorData iHostMonitorID = iterator1.next(); - if (iHostMonitorID.getHost().toXML().equals(monitorID.getHost().toXML())) { + if (isEqual(iHostMonitorID.getComputeResourceDescription(), monitorID.getComputeResourceDescription())) { Iterator<MonitorID> iterator2 = iHostMonitorID.getMonitorIDs().iterator(); while (iterator2.hasNext()) { MonitorID iMonitorID = iterator2.next(); @@ -172,11 +180,10 @@ public class CommonUtils { // could be different, thats why we check the jobID iterator2.remove(); logger.infoId(monitorID.getJobID(), "Removed the jobId: {} JobName: {} from monitoring last " + - "status:{}", monitorID.getJobID(),monitorID.getJobName(), monitorID.getStatus().toString()); + "status:{}", monitorID.getJobID(), monitorID.getJobName(), monitorID.getStatus().toString()); if (iHostMonitorID.getMonitorIDs().size() == 0) { iterator1.remove(); - logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getHost() - .getType().getHostAddress()); + logger.debug("Removed host {} from monitoring queue", iHostMonitorID.getComputeResourceDescription().getHostName()); if (hostMonitorData.size() == 0) { // no useful data so we have to remove the element from the queue queue.remove(next); @@ -330,7 +337,7 @@ public class CommonUtils { */ public static String getJobCountUpdatePath(MonitorID monitorID){ return new StringBuilder("/").append(Constants.STAT).append("/").append(monitorID.getUserName()) - .append("/").append(monitorID.getHost().getType().getHostAddress()).append("/").append(Constants.JOB).toString(); + .append("/").append(monitorID.getComputeResourceDescription().getHostName()).append("/").append(Constants.JOB).toString(); } /** http://git-wip-us.apache.org/repos/asf/airavata/blob/3e584f87/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java ---------------------------------------------------------------------- diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java index 537d8bb..610934e 100644 --- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java +++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java @@ -1,172 +1,172 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ -package org.apache.airavata.job; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.airavata.common.utils.MonitorPublisher; -import org.apache.airavata.commons.gfac.type.HostDescription; -import org.apache.airavata.gfac.core.monitor.MonitorID; -import org.apache.airavata.gfac.monitor.HPCMonitorID; -import org.apache.airavata.gfac.monitor.UserMonitorData; -import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor; -import org.apache.airavata.gsi.ssh.api.Cluster; -import org.apache.airavata.gsi.ssh.api.SSHApiException; -import org.apache.airavata.gsi.ssh.api.ServerInfo; -import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo; -import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; -import org.apache.airavata.gsi.ssh.impl.PBSCluster; -import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; -import org.apache.airavata.gsi.ssh.util.CommonUtils; -import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; -import org.apache.airavata.schemas.gfac.GsisshHostType; -import org.junit.Assert; -import org.testng.annotations.Test; - -import com.google.common.eventbus.EventBus; -import com.google.common.eventbus.Subscribe; - -public class QstatMonitorTestWithMyProxyAuth { - private String myProxyUserName; - private String myProxyPassword; - private String certificateLocation; - private String pbsFilePath; - private String workingDirectory; - private HostDescription hostDescription; - private MonitorPublisher monitorPublisher; - private BlockingQueue<UserMonitorData> pullQueue; - private Thread monitorThread; - - @org.testng.annotations.BeforeClass - public void setUp() throws Exception { -// System.setProperty("myproxy.username", "ogce"); -// System.setProperty("myproxy.password", ""); -// System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh"); -// System.setProperty("gsi.working.directory", "/home/ogce"); -// System.setProperty("trusted.cert.location", "/Users/lahirugunathilake/Downloads/certificates"); - myProxyUserName = System.getProperty("myproxy.username"); - myProxyPassword = System.getProperty("myproxy.password"); - workingDirectory = System.getProperty("gsi.working.directory"); - certificateLocation = System.getProperty("trusted.cert.location"); - if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null) { - System.out.println(">>>>>> Please run tests with my proxy user name and password. " + - "E.g :- mvn clean install -Dmyproxy.username=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<"); - throw new Exception("Need my proxy user name password to run tests."); - } - - monitorPublisher = new MonitorPublisher(new EventBus()); - class InnerClassQstat { - - @Subscribe - private void getStatus(JobStatusChangeEvent status) { - Assert.assertNotNull(status); - System.out.println(status.getState().toString()); - monitorThread.interrupt(); - } - } - monitorPublisher.registerListener(this); - pullQueue = new LinkedBlockingQueue<UserMonitorData>(); - final HPCPullMonitor qstatMonitor = new - HPCPullMonitor(pullQueue, monitorPublisher); - try { - (new Thread(){ - public void run(){ - qstatMonitor.run(); - } - }).start(); - } catch (Exception e) { - e.printStackTrace(); - } - - hostDescription = new HostDescription(GsisshHostType.type); - hostDescription.getType().setHostAddress("trestles.sdsc.edu"); - hostDescription.getType().setHostName("gsissh-gordon"); - ((GsisshHostType) hostDescription.getType()).setPort(22); - ((GsisshHostType)hostDescription.getType()).setInstalledPath("/opt/torque/bin/"); - } - - @Test - public void testQstatMonitor() throws SSHApiException { - /* now have to submit a job to some machine and add that job to the queue */ - //Create authentication - GSIAuthenticationInfo authenticationInfo - = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org", - 7512, 17280000, certificateLocation); - - // Server info - ServerInfo serverInfo = new ServerInfo("ogce", hostDescription.getType().getHostAddress()); - - - Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/bin/")); - - - // Execute command - System.out.println("Target PBS file path: " + workingDirectory); - // constructing the job object - JobDescriptor jobDescriptor = new JobDescriptor(); - jobDescriptor.setWorkingDirectory(workingDirectory); - jobDescriptor.setShellName("/bin/bash"); - jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB"); - jobDescriptor.setExecutablePath("/bin/echo"); - jobDescriptor.setAllEnvExport(true); - jobDescriptor.setMailOptions("n"); - jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out"); - jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err"); - jobDescriptor.setNodes(1); - jobDescriptor.setProcessesPerNode(1); - jobDescriptor.setQueueName("normal"); - jobDescriptor.setMaxWallTime("60"); - jobDescriptor.setAcountString("sds128"); - List<String> inputs = new ArrayList<String>(); - jobDescriptor.setOwner("ogce"); - inputs.add("Hello World"); - jobDescriptor.setInputValues(inputs); - //finished construction of job object - System.out.println(jobDescriptor.toXML()); - for (int i = 0; i < 1; i++) { - String jobID = pbsCluster.submitBatchJob(jobDescriptor); - System.out.println("Job submitted successfully, Job ID: " + jobID); - MonitorID monitorID = new HPCMonitorID(hostDescription, jobID,null,null,null, "ogce",""); - ((HPCMonitorID)monitorID).setAuthenticationInfo(authenticationInfo); - try { - org.apache.airavata.gfac.monitor.util.CommonUtils.addMonitortoQueue(pullQueue, monitorID); - } catch (Exception e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - try { - - monitorThread.join(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - @Subscribe - public void testCaseShutDown(JobStatusChangeEvent status) { - Assert.assertNotNull(status.getState()); - monitorThread.stop(); - } -} +///* +// * +// * Licensed to the Apache Software Foundation (ASF) under one +// * or more contributor license agreements. See the NOTICE file +// * distributed with this work for additional information +// * regarding copyright ownership. The ASF licenses this file +// * to you under the Apache License, Version 2.0 (the +// * "License"); you may not use this file except in compliance +// * with the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, +// * software distributed under the License is distributed on an +// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// * KIND, either express or implied. See the License for the +// * specific language governing permissions and limitations +// * under the License. +// * +//*/ +//package org.apache.airavata.job; +// +//import java.io.File; +//import java.util.ArrayList; +//import java.util.List; +//import java.util.concurrent.BlockingQueue; +//import java.util.concurrent.LinkedBlockingQueue; +// +//import org.apache.airavata.common.utils.MonitorPublisher; +//import org.apache.airavata.commons.gfac.type.HostDescription; +//import org.apache.airavata.gfac.core.monitor.MonitorID; +//import org.apache.airavata.gfac.monitor.HPCMonitorID; +//import org.apache.airavata.gfac.monitor.UserMonitorData; +//import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor; +//import org.apache.airavata.gsi.ssh.api.Cluster; +//import org.apache.airavata.gsi.ssh.api.SSHApiException; +//import org.apache.airavata.gsi.ssh.api.ServerInfo; +//import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo; +//import org.apache.airavata.gsi.ssh.api.job.JobDescriptor; +//import org.apache.airavata.gsi.ssh.impl.PBSCluster; +//import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo; +//import org.apache.airavata.gsi.ssh.util.CommonUtils; +//import org.apache.airavata.model.messaging.event.JobStatusChangeEvent; +//import org.apache.airavata.schemas.gfac.GsisshHostType; +//import org.junit.Assert; +//import org.testng.annotations.Test; +// +//import com.google.common.eventbus.EventBus; +//import com.google.common.eventbus.Subscribe; +// +//public class QstatMonitorTestWithMyProxyAuth { +// private String myProxyUserName; +// private String myProxyPassword; +// private String certificateLocation; +// private String pbsFilePath; +// private String workingDirectory; +// private HostDescription hostDescription; +// private MonitorPublisher monitorPublisher; +// private BlockingQueue<UserMonitorData> pullQueue; +// private Thread monitorThread; +// +// @org.testng.annotations.BeforeClass +// public void setUp() throws Exception { +//// System.setProperty("myproxy.username", "ogce"); +//// System.setProperty("myproxy.password", ""); +//// System.setProperty("basedir", "/Users/lahirugunathilake/work/airavata/sandbox/gsissh"); +//// System.setProperty("gsi.working.directory", "/home/ogce"); +//// System.setProperty("trusted.cert.location", "/Users/lahirugunathilake/Downloads/certificates"); +// myProxyUserName = System.getProperty("myproxy.username"); +// myProxyPassword = System.getProperty("myproxy.password"); +// workingDirectory = System.getProperty("gsi.working.directory"); +// certificateLocation = System.getProperty("trusted.cert.location"); +// if (myProxyUserName == null || myProxyPassword == null || workingDirectory == null) { +// System.out.println(">>>>>> Please run tests with my proxy user name and password. " + +// "E.g :- mvn clean install -Dmyproxy.username=xxx -Dmyproxy.password=xxx -Dgsi.working.directory=/path<<<<<<<"); +// throw new Exception("Need my proxy user name password to run tests."); +// } +// +// monitorPublisher = new MonitorPublisher(new EventBus()); +// class InnerClassQstat { +// +// @Subscribe +// private void getStatus(JobStatusChangeEvent status) { +// Assert.assertNotNull(status); +// System.out.println(status.getState().toString()); +// monitorThread.interrupt(); +// } +// } +// monitorPublisher.registerListener(this); +// pullQueue = new LinkedBlockingQueue<UserMonitorData>(); +// final HPCPullMonitor qstatMonitor = new +// HPCPullMonitor(pullQueue, monitorPublisher); +// try { +// (new Thread(){ +// public void run(){ +// qstatMonitor.run(); +// } +// }).start(); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// +// hostDescription = new HostDescription(GsisshHostType.type); +// hostDescription.getType().setHostAddress("trestles.sdsc.edu"); +// hostDescription.getType().setHostName("gsissh-gordon"); +// ((GsisshHostType) hostDescription.getType()).setPort(22); +// ((GsisshHostType)hostDescription.getType()).setInstalledPath("/opt/torque/bin/"); +// } +// +// @Test +// public void testQstatMonitor() throws SSHApiException { +// /* now have to submit a job to some machine and add that job to the queue */ +// //Create authentication +// GSIAuthenticationInfo authenticationInfo +// = new MyProxyAuthenticationInfo(myProxyUserName, myProxyPassword, "myproxy.teragrid.org", +// 7512, 17280000, certificateLocation); +// +// // Server info +// ServerInfo serverInfo = new ServerInfo("ogce", hostDescription.getType().getHostAddress()); +// +// +// Cluster pbsCluster = new PBSCluster(serverInfo, authenticationInfo, CommonUtils.getPBSJobManager("/opt/torque/bin/")); +// +// +// // Execute command +// System.out.println("Target PBS file path: " + workingDirectory); +// // constructing the job object +// JobDescriptor jobDescriptor = new JobDescriptor(); +// jobDescriptor.setWorkingDirectory(workingDirectory); +// jobDescriptor.setShellName("/bin/bash"); +// jobDescriptor.setJobName("GSI_SSH_SLEEP_JOB"); +// jobDescriptor.setExecutablePath("/bin/echo"); +// jobDescriptor.setAllEnvExport(true); +// jobDescriptor.setMailOptions("n"); +// jobDescriptor.setStandardOutFile(workingDirectory + File.separator + "application.out"); +// jobDescriptor.setStandardErrorFile(workingDirectory + File.separator + "application.err"); +// jobDescriptor.setNodes(1); +// jobDescriptor.setProcessesPerNode(1); +// jobDescriptor.setQueueName("normal"); +// jobDescriptor.setMaxWallTime("60"); +// jobDescriptor.setAcountString("sds128"); +// List<String> inputs = new ArrayList<String>(); +// jobDescriptor.setOwner("ogce"); +// inputs.add("Hello World"); +// jobDescriptor.setInputValues(inputs); +// //finished construction of job object +// System.out.println(jobDescriptor.toXML()); +// for (int i = 0; i < 1; i++) { +// String jobID = pbsCluster.submitBatchJob(jobDescriptor); +// System.out.println("Job submitted successfully, Job ID: " + jobID); +// MonitorID monitorID = new HPCMonitorID(hostDescription, jobID,null,null,null, "ogce",""); +// ((HPCMonitorID)monitorID).setAuthenticationInfo(authenticationInfo); +// try { +// org.apache.airavata.gfac.monitor.util.CommonUtils.addMonitortoQueue(pullQueue, monitorID, jobExecutionContext); +// } catch (Exception e) { +// e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. +// } +// } +// try { +// +// monitorThread.join(); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// } +// +// @Subscribe +// public void testCaseShutDown(JobStatusChangeEvent status) { +// Assert.assertNotNull(status.getState()); +// monitorThread.stop(); +// } +//}
