Author: cziegeler
Date: Wed Feb 10 16:34:47 2010
New Revision: 908572

URL: http://svn.apache.org/viewvc?rev=908572&view=rev
Log:
SLING-1365 : Limit the number of parallel jobs
Add more debug logging.

Modified:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=908572&r1=908571&r2=908572&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
 Wed Feb 10 16:34:47 2010
@@ -156,6 +156,7 @@
     public static final String TOPIC_JOB_FINISHED = 
"org/apache/sling/event/notification/job/FINISHED";
 
     /** Asynchronous notification event when a job failed.
+     * If a job execution fails, it is rescheduled for another try.
      * The property {...@link #PROPERTY_NOTIFICATION_JOB} contains the job 
event and the
      * property {...@link org.osgi.service.event.EventConstants#TIMESTAMP} 
contains the
      * timestamp of the event (as a Long).
@@ -163,6 +164,7 @@
     public static final String TOPIC_JOB_FAILED = 
"org/apache/sling/event/notification/job/FAILED";
 
     /** Asynchronous notification event when a job is cancelled.
+     * If a job execution is cancelled it is not rescheduled.
      * The property {...@link #PROPERTY_NOTIFICATION_JOB} contains the job 
event and the
      * property {...@link org.osgi.service.event.EventConstants#TIMESTAMP} 
contains the
      * timestamp of the event (as a Long).
@@ -661,7 +663,9 @@
             return "<null>";
         }
         final StringBuilder buffer = new StringBuilder(e.getClass().getName());
-        buffer.append(" [topic=");
+        buffer.append('(');
+        buffer.append(e.hashCode());
+        buffer.append(") [topic=");
         buffer.append(e.getTopic());
         buffer.append(", properties=");
         final String[] names = e.getPropertyNames();

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908572&r1=908571&r2=908572&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
 Wed Feb 10 16:34:47 2010
@@ -404,7 +404,7 @@
                     process = this.processingEventsList.remove(info.nodePath) 
!= null;
                 }
                 if ( process ) {
-                    this.logger.info("No acknowledge received for job {} 
stored at {}. Requeueing job.", info.event, info.nodePath);
+                    this.logger.info("No acknowledge received for job {} 
stored at {}. Requeueing job.", EventUtil.toString(info.event), info.nodePath);
                     this.finishedJob(info.event, info.nodePath, true);
                 }
             }
@@ -451,7 +451,9 @@
                 this.ignoreException(e);
             }
             if ( event != null && this.running ) {
-                logger.debug("Persisting job {}", event);
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug("Persisting job {}", 
EventUtil.toString(event));
+                }
                 final EventInfo info = new EventInfo();
                 info.event = event;
                 final String jobId = 
(String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
@@ -512,7 +514,7 @@
                             }
                         } catch (RepositoryException re ) {
                             // something went wrong, so let's log it
-                            this.logger.error("Exception during writing new 
job '" + event + "' to repository at " + nodePath, re);
+                            this.logger.error("Exception during writing new 
job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re);
                         }
                     }
                 }
@@ -570,13 +572,17 @@
             }
 
             if ( info != null && this.running ) {
-                logger.debug("Processing new job {}", info.event);
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug("Received new job {}", 
EventUtil.toString(info.event));
+                }
                 // check for local only jobs and remove them from the queue if 
they're meant
                 // for another application node
                 final String appId = 
(String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION);
                 if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) 
!= null
                     && appId != null && !this.applicationId.equals(appId) ) {
-                    logger.debug("Discarding job {} : local job for a 
different application node.", info.event);
+                    if ( logger.isDebugEnabled() ) {
+                         logger.debug("Discarding job {} : local job for a 
different application node.", EventUtil.toString(info.event));
+                    }
                     info = null;
                 }
 
@@ -584,7 +590,9 @@
                 if ( info != null && 
info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
                     final String queueName = 
(String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME);
                     synchronized ( this.jobQueues ) {
-                        logger.debug("Queuing job {} into queue {}.", 
info.event, queueName);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.debug("Queuing job {} into queue {}.", 
EventUtil.toString(info.event), queueName);
+                        }
                         BlockingQueue<EventInfo> jobQueue = 
this.jobQueues.get(queueName);
                         if ( jobQueue == null ) {
                             final boolean orderedQueue = 
info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
@@ -699,7 +707,9 @@
         boolean putback = false;
         boolean wait = false;
         synchronized (this.backgroundLock) {
-            logger.debug("Executing job {}.", info.event);
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Executing job {}.", 
EventUtil.toString(info.event));
+            }
             try {
                 this.backgroundSession.refresh(false);
                 // check if the node still exists
@@ -738,7 +748,9 @@
                     }
                     // check number of parallel jobs for main queue
                     if ( process && jobQueue == null && this.parallelJobCount 
>= this.maximumParallelJobs ) {
-                        logger.debug("Rescheduling job {} - maximum parallel 
job count of {} reached!", info.event, this.maximumParallelJobs);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.debug("Rescheduling job {} - maximum 
parallel job count of {} reached!", EventUtil.toString(info.event), 
this.maximumParallelJobs);
+                        }
                         process = false;
                         wait = true;
                     }
@@ -826,12 +838,16 @@
      * @see 
org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
      */
     public void handleEvent(final Event event) {
-        logger.debug("Receiving event {}", event);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Receiving event {}", EventUtil.toString(event));
+        }
         // we ignore remote job events
         if ( EventUtil.isLocal(event) ) {
             // check for bundle event
             if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) {
-                logger.debug("Handling local job {}", event);
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug("Handling local job {}", 
EventUtil.toString(event));
+                }
                 // job event
                 final String jobTopic = 
