Author: cziegeler Date: Wed Oct 15 17:55:52 2014 New Revision: 1632141 URL: http://svn.apache.org/r1632141 Log: SLING-4048 : Avoid keeping jobs in memory. Rewrite statistics, queue and topic handling (WiP)
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java (with props) sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (with props) sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (with props) sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (with props) sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java (with props) sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (with props) sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobsImplTest.java (with props) Removed: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java Modified: sling/trunk/bundles/extensions/event/pom.xml sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (contents, props changed) sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java Modified: sling/trunk/bundles/extensions/event/pom.xml URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1632141&r1=1632140&r2=1632141&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/pom.xml (original) +++ sling/trunk/bundles/extensions/event/pom.xml Wed Oct 15 17:55:52 2014 @@ -116,7 +116,7 @@ -Xmx2048m -XX:MaxPermSize=512m </argLine> <includes> - <include>**/it/*</include> + <include>**/it/OrderedQueueTest*</include> </includes> </configuration> </plugin> Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1632141&r1=1632140&r2=1632141&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Wed Oct 15 17:55:52 2014 @@ -65,6 +65,8 @@ public class JobHandler { * @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise. */ public boolean reschedule() { + // update event with retry count and retries + this.job.retry(); return this.jobManager.reschedule(this.job); } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1632141&r1=1632140&r2=1632141&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Wed Oct 15 17:55:52 2014 @@ -68,6 +68,8 @@ public class JobImpl implements Job, Com private final List<Exception> readErrorList; + private final long counter; + /** * Create a new job instance * @@ -90,6 +92,8 @@ public class JobImpl implements Job, Com this.properties = new ValueMapDecorator(properties); this.properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, jobId); + final int lastPos = jobId.lastIndexOf('_'); + this.counter = Long.valueOf(jobId.substring(lastPos + 1)); } /** @@ -382,7 +386,13 @@ public class JobImpl implements Job, Com public int compareTo(final JobImpl o) { int result = this.getCreated().compareTo(o.getCreated()); if ( result == 0 ) { - result = this.getTopic().compareTo(o.getTopic()); + if ( this.counter < o.counter ) { + result = -1; + } else if ( this.counter > o.counter ) { + result = 1; + } else { + result = this.jobId.compareTo(o.jobId); + } } return result; } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632141&r1=1632140&r2=1632141&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Oct 15 17:55:52 2014 @@ -23,13 +23,9 @@ import java.util.Calendar; import java.util.Collection; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; @@ -53,14 +49,9 @@ import org.apache.sling.event.EventUtil; import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration; import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo; -import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent; -import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl; import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue; -import org.apache.sling.event.impl.jobs.queues.OrderedJobQueue; -import org.apache.sling.event.impl.jobs.queues.ParallelJobQueue; -import org.apache.sling.event.impl.jobs.queues.TopicRoundRobinJobQueue; -import org.apache.sling.event.impl.jobs.stats.StatisticsImpl; -import org.apache.sling.event.impl.jobs.stats.TopicStatisticsImpl; +import org.apache.sling.event.impl.jobs.queues.QueueManager; +import org.apache.sling.event.impl.jobs.stats.StatisticsManager; import org.apache.sling.event.impl.support.Environment; import org.apache.sling.event.impl.support.ResourceHelper; import org.apache.sling.event.impl.support.ScheduleInfoImpl; @@ -77,7 +68,6 @@ import org.apache.sling.event.jobs.Queue import org.apache.sling.event.jobs.ScheduledJobInfo; import org.apache.sling.event.jobs.Statistics; import org.apache.sling.event.jobs.TopicStatistics; -import org.apache.sling.event.jobs.consumer.JobExecutor; import org.apache.sling.event.jobs.jmx.QueuesMBean; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; @@ -93,23 +83,21 @@ import org.slf4j.LoggerFactory; @Component(immediate=true) @Service(value={JobManager.class, EventHandler.class, Runnable.class}) @Properties({ - @Property(name="scheduler.period", longValue=60, propertyPrivate=true), - @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true), + @Property(name="scheduler.period", longValue=60), + @Property(name="scheduler.concurrent", boolValue=false), @Property(name=EventConstants.EVENT_TOPIC, value={SlingConstants.TOPIC_RESOURCE_ADDED, SlingConstants.TOPIC_RESOURCE_CHANGED, SlingConstants.TOPIC_RESOURCE_REMOVED, - "org/apache/sling/event/notification/job/*", Utility.TOPIC_STOP, ResourceHelper.BUNDLE_EVENT_STARTED, - ResourceHelper.BUNDLE_EVENT_UPDATED}, propertyPrivate=true) + ResourceHelper.BUNDLE_EVENT_UPDATED}) }) public class JobManagerImpl - extends StatisticsImpl implements JobManager, EventHandler, Runnable, TopologyAware { /** Default logger. */ - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass())); @Reference private TopologyHandler topologyHandler; @@ -136,30 +124,18 @@ public class JobManagerImpl @Reference private QueueConfigurationManager queueManager; - private volatile TopologyCapabilities topologyCapabilities; - - private MaintenanceTask maintenanceTask; + @Reference + private StatisticsManager statisticsManager; - private BackgroundLoader backgroundLoader; + @Reference QueueManager qManager; - /** Lock object for the queues map - we don't want to sync directly on the concurrent map. */ - private final Object queuesLock = new Object(); + private volatile TopologyCapabilities topologyCapabilities; - /** All active queues. */ - private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>(); + private MaintenanceTask maintenanceTask; /** We count the scheduler runs. */ private volatile long schedulerRuns; - /** Current statistics. */ - private final StatisticsImpl baseStatistics = new StatisticsImpl(); - - /** Statistics per topic. */ - private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<String, TopicStatistics>(); - - /** Set of paths directly added as jobs - these will be ignored during observation handling. */ - private final Set<String> directlyAddedPaths = new HashSet<String>(); - /** Job Scheduler. */ private JobSchedulerImpl jobScheduler; @@ -171,7 +147,6 @@ public class JobManagerImpl protected void activate(final Map<String, Object> props) throws LoginException { this.jobScheduler = new JobSchedulerImpl(this.configuration, this.scheduler, this); this.maintenanceTask = new MaintenanceTask(this.configuration); - this.backgroundLoader = new BackgroundLoader(this, this.configuration); this.topologyHandler.addListener(this); logger.info("Apache Sling Job Manager started on instance {}", Environment.APPLICATION_ID); @@ -187,18 +162,7 @@ public class JobManagerImpl this.jobScheduler.deactivate(); - this.backgroundLoader.deactivate(); - this.backgroundLoader = null; - this.maintenanceTask = null; - final Iterator<AbstractJobQueue> i = this.queues.values().iterator(); - while ( i.hasNext() ) { - final AbstractJobQueue jbq = i.next(); - jbq.close(); - // update mbeans - ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq)); - } - this.queues.clear(); logger.info("Apache Sling Job Manager stopped on instance {}", Environment.APPLICATION_ID); } @@ -212,37 +176,6 @@ public class JobManagerImpl this.schedulerRuns++; logger.debug("Job manager maintenance: Starting #{}", this.schedulerRuns); - // check for unprocessed jobs first - logger.debug("Checking for unprocessed jobs..."); - for(final AbstractJobQueue jbq : this.queues.values() ) { - jbq.checkForUnprocessedJobs(); - } - - // we only do a full clean up on every fifth run - final boolean doFullCleanUp = (schedulerRuns % 5 == 0); - - if ( doFullCleanUp ) { - // check for idle queue - logger.debug("Checking for idle queues..."); - - // we synchronize to avoid creating a queue which is about to be removed during cleanup - synchronized ( queuesLock ) { - final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator(); - while ( i.hasNext() ) { - final Map.Entry<String, AbstractJobQueue> current = i.next(); - final AbstractJobQueue jbq = current.getValue(); - if ( jbq.tryToClose() ) { - logger.debug("Removing idle job queue {}", jbq); - // copy statistics - this.baseStatistics.add(jbq); - // remove - i.remove(); - // update mbeans - ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq)); - } - } - } - } // invoke maintenance task final MaintenanceTask task = this.maintenanceTask; if ( task != null ) { @@ -252,93 +185,6 @@ public class JobManagerImpl } /** - * Process a new job - * This method first searches the corresponding queue - if such a queue - * does not exist yet, it is created and started. - * - * @param job The job - */ - void process(final JobImpl job) { - // check if we still are able to process this job - final JobExecutor consumer = this.jobConsumerManager.getExecutor(job.getTopic()); - boolean reassign = false; - String reassignTargetId = null; - if ( consumer == null && (!job.isBridgedEvent() || !this.jobConsumerManager.supportsBridgedEvents())) { - reassign = true; - } - - // get the queue configuration - final TopologyCapabilities caps = this.topologyCapabilities; - final QueueInfo queueInfo = caps != null ? caps.getQueueInfo(job.getTopic()) : null; - if ( queueInfo == null ) { - return; // TODO - } - final InternalQueueConfiguration config = queueInfo.queueConfiguration; - - // Sanity check if queue configuration has changed - if ( config.getType() == QueueConfiguration.Type.DROP ) { - if ( logger.isDebugEnabled() ) { - logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job)); - } - this.finishJob(job, Job.JobState.DROPPED, false, -1); - } else if ( config.getType() == QueueConfiguration.Type.IGNORE ) { - if ( !reassign ) { - if ( logger.isDebugEnabled() ) { - logger.debug("Ignoring job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job)); - } - } - } else { - - if ( reassign ) { - reassignTargetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo)); - - } else { - // get or create queue - AbstractJobQueue queue = null; - // we synchronize to avoid creating a queue which is about to be removed during cleanup - synchronized ( queuesLock ) { - queue = this.queues.get(queueInfo.queueName); - // check for reconfiguration, we really do an identity check here(!) - if ( queue != null && queue.getConfiguration() != config ) { - this.outdateQueue(queue); - // we use a new queue with the configuration - queue = null; - } - if ( queue == null ) { - if ( config.getType() == QueueConfiguration.Type.ORDERED ) { - queue = new OrderedJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin); - } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) { - queue = new ParallelJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin, this.scheduler); - } else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) { - queue = new TopicRoundRobinJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin, this.scheduler); - } - if ( queue == null ) { - // this is just a sanity check, actually we can never get here - logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueInfo.queueName, Utility.toString(job)); - this.finishJob(job, Job.JobState.DROPPED, false, -1); - } else { - queues.put(queueInfo.queueName, queue); - ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null)); - queue.start(); - } - } - } - - // and put job - if ( queue != null ) { - job.updateQueueInfo(queue); - final JobHandler handler = new JobHandler(job, this); - - queue.process(handler); - } - } - } - if ( reassign ) { - this.maintenanceTask.reassignJob(job, reassignTargetId); - } - } - - /** * This method is invoked periodically by the scheduler. * In the default configuration every minute * @see java.lang.Runnable#run() @@ -358,61 +204,13 @@ public class JobManagerImpl } } - private void outdateQueue(final AbstractJobQueue queue) { - // remove the queue with the old name - // check for main queue - final String oldName = ResourceHelper.filterQueueName(queue.getName()); - this.queues.remove(oldName); - // check if we can close or have to rename - if ( queue.tryToClose() ) { - // copy statistics - this.baseStatistics.add(queue); - // update mbeans - ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, queue)); - } else { - queue.outdate(); - // readd with new name - String newName = ResourceHelper.filterName(queue.getName()); - int index = 0; - while ( this.queues.containsKey(newName) ) { - newName = ResourceHelper.filterName(queue.getName()) + '$' + String.valueOf(index++); - } - this.queues.put(newName, queue); - // update mbeans - ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, queue)); - } - } - - /** - * @see org.apache.sling.event.impl.jobs.stats.StatisticsImpl#reset() - * Reset this statistics and all queues. - */ - @Override - public synchronized void reset() { - this.baseStatistics.reset(); - for(final AbstractJobQueue jq : this.queues.values() ) { - jq.reset(); - } - this.topicStatistics.clear(); - } - /** * @see org.apache.sling.event.jobs.JobManager#restart() */ @Override public void restart() { - // let's rename/close all queues and clear them - synchronized ( queuesLock ) { - final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values()); - for(final AbstractJobQueue queue : queues ) { - queue.clear(); - this.outdateQueue(queue); - } - } - // reset statistics - this.reset(); - // and now load again - this.backgroundLoader.restart(); + // TODO reset statistics + // TODO reload queues? } /** @@ -429,17 +227,6 @@ public class JobManagerImpl @Override public void handleEvent(final Event event) { if ( SlingConstants.TOPIC_RESOURCE_ADDED.equals(event.getTopic()) ) { - final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH); - final String rt = (String) event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE); - if ( (rt == null || ResourceHelper.RESOURCE_TYPE_JOB.equals(rt)) && - this.configuration.isLocalJob(path) ) { - synchronized ( this.directlyAddedPaths ) { - if ( directlyAddedPaths.remove(path) ) { - return; - } - } - this.backgroundLoader.loadJob(path); - } this.jobScheduler.handleEvent(event); } else if ( Utility.TOPIC_STOP.equals(event.getTopic()) ) { if ( !EventUtil.isLocal(event) ) { @@ -448,57 +235,20 @@ public class JobManagerImpl } } else if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic()) || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) { - this.backgroundLoader.tryToReloadUnloadedJobs(); this.jobScheduler.handleEvent(event); } else if ( SlingConstants.TOPIC_RESOURCE_CHANGED.equals(event.getTopic()) || SlingConstants.TOPIC_RESOURCE_REMOVED.equals(event.getTopic()) ) { this.jobScheduler.handleEvent(event); - } else { - if ( EventUtil.isLocal(event) ) { - // job notifications - final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC); - if ( topic != null ) { // this is just a sanity check - TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic); - if ( ts == null ) { - this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic)); - ts = (TopicStatisticsImpl)this.topicStatistics.get(topic); - } - if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) { - ts.addCancelled(); - } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) { - ts.addFailed(); - } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) { - final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME); - ts.addFinished(time == null ? -1 : time); - } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) { - final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME); - ts.addActivated(time == null ? -1 : time); - } - } - } } } private void stopProcessing() { - this.backgroundLoader.stop(); - - // let's rename/close all queues and clear them - synchronized ( queuesLock ) { - final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values()); - for(final AbstractJobQueue queue : queues ) { - queue.clear(); - this.outdateQueue(queue); - } - } - this.topologyCapabilities = null; } private void startProcessing(final TopologyCapabilities caps) { // create new capabilities and update view this.topologyCapabilities = caps; - - this.backgroundLoader.start(); } @Override @@ -518,12 +268,7 @@ public class JobManagerImpl */ @Override public synchronized Statistics getStatistics() { - this.copyFrom(this.baseStatistics); - for(final AbstractJobQueue jq : this.queues.values() ) { - this.add(jq); - } - - return this; + return this.statisticsManager.getOverallStatistics(); } /** @@ -531,7 +276,7 @@ public class JobManagerImpl */ @Override public Iterable<TopicStatistics> getTopicStatistics() { - return topicStatistics.values(); + return this.statisticsManager.getTopicStatistics().values(); } /** @@ -539,7 +284,7 @@ public class JobManagerImpl */ @Override public Queue getQueue(final String name) { - return this.queues.get(ResourceHelper.filterQueueName(name)); + return qManager.getQueue(ResourceHelper.filterQueueName(name)); } /** @@ -547,30 +292,7 @@ public class JobManagerImpl */ @Override public Iterable<Queue> getQueues() { - final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator(); - return new Iterable<Queue>() { - - @Override - public Iterator<Queue> iterator() { - return new Iterator<Queue>() { - - @Override - public boolean hasNext() { - return jqI.hasNext(); - } - - @Override - public Queue next() { - return jqI.next(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }; + return qManager.getQueues(); } @Override @@ -1198,12 +920,7 @@ public class JobManagerImpl jobName, jobProperties, info); - if ( job != null ) { - if ( configuration.isLocalJob(job.getResourcePath()) ) { - this.backgroundLoader.addJob(job); - } - return job; - } + return job; } catch (final PersistenceException re ) { // something went wrong, so let's log it this.logger.error("Exception during persisting new job '" + Utility.toString(jobTopic, jobName, jobProperties) + "'", re); @@ -1268,9 +985,6 @@ public class JobManagerImpl if ( logger.isDebugEnabled() ) { logger.debug("Storing new job {} at {}", properties, path); } - synchronized ( this.directlyAddedPaths ) { - this.directlyAddedPaths.add(path); - } ResourceHelper.getOrCreateResource(resolver, path, properties); @@ -1331,6 +1045,8 @@ public class JobManagerImpl resolver.commit(); return true; + } else { + logger.debug("No job resource found at {}", job.getResourcePath()); } } catch ( final PersistenceException ignore ) { this.ignoreException(ignore); @@ -1353,10 +1069,8 @@ public class JobManagerImpl if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) { // get the queue configuration final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic()); - final AbstractJobQueue queue; - synchronized ( queuesLock ) { - queue = this.queues.get(queueInfo.queueName); - } + final AbstractJobQueue queue = (AbstractJobQueue)this.qManager.getQueue(queueInfo.queueName); + boolean stopped = false; if ( queue != null ) { stopped = queue.stopJob(job); Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java?rev=1632141&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java (added) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java Wed Oct 15 17:55:52 2014 @@ -0,0 +1,320 @@ +package org.apache.sling.event.impl.jobs; + +import org.slf4j.Logger; +import org.slf4j.Marker; + +public class TestLogger implements Logger { + + private final boolean DEBUG = false; + + private final Logger logger; + + @Override + public String getName() { + return logger.getName(); + } + + @Override + public boolean isTraceEnabled() { + return logger.isTraceEnabled(); + } + + @Override + public void trace(String msg) { + logger.trace(msg); + } + + @Override + public void trace(String format, Object arg) { + logger.trace(format, arg); + } + + @Override + public void trace(String format, Object arg1, Object arg2) { + logger.trace(format, arg1, arg2); + } + + @Override + public void trace(String format, Object[] argArray) { + logger.trace(format, argArray); + } + + @Override + public void trace(String msg, Throwable t) { + logger.trace(msg, t); + } + + @Override + public boolean isTraceEnabled(Marker marker) { + return logger.isTraceEnabled(marker); + } + + @Override + public void trace(Marker marker, String msg) { + logger.trace(marker, msg); + } + + @Override + public void trace(Marker marker, String format, Object arg) { + logger.trace(marker, format, arg); + } + + @Override + public void trace(Marker marker, String format, Object arg1, Object arg2) { + logger.trace(marker, format, arg1, arg2); + } + + @Override + public void trace(Marker marker, String format, Object[] argArray) { + logger.trace(marker, format, argArray); + } + + @Override + public void trace(Marker marker, String msg, Throwable t) { + logger.trace(marker, msg, t); + } + + @Override + public boolean isDebugEnabled() { + return ( DEBUG ? logger.isInfoEnabled() : logger.isDebugEnabled()); + } + + @Override + public void debug(String msg) { + if ( DEBUG) logger.info(msg); else logger.debug(msg); + } + + @Override + public void debug(String format, Object arg) { + if ( DEBUG) logger.info(format, arg);else logger.debug(format, arg); + } + + @Override + public void debug(String format, Object arg1, Object arg2) { + if ( DEBUG) logger.info(format, arg1, arg2);else logger.debug(format, arg1, arg2); + } + + @Override + public void debug(String format, Object[] argArray) { + if ( DEBUG) logger.info(format, argArray);else logger.debug(format,argArray); + } + + @Override + public void debug(String msg, Throwable t) { + if ( DEBUG) logger.info(msg, t);else logger.debug(msg,t); + } + + @Override + public boolean isDebugEnabled(Marker marker) { + return (DEBUG ? logger.isInfoEnabled(marker) : logger.isDebugEnabled(marker)); + } + + @Override + public void debug(Marker marker, String msg) { + if ( DEBUG) logger.info(marker, msg);else logger.debug(marker,msg); + } + + @Override + public void debug(Marker marker, String format, Object arg) { + if ( DEBUG) logger.info(marker, format, arg);else logger.debug(marker,format,arg); + } + + @Override + public void debug(Marker marker, String format, Object arg1, Object arg2) { + if ( DEBUG) logger.info(marker, format, arg1, arg2);else logger.debug(marker, format, arg1, arg2); + } + + @Override + public void debug(Marker marker, String format, Object[] argArray) { + if ( DEBUG) logger.info(marker, format, argArray); else logger.debug(marker, format, argArray); + } + + @Override + public void debug(Marker marker, String msg, Throwable t) { + if ( DEBUG) logger.info(marker, msg, t); else logger.debug(marker, msg, t); + } + + @Override + public boolean isInfoEnabled() { + return logger.isInfoEnabled(); + } + + @Override + public void info(String msg) { + logger.info(msg); + } + + @Override + public void info(String format, Object arg) { + logger.info(format, arg); + } + + @Override + public void info(String format, Object arg1, Object arg2) { + logger.info(format, arg1, arg2); + } + + @Override + public void info(String format, Object[] argArray) { + logger.info(format, argArray); + } + + @Override + public void info(String msg, Throwable t) { + logger.info(msg, t); + } + + @Override + public boolean isInfoEnabled(Marker marker) { + return logger.isInfoEnabled(marker); + } + + @Override + public void info(Marker marker, String msg) { + logger.info(marker, msg); + } + + @Override + public void info(Marker marker, String format, Object arg) { + logger.info(marker, format, arg); + } + + @Override + public void info(Marker marker, String format, Object arg1, Object arg2) { + logger.info(marker, format, arg1, arg2); + } + + @Override + public void info(Marker marker, String format, Object[] argArray) { + logger.info(marker, format, argArray); + } + + @Override + public void info(Marker marker, String msg, Throwable t) { + logger.info(marker, msg, t); + } + + @Override + public boolean isWarnEnabled() { + return logger.isWarnEnabled(); + } + + @Override + public void warn(String msg) { + logger.warn(msg); + } + + @Override + public void warn(String format, Object arg) { + logger.warn(format, arg); + } + + @Override + public void warn(String format, Object[] argArray) { + logger.warn(format, argArray); + } + + @Override + public void warn(String format, Object arg1, Object arg2) { + logger.warn(format, arg1, arg2); + } + + @Override + public void warn(String msg, Throwable t) { + logger.warn(msg, t); + } + + @Override + public boolean isWarnEnabled(Marker marker) { + return logger.isWarnEnabled(marker); + } + + @Override + public void warn(Marker marker, String msg) { + logger.warn(marker, msg); + } + + @Override + public void warn(Marker marker, String format, Object arg) { + logger.warn(marker, format, arg); + } + + @Override + public void warn(Marker marker, String format, Object arg1, Object arg2) { + logger.warn(marker, format, arg1, arg2); + } + + @Override + public void warn(Marker marker, String format, Object[] argArray) { + logger.warn(marker, format, argArray); + } + + @Override + public void warn(Marker marker, String msg, Throwable t) { + logger.warn(marker, msg, t); + } + + @Override + public boolean isErrorEnabled() { + return logger.isErrorEnabled(); + } + + @Override + public void error(String msg) { + logger.error(msg); + } + + @Override + public void error(String format, Object arg) { + logger.error(format, arg); + } + + @Override + public void error(String format, Object arg1, Object arg2) { + logger.error(format, arg1, arg2); + } + + @Override + public void error(String format, Object[] argArray) { + logger.error(format, argArray); + } + + @Override + public void error(String msg, Throwable t) { + logger.error(msg, t); + } + + @Override + public boolean isErrorEnabled(Marker marker) { + return logger.isErrorEnabled(marker); + } + + @Override + public void error(Marker marker, String msg) { + logger.error(marker, msg); + } + + @Override + public void error(Marker marker, String format, Object arg) { + logger.error(marker, format, arg); + } + + @Override + public void error(Marker marker, String format, Object arg1, Object arg2) { + logger.error(marker, format, arg1, arg2); + } + + @Override + public void error(Marker marker, String format, Object[] argArray) { + logger.error(marker, format, argArray); + } + + @Override + public void error(Marker marker, String msg, Throwable t) { + logger.error(marker, msg, t); + } + + public TestLogger(final Logger l) { + this.logger = l; + } +} Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1632141&r1=1632140&r2=1632141&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Wed Oct 15 17:55:52 2014 @@ -46,9 +46,7 @@ import org.osgi.framework.Constants; value=ConfigurationConstants.DEFAULT_TYPE, options={@PropertyOption(name="UNORDERED",value="Parallel"), @PropertyOption(name="ORDERED",value="Ordered"), - @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin"), - @PropertyOption(name="IGNORE",value="Ignore"), - @PropertyOption(name="DROP",value="Drop")}), + @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin")}), @Property(name=ConfigurationConstants.PROP_TOPICS, unbounded=PropertyUnbounded.ARRAY), @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL, @@ -183,6 +181,9 @@ public class InternalQueueConfiguration return false; } } + if ( type == Type.IGNORE || type == Type.DROP ) { + return false; + } return true; }