Author: cziegeler
Date: Wed Oct 15 17:55:52 2014
New Revision: 1632141

URL: http://svn.apache.org/r1632141
Log:
SLING-4048 : Avoid keeping jobs in memory. Rewrite statistics, queue and topic 
handling (WiP)

Added:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
   (with props)
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
   (with props)
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
   (with props)
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
   (with props)
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
   (with props)
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
   (with props)
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobsImplTest.java
   (with props)
Removed:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
Modified:
    sling/trunk/bundles/extensions/event/pom.xml
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
   (contents, props changed)
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java

Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Wed Oct 15 17:55:52 2014
@@ -116,7 +116,7 @@
                         -Xmx2048m -XX:MaxPermSize=512m
                     </argLine>
                     <includes>
-                        <include>**/it/*</include>
+                        <include>**/it/OrderedQueueTest*</include>
                     </includes>
                 </configuration>
             </plugin>

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
 Wed Oct 15 17:55:52 2014
@@ -65,6 +65,8 @@ public class JobHandler {
      * @return <code>true</code> if rescheduling was successful, 
<code>false</code> otherwise.
      */
     public boolean reschedule() {
+        // update event with retry count and retries
+        this.job.retry();
         return this.jobManager.reschedule(this.job);
     }
 

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
 Wed Oct 15 17:55:52 2014
@@ -68,6 +68,8 @@ public class JobImpl implements Job, Com
 
     private final List<Exception> readErrorList;
 
+    private final long counter;
+
     /**
      * Create a new job instance
      *
@@ -90,6 +92,8 @@ public class JobImpl implements Job, Com
 
         this.properties = new ValueMapDecorator(properties);
         
this.properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, jobId);
+        final int lastPos = jobId.lastIndexOf('_');
+        this.counter = Long.valueOf(jobId.substring(lastPos + 1));
     }
 
     /**
@@ -382,7 +386,13 @@ public class JobImpl implements Job, Com
     public int compareTo(final JobImpl o) {
         int result = this.getCreated().compareTo(o.getCreated());
         if ( result == 0 ) {
-            result = this.getTopic().compareTo(o.getTopic());
+            if ( this.counter < o.counter ) {
+                result = -1;
+            } else if ( this.counter > o.counter ) {
+                result = 1;
+            } else {
+                result = this.jobId.compareTo(o.jobId);
+            }
         }
         return result;
     }

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
 Wed Oct 15 17:55:52 2014
@@ -23,13 +23,9 @@ import java.util.Calendar;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -53,14 +49,9 @@ import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import 
org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
-import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
-import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl;
 import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
-import org.apache.sling.event.impl.jobs.queues.OrderedJobQueue;
-import org.apache.sling.event.impl.jobs.queues.ParallelJobQueue;
-import org.apache.sling.event.impl.jobs.queues.TopicRoundRobinJobQueue;
-import org.apache.sling.event.impl.jobs.stats.StatisticsImpl;
-import org.apache.sling.event.impl.jobs.stats.TopicStatisticsImpl;
+import org.apache.sling.event.impl.jobs.queues.QueueManager;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.impl.support.ScheduleInfoImpl;
@@ -77,7 +68,6 @@ import org.apache.sling.event.jobs.Queue
 import org.apache.sling.event.jobs.ScheduledJobInfo;
 import org.apache.sling.event.jobs.Statistics;
 import org.apache.sling.event.jobs.TopicStatistics;
-import org.apache.sling.event.jobs.consumer.JobExecutor;
 import org.apache.sling.event.jobs.jmx.QueuesMBean;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -93,23 +83,21 @@ import org.slf4j.LoggerFactory;
 @Component(immediate=true)
 @Service(value={JobManager.class, EventHandler.class, Runnable.class})
 @Properties({
-    @Property(name="scheduler.period", longValue=60, propertyPrivate=true),
-    @Property(name="scheduler.concurrent", boolValue=false, 
propertyPrivate=true),
+    @Property(name="scheduler.period", longValue=60),
+    @Property(name="scheduler.concurrent", boolValue=false),
     @Property(name=EventConstants.EVENT_TOPIC,
               value={SlingConstants.TOPIC_RESOURCE_ADDED,
                      SlingConstants.TOPIC_RESOURCE_CHANGED,
                      SlingConstants.TOPIC_RESOURCE_REMOVED,
-                     "org/apache/sling/event/notification/job/*",
                      Utility.TOPIC_STOP,
                      ResourceHelper.BUNDLE_EVENT_STARTED,
-                     ResourceHelper.BUNDLE_EVENT_UPDATED}, 
propertyPrivate=true)
+                     ResourceHelper.BUNDLE_EVENT_UPDATED})
 })
 public class JobManagerImpl
-    extends StatisticsImpl
     implements JobManager, EventHandler, Runnable, TopologyAware {
 
     /** Default logger. */
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private final Logger logger = new 
TestLogger(LoggerFactory.getLogger(this.getClass()));
 
     @Reference
     private TopologyHandler topologyHandler;
@@ -136,30 +124,18 @@ public class JobManagerImpl
     @Reference
     private QueueConfigurationManager queueManager;
 
-    private volatile TopologyCapabilities topologyCapabilities;
-
-    private MaintenanceTask maintenanceTask;
+    @Reference
+    private StatisticsManager statisticsManager;
 
-    private BackgroundLoader backgroundLoader;
+    @Reference QueueManager qManager;
 
-    /** Lock object for the queues map - we don't want to sync directly on the 
concurrent map. */
-    private final Object queuesLock = new Object();
+    private volatile TopologyCapabilities topologyCapabilities;
 
-    /** All active queues. */
-    private final Map<String, AbstractJobQueue> queues = new 
ConcurrentHashMap<String, AbstractJobQueue>();
+    private MaintenanceTask maintenanceTask;
 
     /** We count the scheduler runs. */
     private volatile long schedulerRuns;
 
-    /** Current statistics. */
-    private final StatisticsImpl baseStatistics = new StatisticsImpl();
-
-    /** Statistics per topic. */
-    private final ConcurrentMap<String, TopicStatistics> topicStatistics = new 
ConcurrentHashMap<String, TopicStatistics>();
-
-    /** Set of paths directly added as jobs - these will be ignored during 
observation handling. */
-    private final Set<String> directlyAddedPaths = new HashSet<String>();
-
     /** Job Scheduler. */
     private JobSchedulerImpl jobScheduler;
 
@@ -171,7 +147,6 @@ public class JobManagerImpl
     protected void activate(final Map<String, Object> props) throws 
LoginException {
         this.jobScheduler = new JobSchedulerImpl(this.configuration, 
this.scheduler, this);
         this.maintenanceTask = new MaintenanceTask(this.configuration);
-        this.backgroundLoader = new BackgroundLoader(this, this.configuration);
 
         this.topologyHandler.addListener(this);
         logger.info("Apache Sling Job Manager started on instance {}", 
Environment.APPLICATION_ID);
@@ -187,18 +162,7 @@ public class JobManagerImpl
 
         this.jobScheduler.deactivate();
 
-        this.backgroundLoader.deactivate();
-        this.backgroundLoader = null;
-
         this.maintenanceTask = null;
-        final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
-        while ( i.hasNext() ) {
-            final AbstractJobQueue jbq = i.next();
-            jbq.close();
-            // update mbeans
-            ((QueuesMBeanImpl)queuesMBean).sendEvent(new 
QueueStatusEvent(null, jbq));
-        }
-        this.queues.clear();
         logger.info("Apache Sling Job Manager stopped on instance {}", 
Environment.APPLICATION_ID);
     }
 
@@ -212,37 +176,6 @@ public class JobManagerImpl
         this.schedulerRuns++;
         logger.debug("Job manager maintenance: Starting #{}", 
this.schedulerRuns);
 
-        // check for unprocessed jobs first
-        logger.debug("Checking for unprocessed jobs...");
-        for(final AbstractJobQueue jbq : this.queues.values() ) {
-            jbq.checkForUnprocessedJobs();
-        }
-
-        // we only do a full clean up on every fifth run
-        final boolean doFullCleanUp = (schedulerRuns % 5 == 0);
-
-        if ( doFullCleanUp ) {
-            // check for idle queue
-            logger.debug("Checking for idle queues...");
-
-           // we synchronize to avoid creating a queue which is about to be 
removed during cleanup
-            synchronized ( queuesLock ) {
-                final Iterator<Map.Entry<String, AbstractJobQueue>> i = 
this.queues.entrySet().iterator();
-                while ( i.hasNext() ) {
-                    final Map.Entry<String, AbstractJobQueue> current = 
i.next();
-                    final AbstractJobQueue jbq = current.getValue();
-                    if ( jbq.tryToClose() ) {
-                        logger.debug("Removing idle job queue {}", jbq);
-                        // copy statistics
-                        this.baseStatistics.add(jbq);
-                        // remove
-                        i.remove();
-                        // update mbeans
-                        ((QueuesMBeanImpl)queuesMBean).sendEvent(new 
QueueStatusEvent(null, jbq));
-                    }
-                }
-            }
-        }
         // invoke maintenance task
         final MaintenanceTask task = this.maintenanceTask;
         if ( task != null ) {
@@ -252,93 +185,6 @@ public class JobManagerImpl
     }
 
     /**
-     * Process a new job
-     * This method first searches the corresponding queue - if such a queue
-     * does not exist yet, it is created and started.
-     *
-     * @param job The job
-     */
-    void process(final JobImpl job) {
-        // check if we still are able to process this job
-        final JobExecutor consumer = 
this.jobConsumerManager.getExecutor(job.getTopic());
-        boolean reassign = false;
-        String reassignTargetId = null;
-        if ( consumer == null && (!job.isBridgedEvent() || 
!this.jobConsumerManager.supportsBridgedEvents())) {
-            reassign = true;
-        }
-
-        // get the queue configuration
-        final TopologyCapabilities caps = this.topologyCapabilities;
-        final QueueInfo queueInfo = caps != null ? 
caps.getQueueInfo(job.getTopic()) : null;
-        if ( queueInfo == null ) {
-            return; // TODO
-        }
-        final InternalQueueConfiguration config = queueInfo.queueConfiguration;
-
-        // Sanity check if queue configuration has changed
-        if ( config.getType() == QueueConfiguration.Type.DROP ) {
-            if ( logger.isDebugEnabled() ) {
-                logger.debug("Dropping job due to configuration of queue {} : 
{}", queueInfo.queueName, Utility.toString(job));
-            }
-            this.finishJob(job, Job.JobState.DROPPED, false, -1);
-        } else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
-            if ( !reassign ) {
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug("Ignoring job due to configuration of queue 
{} : {}", queueInfo.queueName, Utility.toString(job));
-                }
-            }
-        } else {
-
-            if ( reassign ) {
-                reassignTargetId = (caps == null ? null : 
caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
-
-            } else {
-                // get or create queue
-                AbstractJobQueue queue = null;
-                // we synchronize to avoid creating a queue which is about to 
be removed during cleanup
-                synchronized ( queuesLock ) {
-                    queue = this.queues.get(queueInfo.queueName);
-                    // check for reconfiguration, we really do an identity 
check here(!)
-                    if ( queue != null && queue.getConfiguration() != config ) 
{
-                        this.outdateQueue(queue);
-                        // we use a new queue with the configuration
-                        queue = null;
-                    }
-                    if ( queue == null ) {
-                        if ( config.getType() == 
QueueConfiguration.Type.ORDERED ) {
-                            queue = new OrderedJobQueue(queueInfo.queueName, 
config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin);
-                        } else if ( config.getType() == 
QueueConfiguration.Type.UNORDERED ) {
-                            queue = new ParallelJobQueue(queueInfo.queueName, 
config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin, 
this.scheduler);
-                        } else if ( config.getType() == 
QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
-                            queue = new 
TopicRoundRobinJobQueue(queueInfo.queueName, config, this.jobConsumerManager, 
this.threadPoolManager, this.eventAdmin, this.scheduler);
-                        }
-                        if ( queue == null ) {
-                            // this is just a sanity check, actually we can 
never get here
-                            logger.warn("Ignoring event due to unknown queue 
type of queue {} : {}", queueInfo.queueName, Utility.toString(job));
-                            this.finishJob(job, Job.JobState.DROPPED, false, 
-1);
-                        } else {
-                            queues.put(queueInfo.queueName, queue);
-                            ((QueuesMBeanImpl)queuesMBean).sendEvent(new 
QueueStatusEvent(queue, null));
-                            queue.start();
-                        }
-                    }
-                }
-
-                // and put job
-                if ( queue != null ) {
-                    job.updateQueueInfo(queue);
-                    final JobHandler handler = new JobHandler(job, this);
-
-                    queue.process(handler);
-                }
-            }
-        }
-        if ( reassign ) {
-            this.maintenanceTask.reassignJob(job, reassignTargetId);
-        }
-    }
-
-    /**
      * This method is invoked periodically by the scheduler.
      * In the default configuration every minute
      * @see java.lang.Runnable#run()
@@ -358,61 +204,13 @@ public class JobManagerImpl
         }
     }
 
-    private void outdateQueue(final AbstractJobQueue queue) {
-        // remove the queue with the old name
-        // check for main queue
-        final String oldName = ResourceHelper.filterQueueName(queue.getName());
-        this.queues.remove(oldName);
-        // check if we can close or have to rename
-        if ( queue.tryToClose() ) {
-            // copy statistics
-            this.baseStatistics.add(queue);
-            // update mbeans
-            ((QueuesMBeanImpl)queuesMBean).sendEvent(new 
QueueStatusEvent(null, queue));
-        } else {
-            queue.outdate();
-            // readd with new name
-            String newName = ResourceHelper.filterName(queue.getName());
-            int index = 0;
-            while ( this.queues.containsKey(newName) ) {
-                newName = ResourceHelper.filterName(queue.getName()) + '$' + 
String.valueOf(index++);
-            }
-            this.queues.put(newName, queue);
-            // update mbeans
-            ((QueuesMBeanImpl)queuesMBean).sendEvent(new 
QueueStatusEvent(queue, queue));
-        }
-    }
-
-    /**
-     * @see org.apache.sling.event.impl.jobs.stats.StatisticsImpl#reset()
-     * Reset this statistics and all queues.
-     */
-    @Override
-    public synchronized void reset() {
-        this.baseStatistics.reset();
-        for(final AbstractJobQueue jq : this.queues.values() ) {
-            jq.reset();
-        }
-        this.topicStatistics.clear();
-    }
-
     /**
      * @see org.apache.sling.event.jobs.JobManager#restart()
      */
     @Override
     public void restart() {
-        // let's rename/close all queues and clear them
-        synchronized ( queuesLock ) {
-            final List<AbstractJobQueue> queues = new 
ArrayList<AbstractJobQueue>(this.queues.values());
-            for(final AbstractJobQueue queue : queues ) {
-                queue.clear();
-                this.outdateQueue(queue);
-            }
-        }
-        // reset statistics
-        this.reset();
-        // and now load again
-        this.backgroundLoader.restart();
+        // TODO reset statistics
+        // TODO reload queues?
     }
 
     /**
@@ -429,17 +227,6 @@ public class JobManagerImpl
     @Override
     public void handleEvent(final Event event) {
         if ( SlingConstants.TOPIC_RESOURCE_ADDED.equals(event.getTopic()) ) {
-            final String path = (String) 
event.getProperty(SlingConstants.PROPERTY_PATH);
-            final String rt = (String) 
event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
-            if ( (rt == null || ResourceHelper.RESOURCE_TYPE_JOB.equals(rt)) &&
-                 this.configuration.isLocalJob(path) ) {
-                synchronized ( this.directlyAddedPaths ) {
-                    if ( directlyAddedPaths.remove(path) ) {
-                        return;
-                    }
-                }
-                this.backgroundLoader.loadJob(path);
-            }
             this.jobScheduler.handleEvent(event);
         } else if ( Utility.TOPIC_STOP.equals(event.getTopic()) ) {
             if ( !EventUtil.isLocal(event) ) {
@@ -448,57 +235,20 @@ public class JobManagerImpl
             }
         } else if ( 
ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
                  || 
ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
-            this.backgroundLoader.tryToReloadUnloadedJobs();
             this.jobScheduler.handleEvent(event);
         } else if ( 
SlingConstants.TOPIC_RESOURCE_CHANGED.equals(event.getTopic())
                  || 
SlingConstants.TOPIC_RESOURCE_REMOVED.equals(event.getTopic()) ) {
             this.jobScheduler.handleEvent(event);
-        } else {
-            if ( EventUtil.isLocal(event) ) {
-                // job notifications
-                final String topic = 
(String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
-                if ( topic != null ) { // this is just a sanity check
-                    TopicStatisticsImpl ts = 
(TopicStatisticsImpl)this.topicStatistics.get(topic);
-                    if ( ts == null ) {
-                        this.topicStatistics.putIfAbsent(topic, new 
TopicStatisticsImpl(topic));
-                        ts = 
(TopicStatisticsImpl)this.topicStatistics.get(topic);
-                    }
-                    if ( 
event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) {
-                        ts.addCancelled();
-                    } else if ( 
event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) {
-                        ts.addFailed();
-                    } else if ( 
event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) {
-                        final Long time = 
(Long)event.getProperty(Utility.PROPERTY_TIME);
-                        ts.addFinished(time == null ? -1 : time);
-                    } else if ( 
event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) {
-                        final Long time = 
(Long)event.getProperty(Utility.PROPERTY_TIME);
-                        ts.addActivated(time == null ? -1 : time);
-                    }
-                }
-            }
         }
     }
 
     private void stopProcessing() {
-        this.backgroundLoader.stop();
-
-        // let's rename/close all queues and clear them
-        synchronized ( queuesLock ) {
-            final List<AbstractJobQueue> queues = new 
ArrayList<AbstractJobQueue>(this.queues.values());
-            for(final AbstractJobQueue queue : queues ) {
-                queue.clear();
-                this.outdateQueue(queue);
-            }
-        }
-
         this.topologyCapabilities = null;
     }
 
     private void startProcessing(final TopologyCapabilities caps) {
         // create new capabilities and update view
         this.topologyCapabilities = caps;
-
-        this.backgroundLoader.start();
     }
 
     @Override
@@ -518,12 +268,7 @@ public class JobManagerImpl
      */
     @Override
     public synchronized Statistics getStatistics() {
-        this.copyFrom(this.baseStatistics);
-        for(final AbstractJobQueue jq : this.queues.values() ) {
-            this.add(jq);
-        }
-
-        return this;
+        return this.statisticsManager.getOverallStatistics();
     }
 
     /**
@@ -531,7 +276,7 @@ public class JobManagerImpl
      */
     @Override
     public Iterable<TopicStatistics> getTopicStatistics() {
-        return topicStatistics.values();
+        return this.statisticsManager.getTopicStatistics().values();
     }
 
     /**
@@ -539,7 +284,7 @@ public class JobManagerImpl
      */
     @Override
     public Queue getQueue(final String name) {
-        return this.queues.get(ResourceHelper.filterQueueName(name));
+        return qManager.getQueue(ResourceHelper.filterQueueName(name));
     }
 
     /**
@@ -547,30 +292,7 @@ public class JobManagerImpl
      */
     @Override
     public Iterable<Queue> getQueues() {
-        final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator();
-        return new Iterable<Queue>() {
-
-            @Override
-            public Iterator<Queue> iterator() {
-                return new Iterator<Queue>() {
-
-                    @Override
-                    public boolean hasNext() {
-                        return jqI.hasNext();
-                    }
-
-                    @Override
-                    public Queue next() {
-                        return jqI.next();
-                    }
-
-                    @Override
-                    public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            }
-        };
+        return qManager.getQueues();
     }
 
     @Override
@@ -1198,12 +920,7 @@ public class JobManagerImpl
                             jobName,
                             jobProperties,
                             info);
-                    if ( job != null ) {
-                        if ( configuration.isLocalJob(job.getResourcePath()) ) 
{
-                            this.backgroundLoader.addJob(job);
-                        }
-                        return job;
-                    }
+                    return job;
                 } catch (final PersistenceException re ) {
                     // something went wrong, so let's log it
                     this.logger.error("Exception during persisting new job '" 
+ Utility.toString(jobTopic, jobName, jobProperties) + "'", re);
@@ -1268,9 +985,6 @@ public class JobManagerImpl
         if ( logger.isDebugEnabled() ) {
             logger.debug("Storing new job {} at {}", properties, path);
         }
-        synchronized ( this.directlyAddedPaths ) {
-            this.directlyAddedPaths.add(path);
-        }
         ResourceHelper.getOrCreateResource(resolver,
                 path,
                 properties);
@@ -1331,6 +1045,8 @@ public class JobManagerImpl
                 resolver.commit();
 
                 return true;
+            } else {
+                logger.debug("No job resource found at {}", 
job.getResourcePath());
             }
         } catch ( final PersistenceException ignore ) {
             this.ignoreException(ignore);
@@ -1353,10 +1069,8 @@ public class JobManagerImpl
         if ( job != null && 
!this.configuration.isStoragePath(job.getResourcePath()) ) {
             // get the queue configuration
             final QueueInfo queueInfo = 
this.queueManager.getQueueInfo(job.getTopic());
-            final AbstractJobQueue queue;
-            synchronized ( queuesLock ) {
-                queue = this.queues.get(queueInfo.queueName);
-            }
+            final AbstractJobQueue queue = 
(AbstractJobQueue)this.qManager.getQueue(queueInfo.queueName);
+
             boolean stopped = false;
             if ( queue != null ) {
                 stopped = queue.stopJob(job);

Added: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java?rev=1632141&view=auto
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
 (added)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
 Wed Oct 15 17:55:52 2014
@@ -0,0 +1,320 @@
+package org.apache.sling.event.impl.jobs;
+
+import org.slf4j.Logger;
+import org.slf4j.Marker;
+
+public class TestLogger implements Logger {
+
+    private final boolean DEBUG = false;
+
+    private final Logger logger;
+
+    @Override
+    public String getName() {
+        return logger.getName();
+    }
+
+    @Override
+    public boolean isTraceEnabled() {
+        return logger.isTraceEnabled();
+    }
+
+    @Override
+    public void trace(String msg) {
+        logger.trace(msg);
+    }
+
+    @Override
+    public void trace(String format, Object arg) {
+        logger.trace(format, arg);
+    }
+
+    @Override
+    public void trace(String format, Object arg1, Object arg2) {
+        logger.trace(format, arg1, arg2);
+    }
+
+    @Override
+    public void trace(String format, Object[] argArray) {
+        logger.trace(format, argArray);
+    }
+
+    @Override
+    public void trace(String msg, Throwable t) {
+        logger.trace(msg, t);
+    }
+
+    @Override
+    public boolean isTraceEnabled(Marker marker) {
+        return logger.isTraceEnabled(marker);
+    }
+
+    @Override
+    public void trace(Marker marker, String msg) {
+        logger.trace(marker, msg);
+    }
+
+    @Override
+    public void trace(Marker marker, String format, Object arg) {
+        logger.trace(marker, format, arg);
+    }
+
+    @Override
+    public void trace(Marker marker, String format, Object arg1, Object arg2) {
+        logger.trace(marker, format, arg1, arg2);
+    }
+
+    @Override
+    public void trace(Marker marker, String format, Object[] argArray) {
+        logger.trace(marker, format, argArray);
+    }
+
+    @Override
+    public void trace(Marker marker, String msg, Throwable t) {
+        logger.trace(marker, msg, t);
+    }
+
+    @Override
+    public boolean isDebugEnabled() {
+        return ( DEBUG ? logger.isInfoEnabled() : logger.isDebugEnabled());
+    }
+
+    @Override
+    public void debug(String msg) {
+        if ( DEBUG) logger.info(msg); else logger.debug(msg);
+    }
+
+    @Override
+    public void debug(String format, Object arg) {
+        if ( DEBUG) logger.info(format, arg);else logger.debug(format, arg);
+    }
+
+    @Override
+    public void debug(String format, Object arg1, Object arg2) {
+        if ( DEBUG) logger.info(format, arg1, arg2);else logger.debug(format, 
arg1, arg2);
+    }
+
+    @Override
+    public void debug(String format, Object[] argArray) {
+        if ( DEBUG) logger.info(format, argArray);else 
logger.debug(format,argArray);
+    }
+
+    @Override
+    public void debug(String msg, Throwable t) {
+        if ( DEBUG) logger.info(msg, t);else logger.debug(msg,t);
+    }
+
+    @Override
+    public boolean isDebugEnabled(Marker marker) {
+        return (DEBUG ? logger.isInfoEnabled(marker) : 
logger.isDebugEnabled(marker));
+    }
+
+    @Override
+    public void debug(Marker marker, String msg) {
+        if ( DEBUG) logger.info(marker, msg);else logger.debug(marker,msg);
+    }
+
+    @Override
+    public void debug(Marker marker, String format, Object arg) {
+        if ( DEBUG) logger.info(marker, format, arg);else 
logger.debug(marker,format,arg);
+    }
+
+    @Override
+    public void debug(Marker marker, String format, Object arg1, Object arg2) {
+        if ( DEBUG) logger.info(marker, format, arg1, arg2);else 
logger.debug(marker, format, arg1, arg2);
+    }
+
+    @Override
+    public void debug(Marker marker, String format, Object[] argArray) {
+        if ( DEBUG) logger.info(marker, format, argArray); else 
logger.debug(marker, format, argArray);
+    }
+
+    @Override
+    public void debug(Marker marker, String msg, Throwable t) {
+        if ( DEBUG) logger.info(marker, msg, t); else logger.debug(marker, 
msg, t);
+    }
+
+    @Override
+    public boolean isInfoEnabled() {
+        return logger.isInfoEnabled();
+    }
+
+    @Override
+    public void info(String msg) {
+        logger.info(msg);
+    }
+
+    @Override
+    public void info(String format, Object arg) {
+        logger.info(format, arg);
+    }
+
+    @Override
+    public void info(String format, Object arg1, Object arg2) {
+        logger.info(format, arg1, arg2);
+    }
+
+    @Override
+    public void info(String format, Object[] argArray) {
+        logger.info(format, argArray);
+    }
+
+    @Override
+    public void info(String msg, Throwable t) {
+        logger.info(msg, t);
+    }
+
+    @Override
+    public boolean isInfoEnabled(Marker marker) {
+        return logger.isInfoEnabled(marker);
+    }
+
+    @Override
+    public void info(Marker marker, String msg) {
+        logger.info(marker, msg);
+    }
+
+    @Override
+    public void info(Marker marker, String format, Object arg) {
+        logger.info(marker, format, arg);
+    }
+
+    @Override
+    public void info(Marker marker, String format, Object arg1, Object arg2) {
+        logger.info(marker, format, arg1, arg2);
+    }
+
+    @Override
+    public void info(Marker marker, String format, Object[] argArray) {
+        logger.info(marker, format, argArray);
+    }
+
+    @Override
+    public void info(Marker marker, String msg, Throwable t) {
+        logger.info(marker, msg, t);
+    }
+
+    @Override
+    public boolean isWarnEnabled() {
+        return logger.isWarnEnabled();
+    }
+
+    @Override
+    public void warn(String msg) {
+        logger.warn(msg);
+    }
+
+    @Override
+    public void warn(String format, Object arg) {
+        logger.warn(format, arg);
+    }
+
+    @Override
+    public void warn(String format, Object[] argArray) {
+        logger.warn(format, argArray);
+    }
+
+    @Override
+    public void warn(String format, Object arg1, Object arg2) {
+        logger.warn(format, arg1, arg2);
+    }
+
+    @Override
+    public void warn(String msg, Throwable t) {
+        logger.warn(msg, t);
+    }
+
+    @Override
+    public boolean isWarnEnabled(Marker marker) {
+        return logger.isWarnEnabled(marker);
+    }
+
+    @Override
+    public void warn(Marker marker, String msg) {
+        logger.warn(marker, msg);
+    }
+
+    @Override
+    public void warn(Marker marker, String format, Object arg) {
+        logger.warn(marker, format, arg);
+    }
+
+    @Override
+    public void warn(Marker marker, String format, Object arg1, Object arg2) {
+        logger.warn(marker, format, arg1, arg2);
+    }
+
+    @Override
+    public void warn(Marker marker, String format, Object[] argArray) {
+        logger.warn(marker, format, argArray);
+    }
+
+    @Override
+    public void warn(Marker marker, String msg, Throwable t) {
+        logger.warn(marker, msg, t);
+    }
+
+    @Override
+    public boolean isErrorEnabled() {
+        return logger.isErrorEnabled();
+    }
+
+    @Override
+    public void error(String msg) {
+        logger.error(msg);
+    }
+
+    @Override
+    public void error(String format, Object arg) {
+        logger.error(format, arg);
+    }
+
+    @Override
+    public void error(String format, Object arg1, Object arg2) {
+        logger.error(format, arg1, arg2);
+    }
+
+    @Override
+    public void error(String format, Object[] argArray) {
+        logger.error(format, argArray);
+    }
+
+    @Override
+    public void error(String msg, Throwable t) {
+        logger.error(msg, t);
+    }
+
+    @Override
+    public boolean isErrorEnabled(Marker marker) {
+        return logger.isErrorEnabled(marker);
+    }
+
+    @Override
+    public void error(Marker marker, String msg) {
+        logger.error(marker, msg);
+    }
+
+    @Override
+    public void error(Marker marker, String format, Object arg) {
+        logger.error(marker, format, arg);
+    }
+
+    @Override
+    public void error(Marker marker, String format, Object arg1, Object arg2) {
+        logger.error(marker, format, arg1, arg2);
+    }
+
+    @Override
+    public void error(Marker marker, String format, Object[] argArray) {
+        logger.error(marker, format, argArray);
+    }
+
+    @Override
+    public void error(Marker marker, String msg, Throwable t) {
+        logger.error(marker, msg, t);
+    }
+
+    public TestLogger(final Logger l) {
+        this.logger = l;
+    }
+}

Propchange: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
 Wed Oct 15 17:55:52 2014
@@ -46,9 +46,7 @@ import org.osgi.framework.Constants;
             value=ConfigurationConstants.DEFAULT_TYPE,
             options={@PropertyOption(name="UNORDERED",value="Parallel"),
                      @PropertyOption(name="ORDERED",value="Ordered"),
-                     @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic 
Round Robin"),
-                     @PropertyOption(name="IGNORE",value="Ignore"),
-                     @PropertyOption(name="DROP",value="Drop")}),
+                     @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic 
Round Robin")}),
     @Property(name=ConfigurationConstants.PROP_TOPICS,
             unbounded=PropertyUnbounded.ARRAY),
     @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
@@ -183,6 +181,9 @@ public class InternalQueueConfiguration
                 return false;
             }
         }
+        if ( type == Type.IGNORE || type == Type.DROP ) {
+            return false;
+        }
         return true;
     }
 


Reply via email to