Author: adrianc
Date: Sun Aug  5 08:01:29 2012
New Revision: 1369536

URL: http://svn.apache.org/viewvc?rev=1369536&view=rev
Log:
Refactored PersistedServiceJob.java so that it contains the GenericValue it 
represents. This improves throughput because the original code hit the 
JobSandbox entity about 10 times for each job, and with this change, there are 
only two JobSandbox hits per job.

Modified:
    
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java

Modified: 
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
URL: 
http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1369536&r1=1369535&r2=1369536&view=diff
==============================================================================
--- 
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
 (original)
+++ 
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
 Sun Aug  5 08:01:29 2012
@@ -54,17 +54,19 @@ import org.apache.commons.lang.StringUti
 /**
  * A {@link Job} that is backed by the entity engine. Job data is stored
  * in the JobSandbox entity.
+ * <p>When the job is queued, this object "owns" the entity value. Any 
external changes
+ * are ignored except the cancelDateTime field - jobs can be canceled after 
they are queued.</p>
  */
 @SuppressWarnings("serial")
 public class PersistedServiceJob extends GenericServiceJob {
 
     public static final String module = PersistedServiceJob.class.getName();
 
-    private transient Delegator delegator = null;
+    private final transient Delegator delegator;
     private long nextRecurrence = -1;
-    private long maxRetry = -1;
-    private long currentRetryCount = 0;
-    private boolean warningLogged = false;
+    private final long maxRetry;
+    private final long currentRetryCount;
+    private final GenericValue jobValue;
 
     /**
      * Creates a new PersistedServiceJob
@@ -75,6 +77,7 @@ public class PersistedServiceJob extends
     public PersistedServiceJob(DispatchContext dctx, GenericValue jobValue, 
GenericRequester req) {
         super(dctx, jobValue.getString("jobId"), 
jobValue.getString("jobName"), null, null, req);
         this.delegator = dctx.getDelegator();
+        this.jobValue = jobValue;
         Timestamp storedDate = jobValue.getTimestamp("runTime");
         this.runtime = storedDate.getTime();
         this.maxRetry = jobValue.get("maxRetry") != null ? 
jobValue.getLong("maxRetry").longValue() : -1;
@@ -83,21 +86,13 @@ public class PersistedServiceJob extends
             this.currentRetryCount = retryCount.longValue();
         } else {
             // backward compatibility
-            this.currentRetryCount = PersistedServiceJob.getRetries(jobValue, 
this.delegator);
+            this.currentRetryCount = getRetries(this.delegator);
         }
     }
 
     @Override
     public void queue() throws InvalidJobException {
         super.queue();
-        // refresh the job object
-        GenericValue jobValue = null;
-        try {
-            jobValue = this.getJob();
-            jobValue.refresh();
-        } catch (GenericEntityException e) {
-            throw new InvalidJobException("Unable to refresh JobSandbox 
value", e);
-        }
         Timestamp cancelTime = jobValue.getTimestamp("cancelDateTime");
         Timestamp startTime = jobValue.getTimestamp("startDateTime");
         if (cancelTime != null || startTime != null) {
@@ -110,7 +105,7 @@ public class PersistedServiceJob extends
             try {
                 jobValue.store();
             } catch (GenericEntityException e) {
-                throw new InvalidJobException("Unable to set the startDateTime 
on the current job [" + getJobId() + "]; not running!", e);
+                throw new InvalidJobException("Unable to set the startDateTime 
and statusId on the current job [" + getJobId() + "]; not running!", e);
             }
         }
     }
@@ -118,45 +113,48 @@ public class PersistedServiceJob extends
     @Override
     protected void init() throws InvalidJobException {
         super.init();
-
-        // configure any addition recurrences
-        GenericValue job = this.getJob();
+        try {
+            // Job might have been canceled after it was placed in the queue.
+            jobValue.refresh();
+        } catch (GenericEntityException e) {
+            throw new InvalidJobException("Unable to refresh JobSandbox 
value", e);
+        }
+        if (jobValue.getTimestamp("cancelDateTime") != null) {
+            // Job cancelled
+            throw new InvalidJobException("Job [" + getJobId() + "] was 
cancelled");
+        }
+        String instanceId = 
UtilProperties.getPropertyValue("general.properties", "unique.instanceId", 
"ofbiz0");
+        if (!instanceId.equals(jobValue.getString("runByInstanceId"))) {
+            // This condition isn't possible, but we will leave it here.
+            throw new InvalidJobException("Job has been accepted by a 
different instance!");
+        }
+        // configure any additional recurrences
         long maxRecurrenceCount = -1;
         long currentRecurrenceCount = 0;
         TemporalExpression expr = null;
-        RecurrenceInfo recurrence = JobManager.getRecurrenceInfo(job);
+        RecurrenceInfo recurrence = JobManager.getRecurrenceInfo(jobValue);
         if (recurrence != null) {
-            if (!this.warningLogged) {
-                Debug.logWarning("Persisted Job [" + getJobId() + "] 
references a RecurrenceInfo, recommend using TemporalExpression instead", 
module);
-                this.warningLogged = true;
-            }
+            Debug.logWarning("Persisted Job [" + getJobId() + "] references a 
RecurrenceInfo, recommend using TemporalExpression instead", module);
             currentRecurrenceCount = recurrence.getCurrentCount();
             expr = RecurrenceInfo.toTemporalExpression(recurrence);
         }
-        if (expr == null && 
UtilValidate.isNotEmpty(job.getString("tempExprId"))) {
+        if (expr == null && 
UtilValidate.isNotEmpty(jobValue.getString("tempExprId"))) {
             try {
-                expr = 
TemporalExpressionWorker.getTemporalExpression(this.delegator, 
job.getString("tempExprId"));
+                expr = 
TemporalExpressionWorker.getTemporalExpression(this.delegator, 
jobValue.getString("tempExprId"));
             } catch (GenericEntityException e) {
                 throw new RuntimeException(e.getMessage());
             }
         }
-
-        String instanceId = 
UtilProperties.getPropertyValue("general.properties", "unique.instanceId", 
"ofbiz0");
-        if (!instanceId.equals(job.getString("runByInstanceId"))) {
-            throw new InvalidJobException("Job has been accepted by a 
different instance!");
-        }
-
-        if (job.get("maxRecurrenceCount") != null) {
-            maxRecurrenceCount = job.getLong("maxRecurrenceCount").longValue();
+        if (jobValue.get("maxRecurrenceCount") != null) {
+            maxRecurrenceCount = 
jobValue.getLong("maxRecurrenceCount").longValue();
         }
-        if (job.get("currentRecurrenceCount") != null) {
-            currentRecurrenceCount = 
job.getLong("currentRecurrenceCount").longValue();
+        if (jobValue.get("currentRecurrenceCount") != null) {
+            currentRecurrenceCount = 
jobValue.getLong("currentRecurrenceCount").longValue();
         }
         if (maxRecurrenceCount != -1) {
             currentRecurrenceCount++;
-            job.set("currentRecurrenceCount", currentRecurrenceCount);
+            jobValue.set("currentRecurrenceCount", currentRecurrenceCount);
         }
-
         try {
             if (expr != null && (maxRecurrenceCount == -1 || 
currentRecurrenceCount <= maxRecurrenceCount)) {
                 if (recurrence != null) {
@@ -164,26 +162,25 @@ public class PersistedServiceJob extends
                 }
                 Calendar next = expr.next(Calendar.getInstance());
                 if (next != null) {
-                    createRecurrence(job, next.getTimeInMillis(), false);
+                    createRecurrence(next.getTimeInMillis(), false);
                 }
             }
         } catch (GenericEntityException e) {
-            throw new RuntimeException(e.getMessage());
+            throw new InvalidJobException(e);
         }
         if (Debug.infoOn()) Debug.logInfo("Job  [" + getJobName() + "] Id ["  
+ getJobId() + "] -- Next runtime: " + new Date(nextRecurrence), module);
     }
 
-    private void createRecurrence(GenericValue job, long next, boolean 
isRetryOnFailure) throws GenericEntityException {
+    private void createRecurrence(long next, boolean isRetryOnFailure) throws 
GenericEntityException {
         if (Debug.verboseOn()) Debug.logVerbose("Next runtime returned: " + 
next, module);
-
         if (next > runtime) {
-            String pJobId = job.getString("parentJobId");
+            String pJobId = jobValue.getString("parentJobId");
             if (pJobId == null) {
-                pJobId = job.getString("jobId");
+                pJobId = jobValue.getString("jobId");
             }
-            GenericValue newJob = GenericValue.create(job);
+            GenericValue newJob = GenericValue.create(jobValue);
             newJob.remove("jobId");
-            newJob.set("previousJobId", job.getString("jobId"));
+            newJob.set("previousJobId", jobValue.getString("jobId"));
             newJob.set("parentJobId", pJobId);
             newJob.set("statusId", "SERVICE_PENDING");
             newJob.set("startDateTime", null);
@@ -203,11 +200,9 @@ public class PersistedServiceJob extends
     @Override
     protected void finish(Map<String, Object> result) throws 
InvalidJobException {
         super.finish(result);
-
         // set the finish date
-        GenericValue job = getJob();
-        job.set("statusId", "SERVICE_FINISHED");
-        job.set("finishDateTime", UtilDateTime.nowTimestamp());
+        jobValue.set("statusId", "SERVICE_FINISHED");
+        jobValue.set("finishDateTime", UtilDateTime.nowTimestamp());
         String jobResult = null;
         if (ServiceUtil.isError(result)) {
             jobResult = 
StringUtils.substring(ServiceUtil.getErrorMessage(result), 0, 255);
@@ -215,10 +210,10 @@ public class PersistedServiceJob extends
             jobResult = 
StringUtils.substring(ServiceUtil.makeSuccessMessage(result, "", "", "", ""), 
0, 255);
         }
         if (UtilValidate.isNotEmpty(jobResult)) {
-            job.set("jobResult", jobResult);
+            jobValue.set("jobResult", jobResult);
         }
         try {
-            job.store();
+            jobValue.store();
         } catch (GenericEntityException e) {
             Debug.logError(e, "Cannot update the job [" + getJobId() + "] 
sandbox", module);
         }
@@ -227,20 +222,17 @@ public class PersistedServiceJob extends
     @Override
     protected void failed(Throwable t) throws InvalidJobException {
         super.failed(t);
-
-        GenericValue job = getJob();
         // if the job has not been re-scheduled; we need to re-schedule and 
run again
         if (nextRecurrence == -1) {
             if (this.canRetry()) {
                 // create a recurrence
                 Calendar cal = Calendar.getInstance();
-                cal.setTime(new Date());
                 cal.add(Calendar.MINUTE, 
ServiceConfigUtil.getFailedRetryMin());
                 long next = cal.getTimeInMillis();
                 try {
-                    createRecurrence(job, next, true);
-                } catch (GenericEntityException gee) {
-                    Debug.logError(gee, "ERROR: Unable to re-schedule job [" + 
getJobId() + "] to re-run : " + job, module);
+                    createRecurrence(next, true);
+                } catch (GenericEntityException e) {
+                    Debug.logError(e, "Unable to re-schedule job [" + 
getJobId() + "]: ", module);
                 }
                 Debug.logInfo("Persisted Job [" + getJobId() + "] Failed 
Re-Scheduling : " + next, module);
             } else {
@@ -248,44 +240,40 @@ public class PersistedServiceJob extends
             }
         }
         // set the failed status
-        job.set("statusId", "SERVICE_FAILED");
-        job.set("finishDateTime", UtilDateTime.nowTimestamp());
-        job.set("jobResult", StringUtils.substring(t.getMessage(), 0, 255));
+        jobValue.set("statusId", "SERVICE_FAILED");
+        jobValue.set("finishDateTime", UtilDateTime.nowTimestamp());
+        jobValue.set("jobResult", StringUtils.substring(t.getMessage(), 0, 
255));
         try {
-            job.store();
+            jobValue.store();
         } catch (GenericEntityException e) {
-            Debug.logError(e, "Cannot update the job sandbox", module);
+            Debug.logError(e, "Cannot update the JobSandbox entity", module);
         }
     }
 
     @Override
     protected String getServiceName() throws InvalidJobException {
-        GenericValue jobObj = getJob();
-        if (jobObj == null || jobObj.get("serviceName") == null) {
+        if (jobValue == null || jobValue.get("serviceName") == null) {
             return null;
         }
-        return jobObj.getString("serviceName");
+        return jobValue.getString("serviceName");
     }
 
     @Override
     protected Map<String, Object> getContext() throws InvalidJobException {
         Map<String, Object> context = null;
         try {
-            GenericValue jobObj = getJob();
-            if (!UtilValidate.isEmpty(jobObj.getString("runtimeDataId"))) {
-                GenericValue contextObj = jobObj.getRelatedOne("RuntimeData", 
false);
+            if (!UtilValidate.isEmpty(jobValue.getString("runtimeDataId"))) {
+                GenericValue contextObj = 
jobValue.getRelatedOne("RuntimeData", false);
                 if (contextObj != null) {
                     context = 
UtilGenerics.checkMap(XmlSerializer.deserialize(contextObj.getString("runtimeInfo"),
 delegator), String.class, Object.class);
                 }
             }
-
             if (context == null) {
                 context = FastMap.newInstance();
             }
-
             // check the runAsUser
-            if (!UtilValidate.isEmpty(jobObj.getString("runAsUser"))) {
-                context.put("userLogin", ServiceUtil.getUserLogin(dctx, 
context, jobObj.getString("runAsUser")));
+            if (!UtilValidate.isEmpty(jobValue.getString("runAsUser"))) {
+                context.put("userLogin", ServiceUtil.getUserLogin(dctx, 
context, jobValue.getString("runAsUser")));
             }
         } catch (GenericEntityException e) {
             Debug.logError(e, "PersistedServiceJob.getContext(): Entity 
Exception", module);
@@ -301,42 +289,26 @@ public class PersistedServiceJob extends
         if (context == null) {
             Debug.logError("Job context is null", module);
         }
-
         return context;
     }
 
-    // gets the job value object
-    private GenericValue getJob() throws InvalidJobException {
-        try {
-            GenericValue jobObj = delegator.findOne("JobSandbox", false, 
"jobId", getJobId());
-            if (jobObj == null) {
-                throw new InvalidJobException("Job [" + getJobId() + "] came 
back null from datasource from delegator " + delegator.getDelegatorName());
-            }
-            return jobObj;
-        } catch (GenericEntityException e) {
-            throw new InvalidJobException("Cannot get job definition [" + 
getJobId() + "] from entity", e);
-        }
-    }
-
     // returns the number of current retries
-    private static long getRetries(GenericValue job, Delegator delegator) {
-        String pJobId = job.getString("parentJobId");
+    private long getRetries(Delegator delegator) {
+        String pJobId = jobValue.getString("parentJobId");
         if (pJobId == null) {
             return 0;
         }
-
         long count = 0;
         try {
             EntityFieldMap ecl = 
EntityCondition.makeConditionMap("parentJobId", pJobId, "statusId", 
"SERVICE_FAILED");
             count = delegator.findCountByCondition("JobSandbox", ecl, null, 
null);
         } catch (GenericEntityException e) {
-            Debug.logError(e, module);
+            Debug.logError(e, "Exception thrown while counting retries: ", 
module);
         }
-
         return count + 1; // add one for the parent
     }
 
-    private boolean canRetry() throws InvalidJobException {
+    private boolean canRetry() {
         if (maxRetry == -1) {
             return true;
         }


Reply via email to