Author: cziegeler
Date: Thu Oct 16 06:52:39 2014
New Revision: 1632217

URL: http://svn.apache.org/r1632217
Log:
SLING-4048 : Avoid keeping jobs in memory. Refactor job traversal and implement 
different queue strategies (WiP)

Added:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java
      - copied, changed from r1632213, 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
      - copied, changed from r1632213, 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java
      - copied, changed from r1632213, 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java
Removed:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java
Modified:
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
    
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
    
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java

Copied: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java
 (from r1632213, 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java)
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java&r1=1632213&r2=1632217&rev=1632217&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java
 Thu Oct 16 06:52:39 2014
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.event.impl.jobs.topics;
+package org.apache.sling.event.impl.jobs;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -24,20 +24,68 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.sling.api.resource.Resource;
-import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.Utility;
 import org.slf4j.Logger;
 
+/**
+ * The job topic traverser is an utility class to traverse all jobs
+ * of a specific topic in order of creation.
+ */
 public class JobTopicTraverser {
 
-    public interface Handler {
+    /**
+     * Callback called for each found job.
+     */
+    public interface JobCallback {
+
+        /**
+         * Callback handle for a job
+         * @param job The job to handle
+         * @return <code>true</code> If processing should continue, 
<code>false</code> otherwise.
+         */
         boolean handle(final JobImpl job);
     }
 
+    /**
+     * Callback called for each found resource.
+     */
+    public interface ResourceCallback {
+
+        /**
+         * Callback handle for a resource
+         * @param rsrc The resource to handle
+         * @return <code>true</code> If processing should continue, 
<code>false</code> otherwise.
+         */
+        boolean handle(final Resource rsrc);
+    }
+
+    /**
+     * Traverse the topic and call the callback for each found job.
+     *
+     * Once the callback notifies to stop traversing by returning false, the 
current minute
+     * will be processed completely (to ensure correct ordering of jobs) and 
then the
+     * traversal stops.
+     *
+     * @param logger        The logger to use for debug logging
+     * @param topicResource The topic resource
+     * @param handler       The callback
+     */
+    public static void traverse(final Logger logger,
+            final Resource topicResource,
+            final JobCallback handler) {
+        traverse(logger, topicResource, handler, null);
+    }
+
     public static void traverse(final Logger logger,
             final Resource topicResource,
-            final Handler handler) {
-        logger.debug("Processing topic {}", topicResource.getName());
+            final ResourceCallback handler) {
+        traverse(logger, topicResource, null, handler);
+    }
+
+    private static void traverse(final Logger logger,
+            final Resource topicResource,
+            final JobCallback jobHandler,
+            final ResourceCallback resourceHandler) {
+        logger.debug("Processing topic {}", 
topicResource.getName().replace('.', '/'));
         // now years
         for(final Resource yearResource: Utility.getSortedChildren(logger, 
"year", topicResource)) {
             final int year = Integer.valueOf(yearResource.getName());
@@ -69,23 +117,31 @@ public class JobTopicTraverser {
                             while ( jobIter.hasNext() ) {
                                 final Resource jobResource = jobIter.next();
 
-                                final JobImpl job = Utility.readJob(logger, 
jobResource);
-                                if ( job != null ) {
-                                    logger.debug("Found job {}", 
jobResource.getName());
-                                    jobs.add(job);
+                                if ( resourceHandler != null ) {
+                                    if ( !resourceHandler.handle(jobResource) 
) {
+                                        return;
+                                    }
+                                } else {
+                                    final JobImpl job = 
Utility.readJob(logger, jobResource);
+                                    if ( job != null ) {
+                                        logger.debug("Found job {}", 
jobResource.getName());
+                                        jobs.add(job);
+                                    }
                                 }
                             }
 
-                            Collections.sort(jobs);
+                            if ( jobHandler != null ) {
+                                Collections.sort(jobs);
 
-                            boolean stop = false;
-                            for(final JobImpl job : jobs) {
-                                if ( !handler.handle(job) ) {
-                                    stop = true;
+                                boolean stop = false;
+                                for(final JobImpl job : jobs) {
+                                    if ( !jobHandler.handle(job) ) {
+                                        stop = true;
+                                    }
+                                }
+                                if ( stop ) {
+                                    return;
                                 }
-                            }
-                            if ( stop ) {
-                                return;
                             }
                         }
                     }

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
 Thu Oct 16 06:52:39 2014
@@ -677,9 +677,12 @@ public abstract class AbstractJobQueue
             // we keep cancelled jobs and succeeded jobs if the queue is 
configured like this.
             final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || 
this.configuration.isKeepJobs();
             handler.finished(resultState, keepJobs, 
rescheduleInfo.processingTime);
+        } else {
+            this.services.topicManager.reschedule(handler.getJob());
         }
         this.notifyFinished(rescheduleInfo.reschedule);
 
+
         return rescheduleInfo.reschedule;
     }
 

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
 Thu Oct 16 06:52:39 2014
@@ -28,11 +28,9 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ResourceUtil;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.event.impl.jobs.JobImpl;
@@ -70,9 +68,6 @@ public class HistoryCleanUpTask implemen
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     @Reference
-    private ResourceResolverFactory resourceResolverFactory;
-
-    @Reference
     private JobManagerConfiguration configuration;
 
     @Override
@@ -100,10 +95,8 @@ public class HistoryCleanUpTask implemen
         } else {
             stateList = null;
         }
-        ResourceResolver resolver = null;
+        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
         try {
-            resolver = 
this.resourceResolverFactory.getAdministrativeResourceResolver(null);
-
             if ( stateList == null || 
stateList.contains(Job.JobState.SUCCEEDED.name()) ) {
                 this.cleanup(removeDate, resolver, context, 
configuration.getStoredSuccessfulJobsPath(), topics, null);
             }
@@ -117,12 +110,8 @@ public class HistoryCleanUpTask implemen
         } catch (final PersistenceException pe) {
             // in the case of an error, we just log this as a warning
             this.logger.warn("Exception during job resource tree cleanup.", 
pe);
-        } catch (final LoginException ignore) {
-            this.ignoreException(ignore);
         } finally {
-            if ( resolver != null ) {
-                resolver.close();
-            }
+            resolver.close();
         }
         return context.result().succeeded();
     }

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
 Thu Oct 16 06:52:39 2014
@@ -21,6 +21,7 @@ package org.apache.sling.event.impl.jobs
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,13 +30,18 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.TestLogger;
 import 
org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.jobs.QueueConfiguration.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * TODO - note last scan time and not all new observation events to avoid 
unnecessary rescan
+ * The queue job cache caches jobs per queue based on the topics the queue is 
actively
+ * processing.
+ *
+ * TODO cache needs to be synchronized!
  */
 public class QueueJobCache {
 
@@ -49,7 +55,9 @@ public class QueueJobCache {
 
     private final Set<String> topics;
 
-    private final Map<String, List<JobImpl>> cache = new HashMap<String, 
List<JobImpl>>();
+    private final Set<String> topicsWithNewJobs = new HashSet<String>();
+
+    private final List<JobImpl> cache = new ArrayList<JobImpl>();
 
     private final QueueInfo info;
 
@@ -59,99 +67,145 @@ public class QueueJobCache {
         this.configuration = configuration;
         this.info = info;
         this.topics = topics;
-        for(final String topic : topics) {
-            this.cache.put(topic, new ArrayList<JobImpl>());
-        }
+        this.topicsWithNewJobs.addAll(topics);
     }
 
+    /**
+     * Return the queue info for this queue.
+     * @return The queue info
+     */
     public QueueInfo getQueueInfo() {
         return this.info;
     }
 
+    /**
+     * All topics of this queue.
+     * @return The topics.
+     */
     public Set<String> getTopics() {
         return this.topics;
     }
 
     /**
      * Get the next job - this method is not called concurrently
-     * TODO This is very expensive atm
      */
     public JobImpl getNextJob() {
         JobImpl result = null;
 
-        // check state of cache
-        this.loadJobs();
+        if ( this.cache.isEmpty() ) {
+            final Set<String> checkingTopics = new HashSet<String>();
+            synchronized ( this.topicsWithNewJobs ) {
+                checkingTopics.addAll(this.topicsWithNewJobs);
+                this.topicsWithNewJobs.clear();
+            }
+            if ( !checkingTopics.isEmpty() ) {
+                this.loadJobs(checkingTopics);
+            }
+        }
 
-        final List<JobImpl> allJobs = new ArrayList<JobImpl>();
-        for(final Map.Entry<String, List<JobImpl>> entry : 
this.cache.entrySet()) {
-            allJobs.addAll(entry.getValue());
-        }
-        Collections.sort(allJobs);
-        if ( allJobs.size() > 0 ) {
-            result = allJobs.get(0);
+        if ( !this.cache.isEmpty() ) {
+            result = this.cache.remove(0);
         }
+
         return result;
     }
 
     /**
      * Load the next N x numberOf(topics) jobs
      */
-    private void loadJobs() {
+    private void loadJobs( final Set<String> checkingTopics) {
         logger.debug("Starting jobs loading...");
 
-        ResourceResolver resolver = null;
+        final Map<String, List<JobImpl>> topicCache = new HashMap<String, 
List<JobImpl>>();
+
+        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
         try {
-            for(final String topic : this.topics) {
-                final List<JobImpl> list = this.cache.get(topic);
-                if ( list.size() < this.maxPreloadLimit ) {
-                    list.clear();
-                    if ( resolver == null ) {
-                        resolver = this.configuration.createResourceResolver();
-                    }
+            for(final String topic : checkingTopics) {
+                final Resource baseResource = 
resolver.getResource(this.configuration.getLocalJobsPath());
 
-                    final Resource baseResource = 
resolver.getResource(this.configuration.getLocalJobsPath());
+                final List<JobImpl> list = new ArrayList<JobImpl>();
+                topicCache.put(topic, list);
 
-                    // sanity check - should never be null
-                    if ( baseResource != null ) {
-                        final Resource topicResource = 
baseResource.getChild(topic.replace('/', '.'));
-                        if ( topicResource != null ) {
-                            loadJobs(topic, topicResource);
-                        }
+                // sanity check - should never be null
+                if ( baseResource != null ) {
+                    final Resource topicResource = 
baseResource.getChild(topic.replace('/', '.'));
+                    if ( topicResource != null ) {
+                        loadJobs(topic, topicResource, list);
                     }
                 }
             }
         } finally {
-            if ( resolver != null ) {
-                resolver.close();
+            resolver.close();
+        }
+        orderTopics(topicCache);
+
+        logger.debug("Finished jobs loading {}", this.cache.size());
+    }
+
+    /**
+     * Order the topics based on the queue type and put them in the cache.
+     * @param topicCache The topic based cache
+     */
+    private void orderTopics(final Map<String, List<JobImpl>> topicCache) {
+        if ( this.info.queueConfiguration.getType() == Type.ORDERED
+             || this.info.queueConfiguration.getType() == Type.UNORDERED) {
+            for(final List<JobImpl> list : topicCache.values()) {
+                this.cache.addAll(list);
             }
+            Collections.sort(this.cache);
+        } else {
+            // topic round robin
+            boolean done = true;
+            do {
+                for(final Map.Entry<String, List<JobImpl>> entry : 
topicCache.entrySet()) {
+                    if ( !entry.getValue().isEmpty() ) {
+                        this.cache.add(entry.getValue().remove(0));
+                        if ( !entry.getValue().isEmpty() ) {
+                            done = false;
+                        }
+                    }
+                }
+            } while ( !done ) ;
         }
-        logger.debug("Finished jobs loading");
     }
 
     /**
      * Load the next N x numberOf(topics) jobs
      */
-    private void loadJobs(final String topic, final Resource topicResource) {
+    private void loadJobs(final String topic, final Resource topicResource, 
final List<JobImpl> list) {
         logger.debug("Loading jobs from topic {}", topic);
-        final List<JobImpl> result = this.cache.get(topic);
 
-        JobTopicTraverser.traverse(logger, topicResource, new 
JobTopicTraverser.Handler() {
+        JobTopicTraverser.traverse(logger, topicResource, new 
JobTopicTraverser.JobCallback() {
 
             @Override
             public boolean handle(final JobImpl job) {
                 if ( job.getProcessingStarted() == null && 
!job.hasReadErrors() ) {
-                    result.add(job);
+                    list.add(job);
                 } else {
                     logger.debug("Discarding job because {} or {}", 
job.getProcessingStarted(), job.hasReadErrors());
                 }
-                return result.size() < maxPreloadLimit;
+                return list.size() < maxPreloadLimit;
             }
         });
-        logger.debug("Caching {} jobs for topic {}", result.size(), topic);
+        logger.debug("Caching {} jobs for topic {}", list.size(), topic);
     }
 
+    /**
+     * Mark the topic to contain new jobs.
+     * @param topic The topic
+     */
     public void handleNewJob(final String topic) {
-        // TODO Auto-generated method stub
+        logger.debug("Update cache to handle new event for topic {}", topic);
+        synchronized ( this.topicsWithNewJobs ) {
+            this.topicsWithNewJobs.add(topic);
+        }
+    }
 
+    public void reschedule(final JobImpl job) {
+        if ( this.info.queueConfiguration.getType() == Type.ORDERED ) {
+            this.cache.add(0, job);
+        } else {
+            this.cache.add(job);
+        }
     }
 }

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
 Thu Oct 16 06:52:39 2014
@@ -41,6 +41,7 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.JobManagerImpl;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.TestLogger;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import 
org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
@@ -182,10 +183,7 @@ public class TopicManager implements Eve
             }
             final QueueInfo info = this.queueConfigMgr.getQueueInfo(topic);
             if ( changed ) {
-                logger.debug("Adding new topic {}", topic);
                 topicsChanged.set(true);
-                logger.info("Starting queue {}", info.queueName);
-
                 this.queueManager.start(this, info);
             } else {
                 final QueueJobCache cache = 
this.queueJobCaches.get(info.queueName);
@@ -240,6 +238,7 @@ public class TopicManager implements Eve
     private final Map<String, Object> queueLocks = new 
ConcurrentHashMap<String, Object>();
 
     public JobHandler take(final String queueName) {
+        logger.debug("Taking new job for {}", queueName);
         Object lock = new Object();
         this.queueLocks.put(queueName, lock);
         JobImpl result = null;
@@ -249,7 +248,9 @@ public class TopicManager implements Eve
                 final Map<String, QueueJobCache> mapping = 
this.updateConfiguration();
                 final QueueJobCache cache = mapping.get(queueName);
                 if ( cache != null ) {
+                    logger.debug("Getting new job from cache...");
                     result = cache.getNextJob();
+                    logger.debug("Job from cache={}", result);
                     if ( result != null ) {
                         isWaiting = false;
                     }
@@ -273,6 +274,7 @@ public class TopicManager implements Eve
         } finally {
             this.queueLocks.remove(queueName);
         }
+        logger.debug("Took new job for {} : {}", queueName, result);
         return (result != null ? new JobHandler( result, 
(JobManagerImpl)this.jobManager) : null);
     }
 
@@ -309,7 +311,7 @@ public class TopicManager implements Eve
                         for(final String t : topics) {
                             final Resource topicResource = 
baseResource.getChild(t.replace('/', '.'));
                             if ( topicResource != null ) {
-                                JobTopicTraverser.traverse(logger, 
topicResource, new JobTopicTraverser.Handler() {
+                                JobTopicTraverser.traverse(logger, 
topicResource, new JobTopicTraverser.JobCallback() {
 
                                     @Override
                                     public boolean handle(final JobImpl job) {
@@ -346,11 +348,22 @@ public class TopicManager implements Eve
         if ( this.isActive.get() ) {
             this.initialScan();
             for(final Map.Entry<String, QueueJobCache> entry : 
this.updateConfiguration().entrySet()) {
-                logger.info("Starting queue {}", entry.getKey());
-
                 this.queueManager.start(this, entry.getValue().getQueueInfo());
             }
         }
     }
 
+    public void reschedule(final JobImpl job) {
+        final QueueInfo info = 
this.queueConfigMgr.getQueueInfo(job.getTopic());
+        final QueueJobCache cache = this.queueJobCaches.get(info.queueName);
+        if ( cache != null ) {
+            cache.reschedule(job);
+            final Object lock = this.queueLocks.get(info.queueName);
+            if ( lock != null ) {
+                synchronized ( lock ) {
+                    lock.notify();
+                }
+            }
+        }
+    }
 }

Copied: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
 (from r1632213, 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java)
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java&r1=1632213&r2=1632217&rev=1632217&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java
 Thu Oct 16 06:52:39 2014
@@ -30,20 +30,22 @@ import org.apache.sling.api.resource.Val
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import 
org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.QueueConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Maintenance task...
- *
- * In the default configuration, this task runs every minute
+ * The check topolgoy task checks for changes in the topology and queue 
configuration
+ * and reassigns jobs.
+ * If the leader instance finds a dead instance it reassigns its jobs to live 
instances.
+ * The leader instance also checks for unassigned jobs and tries to assign 
them.
+ * If an instance detects jobs which it doesn't process anymore it reassigns 
them as
+ * well.
  */
-public class MaintenanceTask {
+public class CheckTopologyTask {
 
     /** Logger. */
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -54,13 +56,12 @@ public class MaintenanceTask {
     /**
      * Constructor
      */
-    public MaintenanceTask(final JobManagerConfiguration config) {
+    public CheckTopologyTask(final JobManagerConfiguration config) {
         this.configuration = config;
     }
 
-    private void reassignJobs(final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager) {
-        if ( caps != null && caps.isLeader() ) {
+    private void reassignJobsFromStoppedInstances(final TopologyCapabilities 
caps) {
+        if ( caps != null && caps.isLeader() && caps.isActive() ) {
             this.logger.debug("Checking for stopped instances...");
             final ResourceResolver resolver = 
this.configuration.createResourceResolver();
             try {
@@ -76,7 +77,7 @@ public class MaintenanceTask {
                         final String instanceId = instanceResource.getName();
                         if ( !caps.isActive(instanceId) ) {
                             logger.debug("Found stopped instance {}", 
instanceId);
-                            assignJobs(caps, queueManager, instanceResource, 
true);
+                            assignJobs(caps, instanceResource, true);
                         }
                     }
                 }
@@ -92,8 +93,7 @@ public class MaintenanceTask {
      * - topology
      * - capabilities
      */
-    private void assignUnassignedJobs(final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager) {
+    private void assignUnassignedJobs(final TopologyCapabilities caps) {
         if ( caps != null && caps.isLeader() ) {
             logger.debug("Checking unassigned jobs...");
             final ResourceResolver resolver = 
this.configuration.createResourceResolver();
@@ -103,7 +103,7 @@ public class MaintenanceTask {
 
                 // this resource should exist, but we check anyway
                 if ( unassignedRoot != null ) {
-                    assignJobs(caps, queueManager, unassignedRoot, false);
+                    assignJobs(caps, unassignedRoot, false);
                 }
             } finally {
                 resolver.close();
@@ -114,9 +114,11 @@ public class MaintenanceTask {
     /**
      * Try to assign all jobs from the jobs root.
      * The jobs are stored by topic
+     * @param caps The topology capabilities
+     * @param jobsRoot The root of the jobs
+     * @param unassign Whether to unassign the job if no instance is found.
      */
     private void assignJobs(final TopologyCapabilities caps,
-            final QueueConfigurationManager queueManager,
             final Resource jobsRoot,
             final boolean unassign) {
         final ResourceResolver resolver = jobsRoot.getResourceResolver();
@@ -138,107 +140,76 @@ public class MaintenanceTask {
             // first check if there is an instance for these topics
             final List<InstanceDescription> potentialTargets = 
caps.getPotentialTargets(checkTopic, null);
             if ( potentialTargets != null && potentialTargets.size() > 0 ) {
-                final QueueInfo info = queueManager.getQueueInfo(topicName);
+                final QueueInfo info = caps.getQueueInfo(topicName);
                 logger.debug("Found queue {} for {}", info.queueConfiguration, 
topicName);
 
-                // if queue is configured to drop, we drop
-                if ( info.queueConfiguration.getType() ==  
QueueConfiguration.Type.DROP) {
-                    final Iterator<Resource> i = topicResource.listChildren();
-                    while ( caps.isActive() && i.hasNext() ) {
-                        final Resource rsrc = i.next();
+                JobTopicTraverser.traverse(this.logger, topicResource, new 
JobTopicTraverser.ResourceCallback() {
+
+                    @Override
+                    public boolean handle(final Resource rsrc) {
                         try {
-                            resolver.delete(rsrc);
-                            resolver.commit();
-                        } catch ( final PersistenceException pe ) {
-                            this.ignoreException(pe);
-                            resolver.refresh();
-                        }
-                    }
-                } else if ( info.queueConfiguration.getType() != 
QueueConfiguration.Type.IGNORE ) {
-                    // if the queue is not configured to ignore, we can 
reschedule
-                    for(final Resource yearResource : 
topicResource.getChildren() ) {
-                        for(final Resource monthResource : 
yearResource.getChildren() ) {
-                            for(final Resource dayResource : 
monthResource.getChildren() ) {
-                                for(final Resource hourResource : 
dayResource.getChildren() ) {
-                                    for(final Resource minuteResource : 
hourResource.getChildren() ) {
-                                        for(final Resource rsrc : 
minuteResource.getChildren() ) {
-
-                                            if ( !caps.isActive() ) {
-                                                return;
-                                            }
-
-                                            try {
-                                                final ValueMap vm = 
ResourceHelper.getValueMap(rsrc);
-                                                final String targetId = 
caps.detectTarget(topicName, vm, info);
-
-                                                if ( targetId != null ) {
-                                                    final String newPath = 
this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + 
topicResource.getName() + 
rsrc.getPath().substring(topicResource.getPath().length());
-                                                    final Map<String, Object> 
props = new HashMap<String, Object>(vm);
-                                                    
props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
-                                                    
props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
-                                                    
props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-                                                    try {
-                                                        
ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                                                        resolver.delete(rsrc);
-                                                        resolver.commit();
-                                                    } catch ( final 
PersistenceException pe ) {
-                                                        
this.ignoreException(pe);
-                                                        resolver.refresh();
-                                                    }
-                                                }
-                                            } catch (final 
InstantiationException ie) {
-                                                // something happened with the 
resource in the meantime
-                                                this.ignoreException(ie);
-                                                resolver.refresh();
-                                            }
-                                        }
-                                    }
+                            final ValueMap vm = 
ResourceHelper.getValueMap(rsrc);
+                            final String targetId = 
caps.detectTarget(topicName, vm, info);
+
+                            if ( targetId != null ) {
+                                final String newPath = 
configuration.getAssginedJobsPath() + '/' + targetId + '/' + 
topicResource.getName() + 
rsrc.getPath().substring(topicResource.getPath().length());
+                                final Map<String, Object> props = new 
HashMap<String, Object>(vm);
+                                props.put(Job.PROPERTY_JOB_QUEUE_NAME, 
info.queueName);
+                                props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, 
targetId);
+                                props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+                                try {
+                                    
ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                                    resolver.delete(rsrc);
+                                    resolver.commit();
+                                } catch ( final PersistenceException pe ) {
+                                    ignoreException(pe);
+                                    resolver.refresh();
+                                    resolver.revert();
                                 }
                             }
+                        } catch (final InstantiationException ie) {
+                            // something happened with the resource in the 
meantime
+                            ignoreException(ie);
+                            resolver.refresh();
+                            resolver.revert();
                         }
+                        return caps.isActive();
                     }
-                }
+                });
             }
+            // now unassign if there are still jobs
             if ( caps.isActive() && unassign ) {
                 // we have to move everything to the unassigned area
-                for(final Resource yearResource : topicResource.getChildren() 
) {
-                    for(final Resource monthResource : 
yearResource.getChildren() ) {
-                        for(final Resource dayResource : 
monthResource.getChildren() ) {
-                            for(final Resource hourResource : 
dayResource.getChildren() ) {
-                                for(final Resource minuteResource : 
hourResource.getChildren() ) {
-                                    for(final Resource rsrc : 
minuteResource.getChildren() ) {
-
-                                        if ( !caps.isActive() ) {
-                                            return;
-                                        }
-
-                                        try {
-                                            final ValueMap vm = 
ResourceHelper.getValueMap(rsrc);
-                                            final String newPath = 
this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + 
rsrc.getPath().substring(topicResource.getPath().length());
-                                            final Map<String, Object> props = 
new HashMap<String, Object>(vm);
-                                            
props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
-                                            
props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
-                                            
props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
-                                            try {
-                                                
ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                                                resolver.delete(rsrc);
-                                                resolver.commit();
-                                            } catch ( final 
PersistenceException pe ) {
-                                                this.ignoreException(pe);
-                                                resolver.refresh();
-                                            }
-                                        } catch (final InstantiationException 
ie) {
-                                            // something happened with the 
resource in the meantime
-                                            this.ignoreException(ie);
-                                            resolver.refresh();
-                                        }
-                                    }
-                                }
+                JobTopicTraverser.traverse(this.logger, topicResource, new 
JobTopicTraverser.ResourceCallback() {
+
+                    @Override
+                    public boolean handle(final Resource rsrc) {
+                        try {
+                            final ValueMap vm = 
ResourceHelper.getValueMap(rsrc);
+                            final String newPath = 
configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + 
rsrc.getPath().substring(topicResource.getPath().length());
+                            final Map<String, Object> props = new 
HashMap<String, Object>(vm);
+                            props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                            props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                            props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+                            try {
+                                ResourceHelper.getOrCreateResource(resolver, 
newPath, props);
+                                resolver.delete(rsrc);
+                                resolver.commit();
+                            } catch ( final PersistenceException pe ) {
+                                ignoreException(pe);
+                                resolver.refresh();
+                                resolver.revert();
                             }
+                        } catch (final InstantiationException ie) {
+                            // something happened with the resource in the 
meantime
+                            ignoreException(ie);
+                            resolver.refresh();
+                            resolver.revert();
                         }
+                        return caps.isActive();
                     }
-                }
+                });
             }
         }
     }
@@ -247,16 +218,15 @@ public class MaintenanceTask {
      * One maintenance run
      */
     public void run(final TopologyCapabilities topologyCapabilities,
-            final QueueConfigurationManager queueManager,
             final boolean topologyChanged,
             final boolean configChanged) {
         // if topology changed, reschedule assigned jobs for stopped instances
         if ( topologyChanged ) {
-            this.reassignJobs(topologyCapabilities, queueManager);
+            this.reassignJobsFromStoppedInstances(topologyCapabilities);
         }
         // try to assign unassigned jobs
         if ( topologyChanged || configChanged ) {
-            this.assignUnassignedJobs(topologyCapabilities, queueManager);
+            this.assignUnassignedJobs(topologyCapabilities);
         }
     }
 

Copied: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java
 (from r1632213, 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java)
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java&r1=1632213&r2=1632217&rev=1632217&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java
 Thu Oct 16 06:52:39 2014
@@ -26,12 +26,16 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.topics.JobTopicTraverser;
+import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.jobs.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RestartTask {
+/**
+ * This task is executed when the job handling starts.
+ * It checks for unfinished jobs from a previous start and corrects their 
state.
+ */
+public class FindUnfinishedJobsTask {
 
     /** Logger. */
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -73,7 +77,7 @@ public class RestartTask {
     private void initTopic(final Resource topicResource) {
         logger.debug("Initializing topic {}...", topicResource.getName());
 
-        JobTopicTraverser.traverse(logger, topicResource, new 
JobTopicTraverser.Handler() {
+        JobTopicTraverser.traverse(logger, topicResource, new 
JobTopicTraverser.JobCallback() {
 
             @Override
             public boolean handle(final JobImpl job) {

Modified: 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java
 Thu Oct 16 06:52:39 2014
@@ -108,12 +108,12 @@ public class TopologyHandler
             final UpgradeTask task = new UpgradeTask();
             task.run(this.configuration, this.topologyCapabilities, 
queueManager);
 
-            final RestartTask rt = new RestartTask();
+            final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask();
             rt.run(this.configuration);
         }
 
-        final MaintenanceTask mt = new MaintenanceTask(this.configuration);
-        mt.run(topologyCapabilities, queueManager, !isConfigChange, 
isConfigChange);
+        final CheckTopologyTask mt = new CheckTopologyTask(this.configuration);
+        mt.run(topologyCapabilities, !isConfigChange, isConfigChange);
 
         if ( !isConfigChange ) {
             // start listeners
@@ -160,6 +160,10 @@ public class TopologyHandler
         }
     }
 
+    /**
+     * Add a topology aware listener
+     * @param service Listener to notify about changes.
+     */
     public void addListener(final TopologyAware service) {
         synchronized ( this.listeners ) {
             this.listeners.add(service);
@@ -167,6 +171,10 @@ public class TopologyHandler
         }
     }
 
+    /**
+     * Remove a topology aware listener
+     * @param service Listener to notify about changes.
+     */
     public void removeListener(final TopologyAware service) {
         synchronized ( this.listeners )  {
             this.listeners.remove(service);

Modified: 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java?rev=1632217&r1=1632216&r2=1632217&view=diff
==============================================================================
--- 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
 (original)
+++ 
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
 Thu Oct 16 06:52:39 2014
@@ -103,7 +103,7 @@ public class OrderedQueueTest extends Ab
                         final int counter = job.getProperty("counter", -10);
                         assertNotEquals("Counter property is missing", -10, 
counter);
                         assertTrue("Counter should only increment by max of 1 
" + counter + " - " + lastCounter,
-                                   counter == lastCounter || counter == 
lastCounter +1);
+                                counter == lastCounter || counter == 
lastCounter +1);
                         lastCounter = counter;
                         if ("sling/orderedtest/start".equals(job.getTopic()) ) 
{
                             cb.block();


Reply via email to