Author: cziegeler Date: Tue Oct 19 06:31:15 2010 New Revision: 1024129 URL: http://svn.apache.org/viewvc?rev=1024129&view=rev Log: Lock on special lock object and add a dropping queue
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java?rev=1024129&r1=1024128&r2=1024129&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java Tue Oct 19 06:31:15 2010 @@ -55,6 +55,7 @@ import org.apache.sling.event.jobs.JobMa import org.apache.sling.event.jobs.JobUtil; import org.apache.sling.event.jobs.JobsIterator; import org.apache.sling.event.jobs.Queue; +import org.apache.sling.event.jobs.QueueConfiguration; import org.apache.sling.event.jobs.Statistics; import org.apache.sling.event.jobs.TopicStatistics; import org.osgi.service.event.Event; @@ -116,6 +117,9 @@ public class DefaultJobManager @Reference private Scheduler scheduler; + /** Lock object for the queues map - we don't want to sync directly on the concurrent map. */ + private final Object queuesLock = new Object(); + /** All active queues. */ private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>(); @@ -195,7 +199,7 @@ public class DefaultJobManager public void cleanup() { // check for idle queue // we synchronize to avoid creating a queue which is about to be removed during cleanup - synchronized ( this ) { + synchronized ( queuesLock ) { final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator(); while ( i.hasNext() ) { final Map.Entry<String, AbstractJobQueue> current = i.next(); @@ -229,7 +233,7 @@ public class DefaultJobManager if ( config == null ) { final String customQueueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME); if ( customQueueName != null ) { - synchronized ( this ) { + synchronized ( queuesLock ) { final AbstractJobQueue queue = this.queues.get(customQueueName); if ( queue != null ) { config = queue.getConfiguration(); @@ -249,15 +253,25 @@ public class DefaultJobManager if ( config.isSkipped(event) ) { if ( logger.isDebugEnabled() ) { - logger.debug("Ignoring event due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event)); + logger.debug("Ignoring job due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event)); + } + return; + } + + // drop? + if ( config.getType() == QueueConfiguration.Type.DROP ) { + if ( logger.isDebugEnabled() ) { + logger.debug("Dropping job due to configuration of queue {} : {}", queueName, EventUtil.toString(event.event)); } + Utility.sendNotification(this.environment, JobUtil.TOPIC_JOB_CANCELLED, event.event, null); + event.finished(); return; } // get or create queue AbstractJobQueue queue = null; // we synchronize to avoid creating a queue which is about to be removed during cleanup - synchronized ( this ) { + synchronized ( queuesLock ) { queue = this.queues.get(queueName); // check for reconfiguration, we really do an identity check here(!) if ( queue != null && queue.getConfiguration() != config ) { @@ -280,11 +294,11 @@ public class DefaultJobManager queue = null; } if ( queue == null ) { - if ( config.getType() == InternalQueueConfiguration.Type.ORDERED ) { + if ( config.getType() == QueueConfiguration.Type.ORDERED ) { queue = new OrderedJobQueue(queueName, config, this.environment); - } else if ( config.getType() == InternalQueueConfiguration.Type.UNORDERED ) { + } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) { queue = new ParallelJobQueue(queueName, config, this.environment, this.scheduler); - } else if ( config.getType() == InternalQueueConfiguration.Type.TOPIC_ROUND_ROBIN ) { + } else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) { queue = new TopicRoundRobinJobQueue(queueName, config, this.environment, this.scheduler); } if ( queue == null ) { @@ -680,7 +694,7 @@ public class DefaultJobManager */ public void restart() { // let's rename/close all queues first - synchronized ( this ) { + synchronized ( queuesLock ) { final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values()); for(final AbstractJobQueue queue : queues ) { // remove the queue with the old name Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java?rev=1024129&r1=1024128&r2=1024129&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/PersistenceHandler.java Tue Oct 19 06:31:15 2010 @@ -975,7 +975,7 @@ public class PersistenceHandler implemen } this.backgroundSession.save(); // and unlock - if ( jobId != null ) { + if ( jobId != null && eventNode.isLocked() ) { this.backgroundSession.getWorkspace().getLockManager().unlock(path); } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1024129&r1=1024128&r2=1024129&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Tue Oct 19 06:31:15 2010 @@ -27,10 +27,11 @@ public interface QueueConfiguration { /** The queue type. */ static enum Type { - UNORDERED, - ORDERED, - TOPIC_ROUND_ROBIN, - IGNORE + UNORDERED, // unordered, parallel prpcessing + ORDERED, // ordered, fifo + TOPIC_ROUND_ROBIN, // unordered, parallel processing, executed based on topic + IGNORE, // ignore job, but do not remove + DROP // drop job without processing! } /**