Author: cziegeler Date: Wed Feb 10 16:34:47 2010 New Revision: 908572 URL: http://svn.apache.org/viewvc?rev=908572&view=rev Log: SLING-1365 : Limit the number of parallel jobs Add more debug logging.
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=908572&r1=908571&r2=908572&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Wed Feb 10 16:34:47 2010 @@ -156,6 +156,7 @@ public static final String TOPIC_JOB_FINISHED = "org/apache/sling/event/notification/job/FINISHED"; /** Asynchronous notification event when a job failed. + * If a job execution fails, it is rescheduled for another try. * The property {...@link #PROPERTY_NOTIFICATION_JOB} contains the job event and the * property {...@link org.osgi.service.event.EventConstants#TIMESTAMP} contains the * timestamp of the event (as a Long). @@ -163,6 +164,7 @@ public static final String TOPIC_JOB_FAILED = "org/apache/sling/event/notification/job/FAILED"; /** Asynchronous notification event when a job is cancelled. + * If a job execution is cancelled it is not rescheduled. * The property {...@link #PROPERTY_NOTIFICATION_JOB} contains the job event and the * property {...@link org.osgi.service.event.EventConstants#TIMESTAMP} contains the * timestamp of the event (as a Long). @@ -661,7 +663,9 @@ return "<null>"; } final StringBuilder buffer = new StringBuilder(e.getClass().getName()); - buffer.append(" [topic="); + buffer.append('('); + buffer.append(e.hashCode()); + buffer.append(") [topic="); buffer.append(e.getTopic()); buffer.append(", properties="); final String[] names = e.getPropertyNames(); Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=908572&r1=908571&r2=908572&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Wed Feb 10 16:34:47 2010 @@ -404,7 +404,7 @@ process = this.processingEventsList.remove(info.nodePath) != null; } if ( process ) { - this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", info.event, info.nodePath); + this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", EventUtil.toString(info.event), info.nodePath); this.finishedJob(info.event, info.nodePath, true); } } @@ -451,7 +451,9 @@ this.ignoreException(e); } if ( event != null && this.running ) { - logger.debug("Persisting job {}", event); + if ( logger.isDebugEnabled() ) { + logger.debug("Persisting job {}", EventUtil.toString(event)); + } final EventInfo info = new EventInfo(); info.event = event; final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID); @@ -512,7 +514,7 @@ } } catch (RepositoryException re ) { // something went wrong, so let's log it - this.logger.error("Exception during writing new job '" + event + "' to repository at " + nodePath, re); + this.logger.error("Exception during writing new job '" + EventUtil.toString(event) + "' to repository at " + nodePath, re); } } } @@ -570,13 +572,17 @@ } if ( info != null && this.running ) { - logger.debug("Processing new job {}", info.event); + if ( logger.isDebugEnabled() ) { + logger.debug("Received new job {}", EventUtil.toString(info.event)); + } // check for local only jobs and remove them from the queue if they're meant // for another application node final String appId = (String)info.event.getProperty(EventUtil.PROPERTY_APPLICATION); if ( info.event.getProperty(EventUtil.PROPERTY_JOB_RUN_LOCAL) != null && appId != null && !this.applicationId.equals(appId) ) { - logger.debug("Discarding job {} : local job for a different application node.", info.event); + if ( logger.isDebugEnabled() ) { + logger.debug("Discarding job {} : local job for a different application node.", EventUtil.toString(info.event)); + } info = null; } @@ -584,7 +590,9 @@ if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) { final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME); synchronized ( this.jobQueues ) { - logger.debug("Queuing job {} into queue {}.", info.event, queueName); + if ( logger.isDebugEnabled() ) { + logger.debug("Queuing job {} into queue {}.", EventUtil.toString(info.event), queueName); + } BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName); if ( jobQueue == null ) { final boolean orderedQueue = info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null; @@ -699,7 +707,9 @@ boolean putback = false; boolean wait = false; synchronized (this.backgroundLock) { - logger.debug("Executing job {}.", info.event); + if ( logger.isDebugEnabled() ) { + logger.debug("Executing job {}.", EventUtil.toString(info.event)); + } try { this.backgroundSession.refresh(false); // check if the node still exists @@ -738,7 +748,9 @@ } // check number of parallel jobs for main queue if ( process && jobQueue == null && this.parallelJobCount >= this.maximumParallelJobs ) { - logger.debug("Rescheduling job {} - maximum parallel job count of {} reached!", info.event, this.maximumParallelJobs); + if ( logger.isDebugEnabled() ) { + logger.debug("Rescheduling job {} - maximum parallel job count of {} reached!", EventUtil.toString(info.event), this.maximumParallelJobs); + } process = false; wait = true; } @@ -826,12 +838,16 @@ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event) */ public void handleEvent(final Event event) { - logger.debug("Receiving event {}", event); + if ( logger.isDebugEnabled() ) { + logger.debug("Receiving event {}", EventUtil.toString(event)); + } // we ignore remote job events if ( EventUtil.isLocal(event) ) { // check for bundle event if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) { - logger.debug("Handling local job {}", event); + if ( logger.isDebugEnabled() ) { + logger.debug("Handling local job {}", EventUtil.toString(event)); + } // job event final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC); @@ -956,7 +972,9 @@ final ParallelInfo parInfo = ParallelInfo.getParallelInfo(event); final boolean parallelProcessing = parInfo.processParallel; final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC); - logger.debug("Starting job {}", event); + if ( logger.isDebugEnabled() ) { + logger.debug("Starting job {}", EventUtil.toString(event)); + } boolean unlock = true; try { if ( isMainQueue ) { @@ -1231,6 +1249,9 @@ * @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, String, boolean) */ public boolean finishedJob(Event job, String eventNodePath, boolean shouldReschedule) { + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Received finish for job {}, shouldReschedule={}", EventUtil.toString(job), shouldReschedule); + } // let's remove the event from our processing list // this is just a sanity check, as usually the job should have been // removed during sendAcknowledge. @@ -1259,11 +1280,20 @@ newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount); newProperties.put(EventUtil.PROPERTY_JOB_RETRIES, retries); job = new Event(job.getTopic(), newProperties); - this.sendNotification(EventUtil.TOPIC_JOB_CANCELLED, job); - } else { + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Failed job {}", EventUtil.toString(job)); + } this.sendNotification(EventUtil.TOPIC_JOB_FAILED, job); + } else { + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Cancelled job {}", EventUtil.toString(job)); + } + this.sendNotification(EventUtil.TOPIC_JOB_CANCELLED, job); } } else { + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Finished job {}", EventUtil.toString(job)); + } this.sendNotification(EventUtil.TOPIC_JOB_FINISHED, job); } final ParallelInfo parInfo = ParallelInfo.getParallelInfo(job); @@ -1401,7 +1431,9 @@ } private void putBackIntoMainQueue(final EventInfo info, final boolean useSleepTime) { - logger.debug("Putting job {} back into the queue.", info.event); + if ( logger.isDebugEnabled() ) { + logger.debug("Putting job {} back into the queue.", EventUtil.toString(info.event)); + } final Date fireDate = new Date(); if ( useSleepTime ) { fireDate.setTime(System.currentTimeMillis() + this.sleepTime * 1000);