(String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
 
@@ -956,7 +972,9 @@
         final ParallelInfo parInfo = ParallelInfo.getParallelInfo(event);
         final boolean parallelProcessing = parInfo.processParallel;
         final String jobTopic = 
(String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
-        logger.debug("Starting job {}", event);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Starting job {}", EventUtil.toString(event));
+        }
         boolean unlock = true;
         try {
             if ( isMainQueue ) {
@@ -1231,6 +1249,9 @@
      * @see 
org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event,
 String, boolean)
      */
     public boolean finishedJob(Event job, String eventNodePath, boolean 
shouldReschedule) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Received finish for job {}, 
shouldReschedule={}", EventUtil.toString(job), shouldReschedule);
+        }
         // let's remove the event from our processing list
         // this is just a sanity check, as usually the job should have been
         // removed during sendAcknowledge.
@@ -1259,11 +1280,20 @@
                 newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, 
retryCount);
                 newProperties.put(EventUtil.PROPERTY_JOB_RETRIES, retries);
                 job = new Event(job.getTopic(), newProperties);
-                this.sendNotification(EventUtil.TOPIC_JOB_CANCELLED, job);
-            } else {
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Failed job {}", 
EventUtil.toString(job));
+                }
                 this.sendNotification(EventUtil.TOPIC_JOB_FAILED, job);
+            } else {
+                if ( this.logger.isDebugEnabled() ) {
+                    this.logger.debug("Cancelled job {}", 
EventUtil.toString(job));
+                }
+                this.sendNotification(EventUtil.TOPIC_JOB_CANCELLED, job);
             }
         } else {
+            if ( this.logger.isDebugEnabled() ) {
+                this.logger.debug("Finished job {}", EventUtil.toString(job));
+            }
             this.sendNotification(EventUtil.TOPIC_JOB_FINISHED, job);
         }
         final ParallelInfo parInfo = ParallelInfo.getParallelInfo(job);
@@ -1401,7 +1431,9 @@
     }
 
     private void putBackIntoMainQueue(final EventInfo info, final boolean 
useSleepTime) {
-        logger.debug("Putting job {} back into the queue.", info.event);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug("Putting job {} back into the queue.", 
EventUtil.toString(info.event));
+        }
         final Date fireDate = new Date();
         if ( useSleepTime ) {
             fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 
1000);


Reply via email to