Repository: airavata
Updated Branches:
  refs/heads/master 0599edf0b -> 6b90e6427


Fixing Localprovider support


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/6b90e642
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/6b90e642
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/6b90e642

Branch: refs/heads/master
Commit: 6b90e6427b9093ead5baf0cda3f599fa1aad363f
Parents: 0599edf
Author: lahiru <lah...@apache.org>
Authored: Thu Mar 20 13:00:04 2014 -0400
Committer: lahiru <lah...@apache.org>
Committed: Thu Mar 20 13:00:04 2014 -0400

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  | 47 +++++++++-
 .../main/resources/airavata-client.properties   |  8 +-
 .../airavata/client/tools/DocumentCreator.java  |  5 +-
 .../org/apache/airavata/gfac/cpi/GFacImpl.java  | 18 ++--
 .../gfac/provider/impl/AbstractProvider.java    |  6 +-
 .../gfac/provider/impl/GSISSHProvider.java      | 68 +-------------
 .../gfac/provider/impl/LocalProvider.java       | 25 ++---
 .../apache/airavata/gfac/utils/GFacUtils.java   | 96 ++++++++++++++++++--
 .../server/OrchestratorServerHandler.java       |  6 +-
 .../job/monitor/AiravataJobStatusUpdator.java   |  8 +-
 .../airavata/job/monitor/MonitorManager.java    | 29 +++++-
 .../job/monitor/impl/LocalJobMonitor.java       | 58 ++++++++++++
 .../airavata/job/monitor/state/JobStatus.java   |  9 ++
 13 files changed, 265 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git 
a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
 
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index efcf4a0..7c20dff 100644
--- 
a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ 
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -49,7 +49,7 @@ import java.util.Set;
 public class CreateLaunchExperiment {
 
     //FIXME: Read from a config file
-    public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
+    public static final String THRIFT_SERVER_HOST = "localhost";
     public static final int THRIFT_SERVER_PORT = 8930;
     private final static Logger logger = 
LoggerFactory.getLogger(CreateLaunchExperiment.class);
     private static final String DEFAULT_USER = "defauly.registry.user";
@@ -61,7 +61,7 @@ public class CreateLaunchExperiment {
             final Airavata.Client airavata = 
AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, 
THRIFT_SERVER_PORT);
             System.out.println("API version is " + airavata.GetAPIVersion());
             addDescriptors();
-            final String expId = createExperimentForTrestles(airavata);
+            final String expId = createExperimentForLocalHost(airavata);
 //            final String expId = createUS3ExperimentForTrestles(airavata);
 //            final String expId = createExperimentForStampede(airavata);
             System.out.println("Experiment ID : " + expId);
@@ -194,7 +194,48 @@ public class CreateLaunchExperiment {
             throw new TException(e);
         }
     }
