Author: cziegeler Date: Thu Oct 16 06:52:39 2014 New Revision: 1632217 URL: http://svn.apache.org/r1632217 Log: SLING-4048 : Avoid keeping jobs in memory. Refactor job traversal and implement different queue strategies (WiP)
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java - copied, changed from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java - copied, changed from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java - copied, changed from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java Removed: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java (from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java) URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java&r1=1632213&r2=1632217&rev=1632217&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobTopicTraverser.java Thu Oct 16 06:52:39 2014 @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.event.impl.jobs.topics; +package org.apache.sling.event.impl.jobs; import java.util.ArrayList; import java.util.Collections; @@ -24,20 +24,68 @@ import java.util.Iterator; import java.util.List; import org.apache.sling.api.resource.Resource; -import org.apache.sling.event.impl.jobs.JobImpl; -import org.apache.sling.event.impl.jobs.Utility; import org.slf4j.Logger; +/** + * The job topic traverser is an utility class to traverse all jobs + * of a specific topic in order of creation. + */ public class JobTopicTraverser { - public interface Handler { + /** + * Callback called for each found job. + */ + public interface JobCallback { + + /** + * Callback handle for a job + * @param job The job to handle + * @return <code>true</code> If processing should continue, <code>false</code> otherwise. + */ boolean handle(final JobImpl job); } + /** + * Callback called for each found resource. + */ + public interface ResourceCallback { + + /** + * Callback handle for a resource + * @param rsrc The resource to handle + * @return <code>true</code> If processing should continue, <code>false</code> otherwise. + */ + boolean handle(final Resource rsrc); + } + + /** + * Traverse the topic and call the callback for each found job. + * + * Once the callback notifies to stop traversing by returning false, the current minute + * will be processed completely (to ensure correct ordering of jobs) and then the + * traversal stops. + * + * @param logger The logger to use for debug logging + * @param topicResource The topic resource + * @param handler The callback + */ + public static void traverse(final Logger logger, + final Resource topicResource, + final JobCallback handler) { + traverse(logger, topicResource, handler, null); + } + public static void traverse(final Logger logger, final Resource topicResource, - final Handler handler) { - logger.debug("Processing topic {}", topicResource.getName()); + final ResourceCallback handler) { + traverse(logger, topicResource, null, handler); + } + + private static void traverse(final Logger logger, + final Resource topicResource, + final JobCallback jobHandler, + final ResourceCallback resourceHandler) { + logger.debug("Processing topic {}", topicResource.getName().replace('.', '/')); // now years for(final Resource yearResource: Utility.getSortedChildren(logger, "year", topicResource)) { final int year = Integer.valueOf(yearResource.getName()); @@ -69,23 +117,31 @@ public class JobTopicTraverser { while ( jobIter.hasNext() ) { final Resource jobResource = jobIter.next(); - final JobImpl job = Utility.readJob(logger, jobResource); - if ( job != null ) { - logger.debug("Found job {}", jobResource.getName()); - jobs.add(job); + if ( resourceHandler != null ) { + if ( !resourceHandler.handle(jobResource) ) { + return; + } + } else { + final JobImpl job = Utility.readJob(logger, jobResource); + if ( job != null ) { + logger.debug("Found job {}", jobResource.getName()); + jobs.add(job); + } } } - Collections.sort(jobs); + if ( jobHandler != null ) { + Collections.sort(jobs); - boolean stop = false; - for(final JobImpl job : jobs) { - if ( !handler.handle(job) ) { - stop = true; + boolean stop = false; + for(final JobImpl job : jobs) { + if ( !jobHandler.handle(job) ) { + stop = true; + } + } + if ( stop ) { + return; } - } - if ( stop ) { - return; } } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632217&r1=1632216&r2=1632217&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Oct 16 06:52:39 2014 @@ -677,9 +677,12 @@ public abstract class AbstractJobQueue // we keep cancelled jobs and succeeded jobs if the queue is configured like this. final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs(); handler.finished(resultState, keepJobs, rescheduleInfo.processingTime); + } else { + this.services.topicManager.reschedule(handler.getJob()); } this.notifyFinished(rescheduleInfo.reschedule); + return rescheduleInfo.reschedule; } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java?rev=1632217&r1=1632216&r2=1632217&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java Thu Oct 16 06:52:39 2014 @@ -28,11 +28,9 @@ import org.apache.felix.scr.annotations. import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.Service; -import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ResourceResolver; -import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.api.resource.ResourceUtil; import org.apache.sling.api.resource.ValueMap; import org.apache.sling.event.impl.jobs.JobImpl; @@ -70,9 +68,6 @@ public class HistoryCleanUpTask implemen private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Reference - private ResourceResolverFactory resourceResolverFactory; - - @Reference private JobManagerConfiguration configuration; @Override @@ -100,10 +95,8 @@ public class HistoryCleanUpTask implemen } else { stateList = null; } - ResourceResolver resolver = null; + final ResourceResolver resolver = this.configuration.createResourceResolver(); try { - resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null); - if ( stateList == null || stateList.contains(Job.JobState.SUCCEEDED.name()) ) { this.cleanup(removeDate, resolver, context, configuration.getStoredSuccessfulJobsPath(), topics, null); } @@ -117,12 +110,8 @@ public class HistoryCleanUpTask implemen } catch (final PersistenceException pe) { // in the case of an error, we just log this as a warning this.logger.warn("Exception during job resource tree cleanup.", pe); - } catch (final LoginException ignore) { - this.ignoreException(ignore); } finally { - if ( resolver != null ) { - resolver.close(); - } + resolver.close(); } return context.result().succeeded(); } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java?rev=1632217&r1=1632216&r2=1632217&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java Thu Oct 16 06:52:39 2014 @@ -21,6 +21,7 @@ package org.apache.sling.event.impl.jobs import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -29,13 +30,18 @@ import org.apache.sling.api.resource.Res import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.event.impl.jobs.JobImpl; import org.apache.sling.event.impl.jobs.JobManagerConfiguration; +import org.apache.sling.event.impl.jobs.JobTopicTraverser; import org.apache.sling.event.impl.jobs.TestLogger; import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo; +import org.apache.sling.event.jobs.QueueConfiguration.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * TODO - note last scan time and not all new observation events to avoid unnecessary rescan + * The queue job cache caches jobs per queue based on the topics the queue is actively + * processing. + * + * TODO cache needs to be synchronized! */ public class QueueJobCache { @@ -49,7 +55,9 @@ public class QueueJobCache { private final Set<String> topics; - private final Map<String, List<JobImpl>> cache = new HashMap<String, List<JobImpl>>(); + private final Set<String> topicsWithNewJobs = new HashSet<String>(); + + private final List<JobImpl> cache = new ArrayList<JobImpl>(); private final QueueInfo info; @@ -59,99 +67,145 @@ public class QueueJobCache { this.configuration = configuration; this.info = info; this.topics = topics; - for(final String topic : topics) { - this.cache.put(topic, new ArrayList<JobImpl>()); - } + this.topicsWithNewJobs.addAll(topics); } + /** + * Return the queue info for this queue. + * @return The queue info + */ public QueueInfo getQueueInfo() { return this.info; } + /** + * All topics of this queue. + * @return The topics. + */ public Set<String> getTopics() { return this.topics; } /** * Get the next job - this method is not called concurrently - * TODO This is very expensive atm */ public JobImpl getNextJob() { JobImpl result = null; - // check state of cache - this.loadJobs(); + if ( this.cache.isEmpty() ) { + final Set<String> checkingTopics = new HashSet<String>(); + synchronized ( this.topicsWithNewJobs ) { + checkingTopics.addAll(this.topicsWithNewJobs); + this.topicsWithNewJobs.clear(); + } + if ( !checkingTopics.isEmpty() ) { + this.loadJobs(checkingTopics); + } + } - final List<JobImpl> allJobs = new ArrayList<JobImpl>(); - for(final Map.Entry<String, List<JobImpl>> entry : this.cache.entrySet()) { - allJobs.addAll(entry.getValue()); - } - Collections.sort(allJobs); - if ( allJobs.size() > 0 ) { - result = allJobs.get(0); + if ( !this.cache.isEmpty() ) { + result = this.cache.remove(0); } + return result; } /** * Load the next N x numberOf(topics) jobs */ - private void loadJobs() { + private void loadJobs( final Set<String> checkingTopics) { logger.debug("Starting jobs loading..."); - ResourceResolver resolver = null; + final Map<String, List<JobImpl>> topicCache = new HashMap<String, List<JobImpl>>(); + + final ResourceResolver resolver = this.configuration.createResourceResolver(); try { - for(final String topic : this.topics) { - final List<JobImpl> list = this.cache.get(topic); - if ( list.size() < this.maxPreloadLimit ) { - list.clear(); - if ( resolver == null ) { - resolver = this.configuration.createResourceResolver(); - } + for(final String topic : checkingTopics) { + final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath()); - final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath()); + final List<JobImpl> list = new ArrayList<JobImpl>(); + topicCache.put(topic, list); - // sanity check - should never be null - if ( baseResource != null ) { - final Resource topicResource = baseResource.getChild(topic.replace('/', '.')); - if ( topicResource != null ) { - loadJobs(topic, topicResource); - } + // sanity check - should never be null + if ( baseResource != null ) { + final Resource topicResource = baseResource.getChild(topic.replace('/', '.')); + if ( topicResource != null ) { + loadJobs(topic, topicResource, list); } } } } finally { - if ( resolver != null ) { - resolver.close(); + resolver.close(); + } + orderTopics(topicCache); + + logger.debug("Finished jobs loading {}", this.cache.size()); + } + + /** + * Order the topics based on the queue type and put them in the cache. + * @param topicCache The topic based cache + */ + private void orderTopics(final Map<String, List<JobImpl>> topicCache) { + if ( this.info.queueConfiguration.getType() == Type.ORDERED + || this.info.queueConfiguration.getType() == Type.UNORDERED) { + for(final List<JobImpl> list : topicCache.values()) { + this.cache.addAll(list); } + Collections.sort(this.cache); + } else { + // topic round robin + boolean done = true; + do { + for(final Map.Entry<String, List<JobImpl>> entry : topicCache.entrySet()) { + if ( !entry.getValue().isEmpty() ) { + this.cache.add(entry.getValue().remove(0)); + if ( !entry.getValue().isEmpty() ) { + done = false; + } + } + } + } while ( !done ) ; } - logger.debug("Finished jobs loading"); } /** * Load the next N x numberOf(topics) jobs */ - private void loadJobs(final String topic, final Resource topicResource) { + private void loadJobs(final String topic, final Resource topicResource, final List<JobImpl> list) { logger.debug("Loading jobs from topic {}", topic); - final List<JobImpl> result = this.cache.get(topic); - JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.Handler() { + JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() { @Override public boolean handle(final JobImpl job) { if ( job.getProcessingStarted() == null && !job.hasReadErrors() ) { - result.add(job); + list.add(job); } else { logger.debug("Discarding job because {} or {}", job.getProcessingStarted(), job.hasReadErrors()); } - return result.size() < maxPreloadLimit; + return list.size() < maxPreloadLimit; } }); - logger.debug("Caching {} jobs for topic {}", result.size(), topic); + logger.debug("Caching {} jobs for topic {}", list.size(), topic); } + /** + * Mark the topic to contain new jobs. + * @param topic The topic + */ public void handleNewJob(final String topic) { - // TODO Auto-generated method stub + logger.debug("Update cache to handle new event for topic {}", topic); + synchronized ( this.topicsWithNewJobs ) { + this.topicsWithNewJobs.add(topic); + } + } + public void reschedule(final JobImpl job) { + if ( this.info.queueConfiguration.getType() == Type.ORDERED ) { + this.cache.add(0, job); + } else { + this.cache.add(job); + } } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java?rev=1632217&r1=1632216&r2=1632217&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java Thu Oct 16 06:52:39 2014 @@ -41,6 +41,7 @@ import org.apache.sling.event.impl.jobs. import org.apache.sling.event.impl.jobs.JobImpl; import org.apache.sling.event.impl.jobs.JobManagerConfiguration; import org.apache.sling.event.impl.jobs.JobManagerImpl; +import org.apache.sling.event.impl.jobs.JobTopicTraverser; import org.apache.sling.event.impl.jobs.TestLogger; import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo; @@ -182,10 +183,7 @@ public class TopicManager implements Eve } final QueueInfo info = this.queueConfigMgr.getQueueInfo(topic); if ( changed ) { - logger.debug("Adding new topic {}", topic); topicsChanged.set(true); - logger.info("Starting queue {}", info.queueName); - this.queueManager.start(this, info); } else { final QueueJobCache cache = this.queueJobCaches.get(info.queueName); @@ -240,6 +238,7 @@ public class TopicManager implements Eve private final Map<String, Object> queueLocks = new ConcurrentHashMap<String, Object>(); public JobHandler take(final String queueName) { + logger.debug("Taking new job for {}", queueName); Object lock = new Object(); this.queueLocks.put(queueName, lock); JobImpl result = null; @@ -249,7 +248,9 @@ public class TopicManager implements Eve final Map<String, QueueJobCache> mapping = this.updateConfiguration(); final QueueJobCache cache = mapping.get(queueName); if ( cache != null ) { + logger.debug("Getting new job from cache..."); result = cache.getNextJob(); + logger.debug("Job from cache={}", result); if ( result != null ) { isWaiting = false; } @@ -273,6 +274,7 @@ public class TopicManager implements Eve } finally { this.queueLocks.remove(queueName); } + logger.debug("Took new job for {} : {}", queueName, result); return (result != null ? new JobHandler( result, (JobManagerImpl)this.jobManager) : null); } @@ -309,7 +311,7 @@ public class TopicManager implements Eve for(final String t : topics) { final Resource topicResource = baseResource.getChild(t.replace('/', '.')); if ( topicResource != null ) { - JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.Handler() { + JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() { @Override public boolean handle(final JobImpl job) { @@ -346,11 +348,22 @@ public class TopicManager implements Eve if ( this.isActive.get() ) { this.initialScan(); for(final Map.Entry<String, QueueJobCache> entry : this.updateConfiguration().entrySet()) { - logger.info("Starting queue {}", entry.getKey()); - this.queueManager.start(this, entry.getValue().getQueueInfo()); } } } + public void reschedule(final JobImpl job) { + final QueueInfo info = this.queueConfigMgr.getQueueInfo(job.getTopic()); + final QueueJobCache cache = this.queueJobCaches.get(info.queueName); + if ( cache != null ) { + cache.reschedule(job); + final Object lock = this.queueLocks.get(info.queueName); + if ( lock != null ) { + synchronized ( lock ) { + lock.notify(); + } + } + } + } } Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java (from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java) URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java&r1=1632213&r2=1632217&rev=1632217&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/MaintenanceTask.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/CheckTopologyTask.java Thu Oct 16 06:52:39 2014 @@ -30,20 +30,22 @@ import org.apache.sling.api.resource.Val import org.apache.sling.discovery.InstanceDescription; import org.apache.sling.event.impl.jobs.JobImpl; import org.apache.sling.event.impl.jobs.JobManagerConfiguration; -import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; +import org.apache.sling.event.impl.jobs.JobTopicTraverser; import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo; import org.apache.sling.event.impl.support.ResourceHelper; import org.apache.sling.event.jobs.Job; -import org.apache.sling.event.jobs.QueueConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Maintenance task... - * - * In the default configuration, this task runs every minute + * The check topolgoy task checks for changes in the topology and queue configuration + * and reassigns jobs. + * If the leader instance finds a dead instance it reassigns its jobs to live instances. + * The leader instance also checks for unassigned jobs and tries to assign them. + * If an instance detects jobs which it doesn't process anymore it reassigns them as + * well. */ -public class MaintenanceTask { +public class CheckTopologyTask { /** Logger. */ private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -54,13 +56,12 @@ public class MaintenanceTask { /** * Constructor */ - public MaintenanceTask(final JobManagerConfiguration config) { + public CheckTopologyTask(final JobManagerConfiguration config) { this.configuration = config; } - private void reassignJobs(final TopologyCapabilities caps, - final QueueConfigurationManager queueManager) { - if ( caps != null && caps.isLeader() ) { + private void reassignJobsFromStoppedInstances(final TopologyCapabilities caps) { + if ( caps != null && caps.isLeader() && caps.isActive() ) { this.logger.debug("Checking for stopped instances..."); final ResourceResolver resolver = this.configuration.createResourceResolver(); try { @@ -76,7 +77,7 @@ public class MaintenanceTask { final String instanceId = instanceResource.getName(); if ( !caps.isActive(instanceId) ) { logger.debug("Found stopped instance {}", instanceId); - assignJobs(caps, queueManager, instanceResource, true); + assignJobs(caps, instanceResource, true); } } } @@ -92,8 +93,7 @@ public class MaintenanceTask { * - topology * - capabilities */ - private void assignUnassignedJobs(final TopologyCapabilities caps, - final QueueConfigurationManager queueManager) { + private void assignUnassignedJobs(final TopologyCapabilities caps) { if ( caps != null && caps.isLeader() ) { logger.debug("Checking unassigned jobs..."); final ResourceResolver resolver = this.configuration.createResourceResolver(); @@ -103,7 +103,7 @@ public class MaintenanceTask { // this resource should exist, but we check anyway if ( unassignedRoot != null ) { - assignJobs(caps, queueManager, unassignedRoot, false); + assignJobs(caps, unassignedRoot, false); } } finally { resolver.close(); @@ -114,9 +114,11 @@ public class MaintenanceTask { /** * Try to assign all jobs from the jobs root. * The jobs are stored by topic + * @param caps The topology capabilities + * @param jobsRoot The root of the jobs + * @param unassign Whether to unassign the job if no instance is found. */ private void assignJobs(final TopologyCapabilities caps, - final QueueConfigurationManager queueManager, final Resource jobsRoot, final boolean unassign) { final ResourceResolver resolver = jobsRoot.getResourceResolver(); @@ -138,107 +140,76 @@ public class MaintenanceTask { // first check if there is an instance for these topics final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null); if ( potentialTargets != null && potentialTargets.size() > 0 ) { - final QueueInfo info = queueManager.getQueueInfo(topicName); + final QueueInfo info = caps.getQueueInfo(topicName); logger.debug("Found queue {} for {}", info.queueConfiguration, topicName); - // if queue is configured to drop, we drop - if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) { - final Iterator<Resource> i = topicResource.listChildren(); - while ( caps.isActive() && i.hasNext() ) { - final Resource rsrc = i.next(); + JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() { + + @Override + public boolean handle(final Resource rsrc) { try { - resolver.delete(rsrc); - resolver.commit(); - } catch ( final PersistenceException pe ) { - this.ignoreException(pe); - resolver.refresh(); - } - } - } else if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) { - // if the queue is not configured to ignore, we can reschedule - for(final Resource yearResource : topicResource.getChildren() ) { - for(final Resource monthResource : yearResource.getChildren() ) { - for(final Resource dayResource : monthResource.getChildren() ) { - for(final Resource hourResource : dayResource.getChildren() ) { - for(final Resource minuteResource : hourResource.getChildren() ) { - for(final Resource rsrc : minuteResource.getChildren() ) { - - if ( !caps.isActive() ) { - return; - } - - try { - final ValueMap vm = ResourceHelper.getValueMap(rsrc); - final String targetId = caps.detectTarget(topicName, vm, info); - - if ( targetId != null ) { - final String newPath = this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); - final Map<String, Object> props = new HashMap<String, Object>(vm); - props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName); - props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId); - props.remove(Job.PROPERTY_JOB_STARTED_TIME); - try { - ResourceHelper.getOrCreateResource(resolver, newPath, props); - resolver.delete(rsrc); - resolver.commit(); - } catch ( final PersistenceException pe ) { - this.ignoreException(pe); - resolver.refresh(); - } - } - } catch (final InstantiationException ie) { - // something happened with the resource in the meantime - this.ignoreException(ie); - resolver.refresh(); - } - } - } + final ValueMap vm = ResourceHelper.getValueMap(rsrc); + final String targetId = caps.detectTarget(topicName, vm, info); + + if ( targetId != null ) { + final String newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); + final Map<String, Object> props = new HashMap<String, Object>(vm); + props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName); + props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId); + props.remove(Job.PROPERTY_JOB_STARTED_TIME); + try { + ResourceHelper.getOrCreateResource(resolver, newPath, props); + resolver.delete(rsrc); + resolver.commit(); + } catch ( final PersistenceException pe ) { + ignoreException(pe); + resolver.refresh(); + resolver.revert(); } } + } catch (final InstantiationException ie) { + // something happened with the resource in the meantime + ignoreException(ie); + resolver.refresh(); + resolver.revert(); } + return caps.isActive(); } - } + }); } + // now unassign if there are still jobs if ( caps.isActive() && unassign ) { // we have to move everything to the unassigned area - for(final Resource yearResource : topicResource.getChildren() ) { - for(final Resource monthResource : yearResource.getChildren() ) { - for(final Resource dayResource : monthResource.getChildren() ) { - for(final Resource hourResource : dayResource.getChildren() ) { - for(final Resource minuteResource : hourResource.getChildren() ) { - for(final Resource rsrc : minuteResource.getChildren() ) { - - if ( !caps.isActive() ) { - return; - } - - try { - final ValueMap vm = ResourceHelper.getValueMap(rsrc); - final String newPath = this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); - final Map<String, Object> props = new HashMap<String, Object>(vm); - props.remove(Job.PROPERTY_JOB_QUEUE_NAME); - props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE); - props.remove(Job.PROPERTY_JOB_STARTED_TIME); - - try { - ResourceHelper.getOrCreateResource(resolver, newPath, props); - resolver.delete(rsrc); - resolver.commit(); - } catch ( final PersistenceException pe ) { - this.ignoreException(pe); - resolver.refresh(); - } - } catch (final InstantiationException ie) { - // something happened with the resource in the meantime - this.ignoreException(ie); - resolver.refresh(); - } - } - } + JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() { + + @Override + public boolean handle(final Resource rsrc) { + try { + final ValueMap vm = ResourceHelper.getValueMap(rsrc); + final String newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); + final Map<String, Object> props = new HashMap<String, Object>(vm); + props.remove(Job.PROPERTY_JOB_QUEUE_NAME); + props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE); + props.remove(Job.PROPERTY_JOB_STARTED_TIME); + + try { + ResourceHelper.getOrCreateResource(resolver, newPath, props); + resolver.delete(rsrc); + resolver.commit(); + } catch ( final PersistenceException pe ) { + ignoreException(pe); + resolver.refresh(); + resolver.revert(); } + } catch (final InstantiationException ie) { + // something happened with the resource in the meantime + ignoreException(ie); + resolver.refresh(); + resolver.revert(); } + return caps.isActive(); } - } + }); } } } @@ -247,16 +218,15 @@ public class MaintenanceTask { * One maintenance run */ public void run(final TopologyCapabilities topologyCapabilities, - final QueueConfigurationManager queueManager, final boolean topologyChanged, final boolean configChanged) { // if topology changed, reschedule assigned jobs for stopped instances if ( topologyChanged ) { - this.reassignJobs(topologyCapabilities, queueManager); + this.reassignJobsFromStoppedInstances(topologyCapabilities); } // try to assign unassigned jobs if ( topologyChanged || configChanged ) { - this.assignUnassignedJobs(topologyCapabilities, queueManager); + this.assignUnassignedJobs(topologyCapabilities); } } Copied: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java (from r1632213, sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java) URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java?p2=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java&p1=sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java&r1=1632213&r2=1632217&rev=1632217&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/RestartTask.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/FindUnfinishedJobsTask.java Thu Oct 16 06:52:39 2014 @@ -26,12 +26,16 @@ import org.apache.sling.api.resource.Res import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.event.impl.jobs.JobImpl; import org.apache.sling.event.impl.jobs.JobManagerConfiguration; -import org.apache.sling.event.impl.jobs.topics.JobTopicTraverser; +import org.apache.sling.event.impl.jobs.JobTopicTraverser; import org.apache.sling.event.jobs.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RestartTask { +/** + * This task is executed when the job handling starts. + * It checks for unfinished jobs from a previous start and corrects their state. + */ +public class FindUnfinishedJobsTask { /** Logger. */ private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -73,7 +77,7 @@ public class RestartTask { private void initTopic(final Resource topicResource) { logger.debug("Initializing topic {}...", topicResource.getName()); - JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.Handler() { + JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() { @Override public boolean handle(final JobImpl job) { Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java?rev=1632217&r1=1632216&r2=1632217&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyHandler.java Thu Oct 16 06:52:39 2014 @@ -108,12 +108,12 @@ public class TopologyHandler final UpgradeTask task = new UpgradeTask(); task.run(this.configuration, this.topologyCapabilities, queueManager); - final RestartTask rt = new RestartTask(); + final FindUnfinishedJobsTask rt = new FindUnfinishedJobsTask(); rt.run(this.configuration); } - final MaintenanceTask mt = new MaintenanceTask(this.configuration); - mt.run(topologyCapabilities, queueManager, !isConfigChange, isConfigChange); + final CheckTopologyTask mt = new CheckTopologyTask(this.configuration); + mt.run(topologyCapabilities, !isConfigChange, isConfigChange); if ( !isConfigChange ) { // start listeners @@ -160,6 +160,10 @@ public class TopologyHandler } } + /** + * Add a topology aware listener + * @param service Listener to notify about changes. + */ public void addListener(final TopologyAware service) { synchronized ( this.listeners ) { this.listeners.add(service); @@ -167,6 +171,10 @@ public class TopologyHandler } } + /** + * Remove a topology aware listener + * @param service Listener to notify about changes. + */ public void removeListener(final TopologyAware service) { synchronized ( this.listeners ) { this.listeners.remove(service); Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java?rev=1632217&r1=1632216&r2=1632217&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java (original) +++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java Thu Oct 16 06:52:39 2014 @@ -103,7 +103,7 @@ public class OrderedQueueTest extends Ab final int counter = job.getProperty("counter", -10); assertNotEquals("Counter property is missing", -10, counter); assertTrue("Counter should only increment by max of 1 " + counter + " - " + lastCounter, - counter == lastCounter || counter == lastCounter +1); + counter == lastCounter || counter == lastCounter +1); lastCounter = counter; if ("sling/orderedtest/start".equals(job.getTopic()) ) { cb.block();