Author: cziegeler Date: Wed Oct 15 11:30:09 2014 New Revision: 1631994 URL: http://svn.apache.org/r1631994 Log: SLING-4048 : Avoid keeping jobs in memory. Move topology handling to own handler service
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java (with props) sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java (with props) sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java (with props) sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java (with props) Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1631994&r1=1631993&r2=1631994&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Oct 15 11:30:09 2014 @@ -117,10 +117,6 @@ public class JobManagerImpl @Reference private EventAdmin eventAdmin; - /** The configuration manager. */ - @Reference - private QueueConfigurationManager queueConfigManager; - @Reference private Scheduler scheduler; @@ -137,6 +133,8 @@ public class JobManagerImpl @Reference private JobManagerConfiguration configuration; + @Reference + private QueueConfigurationManager queueManager; private volatile TopologyCapabilities topologyCapabilities; @@ -248,7 +246,7 @@ public class JobManagerImpl // invoke maintenance task final MaintenanceTask task = this.maintenanceTask; if ( task != null ) { - task.run(this.topologyCapabilities, this.queueConfigManager, this.schedulerRuns - 1); + task.run(this.topologyCapabilities, this.schedulerRuns - 1); } logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns); } @@ -270,7 +268,11 @@ public class JobManagerImpl } // get the queue configuration - final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic()); + final TopologyCapabilities caps = this.topologyCapabilities; + final QueueInfo queueInfo = caps != null ? caps.getQueueInfo(job.getTopic()) : null; + if ( queueInfo == null ) { + return; // TODO + } final InternalQueueConfiguration config = queueInfo.queueConfiguration; // Sanity check if queue configuration has changed @@ -288,7 +290,6 @@ public class JobManagerImpl } else { if ( reassign ) { - final TopologyCapabilities caps = this.topologyCapabilities; reassignTargetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo)); } else { @@ -1167,7 +1168,7 @@ public class JobManagerImpl final String jobName, final Map<String, Object> jobProperties, final List<String> errors) { - final QueueInfo info = this.queueConfigManager.getQueueInfo(jobTopic); + final QueueInfo info = this.queueManager.getQueueInfo(jobTopic); if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP ) { if ( logger.isDebugEnabled() ) { logger.debug("Dropping job due to configuration of queue {} : {}", info.queueName, Utility.toString(jobTopic, jobName, jobProperties)); @@ -1280,7 +1281,7 @@ public class JobManagerImpl } public void reassign(final JobImpl job) { - final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic()); + final QueueInfo queueInfo = queueManager.getQueueInfo(job.getTopic()); final InternalQueueConfiguration config = queueInfo.queueConfiguration; // Sanity check if queue configuration has changed @@ -1351,7 +1352,7 @@ public class JobManagerImpl final JobImpl job = (JobImpl)this.getJobById(jobId); if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) { // get the queue configuration - final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic()); + final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic()); final AbstractJobQueue queue; synchronized ( queuesLock ) { queue = this.queues.get(queueInfo.queueName); @@ -1361,6 +1362,7 @@ public class JobManagerImpl stopped = queue.stopJob(job); } if ( forward && !stopped ) { + // TODO why not remove the resource? // send remote event final Map<String, Object> props = new HashMap<String, Object>(); props.put(Utility.PROPERTY_ID, jobId); Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java?rev=1631994&r1=1631993&r2=1631994&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java Wed Oct 15 11:30:09 2014 @@ -18,8 +18,6 @@ */ package org.apache.sling.event.impl.jobs; -import java.io.IOException; -import java.io.ObjectInputStream; import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; @@ -32,15 +30,10 @@ import org.apache.sling.api.resource.Res import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceUtil; import org.apache.sling.api.resource.ValueMap; -import org.apache.sling.discovery.InstanceDescription; -import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; -import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo; import org.apache.sling.event.impl.support.BatchResourceRemover; -import org.apache.sling.event.impl.support.Environment; import org.apache.sling.event.impl.support.ResourceHelper; import org.apache.sling.event.impl.topology.TopologyCapabilities; import org.apache.sling.event.jobs.Job; -import org.apache.sling.event.jobs.QueueConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,14 +50,6 @@ public class MaintenanceTask { /** Job manager configuration. */ private final JobManagerConfiguration configuration; - /** Change count for queue configurations .*/ - private volatile long queueConfigChangeCount = -1; - - /** Change count for topology changes .*/ - private volatile long topologyChangeCount = -1; - - private boolean checkedForPreviousVersion = false; - /** * Constructor */ @@ -72,241 +57,11 @@ public class MaintenanceTask { this.configuration = config; } - private void reassignJobs(final TopologyCapabilities caps, - final QueueConfigurationManager queueManager) { - if ( caps != null && caps.isLeader() ) { - this.logger.debug("Checking for stopped instances..."); - final ResourceResolver resolver = this.configuration.createResourceResolver(); - try { - final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath()); - this.logger.debug("Got jobs root {}", jobsRoot); - - // this resource should exist, but we check anyway - if ( jobsRoot != null ) { - final Iterator<Resource> instanceIter = jobsRoot.listChildren(); - while ( caps.isActive() && instanceIter.hasNext() ) { - final Resource instanceResource = instanceIter.next(); - - final String instanceId = instanceResource.getName(); - if ( !caps.isActive(instanceId) ) { - logger.debug("Found stopped instance {}", instanceId); - assignJobs(caps, queueManager, instanceResource, true); - } - } - } - } finally { - resolver.close(); - } - } - } - - /** - * Try to assign unassigned jobs as there might be changes in: - * - queue configurations - * - topology - * - capabilities - */ - private void assignUnassignedJobs(final TopologyCapabilities caps, - final QueueConfigurationManager queueManager) { - if ( caps != null && caps.isLeader() ) { - logger.debug("Checking unassigned jobs..."); - final ResourceResolver resolver = this.configuration.createResourceResolver(); - try { - final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath()); - logger.debug("Got unassigned root {}", unassignedRoot); - - // this resource should exist, but we check anyway - if ( unassignedRoot != null ) { - assignJobs(caps, queueManager, unassignedRoot, false); - } - } finally { - resolver.close(); - } - } - } - - /** - * Try to assign all jobs from the jobs root. - * The jobs are stored by topic - */ - private void assignJobs(final TopologyCapabilities caps, - final QueueConfigurationManager queueManager, - final Resource jobsRoot, - final boolean unassign) { - final ResourceResolver resolver = jobsRoot.getResourceResolver(); - - final Iterator<Resource> topicIter = jobsRoot.listChildren(); - while ( caps.isActive() && topicIter.hasNext() ) { - final Resource topicResource = topicIter.next(); - - final String topicName = topicResource.getName().replace('.', '/'); - logger.debug("Found topic {}", topicName); - - final String checkTopic; - if ( topicName.equals(JobImpl.PROPERTY_BRIDGED_EVENT) ) { - checkTopic = "/"; - } else { - checkTopic = topicName; - } - - // first check if there is an instance for these topics - final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null); - if ( potentialTargets != null && potentialTargets.size() > 0 ) { - final QueueInfo info = queueManager.getQueueInfo(topicName); - logger.debug("Found queue {} for {}", info.queueConfiguration, topicName); - - // if queue is configured to drop, we drop - if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) { - final Iterator<Resource> i = topicResource.listChildren(); - while ( caps.isActive() && i.hasNext() ) { - final Resource rsrc = i.next(); - try { - resolver.delete(rsrc); - resolver.commit(); - } catch ( final PersistenceException pe ) { - this.ignoreException(pe); - resolver.refresh(); - } - } - } else if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) { - // if the queue is not configured to ignore, we can reschedule - for(final Resource yearResource : topicResource.getChildren() ) { - for(final Resource monthResource : yearResource.getChildren() ) { - for(final Resource dayResource : monthResource.getChildren() ) { - for(final Resource hourResource : dayResource.getChildren() ) { - for(final Resource minuteResource : hourResource.getChildren() ) { - for(final Resource rsrc : minuteResource.getChildren() ) { - - if ( !caps.isActive() ) { - return; - } - - try { - final ValueMap vm = ResourceHelper.getValueMap(rsrc); - final String targetId = caps.detectTarget(topicName, vm, info); - - if ( targetId != null ) { - final String newPath = this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); - final Map<String, Object> props = new HashMap<String, Object>(vm); - props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName); - props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId); - props.remove(Job.PROPERTY_JOB_STARTED_TIME); - try { - ResourceHelper.getOrCreateResource(resolver, newPath, props); - resolver.delete(rsrc); - resolver.commit(); - } catch ( final PersistenceException pe ) { - this.ignoreException(pe); - resolver.refresh(); - } - } - } catch (final InstantiationException ie) { - // something happened with the resource in the meantime - this.ignoreException(ie); - resolver.refresh(); - } - } - } - } - } - } - } - } - } - if ( caps.isActive() && unassign ) { - // we have to move everything to the unassigned area - for(final Resource yearResource : topicResource.getChildren() ) { - for(final Resource monthResource : yearResource.getChildren() ) { - for(final Resource dayResource : monthResource.getChildren() ) { - for(final Resource hourResource : dayResource.getChildren() ) { - for(final Resource minuteResource : hourResource.getChildren() ) { - for(final Resource rsrc : minuteResource.getChildren() ) { - - if ( !caps.isActive() ) { - return; - } - - try { - final ValueMap vm = ResourceHelper.getValueMap(rsrc); - final String newPath = this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); - final Map<String, Object> props = new HashMap<String, Object>(vm); - props.remove(Job.PROPERTY_JOB_QUEUE_NAME); - props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE); - props.remove(Job.PROPERTY_JOB_STARTED_TIME); - - try { - ResourceHelper.getOrCreateResource(resolver, newPath, props); - resolver.delete(rsrc); - resolver.commit(); - } catch ( final PersistenceException pe ) { - this.ignoreException(pe); - resolver.refresh(); - } - } catch (final InstantiationException ie) { - // something happened with the resource in the meantime - this.ignoreException(ie); - resolver.refresh(); - } - } - } - } - } - } - } - } - } - } - - /** - * Check if the topology has changed. - */ - private boolean topologyHasChanged(final TopologyCapabilities topologyCapabilities) { - boolean topologyChanged = false; - if ( topologyCapabilities != null ) { - if ( this.topologyChangeCount != topologyCapabilities.getChangeCount() ) { - this.topologyChangeCount = topologyCapabilities.getChangeCount(); - topologyChanged = true; - } - } - return topologyChanged; - } - - private boolean queueConfigurationHasChanged(final TopologyCapabilities topologyCapabilities, - final QueueConfigurationManager queueManager) { - boolean configChanged = false; - if ( topologyCapabilities != null ) { - final int queueChangeCount = queueManager.getChangeCount(); - if ( this.queueConfigChangeCount < queueChangeCount ) { - configChanged = true; - this.queueConfigChangeCount = queueChangeCount; - } - } - return configChanged; - } - /** * One maintenance run */ public void run(final TopologyCapabilities topologyCapabilities, - final QueueConfigurationManager queueManager, final long cleanUpCounter) { - // check topology and config change during each invocation - final boolean topologyChanged = this.topologyHasChanged(topologyCapabilities); - final boolean configChanged = this.queueConfigurationHasChanged(topologyCapabilities, queueManager); - - // if topology changed, reschedule assigned jobs for stopped instances - if ( topologyChanged ) { - this.reassignJobs(topologyCapabilities, queueManager); - } - // try to assign unassigned jobs - if ( topologyChanged || configChanged ) { - this.assignUnassignedJobs(topologyCapabilities, queueManager); - } - - if ( topologyChanged && !this.checkedForPreviousVersion && topologyCapabilities != null && topologyCapabilities.isLeader() ) { - this.processJobsFromPreviousVersions(topologyCapabilities, queueManager); - } - if ( topologyCapabilities != null ) { // Clean up final String cleanUpAssignedPath;; @@ -641,138 +396,4 @@ public class MaintenanceTask { this.logger.debug("Ignored exception " + e.getMessage(), e); } } - - /** - * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area - */ - private void processJobsFromPreviousVersions(final TopologyCapabilities caps, - final QueueConfigurationManager queueManager) { - final ResourceResolver resolver = this.configuration.createResourceResolver(); - try { - this.processJobsFromPreviousVersions(caps, queueManager, resolver.getResource(this.configuration.getPreviousVersionAnonPath())); - this.processJobsFromPreviousVersions(caps, queueManager, resolver.getResource(this.configuration.getPreviousVersionIdentifiedPath())); - this.checkedForPreviousVersion = true; - } catch ( final PersistenceException pe ) { - this.logger.warn("Problems moving jobs from previous version.", pe); - } finally { - resolver.close(); - } - } - - /** - * Recursively find jobs and move them - */ - private void processJobsFromPreviousVersions(final TopologyCapabilities caps, - final QueueConfigurationManager queueManager, - final Resource rsrc) throws PersistenceException { - if ( rsrc != null && caps.isActive() ) { - if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) { - this.moveJobFromPreviousVersion(caps, queueManager, rsrc); - } else { - for(final Resource child : rsrc.getChildren()) { - this.processJobsFromPreviousVersions(caps, queueManager, child); - } - if ( caps.isActive() ) { - rsrc.getResourceResolver().delete(rsrc); - rsrc.getResourceResolver().commit(); - rsrc.getResourceResolver().refresh(); - } - } - } - } - - /** - * Move a single job - */ - private void moveJobFromPreviousVersion(final TopologyCapabilities caps, - final QueueConfigurationManager queueManager, - final Resource jobResource) - throws PersistenceException { - final ResourceResolver resolver = jobResource.getResourceResolver(); - - try { - final ValueMap vm = ResourceHelper.getValueMap(jobResource); - // check for binary properties - Map<String, Object> binaryProperties = new HashMap<String, Object>(); - final ObjectInputStream ois = vm.get("slingevent:properties", ObjectInputStream.class); - if ( ois != null ) { - try { - int length = ois.readInt(); - for(int i=0;i<length;i++) { - final String key = (String)ois.readObject(); - final Object value = ois.readObject(); - binaryProperties.put(key, value); - } - } catch (final ClassNotFoundException cnfe) { - throw new PersistenceException("Class not found.", cnfe); - } catch (final java.io.InvalidClassException ice) { - throw new PersistenceException("Invalid class.", ice); - } catch (final IOException ioe) { - throw new PersistenceException("Unable to deserialize job properties.", ioe); - } finally { - try { - ois.close(); - } catch (final IOException ioe) { - this.ignoreException(ioe); - } - } - } - - final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm); - - properties.put(JobImpl.PROPERTY_BRIDGED_EVENT, true); - final String topic = (String)properties.remove("slingevent:topic"); - properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, topic); - - properties.remove(Job.PROPERTY_JOB_QUEUE_NAME); - properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE); - // and binary properties - properties.putAll(binaryProperties); - properties.remove("slingevent:properties"); - - if ( !properties.containsKey(Job.PROPERTY_JOB_RETRIES) ) { - properties.put(Job.PROPERTY_JOB_RETRIES, 10); // we put a dummy value here; this gets updated by the queue - } - if ( !properties.containsKey(Job.PROPERTY_JOB_RETRY_COUNT) ) { - properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0); - } - - final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null); - String targetId = null; - if ( potentialTargets != null && potentialTargets.size() > 0 ) { - final QueueInfo info = queueManager.getQueueInfo(topic); - logger.debug("Found queue {} for {}", info.queueConfiguration, topic); - // if queue is configured to drop, we drop - if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) { - resolver.delete(jobResource); - resolver.commit(); - return; - } - if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) { - targetId = caps.detectTarget(topic, vm, info); - if ( targetId != null ) { - properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName); - properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId); - properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries()); - } - } - } - - properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, "old:" + Environment.APPLICATION_ID); - properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB); - - final String jobId = this.configuration.getUniqueId(topic); - properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId); - properties.remove(Job.PROPERTY_JOB_STARTED_TIME); - - final String newPath = this.configuration.getUniquePath(targetId, topic, jobId, vm); - this.logger.debug("Moving 'old' job from {} to {}", jobResource.getPath(), newPath); - - ResourceHelper.getOrCreateResource(resolver, newPath, properties); - resolver.delete(jobResource); - resolver.commit(); - } catch (final InstantiationException ie) { - throw new PersistenceException("Exception while reading reasource: " + ie.getMessage(), ie.getCause()); - } - } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1631994&r1=1631993&r2=1631994&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Wed Oct 15 11:30:09 2014 @@ -31,7 +31,9 @@ import org.apache.sling.api.resource.Log import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.event.impl.support.ResourceHelper; import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; import org.osgi.util.tracker.ServiceTracker; +import org.osgi.util.tracker.ServiceTrackerCustomizer; /** @@ -41,6 +43,10 @@ import org.osgi.util.tracker.ServiceTrac @Service(value=QueueConfigurationManager.class) public class QueueConfigurationManager { + public interface QueueConfigurationChangeListener { + void configChanged(); + } + /** Configurations - ordered by service ranking. */ private volatile InternalQueueConfiguration[] orderedConfigs = new InternalQueueConfiguration[0]; @@ -53,6 +59,9 @@ public class QueueConfigurationManager { @Reference private MainQueueConfiguration mainQueueConfiguration; + /** Listeners. */ + private final List<QueueConfigurationChangeListener> listeners = new ArrayList<QueueConfigurationChangeListener>(); + /** * Activate this component. * Create the service tracker and start it. @@ -61,7 +70,24 @@ public class QueueConfigurationManager { protected void activate(final BundleContext bundleContext) throws LoginException, PersistenceException { this.configTracker = new ServiceTracker(bundleContext, - InternalQueueConfiguration.class.getName(), null); + InternalQueueConfiguration.class.getName(), new ServiceTrackerCustomizer() { + + @Override + public void removedService(final ServiceReference reference, final Object service) { + bundleContext.ungetService(reference); + updateListeners(); + } + + @Override + public void modifiedService(ServiceReference reference, Object service) { + // nothing to do + } + + @Override + public Object addingService(final ServiceReference reference) { + return bundleContext.getService(reference); + } + }); this.configTracker.open(); } @@ -145,4 +171,24 @@ public class QueueConfigurationManager { public int getChangeCount() { return this.configTracker.getTrackingCount(); } + + public void addListener(final QueueConfigurationChangeListener listener) { + synchronized ( this.listeners ) { + this.listeners.add(listener); + } + } + + public void removeListener(final QueueConfigurationChangeListener listener) { + synchronized ( this.listeners ) { + this.listeners.remove(listener); + } + } + + private void updateListeners() { + synchronized ( listeners ) { + for(final QueueConfigurationChangeListener l : listeners) { + l.configChanged(); + } + } + } } Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java?rev=1631994&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java (added) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java Wed Oct 15 11:30:09 2014 @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.jobs.topics; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.sling.api.resource.Resource; +import org.apache.sling.event.impl.jobs.JobImpl; +import org.apache.sling.event.impl.jobs.Utility; +import org.slf4j.Logger; + +public class JobTopicTraverser { + + public interface Handler { + boolean handle(final JobImpl job); + } + + public static void traverse(final Logger logger, + final Resource topicResource, + final Handler handler) { + logger.debug("Processing topic {}", topicResource.getName()); + // now years + for(final Resource yearResource: Utility.getSortedChildren(logger, "year", topicResource)) { + final int year = Integer.valueOf(yearResource.getName()); + logger.debug("Processing year {}", year); + + // now months + for(final Resource monthResource: Utility.getSortedChildren(logger, "month", yearResource)) { + final int month = Integer.valueOf(monthResource.getName()); + logger.debug("Processing month {}", month); + + // now days + for(final Resource dayResource: Utility.getSortedChildren(logger, "day", monthResource)) { + final int day = Integer.valueOf(dayResource.getName()); + logger.debug("Processing day {}", day); + + // now hours + for(final Resource hourResource: Utility.getSortedChildren(logger, "hour", dayResource)) { + final int hour = Integer.valueOf(hourResource.getName()); + logger.debug("Processing hour {}", hour); + + // now minutes + for(final Resource minuteResource: Utility.getSortedChildren(logger, "minute", hourResource)) { + final int minute = Integer.valueOf(minuteResource.getName()); + logger.debug("Processing minute {}", minute); + + // now jobs + final List<JobImpl> jobs = new ArrayList<JobImpl>(); + final Iterator<Resource> jobIter = minuteResource.listChildren(); + while ( jobIter.hasNext() ) { + final Resource jobResource = jobIter.next(); + + final JobImpl job = Utility.readJob(logger, jobResource); + if ( job != null ) { + logger.debug("Found job {}", jobResource.getName()); + jobs.add(job); + } + } + + Collections.sort(jobs); + + for(final JobImpl job : jobs) { + if ( !handler.handle(job) ) { + return; + } + } + } + } + } + } + } + } +} Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java?rev=1631994&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java (added) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java Wed Oct 15 11:30:09 2014 @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.topology; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.sling.api.resource.PersistenceException; +import org.apache.sling.api.resource.Resource; +import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.api.resource.ValueMap; +import org.apache.sling.discovery.InstanceDescription; +import org.apache.sling.event.impl.jobs.JobImpl; +import org.apache.sling.event.impl.jobs.JobManagerConfiguration; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo; +import org.apache.sling.event.impl.support.ResourceHelper; +import org.apache.sling.event.jobs.Job; +import org.apache.sling.event.jobs.QueueConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintenance task... + * + * In the default configuration, this task runs every minute + */ +public class MaintenanceTask { + + /** Logger. */ + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + /** Job manager configuration. */ + private final JobManagerConfiguration configuration; + + /** + * Constructor + */ + public MaintenanceTask(final JobManagerConfiguration config) { + this.configuration = config; + } + + private void reassignJobs(final TopologyCapabilities caps, + final QueueConfigurationManager queueManager) { + if ( caps != null && caps.isLeader() ) { + this.logger.debug("Checking for stopped instances..."); + final ResourceResolver resolver = this.configuration.createResourceResolver(); + try { + final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath()); + this.logger.debug("Got jobs root {}", jobsRoot); + + // this resource should exist, but we check anyway + if ( jobsRoot != null ) { + final Iterator<Resource> instanceIter = jobsRoot.listChildren(); + while ( caps.isActive() && instanceIter.hasNext() ) { + final Resource instanceResource = instanceIter.next(); + + final String instanceId = instanceResource.getName(); + if ( !caps.isActive(instanceId) ) { + logger.debug("Found stopped instance {}", instanceId); + assignJobs(caps, queueManager, instanceResource, true); + } + } + } + } finally { + resolver.close(); + } + } + } + + /** + * Try to assign unassigned jobs as there might be changes in: + * - queue configurations + * - topology + * - capabilities + */ + private void assignUnassignedJobs(final TopologyCapabilities caps, + final QueueConfigurationManager queueManager) { + if ( caps != null && caps.isLeader() ) { + logger.debug("Checking unassigned jobs..."); + final ResourceResolver resolver = this.configuration.createResourceResolver(); + try { + final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath()); + logger.debug("Got unassigned root {}", unassignedRoot); + + // this resource should exist, but we check anyway + if ( unassignedRoot != null ) { + assignJobs(caps, queueManager, unassignedRoot, false); + } + } finally { + resolver.close(); + } + } + } + + /** + * Try to assign all jobs from the jobs root. + * The jobs are stored by topic + */ + private void assignJobs(final TopologyCapabilities caps, + final QueueConfigurationManager queueManager, + final Resource jobsRoot, + final boolean unassign) { + final ResourceResolver resolver = jobsRoot.getResourceResolver(); + + final Iterator<Resource> topicIter = jobsRoot.listChildren(); + while ( caps.isActive() && topicIter.hasNext() ) { + final Resource topicResource = topicIter.next(); + + final String topicName = topicResource.getName().replace('.', '/'); + logger.debug("Found topic {}", topicName); + + final String checkTopic; + if ( topicName.equals(JobImpl.PROPERTY_BRIDGED_EVENT) ) { + checkTopic = "/"; + } else { + checkTopic = topicName; + } + + // first check if there is an instance for these topics + final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null); + if ( potentialTargets != null && potentialTargets.size() > 0 ) { + final QueueInfo info = queueManager.getQueueInfo(topicName); + logger.debug("Found queue {} for {}", info.queueConfiguration, topicName); + + // if queue is configured to drop, we drop + if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) { + final Iterator<Resource> i = topicResource.listChildren(); + while ( caps.isActive() && i.hasNext() ) { + final Resource rsrc = i.next(); + try { + resolver.delete(rsrc); + resolver.commit(); + } catch ( final PersistenceException pe ) { + this.ignoreException(pe); + resolver.refresh(); + } + } + } else if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) { + // if the queue is not configured to ignore, we can reschedule + for(final Resource yearResource : topicResource.getChildren() ) { + for(final Resource monthResource : yearResource.getChildren() ) { + for(final Resource dayResource : monthResource.getChildren() ) { + for(final Resource hourResource : dayResource.getChildren() ) { + for(final Resource minuteResource : hourResource.getChildren() ) { + for(final Resource rsrc : minuteResource.getChildren() ) { + + if ( !caps.isActive() ) { + return; + } + + try { + final ValueMap vm = ResourceHelper.getValueMap(rsrc); + final String targetId = caps.detectTarget(topicName, vm, info); + + if ( targetId != null ) { + final String newPath = this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); + final Map<String, Object> props = new HashMap<String, Object>(vm); + props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName); + props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId); + props.remove(Job.PROPERTY_JOB_STARTED_TIME); + try { + ResourceHelper.getOrCreateResource(resolver, newPath, props); + resolver.delete(rsrc); + resolver.commit(); + } catch ( final PersistenceException pe ) { + this.ignoreException(pe); + resolver.refresh(); + } + } + } catch (final InstantiationException ie) { + // something happened with the resource in the meantime + this.ignoreException(ie); + resolver.refresh(); + } + } + } + } + } + } + } + } + } + if ( caps.isActive() && unassign ) { + // we have to move everything to the unassigned area + for(final Resource yearResource : topicResource.getChildren() ) { + for(final Resource monthResource : yearResource.getChildren() ) { + for(final Resource dayResource : monthResource.getChildren() ) { + for(final Resource hourResource : dayResource.getChildren() ) { + for(final Resource minuteResource : hourResource.getChildren() ) { + for(final Resource rsrc : minuteResource.getChildren() ) { + + if ( !caps.isActive() ) { + return; + } + + try { + final ValueMap vm = ResourceHelper.getValueMap(rsrc); + final String newPath = this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); + final Map<String, Object> props = new HashMap<String, Object>(vm); + props.remove(Job.PROPERTY_JOB_QUEUE_NAME); + props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE); + props.remove(Job.PROPERTY_JOB_STARTED_TIME); + + try { + ResourceHelper.getOrCreateResource(resolver, newPath, props); + resolver.delete(rsrc); + resolver.commit(); + } catch ( final PersistenceException pe ) { + this.ignoreException(pe); + resolver.refresh(); + } + } catch (final InstantiationException ie) { + // something happened with the resource in the meantime + this.ignoreException(ie); + resolver.refresh(); + } + } + } + } + } + } + } + } + } + } + + /** + * One maintenance run + */ + public void run(final TopologyCapabilities topologyCapabilities, + final QueueConfigurationManager queueManager, + final boolean topologyChanged, + final boolean configChanged) { + // if topology changed, reschedule assigned jobs for stopped instances + if ( topologyChanged ) { + this.reassignJobs(topologyCapabilities, queueManager); + } + // try to assign unassigned jobs + if ( topologyChanged || configChanged ) { + this.assignUnassignedJobs(topologyCapabilities, queueManager); + } + } + + /** + * Helper method which just logs the exception in debug mode. + * @param e + */ + private void ignoreException(final Exception e) { + if ( this.logger.isDebugEnabled() ) { + this.logger.debug("Ignored exception " + e.getMessage(), e); + } + } +} Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java?rev=1631994&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java (added) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java Wed Oct 15 11:30:09 2014 @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.topology; + +import java.util.Iterator; + +import org.apache.sling.api.resource.ModifiableValueMap; +import org.apache.sling.api.resource.PersistenceException; +import org.apache.sling.api.resource.Resource; +import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.event.impl.jobs.JobImpl; +import org.apache.sling.event.impl.jobs.JobManagerConfiguration; +import org.apache.sling.event.impl.jobs.topics.JobTopicTraverser; +import org.apache.sling.event.jobs.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RestartTask { + + /** Logger. */ + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + public void run(final JobManagerConfiguration configuration) { + this.initialScan(configuration); + } + + /** + * Scan the resource tree for unfinished jobs from previous runs + */ + private void initialScan(final JobManagerConfiguration configuration) { + logger.debug("Scanning repository for unfinished jobs..."); + final ResourceResolver resolver = configuration.createResourceResolver(); + try { + final Resource baseResource = resolver.getResource(configuration.getLocalJobsPath()); + + // sanity check - should never be null + if ( baseResource != null ) { + final Iterator<Resource> topicIter = baseResource.listChildren(); + while ( topicIter.hasNext() ) { + final Resource topicResource = topicIter.next(); + logger.debug("Found topic {}", topicResource.getName()); + + // init topic + initTopic(topicResource); + } + } + } finally { + resolver.close(); + } + } + + /** + * Initialize a topic and update all jobs from that topic. + * Reset started time and increase retry count of unfinished jobs + * @param topicResource The topic resource + */ + private void initTopic(final Resource topicResource) { + logger.debug("Initializing topic {}...", topicResource.getName()); + + JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.Handler() { + + @Override + public boolean handle(final JobImpl job) { + if ( job.getProcessingStarted() != null ) { + job.retry(); + try { + final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath()); + // sanity check + if ( jobResource != null ) { + final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class); + mvm.remove(Job.PROPERTY_JOB_STARTED_TIME); + mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getRetryCount()); + jobResource.getResourceResolver().commit(); + } + } catch ( final PersistenceException ignore) { + logger.error("Unable to update unfinished job " + job, ignore); + } + } + return true; + } + }); + logger.debug("Topic {} initialized", topicResource.getName()); + } +} Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java?rev=1631994&r1=1631993&r2=1631994&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java Wed Oct 15 11:30:09 2014 @@ -30,6 +30,8 @@ import org.apache.sling.discovery.Instan import org.apache.sling.discovery.TopologyView; import org.apache.sling.event.impl.jobs.JobImpl; import org.apache.sling.event.impl.jobs.JobManagerConfiguration; +import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo; import org.apache.sling.event.impl.support.Environment; import org.apache.sling.event.jobs.QueueConfiguration; @@ -73,6 +75,9 @@ public class TopologyCapabilities { /** JobManagerConfiguration. */ private final JobManagerConfiguration jobManagerConfiguration; + /** Queue config manager. */ + private final QueueConfigurationManager queueManager; + public static final class InstanceDescriptionComparator implements Comparator<InstanceDescription> { private final String localClusterId; @@ -120,8 +125,11 @@ public class TopologyCapabilities { return allInstances; } - public TopologyCapabilities(final TopologyView view, final JobManagerConfiguration config) { + public TopologyCapabilities(final TopologyView view, + final QueueConfigurationManager queueManager, + final JobManagerConfiguration config) { this.jobManagerConfiguration = config; + this.queueManager = queueManager; this.instanceComparator = new InstanceDescriptionComparator(view.getLocalInstance().getClusterView().getId()); this.isLeader = view.getLocalInstance().isLeader(); this.allInstances = getAllInstancesMap(view); @@ -156,11 +164,11 @@ public class TopologyCapabilities { public boolean isActive() { return this.active; } - +/* public long getChangeCount() { return this.changeCount; } - +*/ public boolean isActive(final String instanceId) { return this.allInstances.containsKey(instanceId); } @@ -280,4 +288,17 @@ public class TopologyCapabilities { return this.instanceCapabilities; } + public QueueInfo getQueueInfo(final String topic) { + if ( this.active ) { + return this.queueManager.getQueueInfo(topic); + } + return null; + } + + public InternalQueueConfiguration[] getQueueConfigurations() { + if ( this.active ) { + return this.queueManager.getConfigurations(); + } + return null; + } } Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java?rev=1631994&r1=1631993&r2=1631994&view=diff ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java (original) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java Wed Oct 15 11:30:09 2014 @@ -22,25 +22,30 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.Service; import org.apache.sling.discovery.TopologyEvent; import org.apache.sling.discovery.TopologyEvent.Type; import org.apache.sling.discovery.TopologyEventListener; -import org.apache.sling.discovery.TopologyView; import org.apache.sling.event.impl.jobs.JobManagerConfiguration; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueConfigurationChangeListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * The topology handler listens for topology events + * The topology handler listens for topology events. + * + * TODO - config changes should actually do a real stop/start */ @Component(immediate=true) @Service(value={TopologyHandler.class, TopologyEventListener.class}) public class TopologyHandler - implements TopologyEventListener { + implements TopologyEventListener, QueueConfigurationChangeListener { /** Logger. */ private final Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -51,20 +56,75 @@ public class TopologyHandler @Reference private JobManagerConfiguration configuration; + @Reference + private QueueConfigurationManager queueManager; + /** The topology capabilities. */ private volatile TopologyCapabilities topologyCapabilities; - private void stopProcessing() { + @Activate + protected void activate() { + this.queueManager.addListener(this); + } + + @Deactivate + protected void dectivate() { + this.queueManager.removeListener(this); + } + + + @Override + public void configChanged() { + final TopologyCapabilities caps = this.topologyCapabilities; + if ( caps != null ) { + synchronized ( this.listeners ) { + // this.stopProcessing(false); + + this.startProcessing(Type.PROPERTIES_CHANGED, caps, true); + } + } + } + + private void stopProcessing(final boolean deactivate) { + boolean notify = this.topologyCapabilities != null; // deactivate old capabilities - this stops all background processes - if ( this.topologyCapabilities != null ) { + if ( deactivate && this.topologyCapabilities != null ) { this.topologyCapabilities.deactivate(); } this.topologyCapabilities = null; + + if ( notify ) { + // stop all listeners + this.notifiyListeners(); + } } - private void startProcessing(final TopologyView view) { + private void startProcessing(final Type eventType, final TopologyCapabilities newCaps, final boolean isConfigChange) { // create new capabilities and update view - this.topologyCapabilities = new TopologyCapabilities(view, this.configuration); + this.topologyCapabilities = newCaps; + + // before we propagate the new topology we do some maintenance + if ( eventType == Type.TOPOLOGY_INIT ) { + final UpgradeTask task = new UpgradeTask(); + task.run(this.configuration, this.topologyCapabilities, queueManager); + + final RestartTask rt = new RestartTask(); + rt.run(this.configuration); + } + + final MaintenanceTask mt = new MaintenanceTask(this.configuration); + mt.run(topologyCapabilities, queueManager, !isConfigChange, isConfigChange); + + if ( !isConfigChange ) { + // start listeners + this.notifiyListeners(); + } + } + + private void notifiyListeners() { + for(final TopologyAware l : this.listeners) { + l.topologyChanged(this.topologyCapabilities); + } } /** @@ -86,22 +146,15 @@ public class TopologyHandler synchronized ( this.listeners ) { if ( event.getType() == Type.TOPOLOGY_CHANGING ) { - this.stopProcessing(); + this.stopProcessing(true); - for(final TopologyAware l : this.listeners) { - l.topologyChanged(this.topologyCapabilities); - } } else if ( event.getType() == Type.TOPOLOGY_INIT || event.getType() == Type.TOPOLOGY_CHANGED || event.getType() == Type.PROPERTIES_CHANGED ) { - this.stopProcessing(); - - this.startProcessing(event.getNewView()); + this.stopProcessing(true); - for(final TopologyAware l : this.listeners) { - l.topologyChanged(this.topologyCapabilities); - } + this.startProcessing(event.getType(), new TopologyCapabilities(event.getNewView(), this.queueManager, this.configuration), false); } } Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java?rev=1631994&view=auto ============================================================================== --- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java (added) +++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java Wed Oct 15 11:30:09 2014 @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.event.impl.topology; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.sling.api.resource.PersistenceException; +import org.apache.sling.api.resource.Resource; +import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.api.resource.ValueMap; +import org.apache.sling.discovery.InstanceDescription; +import org.apache.sling.event.impl.jobs.JobImpl; +import org.apache.sling.event.impl.jobs.JobManagerConfiguration; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager; +import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo; +import org.apache.sling.event.impl.support.Environment; +import org.apache.sling.event.impl.support.ResourceHelper; +import org.apache.sling.event.jobs.Job; +import org.apache.sling.event.jobs.QueueConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Upgrade task + * + * Upgrade jobs from earlier versions to the new format. + */ +public class UpgradeTask { + + /** Logger. */ + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + /** + * Upgrade + */ + public void run(final JobManagerConfiguration configuration, + final TopologyCapabilities topologyCapabilities, + final QueueConfigurationManager queueManager) { + if ( topologyCapabilities.isLeader() ) { + this.processJobsFromPreviousVersions(configuration, topologyCapabilities, queueManager); + } + } + + /** + * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area + */ + private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration, + final TopologyCapabilities caps, + final QueueConfigurationManager queueManager) { + final ResourceResolver resolver = configuration.createResourceResolver(); + try { + this.processJobsFromPreviousVersions(configuration, caps, queueManager, resolver.getResource(configuration.getPreviousVersionAnonPath())); + this.processJobsFromPreviousVersions(configuration, caps, queueManager, resolver.getResource(configuration.getPreviousVersionIdentifiedPath())); + } catch ( final PersistenceException pe ) { + this.logger.warn("Problems moving jobs from previous version.", pe); + } finally { + resolver.close(); + } + } + + /** + * Recursively find jobs and move them + */ + private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration, + final TopologyCapabilities caps, + final QueueConfigurationManager queueManager, + final Resource rsrc) throws PersistenceException { + if ( rsrc != null && caps.isActive() ) { + if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) { + this.moveJobFromPreviousVersion(configuration, caps, queueManager, rsrc); + } else { + for(final Resource child : rsrc.getChildren()) { + this.processJobsFromPreviousVersions(configuration, caps, queueManager, child); + } + if ( caps.isActive() ) { + rsrc.getResourceResolver().delete(rsrc); + rsrc.getResourceResolver().commit(); + rsrc.getResourceResolver().refresh(); + } + } + } + } + + /** + * Move a single job + */ + private void moveJobFromPreviousVersion(final JobManagerConfiguration configuration, + final TopologyCapabilities caps, + final QueueConfigurationManager queueManager, + final Resource jobResource) + throws PersistenceException { + final ResourceResolver resolver = jobResource.getResourceResolver(); + + try { + final ValueMap vm = ResourceHelper.getValueMap(jobResource); + // check for binary properties + Map<String, Object> binaryProperties = new HashMap<String, Object>(); + final ObjectInputStream ois = vm.get("slingevent:properties", ObjectInputStream.class); + if ( ois != null ) { + try { + int length = ois.readInt(); + for(int i=0;i<length;i++) { + final String key = (String)ois.readObject(); + final Object value = ois.readObject(); + binaryProperties.put(key, value); + } + } catch (final ClassNotFoundException cnfe) { + throw new PersistenceException("Class not found.", cnfe); + } catch (final java.io.InvalidClassException ice) { + throw new PersistenceException("Invalid class.", ice); + } catch (final IOException ioe) { + throw new PersistenceException("Unable to deserialize job properties.", ioe); + } finally { + try { + ois.close(); + } catch (final IOException ioe) { + throw new PersistenceException("Unable to deserialize job properties.", ioe); + } + } + } + + final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm); + + properties.put(JobImpl.PROPERTY_BRIDGED_EVENT, true); + final String topic = (String)properties.remove("slingevent:topic"); + properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, topic); + + properties.remove(Job.PROPERTY_JOB_QUEUE_NAME); + properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE); + // and binary properties + properties.putAll(binaryProperties); + properties.remove("slingevent:properties"); + + if ( !properties.containsKey(Job.PROPERTY_JOB_RETRIES) ) { + properties.put(Job.PROPERTY_JOB_RETRIES, 10); // we put a dummy value here; this gets updated by the queue + } + if ( !properties.containsKey(Job.PROPERTY_JOB_RETRY_COUNT) ) { + properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0); + } + + final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null); + String targetId = null; + if ( potentialTargets != null && potentialTargets.size() > 0 ) { + final QueueInfo info = queueManager.getQueueInfo(topic); + logger.debug("Found queue {} for {}", info.queueConfiguration, topic); + // if queue is configured to drop, we drop + if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) { + resolver.delete(jobResource); + resolver.commit(); + return; + } + if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) { + targetId = caps.detectTarget(topic, vm, info); + if ( targetId != null ) { + properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName); + properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId); + properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries()); + } + } + } + + properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, "old:" + Environment.APPLICATION_ID); + properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB); + + final String jobId = configuration.getUniqueId(topic); + properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId); + properties.remove(Job.PROPERTY_JOB_STARTED_TIME); + + final String newPath = configuration.getUniquePath(targetId, topic, jobId, vm); + this.logger.debug("Moving 'old' job from {} to {}", jobResource.getPath(), newPath); + + ResourceHelper.getOrCreateResource(resolver, newPath, properties); + resolver.delete(jobResource); + resolver.commit(); + } catch (final InstantiationException ie) { + throw new PersistenceException("Exception while reading reasource: " + ie.getMessage(), ie.getCause()); + } + } +} Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java ------------------------------------------------------------------------------ svn:keywords = author date id revision rev url Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java ------------------------------------------------------------------------------ svn:mime-type = text/plain