Author: cziegeler Date: Wed Jul 31 09:18:26 2013 New Revision: 1508781 URL: http://svn.apache.org/r1508781 Log: SLING-2979 : Add support for running scheduled task only on the leader
Modified: sling/trunk/bundles/commons/scheduler/pom.xml sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java Modified: sling/trunk/bundles/commons/scheduler/pom.xml URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/pom.xml?rev=1508781&r1=1508780&r2=1508781&view=diff ============================================================================== --- sling/trunk/bundles/commons/scheduler/pom.xml (original) +++ sling/trunk/bundles/commons/scheduler/pom.xml Wed Jul 31 09:18:26 2013 @@ -58,7 +58,6 @@ org.apache.sling.commons.scheduler.impl </Private-Package> <DynamicImport-Package> - org.apache.sling.discovery;version="[1.0,2)", commonj.work, com.mchange.v2.c3p0, javax.ejb, Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java?rev=1508781&r1=1508780&r2=1508781&view=diff ============================================================================== --- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java (original) +++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java Wed Jul 31 09:18:26 2013 @@ -52,8 +52,29 @@ public interface ScheduleOptions { * Flag indicating whether the job should only be run on the leader. * This defaults to false. * If no topology information is available (= no Apache Sling Discovery Implementation active) - * this flag is ignored and the job is run on all instances. + * this flag is ignored and the job is run on all instances! + * If {@link #onSingleInstanceOnly(boolean)} or {@link #onInstancesOnly(String[])} has been called before, + * that option is reset and overwritten by the value of this method. * @param flag Whether this job should only be run on the leader */ ScheduleOptions onLeaderOnly(final boolean flag); + + /** + * Flag indicating whether the job should only be run on a single instance in a cluster + * This defaults to false. + * If no topology information is available (= no Apache Sling Discovery Implementation active) + * this flag is ignored and the job is run on all instances! + * If {@link #onLeaderOnly(boolean)} or {@link #onInstancesOnly(String[])} has been called before, + * that option is reset and overwritten by the value of this method. + * @param flag Whether this job should only be run on a single instance. + */ + ScheduleOptions onSingleInstanceOnly(final boolean flag); + + /** + * List of Sling IDs this job should be run on. + * If {@link #onLeaderOnly(boolean)} or {@link #onSingleInstanceOnly(boolean)} has been called before, + * that option is reset and overwritten by the value of this method. + * @param flag Whether this job should only be run on a single instance. + */ + ScheduleOptions onInstancesOnly(String[] slingIds); } Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java?rev=1508781&r1=1508780&r2=1508781&view=diff ============================================================================== --- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java (original) +++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java Wed Jul 31 09:18:26 2013 @@ -61,13 +61,31 @@ public interface Scheduler { /** Name of the configuration property to define the job name. */ String PROPERTY_SCHEDULER_NAME = "scheduler.name"; - /** Name of the configuration property to define if the job should only be run on the leader. - * Default is to start the job on all instances. This property needs to be of type Boolean. + /** + * Name of the configuration property to define the instances this job should run on. + * By default a job is run on all instances. This property can be configured with: + * - a list of Sling IDs : in that case the job is only run on instances in this set. + * - constant {@link #VALUE_RUN_ON_LEADER} : the job is only run on the leader + * - constant {@link #VALUE_RUN_ON_SINGLE} : the job is only run on a single instance in a cluster. This is + * basically the same as {@link #VALUE_RUN_ON_LEADER} but it's not further specified which + * single instance is used. + * Default is to start the job on all instances. This property needs to be of type String + * or String[]. * If no topology information is available (= no Apache Sling Discovery Implementation active) - * this flag is ignored and the job is run on all instances. + * the values {@link #VALUE_RUN_ON_LEADER} and {@link #VALUE_RUN_ON_SINGLE} are ignored, and the job is run on all instances. + * @since 2.3.0 + */ + String PROPERTY_SCHEDULER_RUN_ON = "scheduler.runOn"; + + /** Value for {@link #PROPERTY_SCHEDULER_RUN_ON} to run the job on the leader only. + * @since 2.3.0 + */ + String VALUE_RUN_ON_LEADER = "LEADER"; + + /** Value for {@link #PROPERTY_SCHEDULER_RUN_ON} to run the job on a single instance only. * @since 2.3.0 */ - String PROPERTY_SCHEDULER_LEADER_ONLY = "scheduler.leaderonly"; + String VALUE_RUN_ON_SINGLE = "SINGLE"; /** * Schedule a job based on the options. Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java?rev=1508781&r1=1508780&r2=1508781&view=diff ============================================================================== --- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java (original) +++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java Wed Jul 31 09:18:26 2013 @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.Map; import org.apache.sling.commons.scheduler.ScheduleOptions; +import org.apache.sling.commons.scheduler.Scheduler; import org.quartz.Trigger; import org.quartz.TriggerBuilder; @@ -33,14 +34,14 @@ public class InternalScheduleOptions imp public boolean canRunConcurrently = false; - public boolean onLeaderOnly = false; - public Map<String, Serializable> configuration; public final TriggerBuilder<? extends Trigger> trigger; public final IllegalArgumentException argumentException; + public String[] runOn; + public InternalScheduleOptions(final TriggerBuilder<? extends Trigger> trigger) { this.trigger = trigger; this.argumentException = null; @@ -79,7 +80,31 @@ public class InternalScheduleOptions imp * @see org.apache.sling.commons.scheduler.ScheduleOptions#onLeaderOnly(boolean) */ public ScheduleOptions onLeaderOnly(final boolean flag) { - this.onLeaderOnly = flag; + if ( flag ) { + this.runOn = new String[] {Scheduler.VALUE_RUN_ON_LEADER}; + } else { + this.runOn = null; + } + return this; + } + + /** + * @see org.apache.sling.commons.scheduler.ScheduleOptions#onSingleInstanceOnly(boolean) + */ + public ScheduleOptions onSingleInstanceOnly(final boolean flag) { + if ( flag ) { + this.runOn = new String[] {Scheduler.VALUE_RUN_ON_SINGLE}; + } else { + this.runOn = null; + } + return this; + } + + /** + * @see org.apache.sling.commons.scheduler.ScheduleOptions#onInstancesOnly(java.lang.String[]) + */ + public ScheduleOptions onInstancesOnly(final String[] slingIds) { + this.runOn = slingIds; return this; } } Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java?rev=1508781&r1=1508780&r2=1508781&view=diff ============================================================================== --- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java (original) +++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java Wed Jul 31 09:18:26 2013 @@ -44,9 +44,9 @@ public class QuartzJobExecutor implement final JobDataMap data = context.getJobDetail().getJobDataMap(); - // check leader - final boolean onLeaderOnly = data.getBooleanValue(QuartzScheduler.DATA_MAP_ON_LEADER_ONLY); - if (onLeaderOnly && !IS_LEADER.get()) { + // check leader/single + final String[] runOn = (String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON); + if (runOn != null && !IS_LEADER.get()) { return; } Modified: sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java URL: http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java?rev=1508781&r1=1508780&r2=1508781&view=diff ============================================================================== --- sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java (original) +++ sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java Wed Jul 31 09:18:26 2013 @@ -37,6 +37,7 @@ import org.apache.sling.commons.schedule import org.apache.sling.commons.scheduler.Scheduler; import org.apache.sling.commons.threads.ThreadPool; import org.apache.sling.commons.threads.ThreadPoolManager; +import org.apache.sling.discovery.DiscoveryService; import org.osgi.framework.Constants; import org.osgi.framework.ServiceReference; import org.osgi.framework.ServiceRegistration; @@ -87,8 +88,8 @@ public class QuartzScheduler implements /** Map key for the logger. */ static final String DATA_MAP_LOGGER = "QuartzJobScheduler.Logger"; - /** Map key for the isLeader information (Boolean). */ - static final String DATA_MAP_ON_LEADER_ONLY = "QuartzJobScheduler.OnLeaderOnly"; + /** Map key for the runOn information (String[]). */ + static final String DATA_MAP_RUN_ON = "QuartzJobScheduler.runOn"; /** The quartz scheduler. */ private volatile org.quartz.Scheduler scheduler; @@ -110,6 +111,9 @@ public class QuartzScheduler implements @Property private static final String PROPERTY_POOL_NAME = "poolName"; + @Reference + private DiscoveryService discoveryService; + /** * Activate this component. * Start the scheduler. @@ -247,7 +251,9 @@ public class QuartzScheduler implements if ( options.configuration != null ) { jobDataMap.put(DATA_MAP_CONFIGURATION, options.configuration); } - jobDataMap.put(DATA_MAP_ON_LEADER_ONLY, options.onLeaderOnly); + if ( options.runOn != null) { + jobDataMap.put(DATA_MAP_RUN_ON, options.runOn); + } return jobDataMap; } @@ -426,13 +432,21 @@ public class QuartzScheduler implements try { final String name = getServiceIdentifier(ref); final Boolean concurrent = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_CONCURRENT); - final Boolean onLeaderOnly = (Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_LEADER_ONLY); + final Object runOn = ref.getProperty(Scheduler.PROPERTY_SCHEDULER_RUN_ON); + String[] runOnOpts = null; + if ( runOn instanceof String ) { + runOnOpts = new String[] {runOn.toString()}; + } else if ( runOn instanceof String[] ) { + runOnOpts = (String[])runOn; + } else { + this.logger.warn("Property {} ignored for scheduler {}", Scheduler.PROPERTY_SCHEDULER_RUN_ON, ref); + } final String expression = (String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_EXPRESSION); if ( expression != null ) { this.scheduleJob(job, this.EXPR(expression) .name(name) .canRunConcurrently((concurrent != null ? concurrent : true)) - .onLeaderOnly(onLeaderOnly != null ? onLeaderOnly : false)); + .onInstancesOnly(runOnOpts)); } else { final Long period = (Long)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_PERIOD); if ( period != null ) { @@ -446,7 +460,7 @@ public class QuartzScheduler implements this.scheduleJob(job, this.PERIODIC(period, immediate) .name(name) .canRunConcurrently((concurrent != null ? concurrent : true)) - .onLeaderOnly(onLeaderOnly != null ? onLeaderOnly : false)); + .onInstancesOnly(runOnOpts)); } } else { this.logger.debug("Ignoring servce {} : no scheduling property found.", ref); @@ -792,6 +806,30 @@ public class QuartzScheduler implements } else { name = job.getClass().getName() + ':' + UUID.randomUUID(); } + + // check run on + if ( opts.runOn != null ) { + boolean schedule = false; + if ( opts.runOn.length == 1 && Scheduler.VALUE_RUN_ON_LEADER.equals(opts.runOn[0])) { + schedule = true; + } else if ( opts.runOn.length == 1 && Scheduler.VALUE_RUN_ON_SINGLE.equals(opts.runOn[0])) { + schedule = true; + } else { // sling IDs + final String myId = this.discoveryService.getTopology().getLocalInstance().getSlingId(); + for(final String id : opts.runOn ) { + if ( myId.equals(id) ) { + schedule = true; + break; + } + } + opts.runOn = null; + } + if ( !schedule ) { + this.logger.warn("Not scheduling job {} with name {} - not in required Sling ID set", job, name); + return; + } + } + final Trigger trigger = opts.trigger.withIdentity(name).build(); // create the data map