Author: samindaw
Date: Tue Jun 11 13:54:05 2013
New Revision: 1491791

URL: http://svn.apache.org/r1491791
Log:
application job data persistance for EC2Provider

Modified:
    
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
    
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
    
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
    
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
    
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
    
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
    
airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java
    
airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java

Modified: 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
URL: 
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
 (original)
+++ 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/BESProvider.java
 Tue Jun 11 13:54:05 2013
@@ -202,7 +202,7 @@ public class BESProvider implements GFac
         appJob.setJobId(jobId);
         appJob.setJobData(jobDefinition.toString());
         appJob.setSubmittedTime(Calendar.getInstance().getTime());
-        appJob.setJobStatus(ApplicationJobStatus.SUBMITTED);
+        appJob.setStatus(ApplicationJobStatus.SUBMITTED);
         appJob.setStatusUpdateTime(appJob.getSubmittedTime());
         GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
        }

Modified: 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
URL: 
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
 (original)
+++ 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/EC2Provider.java
 Tue Jun 11 13:54:05 2013
@@ -37,6 +37,9 @@ import com.sshtools.j2ssh.transport.publ
 import com.sshtools.j2ssh.transport.publickey.SshPrivateKey;
 import com.sshtools.j2ssh.transport.publickey.SshPrivateKeyFile;
 import com.sshtools.j2ssh.transport.publickey.SshPublicKey;
+
+import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
 import org.apache.airavata.commons.gfac.type.ActualParameter;
 import org.apache.airavata.commons.gfac.type.ApplicationDescription;
 import org.apache.airavata.gfac.GFacException;
@@ -48,6 +51,9 @@ import org.apache.airavata.gfac.provider
 import org.apache.airavata.gfac.provider.utils.AmazonEC2Util;
 import org.apache.airavata.gfac.provider.utils.EC2ProviderUtil;
 import org.apache.airavata.gfac.provider.utils.ProviderUtils;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.registry.api.workflow.ApplicationJob;
+import 
org.apache.airavata.registry.api.workflow.ApplicationJob.ApplicationJobStatus;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 import org.apache.airavata.schemas.gfac.Ec2ApplicationDeploymentType;
 import org.apache.airavata.schemas.gfac.OutputParameterType;
@@ -58,6 +64,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
 
@@ -74,9 +81,12 @@ public class EC2Provider implements GFac
     private Instance instance = null;
 
     private AmazonSecurityContext amazonSecurityContext;
+    
+    private String jobId;
 
     public void initialize(JobExecutionContext jobExecutionContext) throws 
GFacProviderException,GFacException{
         if (jobExecutionContext != null) {
+               
jobId="EC2_"+jobExecutionContext.getApplicationContext().getHostDescription().getType().getHostAddress()+"_"+Calendar.getInstance().getTimeInMillis();
             if 
(jobExecutionContext.getSecurityContext(AmazonSecurityContext.AMAZON_SECURITY_CONTEXT)
                     instanceof AmazonSecurityContext) {
                 this.amazonSecurityContext = (AmazonSecurityContext) 
jobExecutionContext.
@@ -96,7 +106,8 @@ public class EC2Provider implements GFac
             log.debug("INS_TYPE:" + amazonSecurityContext.getInstanceType());
             log.debug("USERNAME:" + amazonSecurityContext.getUserName());
         }
-
+        saveApplicationJob(jobExecutionContext);
+//        job
         /* Validation */
         if (amazonSecurityContext.getAccessKey() == null || 
amazonSecurityContext.getAccessKey().isEmpty())
             throw new GFacProviderException("EC2 Access Key is empty", 
jobExecutionContext);
@@ -113,13 +124,31 @@ public class EC2Provider implements GFac
         AWSCredentials credential =
                 new BasicAWSCredentials(amazonSecurityContext.getAccessKey(), 
amazonSecurityContext.getSecretKey());
         AmazonEC2Client ec2client = new AmazonEC2Client(credential);
-
+        GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, 
ApplicationJobStatus.AUTHENTICATE);
         initEc2Environment(jobExecutionContext, ec2client);
         checkConnection(instance, ec2client);
     }
 
+       private void saveApplicationJob(JobExecutionContext 
jobExecutionContext) {
+               ApplicationJob job = 
GFacUtils.createApplicationJob(jobExecutionContext);
+        job.setJobId(jobId);
+        job.setStatus(ApplicationJobStatus.VALIDATE_INPUT);
+        job.setSubmittedTime(Calendar.getInstance().getTime());
+        job.setStatusUpdateTime(job.getSubmittedTime());
+        GFacUtils.recordApplicationJob(jobExecutionContext, job);
+       }
+
     public void execute(JobExecutionContext jobExecutionContext) throws 