-   
+    public static String createExperimentForLocalHost(Airavata.Client client) 
throws TException  {
+        try{
+            List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
+            DataObjectType input = new DataObjectType();
+            input.setKey("echo_input");
+            input.setType(DataType.STRING.toString());
+            input.setValue("echo_output=Hello World");
+            exInputs.add(input);
+
+            List<DataObjectType> exOut = new ArrayList<DataObjectType>();
+            DataObjectType output = new DataObjectType();
+            output.setKey("echo_output");
+            output.setType(DataType.STRING.toString());
+            output.setValue("");
+            exOut.add(output);
+
+            Experiment simpleExperiment =
+                    ExperimentModelUtil.createSimpleExperiment("project1", 
"admin", "echoExperiment", "SimpleEcho0", "SimpleEcho0", exInputs);
+            simpleExperiment.setExperimentOutputs(exOut);
+
+            ComputationalResourceScheduling scheduling = 
ExperimentModelUtil.createComputationResourceScheduling("localhost", 1, 1, 1, 
"normal", 0, 0, 1, "sds128");
+            scheduling.setResourceHostId("localhost");
+            UserConfigurationData userConfigurationData = new 
UserConfigurationData();
+            userConfigurationData.setAiravataAutoSchedule(false);
+            userConfigurationData.setOverrideManualScheduledParams(false);
+            
userConfigurationData.setComputationalResourceScheduling(scheduling);
+            simpleExperiment.setUserConfigurationData(userConfigurationData);
+            return client.createExperiment(simpleExperiment);
+        } catch (AiravataSystemException e) {
+            logger.error("Error occured while creating the experiment...", 
e.getMessage());
+            throw new AiravataSystemException(e);
+        } catch (InvalidRequestException e) {
+            logger.error("Error occured while creating the experiment...", 
e.getMessage());
+            throw new InvalidRequestException(e);
+        } catch (AiravataClientException e) {
+            logger.error("Error occured while creating the experiment...", 
e.getMessage());
+            throw new AiravataClientException(e);
+        }catch (TException e) {
+            logger.error("Error occured while creating the experiment...", 
e.getMessage());
+            throw new TException(e);
+        }
+    }
     public static String createExperimentForStampede(Airavata.Client client) 