GFacProviderException {
+       GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, 
ApplicationJobStatus.INITIALIZE);
         String shellCmd = createShellCmd(jobExecutionContext);
+        AiravataAPI airavataAPI = 
jobExecutionContext.getGFacConfiguration().getAiravataAPI();
+        if (airavataAPI!=null){
+               try {
+                               
airavataAPI.getProvenanceManager().updateApplicationJobData(jobId, shellCmd);
+                       } catch (AiravataAPIInvocationException e) {
+                               log.error("Error in saving EC2 shell 
command!!!", e);
+                       }
+        }
         SshClient sshClient = new SshClient();
         sshClient.setSocketTimeout(SOCKET_TIMEOUT);
         SshConnectionProperties properties = new SshConnectionProperties();
@@ -145,7 +174,7 @@ public class EC2Provider implements GFac
                     return true;
                 }
             });
-
+            GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, 
ApplicationJobStatus.AUTHENTICATE);
             // Initialize the authentication data.
             PublicKeyAuthenticationClient publicKeyAuth = new 
PublicKeyAuthenticationClient();
             publicKeyAuth.setUsername(amazonSecurityContext.getUserName());
@@ -164,11 +193,13 @@ public class EC2Provider implements GFac
             } else if(result==AuthenticationProtocolState.COMPLETE) {
                 log.info("ssh client authentication is complete...");
             }
-
+            GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, 
ApplicationJobStatus.SUBMITTED);
             SessionChannelClient session = sshClient.openSessionChannel();
             log.info("ssh session successfully opened...");
             session.requestPseudoTerminal("vt100", 80, 25, 0, 0, "");
             session.startShell();
+            
+            GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, 
ApplicationJobStatus.EXECUTING);
             session.getOutputStream().write(shellCmd.getBytes());
 
             InputStream in = session.getInputStream();
@@ -185,6 +216,7 @@ public class EC2Provider implements GFac
                     break;
                 }
             }
+            GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, 
ApplicationJobStatus.RESULTS_RETRIEVE);
 
             executionResult = 
executionResult.replace("\r","").replace("\n","");
             log.info("Result of the job : " + executionResult);
@@ -198,7 +230,7 @@ public class EC2Provider implements GFac
                 ((StringParameterType) 
outParam.getType()).setValue(executionResult);
                 
jobExecutionContext.getOutMessageContext().addParameter(paramName, outParam);
             }
-
+            GFacUtils.updateApplicationJobStatus(jobExecutionContext, jobId, 
ApplicationJobStatus.FINISHED);
         } catch (InvalidSshKeyException e) {
             throw new GFacProviderException("Invalid SSH key", e);
         } catch (IOException e) {

Modified: 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
URL: 
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
 (original)
+++ 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/GramProvider.java
 Tue Jun 11 13:54:05 2013
@@ -138,7 +138,7 @@ public class GramProvider implements GFa
                appJob.setJobId(job.getIDAsString());
                appJob.setJobData(job.getRSL());
                appJob.setSubmittedTime(Calendar.getInstance().getTime());
-               appJob.setJobStatus(ApplicationJobStatus.SUBMITTED);
+               appJob.setStatus(ApplicationJobStatus.SUBMITTED);
                appJob.setStatusUpdateTime(appJob.getSubmittedTime());
                GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
        }

Modified: 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
URL: 
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
 (original)
+++ 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/HadoopProvider.java
 Tue Jun 11 13:54:05 2013
@@ -21,6 +21,12 @@
 
 package org.apache.airavata.gfac.provider.impl;
 
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Map;
+
 import org.apache.airavata.commons.gfac.type.ActualParameter;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.context.JobExecutionContext;
@@ -34,22 +40,16 @@ import org.apache.airavata.schemas.gfac.
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * Executes hadoop job using the cluster configuration provided by handlers in
  * in-flow.

Modified: 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
URL: 
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
 (original)
+++ 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/LocalProvider.java
 Tue Jun 11 13:54:05 2013
@@ -191,7 +191,7 @@ public class LocalProvider implements GF
                JAXB.marshal(data, stream);
                appJob.setJobData(stream.toString());
                appJob.setSubmittedTime(Calendar.getInstance().getTime());
-               appJob.setJobStatus(ApplicationJobStatus.SUBMITTED);
+               appJob.setStatus(ApplicationJobStatus.SUBMITTED);
                appJob.setStatusUpdateTime(appJob.getSubmittedTime());
                GFacUtils.recordApplicationJob(jobExecutionContext, appJob);
        }

Modified: 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
URL: 
http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
 (original)
+++ 
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/SSHProvider.java
 Tue Jun 11 13:54:05 2013
@@ -86,7 +86,7 @@ public class SSHProvider implements GFac
        private void saveApplicationJob(JobExecutionContext 
jobExecutionContext, String executableName) {
                ApplicationJob job = 
GFacUtils.createApplicationJob(jobExecutionContext);
                job.setJobId(jobID);
-               job.setJobStatus(ApplicationJobStatus.INITIALIZE);
+               job.setStatus(ApplicationJobStatus.INITIALIZE);
                job.setSubmittedTime(Calendar.getInstance().getTime());
                job.setStatusUpdateTime(job.getSubmittedTime());
                job.setJobData(executableName);

Modified: 
airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java
URL: 
http://svn.apache.org/viewvc/airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- 
airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java
 (original)
+++ 
airavata/trunk/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/impl/AiravataJPARegistry.java
 Tue Jun 11 13:54:05 2013
@@ -2273,7 +2273,7 @@ public class AiravataJPARegistry extends
                gfacJob.setJobData(job.getJobData());
                gfacJob.setMetadata(job.getMetadata());
                gfacJob.setServiceDescID(job.getServiceDescriptionId());
-               gfacJob.setStatus(job.getJobStatus().toString());
+               gfacJob.setStatus(job.getStatus().toString());
                gfacJob.setSubmittedTime(new 
Timestamp(job.getSubmittedTime().getTime()));
        }
 
@@ -2353,7 +2353,7 @@ public class AiravataJPARegistry extends
                job.setHostDescriptionId(gfacJob.getHostDescID());
                job.setJobData(gfacJob.getJobData());
                job.setJobId(gfacJob.getLocalJobID());
-               
job.setJobStatus(ApplicationJobStatus.valueOf(gfacJob.getStatus()));
+               
job.setStatus(ApplicationJobStatus.valueOf(gfacJob.getStatus()));
                job.setMetadata(gfacJob.getMetadata());
                job.setNodeId(gfacJob.getNodeID());
                job.setServiceDescriptionId(gfacJob.getServiceDescID());

Modified: 
airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java
URL: 
http://svn.apache.org/viewvc/airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java?rev=1491791&r1=1491790&r2=1491791&view=diff
==============================================================================
--- 
airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java
 (original)
+++ 
airavata/trunk/modules/registry/registry-api/src/main/java/org/apache/airavata/registry/api/workflow/ApplicationJob.java
 Tue Jun 11 13:54:05 2013
@@ -37,6 +37,10 @@ public class ApplicationJob {
         */
        public static enum ApplicationJobStatus{
                /**
+                * Validating the application job input data and configurations
+                */
+               VALIDATE_INPUT,
+               /**
                 * Input data/files is being staged for the application job.
                 */
                STAGING,
@@ -45,14 +49,14 @@ public class ApplicationJob {
                 */
                AUTHENTICATE,
                /**
-                * Application job is submitted, possibly waiting to start 
executing.
-                */
-               SUBMITTED,
-               /**
                 * Application job is being initialized.
                 */
                INITIALIZE, 
                /**
+                * Application job is submitted, possibly waiting to start 
executing.
+                */
+               SUBMITTED,
+               /**
                 * Application job is waiting to start/continue its executing.
                 */
                PENDING,
@@ -67,7 +71,7 @@ public class ApplicationJob {
                /**
                 * Application job is waiting for data or a trigger to continue 
its execution.
                 */
-               WAITING_FOR_DATA,
+               WAIT_FOR_DATA,
                /**
                 * Finalizing the execution of the application job.
                 */
@@ -81,6 +85,10 @@ public class ApplicationJob {
                 */
                RESULTS_RETRIEVE,
                /**
+                * Validating the application job execution results
+                */
+               VALIDATE_OUTPUT,
+               /**
                 * Application job completed successfully.
                 */
                FINISHED,
@@ -113,7 +121,7 @@ public class ApplicationJob {
        
        private Date submittedTime;
        private Date statusUpdateTime;
-       private ApplicationJobStatus jobStatus;
+       private ApplicationJobStatus status;
        
        private String metadata;
 
@@ -251,12 +259,12 @@ public class ApplicationJob {
         * Get the currently recorded status of the application job. 
         * @return
         */
-       public ApplicationJobStatus getJobStatus() {
-               return jobStatus;
+       public ApplicationJobStatus getStatus() {
+               return status;
        }
 
-       public void setJobStatus(ApplicationJobStatus jobStatus) {
-               this.jobStatus = jobStatus;
+       public void setStatus(ApplicationJobStatus status) {
+               this.status = status;
        }
 
        /**


Reply via email to