throws TException  {
         try{
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties
----------------------------------------------------------------------
diff --git 
a/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties
 
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties
index 83be989..9786a1a 100644
--- 
a/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties
+++ 
b/airavata-api/airavata-client-sdks/java-client-samples/src/main/resources/airavata-client.properties
@@ -34,12 +34,12 @@ 
class.registry.accessor=org.apache.airavata.persistance.registry.jpa.impl.Airava
 
 ########################Registry JPA Implementation 
Settings########################
 #for mysql [AiravataJPARegistry]
-registry.jdbc.driver=com.mysql.jdbc.Driver
-registry.jdbc.url=jdbc:mysql://gw111.iu.xsede.org:3306/airavata
+#registry.jdbc.driver=com.mysql.jdbc.Driver
+#registry.jdbc.url=jdbc:mysql://gw111.iu.xsede.org:3306/airavata
 
 #for derby [AiravataJPARegistry]
-#registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver
-#registry.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
+registry.jdbc.driver=org.apache.derby.jdbc.ClientDriver
+registry.jdbc.url=jdbc:derby://localhost:1527/persistent_data;create=true;user=airavata;password=airavata
 registry.jdbc.user=airavata
 registry.jdbc.password=airavata12
 start.derby.server.mode=true

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
----------------------------------------------------------------------
diff --git 
a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
 
b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
index 7964291..9fbcbd2 100644
--- 
a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
+++ 
b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
@@ -64,10 +64,11 @@ public class DocumentCreator {
             e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
         }
 
+        String serviceName = "SimpleEcho0";
         ServiceDescription serviceDescription = new ServiceDescription();
         List<InputParameterType> inputParameters = new 
ArrayList<InputParameterType>();
         List<OutputParameterType> outputParameters = new 
ArrayList<OutputParameterType>();
-        serviceDescription.getType().setName("Echo");
+        serviceDescription.getType().setName(serviceName);
         serviceDescription.getType().setDescription("Echo service");
         // Creating input parameters
         InputParameterType parameter = 
InputParameterType.Factory.newInstance();
@@ -105,7 +106,7 @@ public class DocumentCreator {
         
applicationDeploymentDescriptionType.setScratchWorkingDirectory("/tmp");
 
         try {
-            
airavataAPI.getApplicationManager().saveApplicationDescription("Echo", 
"localhost", applicationDeploymentDescription);
+            
airavataAPI.getApplicationManager().saveApplicationDescription(serviceName, 
"localhost", applicationDeploymentDescription);
         } catch (AiravataAPIInvocationException e) {
             e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
index f82fb7f..f79561c 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/cpi/GFacImpl.java
@@ -294,14 +294,16 @@ public class GFacImpl implements GFac {
             TaskDetails taskData = (TaskDetails) 
registry.get(DataType.TASK_DETAIL, taskID);
             JobDetails jobDetails = taskData.getJobDetailsList().get(0);
             String jobDescription = jobDetails.getJobDescription();
-            JobDescriptor jobDescriptor = 
JobDescriptor.fromXML(jobDescription);
-            
applicationDeploymentDescription.getType().setScratchWorkingDirectory(
-                    
jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
-            
applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
-            
applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
-            
applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
-            
applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
-        } catch (Exception e) {
+            if(jobDescription != null) {
+                JobDescriptor jobDescriptor = 
JobDescriptor.fromXML(jobDescription);
+                
applicationDeploymentDescription.getType().setScratchWorkingDirectory(
+                        
jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getWorkingDirectory());
+                
applicationDeploymentDescription.getType().setInputDataDirectory(jobDescriptor.getInputDirectory());
+                
applicationDeploymentDescription.getType().setOutputDataDirectory(jobDescriptor.getOutputDirectory());
+                
applicationDeploymentDescription.getType().setStandardError(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardErrorFile());
+                
applicationDeploymentDescription.getType().setStandardOutput(jobDescriptor.getJobDescriptorDocument().getJobDescriptor().getStandardOutFile());
+            }
+            } catch (Exception e) {
             throw new GFacException(e);
         }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
index e49c5dd..dbbcb62 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
@@ -30,9 +30,9 @@ import 
org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.Registry;
 
 public abstract class AbstractProvider{
-       protected Registry registry = null;
-       protected JobDetails details;
-       protected JobStatus status;
+    protected Registry registry = null;
+       protected JobDetails details;     //todo we need to remove this and add 
methods to fill Job details, this is not a property of a provider
+       protected JobStatus status;   //todo we need to remove this and add 
methods to fill Job details, this is not a property of a provider
 
        public void initialize(JobExecutionContext jobExecutionContext) throws 
GFacProviderException, GFacException {
                registry = RegistryFactory.getDefaultRegistry();

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
index e8c7ff4..c20447c 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GSISSHProvider.java
@@ -89,73 +89,7 @@ public class GSISSHProvider extends AbstractProvider 
implements GFacProvider{
                 log.info("Successfully retrieved the Security Context");
             }
             // This installed path is a mandetory field, because this could 
change based on the computing resource
-            JobDescriptor jobDescriptor = new JobDescriptor();
-            jobDescriptor.setWorkingDirectory(app.getStaticWorkingDirectory());
-            jobDescriptor.setShellName("/bin/bash");
-            Random random = new Random();
-            int i = random.nextInt();
-            jobDescriptor.setJobName(app.getApplicationName().getStringValue() 
+ String.valueOf(i));
-            jobDescriptor.setExecutablePath(app.getExecutableLocation());
-            jobDescriptor.setAllEnvExport(true);
-            jobDescriptor.setMailOptions("n");
-            jobDescriptor.setStandardOutFile(app.getStandardOutput());
-            jobDescriptor.setStandardErrorFile(app.getStandardError());
-            jobDescriptor.setNodes(app.getNodeCount());
-            jobDescriptor.setProcessesPerNode(app.getProcessorsPerNode());
-            jobDescriptor.setMaxWallTime(String.valueOf(app.getMaxWallTime()));
-            jobDescriptor.setJobSubmitter(app.getJobSubmitterCommand());
-            if (app.getProjectAccount().getProjectAccountNumber() != null) {
-                
jobDescriptor.setAcountString(app.getProjectAccount().getProjectAccountNumber());
-            }
-            if (app.getQueue().getQueueName() != null) {
-                jobDescriptor.setQueueName(app.getQueue().getQueueName());
-            }
-            jobDescriptor.setOwner(((PBSCluster) 
cluster).getServerInfo().getUserName());
-            jobDescriptor.setInputDirectory(app.getInputDataDirectory());
-            jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
-            TaskDetails taskData = jobExecutionContext.getTaskData();
-            if(taskData != null && taskData.isSetTaskScheduling()){
-               ComputationalResourceScheduling computionnalResource = 
taskData.getTaskScheduling();
-                if(computionnalResource.getNodeCount() > 0){
-                       
jobDescriptor.setNodes(computionnalResource.getNodeCount());
-                }
-                if(computionnalResource.getComputationalProjectAccount() != 
null){
-                       
jobDescriptor.setAcountString(computionnalResource.getComputationalProjectAccount());
-                }
-                if(computionnalResource.getQueueName() != null){
-                       
jobDescriptor.setQueueName(computionnalResource.getQueueName());
-                }
-                if(computionnalResource.getTotalCPUCount() > 0){
-                       
jobDescriptor.setProcessesPerNode(computionnalResource.getTotalCPUCount());
-                }
-                if(computionnalResource.getWallTimeLimit() > 0){
-                       
jobDescriptor.setMaxWallTime(String.valueOf(computionnalResource.getWallTimeLimit()));
-                }
-            }
-            List<String> inputValues = new ArrayList<String>();
-            MessageContext input = jobExecutionContext.getInMessageContext();
-            Map<String, Object> inputs = input.getParameters();
-            Set<String> keys = inputs.keySet();
-            for (String paramName : keys) {
-                ActualParameter actualParameter = (ActualParameter) 
inputs.get(paramName);
-                if 
("URIArray".equals(actualParameter.getType().getType().toString()) || 
"StringArray".equals(actualParameter.getType().getType().toString())
-                        || 
"FileArray".equals(actualParameter.getType().getType().toString())) {
-                    String[] values = null;
-                    if (actualParameter.getType() instanceof URIArrayType) {
-                        values = ((URIArrayType) 
actualParameter.getType()).getValueArray();
-                    } else if (actualParameter.getType() instanceof 
StringArrayType) {
-                        values = ((StringArrayType) 
actualParameter.getType()).getValueArray();
-                    } else if (actualParameter.getType() instanceof 
FileArrayType) {
-                        values = ((FileArrayType) 
actualParameter.getType()).getValueArray();
-                    }
-                    String value = StringUtil.createDelimiteredString(values, 
" ");
-                    inputValues.add(value);
-                } else {
-                    String paramValue = 
MappingFactory.toString(actualParameter);
-                    inputValues.add(paramValue);
-                }
-            }
-            jobDescriptor.setInputValues(inputValues);
+            JobDescriptor jobDescriptor = 
GFacUtils.createJobDescriptor(jobExecutionContext, app, cluster);
 
             log.info(jobDescriptor.toXML());
             

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
index 3109080..e446614 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
@@ -41,6 +41,7 @@ import org.apache.airavata.gfac.utils.GFacUtils;
 import org.apache.airavata.gfac.utils.InputStreamToFileWriter;
 import org.apache.airavata.gfac.utils.InputUtils;
 import org.apache.airavata.gfac.utils.OutputUtils;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.registry.api.workflow.ApplicationJob;
@@ -131,17 +132,14 @@ public class LocalProvider extends AbstractProvider 
implements GFacProvider{
         try {
                jobId= jobExecutionContext.getTaskData().getTaskID();
             jobDetails.setJobID(jobId);
+            jobDetails.setJobDescription(app.toString());
             jobExecutionContext.setJobDetails(jobDetails);
-            details.setJobID(jobId);
-            GFacUtils.saveJobStatus(details, JobState.SETUP, 
jobExecutionContext.getTaskData().getTaskID());
+            JobDescriptor jobDescriptor = 
GFacUtils.createJobDescriptor(jobExecutionContext, app, null);
+            jobDetails.setJobDescription(jobDescriptor.toXML());
+            GFacUtils.saveJobStatus(jobDetails, JobState.SETUP, 
jobExecutionContext.getTaskData().getTaskID());
                // running cmd
             Process process = builder.start();
-            
-            //todo fix how to incoperate orchestrator with gfac
-//            if(jobExecutionContext.getGFacConfiguration().getAiravataAPI() 
!= null){
-//                     saveApplicationJob(jobExecutionContext);
-//             }
-//            GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, 
ApplicationJobStatus.INITIALIZE);
+
             Thread standardOutWriter = new 
InputStreamToFileWriter(process.getInputStream(), app.getStandardOutput());
             Thread standardErrorWriter = new 
InputStreamToFileWriter(process.getErrorStream(), app.getStandardError());
 
@@ -150,11 +148,8 @@ public class LocalProvider extends AbstractProvider 
implements GFacProvider{
             standardErrorWriter.setDaemon(true);
             standardOutWriter.start();
             standardErrorWriter.start();
-            GFacUtils.updateJobStatus(jobDetails, JobState.ACTIVE);
-            // wait for the process (application) to finish executing
+
             int returnValue = process.waitFor();
-            //todo fix how to incoperate orchestrator with gfac
-            GFacUtils.updateJobStatus(jobDetails, JobState.COMPLETE);
 
             // make sure other two threads are done
             standardOutWriter.join();
@@ -165,10 +160,8 @@ public class LocalProvider extends AbstractProvider 
implements GFacProvider{
              * just provide warning in the log messages
              */
             if (returnValue != 0) {
-//             GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, 
ApplicationJobStatus.FAILED);
                 log.error("Process finished with non zero return value. 
Process may have failed");
             } else {
-//             GFacUtils.updateApplicationJobStatus(jobExecutionContext,jobId, 
ApplicationJobStatus.FINISHED);
                 log.info("Process finished with return value of zero.");
             }
 
@@ -177,10 +170,7 @@ public class LocalProvider extends AbstractProvider 
implements GFacProvider{
                     .append(" on the localHost, working directory = 
").append(app.getStaticWorkingDirectory())
                     .append(" tempDirectory = 
").append(app.getScratchWorkingDirectory()).append(" With the status ")
                     .append(String.valueOf(returnValue));
-            details.setJobDescription(buf.toString());
-            GFacUtils.updateJobStatus(details, JobState.COMPLETE);
             log.info(buf.toString());
-
         } catch (IOException io) {
             throw new GFacProviderException(io.getMessage(), io);
         } catch (InterruptedException e) {
@@ -228,7 +218,6 @@ public class LocalProvider extends AbstractProvider 
implements GFacProvider{
         }
     }
 
-    @Override
     public void cancelJob(String jobId, JobExecutionContext 
jobExecutionContext) throws GFacException {
         throw new NotImplementedException();
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
index 6188de3..b4210b5 100644
--- 
a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
+++ 
b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/utils/GFacUtils.java
@@ -35,17 +35,15 @@ import org.apache.airavata.client.api.AiravataAPI;
 import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
 import org.apache.airavata.common.utils.StringUtil;
 import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.MappingFactory;
 import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
-import org.apache.airavata.model.workspace.experiment.ActionableGroup;
-import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
-import org.apache.airavata.model.workspace.experiment.DataObjectType;
-import org.apache.airavata.model.workspace.experiment.ErrorCategory;
-import org.apache.airavata.model.workspace.experiment.ErrorDetails;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.JobStatus;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.api.workflow.ApplicationJob;
 import 
org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
@@ -691,4 +689,86 @@ public class GFacUtils {
         }
         return stringObjectHashMap;
     }
+
+    public static JobDescriptor createJobDescriptor(JobExecutionContext 
jobExecutionContext,
+                                                    
ApplicationDeploymentDescriptionType app, Cluster cluster) {
+        JobDescriptor jobDescriptor = new JobDescriptor();
+        // this is common for any application descriptor
+        jobDescriptor.setInputDirectory(app.getInputDataDirectory());
+        jobDescriptor.setOutputDirectory(app.getOutputDataDirectory());
+        jobDescriptor.setExecutablePath(app.getExecutableLocation());
+        jobDescriptor.setStandardOutFile(app.getStandardOutput());
+        jobDescriptor.setStandardErrorFile(app.getStandardError());
+        Random random = new Random();
+        int i = random.nextInt();
+        jobDescriptor.setJobName(app.getApplicationName().getStringValue() + 
String.valueOf(i));
+        jobDescriptor.setWorkingDirectory(app.getStaticWorkingDirectory());
+
+
+        List<String> inputValues = new ArrayList<String>();
+        MessageContext input = jobExecutionContext.getInMessageContext();
+        Map<String, Object> inputs = input.getParameters();
+        Set<String> keys = inputs.keySet();
+        for (String paramName : keys) {
+            ActualParameter actualParameter = (ActualParameter) 
inputs.get(paramName);
+            if 
("URIArray".equals(actualParameter.getType().getType().toString()) || 
"StringArray".equals(actualParameter.getType().getType().toString())
+                    || 
"FileArray".equals(actualParameter.getType().getType().toString())) {
+                String[] values = null;
+                if (actualParameter.getType() instanceof URIArrayType) {
+                    values = ((URIArrayType) 
actualParameter.getType()).getValueArray();
+                } else if (actualParameter.getType() instanceof 
StringArrayType) {
+                    values = ((StringArrayType) 
actualParameter.getType()).getValueArray();
+                } else if (actualParameter.getType() instanceof FileArrayType) 
{
+                    values = ((FileArrayType) 
actualParameter.getType()).getValueArray();
+                }
+                String value = StringUtil.createDelimiteredString(values, " ");
+                inputValues.add(value);
+            } else {
+                String paramValue = MappingFactory.toString(actualParameter);
+                inputValues.add(paramValue);
+            }
+        }
+        jobDescriptor.setInputValues(inputValues);
+
+        // this part will fill out the hpcApplicationDescriptor
+        if (app instanceof HpcApplicationDeploymentType) {
+            HpcApplicationDeploymentType applicationDeploymentType
+                    = (HpcApplicationDeploymentType) app;
+            jobDescriptor.setShellName("/bin/bash");
+            jobDescriptor.setAllEnvExport(true);
+            jobDescriptor.setMailOptions("n");
+            jobDescriptor.setNodes(applicationDeploymentType.getNodeCount());
+            
jobDescriptor.setProcessesPerNode(applicationDeploymentType.getProcessorsPerNode());
+            
jobDescriptor.setMaxWallTime(String.valueOf(applicationDeploymentType.getMaxWallTime()));
+            
jobDescriptor.setJobSubmitter(applicationDeploymentType.getJobSubmitterCommand());
+            if 
(applicationDeploymentType.getProjectAccount().getProjectAccountNumber() != 
null) {
+                
jobDescriptor.setAcountString(applicationDeploymentType.getProjectAccount().getProjectAccountNumber());
+            }
+            if (applicationDeploymentType.getQueue().getQueueName() != null) {
+                
jobDescriptor.setQueueName(applicationDeploymentType.getQueue().getQueueName());
+            }
+            jobDescriptor.setOwner(((PBSCluster) 
cluster).getServerInfo().getUserName());
+            TaskDetails taskData = jobExecutionContext.getTaskData();
+            if (taskData != null && taskData.isSetTaskScheduling()) {
+                ComputationalResourceScheduling computionnalResource = 
taskData.getTaskScheduling();
+                if (computionnalResource.getNodeCount() > 0) {
+                    
jobDescriptor.setNodes(computionnalResource.getNodeCount());
+                }
+                if (computionnalResource.getComputationalProjectAccount() != 
null) {
+                    
jobDescriptor.setAcountString(computionnalResource.getComputationalProjectAccount());
+                }
+                if (computionnalResource.getQueueName() != null) {
+                    
jobDescriptor.setQueueName(computionnalResource.getQueueName());
+                }
+                if (computionnalResource.getTotalCPUCount() > 0) {
+                    
jobDescriptor.setProcessesPerNode(computionnalResource.getTotalCPUCount());
+                }
+                if (computionnalResource.getWallTimeLimit() > 0) {
+                    
jobDescriptor.setMaxWallTime(String.valueOf(computionnalResource.getWallTimeLimit()));
+                }
+            }
+
+        }
+        return jobDescriptor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 6d6d6f9..b00fe4f 100644
--- 
a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ 
b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -32,6 +32,7 @@ import org.apache.airavata.job.monitor.core.Monitor;
 import org.apache.airavata.job.monitor.core.PullMonitor;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.impl.LocalJobMonitor;
 import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
 import org.apache.airavata.model.workspace.experiment.Experiment;
@@ -112,6 +113,8 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                             ((AMQPMonitor) monitor).initialize(proxyPath, 
connectionName, list);
                             monitorManager.addAMQPMonitor((AMQPMonitor) 
monitor);
                         }
+                    } else if(monitor instanceof LocalJobMonitor){
+                        
monitorManager.addLocalMonitor((LocalJobMonitor)monitor);
                     } else {
                         log.error("Wrong class is given to primary Monitor");
                     }
@@ -191,7 +194,7 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
                         log.error("Job submission Failed, so we remove the job 
from monitoring");
 
                     }else{
-                        monitorManager.addAJobToMonitor(monitorID);
+                            monitorManager.addAJobToMonitor(monitorID);
                     }
                 }
             }
@@ -209,7 +212,6 @@ public class OrchestratorServerHandler implements 
OrchestratorService.Iface {
         this.monitorManager = monitorManager;
     }
 
-    @Override
     public boolean terminateExperiment(String experimentId) throws TException {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index 601cc27..b755e16 100644
--- 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -27,6 +27,7 @@ import 
org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.CompositeIdentifier;
+import org.apache.airavata.registry.cpi.DataType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,13 +116,16 @@ public class AiravataJobStatusUpdator{
         }
     }
     public  void updateJobStatus(String taskId, String jobID, JobState state) 
throws Exception {
-        JobDetails details = new JobDetails();
+        CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
+        JobDetails details = 
(JobDetails)airavataRegistry.get(DataType.JOB_DETAIL, ids);
+        if(details == null) {
+            details = new JobDetails();
+        }
         org.apache.airavata.model.workspace.experiment.JobStatus status = new 
org.apache.airavata.model.workspace.experiment.JobStatus();
         status.setJobState(state);
         status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
         details.setJobStatus(status);
         details.setJobID(jobID);
-        CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
         
airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.JOB_DETAIL, 
details, ids);
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
----------------------------------------------------------------------
diff --git 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
index 3515a68..b819e2b 100644
--- 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
+++ 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorManager.java
@@ -22,15 +22,19 @@ package org.apache.airavata.job.monitor;
 
 import com.google.common.eventbus.EventBus;
 import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.job.monitor.core.Monitor;
 import org.apache.airavata.job.monitor.core.PullMonitor;
 import org.apache.airavata.job.monitor.core.PushMonitor;
 import org.apache.airavata.job.monitor.event.MonitorPublisher;
 import org.apache.airavata.job.monitor.exception.AiravataMonitorException;
+import org.apache.airavata.job.monitor.impl.LocalJobMonitor;
 import org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.AMQPMonitor;
 import org.apache.airavata.job.monitor.impl.push.amqp.UnRegisterThread;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryImpl;
+import org.apache.airavata.schemas.gfac.GlobusHostType;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +66,8 @@ public class MonitorManager {
 
     private MonitorPublisher monitorPublisher;
 
+    private Monitor localJobMonitor;
+
     /**
      * This will initialize the major monitoring system.
      */
@@ -71,6 +77,7 @@ public class MonitorManager {
         pullQueue = new LinkedBlockingQueue<MonitorID>();
         pushQueue = new LinkedBlockingQueue<MonitorID>();
         finishQueue = new LinkedBlockingQueue<MonitorID>();
+        localJobQueue = new LinkedBlockingQueue<MonitorID>();
         monitorPublisher = new MonitorPublisher(new EventBus());
         registerListener(new AiravataJobStatusUpdator(new RegistryImpl(), 
finishQueue));
     }
@@ -88,6 +95,19 @@ public class MonitorManager {
         addPushMonitor(monitor);
     }
 
+
+    /**
+     * This can be use to add an empty AMQPMonitor object to the monitor system
+     * and tihs method will take care of the initialization
+     * todo may be we need to move this to some other class
+     * @param monitor
+     */
+    public void addLocalMonitor(LocalJobMonitor monitor) {
+        monitor.setPublisher(this.getMonitorPublisher());
+        monitor.setJobQueue(this.getLocalJobQueue());
+        localJobMonitor = monitor;
+    }
+
     /**
      * This can be used to adda a QstatMonitor and it will take care of
      * the initialization of QstatMonitor
@@ -145,8 +165,11 @@ public class MonitorManager {
             } else if (Constants.PUSH.equals(host.getMonitorMode())) {
                 pushQueue.add(monitorID);
             }
+        } else if(monitorID.getHost().getType() instanceof GlobusHostType){
+            logger.error("Monitoring does not support GlubusHostType 
resources");
         } else {
-            logger.error("We only support Gsissh host types currently");
+            // we assume this is a type of localJobtype
+            localJobQueue.add(monitorID);
         }
     }
 
@@ -163,6 +186,10 @@ public class MonitorManager {
     public void launchMonitor() throws AiravataMonitorException {
         //no push monitor is configured so we launch pull monitor
         int index = 0;
+        if(localJobMonitor != null){
+            (new Thread(localJobMonitor)).start();
+        }
+
         for (PullMonitor monitor : pullMonitors) {
             (new Thread(monitor)).start();
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
new file mode 100644
index 0000000..c20eef2
--- /dev/null
+++ 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.impl;
+
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.core.AiravataAbstractMonitor;
+import org.apache.airavata.job.monitor.state.JobStatus;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * This monitor can be used to monitor a job which runs locally,
+ * Since its a local job job doesn't have states, once it get executed
+ * then the job starts running
+ */
+public class LocalJobMonitor extends AiravataAbstractMonitor {
+    // Though we have a qeuue here, it not going to be used in local jobs
+    BlockingQueue<MonitorID> jobQueue;
+
+    public void run() {
+        do {
+            try {
+                MonitorID take = jobQueue.take();
+                getPublisher().publish(new JobStatus(take, JobState.COMPLETE));
+            } catch (Exception e) {
+                e.printStackTrace();  //To change body of catch statement use 
File | Settings | File Templates.
+            }
+        } while (!ServerSettings.isStopAllThreads());
+    }
+
+    public BlockingQueue<MonitorID> getJobQueue() {
+        return jobQueue;
+    }
+
+    public void setJobQueue(BlockingQueue<MonitorID> jobQueue) {
+        this.jobQueue = jobQueue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/6b90e642/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
----------------------------------------------------------------------
diff --git 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
index fe623fb..8f05124 100644
--- 
a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
+++ 
b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatus.java
@@ -38,6 +38,15 @@ public class JobStatus {
     private MonitorID monitorID;
 
 
+    // this constructor can be used in Qstat monitor to handle errors
+    public JobStatus() {
+    }
+
+    public JobStatus(MonitorID monitorID, JobState state) {
+        this.monitorID = monitorID;
+        this.state = state;
+    }
+
     public JobState getState() {
         return state;
     }

Reply via email to