Author: cziegeler
Date: Wed Jul 31 09:18:26 2013
New Revision: 1508781

URL: http://svn.apache.org/r1508781
Log:
SLING-2979 : Add support for running scheduled task only on the leader

Modified:
    sling/trunk/bundles/commons/scheduler/pom.xml
    
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java
    
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java
    
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java
    
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
    
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java

Modified: sling/trunk/bundles/commons/scheduler/pom.xml
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/pom.xml?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- sling/trunk/bundles/commons/scheduler/pom.xml (original)
+++ sling/trunk/bundles/commons/scheduler/pom.xml Wed Jul 31 09:18:26 2013
@@ -58,7 +58,6 @@
                             org.apache.sling.commons.scheduler.impl
                         </Private-Package>
                         <DynamicImport-Package>
-                            org.apache.sling.discovery;version="[1.0,2)",
                             commonj.work,
                             com.mchange.v2.c3p0,
                             javax.ejb,

Modified: 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java
 (original)
+++ 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/ScheduleOptions.java
 Wed Jul 31 09:18:26 2013
@@ -52,8 +52,29 @@ public interface ScheduleOptions {
      * Flag indicating whether the job should only be run on the leader.
      * This defaults to false.
      * If no topology information is available (= no Apache Sling Discovery 
Implementation active)
-     * this flag is ignored and the job is run on all instances.
+     * this flag is ignored and the job is run on all instances!
+     * If {@link #onSingleInstanceOnly(boolean)} or {@link 
#onInstancesOnly(String[])} has been called before,
+     * that option is reset and overwritten by the value of this method.
      * @param flag Whether this job should only be run on the leader
      */
     ScheduleOptions onLeaderOnly(final boolean flag);
+
+    /**
+     * Flag indicating whether the job should only be run on a single instance 
in a cluster
+     * This defaults to false.
+     * If no topology information is available (= no Apache Sling Discovery 
Implementation active)
+     * this flag is ignored and the job is run on all instances!
+     * If {@link #onLeaderOnly(boolean)} or {@link #onInstancesOnly(String[])} 
has been called before,
+     * that option is reset and overwritten by the value of this method.
+     * @param flag Whether this job should only be run on a single instance.
+     */
+    ScheduleOptions onSingleInstanceOnly(final boolean flag);
+
+    /**
+     * List of Sling IDs this job should be run on.
+     * If {@link #onLeaderOnly(boolean)} or {@link 
#onSingleInstanceOnly(boolean)} has been called before,
+     * that option is reset and overwritten by the value of this method.
+     * @param flag Whether this job should only be run on a single instance.
+     */
+    ScheduleOptions onInstancesOnly(String[] slingIds);
 }

Modified: 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java
 (original)
+++ 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/Scheduler.java
 Wed Jul 31 09:18:26 2013
@@ -61,13 +61,31 @@ public interface Scheduler {
     /** Name of the configuration property to define the job name. */
     String PROPERTY_SCHEDULER_NAME = "scheduler.name";
 
-    /** Name of the configuration property to define if the job should only be 
run on the leader.
-     * Default is to start the job on all instances. This property needs to be 
of type Boolean.
+    /**
+     * Name of the configuration property to define the instances this job 
should run on.
+     * By default a job is run on all instances. This property can be 
configured with:
+     * - a list of Sling IDs : in that case the job is only run on instances 
in this set.
+     * - constant {@link #VALUE_RUN_ON_LEADER} : the job is only run on the 
leader
+     * - constant {@link #VALUE_RUN_ON_SINGLE} : the job is only run on a 
single instance in a cluster. This is
+     *                     basically the same as {@link #VALUE_RUN_ON_LEADER} 
but it's not further specified which
+     *                     single instance is used.
+     * Default is to start the job on all instances. This property needs to be 
of type String
+     * or String[].
      * If no topology information is available (= no Apache Sling Discovery 
Implementation active)
-     * this flag is ignored and the job is run on all instances.
+     * the values {@link #VALUE_RUN_ON_LEADER} and {@link 
#VALUE_RUN_ON_SINGLE} are ignored, and the job is run on all instances.
+     * @since 2.3.0
+     */
+    String PROPERTY_SCHEDULER_RUN_ON = "scheduler.runOn";
+
+    /** Value for {@link #PROPERTY_SCHEDULER_RUN_ON} to run the job on the 
leader only.
+     * @since 2.3.0
+     */
+    String VALUE_RUN_ON_LEADER = "LEADER";
+
+    /** Value for {@link #PROPERTY_SCHEDULER_RUN_ON} to run the job on a 
single instance only.
      * @since 2.3.0
      */
-    String PROPERTY_SCHEDULER_LEADER_ONLY = "scheduler.leaderonly";
+    String VALUE_RUN_ON_SINGLE = "SINGLE";
 
     /**
      * Schedule a job based on the options.

Modified: 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java
 (original)
+++ 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/InternalScheduleOptions.java
 Wed Jul 31 09:18:26 2013
@@ -20,6 +20,7 @@ import java.io.Serializable;
 import java.util.Map;
 
 import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
 import org.quartz.Trigger;
 import org.quartz.TriggerBuilder;
 
@@ -33,14 +34,14 @@ public class InternalScheduleOptions imp
 
     public boolean canRunConcurrently = false;
 
-    public boolean onLeaderOnly = false;
-
     public Map<String, Serializable> configuration;
 
     public final TriggerBuilder<? extends Trigger> trigger;
 
     public final IllegalArgumentException argumentException;
 
+    public String[] runOn;
+
     public InternalScheduleOptions(final TriggerBuilder<? extends Trigger> 
trigger) {
         this.trigger = trigger;
         this.argumentException = null;
@@ -79,7 +80,31 @@ public class InternalScheduleOptions imp
      * @see 
org.apache.sling.commons.scheduler.ScheduleOptions#onLeaderOnly(boolean)
      */
     public ScheduleOptions onLeaderOnly(final boolean flag) {
-        this.onLeaderOnly = flag;
+        if ( flag ) {
+            this.runOn = new String[] {Scheduler.VALUE_RUN_ON_LEADER};
+        } else {
+            this.runOn = null;
+        }
+        return this;
+    }
+
+    /**
+     * @see 
org.apache.sling.commons.scheduler.ScheduleOptions#onSingleInstanceOnly(boolean)
+     */
+    public ScheduleOptions onSingleInstanceOnly(final boolean flag) {
+        if ( flag ) {
+            this.runOn = new String[] {Scheduler.VALUE_RUN_ON_SINGLE};
+        } else {
+            this.runOn = null;
+        }
+        return this;
+    }
+
+    /**
+     * @see 
org.apache.sling.commons.scheduler.ScheduleOptions#onInstancesOnly(java.lang.String[])
+     */
+    public ScheduleOptions onInstancesOnly(final String[] slingIds) {
+        this.runOn = slingIds;
         return this;
     }
 }

Modified: 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
 (original)
+++ 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzJobExecutor.java
 Wed Jul 31 09:18:26 2013
@@ -44,9 +44,9 @@ public class QuartzJobExecutor implement
 
         final JobDataMap data = context.getJobDetail().getJobDataMap();
 
-        // check leader
-        final boolean onLeaderOnly = 
data.getBooleanValue(QuartzScheduler.DATA_MAP_ON_LEADER_ONLY);
-        if (onLeaderOnly && !IS_LEADER.get()) {
+        // check leader/single
+        final String[] runOn = 
(String[])data.get(QuartzScheduler.DATA_MAP_RUN_ON);
+        if (runOn != null && !IS_LEADER.get()) {
             return;
         }
 

Modified: 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java?rev=1508781&r1=1508780&r2=1508781&view=diff
==============================================================================
--- 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
 (original)
+++ 
sling/trunk/bundles/commons/scheduler/src/main/java/org/apache/sling/commons/scheduler/impl/QuartzScheduler.java
 Wed Jul 31 09:18:26 2013
@@ -37,6 +37,7 @@ import org.apache.sling.commons.schedule
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.discovery.DiscoveryService;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
@@ -87,8 +88,8 @@ public class QuartzScheduler implements 
     /** Map key for the logger. */
     static final String DATA_MAP_LOGGER = "QuartzJobScheduler.Logger";
 
-    /** Map key for the isLeader information (Boolean). */
-    static final String DATA_MAP_ON_LEADER_ONLY = 
"QuartzJobScheduler.OnLeaderOnly";
+    /** Map key for the runOn information (String[]). */
+    static final String DATA_MAP_RUN_ON = "QuartzJobScheduler.runOn";
 
     /** The quartz scheduler. */
     private volatile org.quartz.Scheduler scheduler;
@@ -110,6 +111,9 @@ public class QuartzScheduler implements 
     @Property
     private static final String PROPERTY_POOL_NAME = "poolName";
 
+    @Reference
+    private DiscoveryService discoveryService;
+
     /**
      * Activate this component.
      * Start the scheduler.
@@ -247,7 +251,9 @@ public class QuartzScheduler implements 
         if ( options.configuration != null ) {
             jobDataMap.put(DATA_MAP_CONFIGURATION, options.configuration);
         }
-        jobDataMap.put(DATA_MAP_ON_LEADER_ONLY, options.onLeaderOnly);
+        if ( options.runOn != null) {
+            jobDataMap.put(DATA_MAP_RUN_ON, options.runOn);
+        }
 
         return jobDataMap;
     }
@@ -426,13 +432,21 @@ public class QuartzScheduler implements 
                 try {
                     final String name = getServiceIdentifier(ref);
                     final Boolean concurrent = 
(Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_CONCURRENT);
-                    final Boolean onLeaderOnly = 
(Boolean)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_LEADER_ONLY);
+                    final Object runOn = 
ref.getProperty(Scheduler.PROPERTY_SCHEDULER_RUN_ON);
+                    String[] runOnOpts = null;
+                    if ( runOn instanceof String ) {
+                        runOnOpts = new String[] {runOn.toString()};
+                    } else if ( runOn instanceof String[] ) {
+                        runOnOpts = (String[])runOn;
+                    } else {
+                        this.logger.warn("Property {} ignored for scheduler 
{}", Scheduler.PROPERTY_SCHEDULER_RUN_ON, ref);
+                    }
                     final String expression = 
(String)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_EXPRESSION);
                     if ( expression != null ) {
                         this.scheduleJob(job, this.EXPR(expression)
                                 .name(name)
                                 .canRunConcurrently((concurrent != null ? 
concurrent : true))
-                                .onLeaderOnly(onLeaderOnly != null ? 
onLeaderOnly : false));
+                                .onInstancesOnly(runOnOpts));
                     } else {
                         final Long period = 
(Long)ref.getProperty(Scheduler.PROPERTY_SCHEDULER_PERIOD);
                         if ( period != null ) {
@@ -446,7 +460,7 @@ public class QuartzScheduler implements 
                                 this.scheduleJob(job, this.PERIODIC(period, 
immediate)
                                         .name(name)
                                         .canRunConcurrently((concurrent != 
null ? concurrent : true))
-                                        .onLeaderOnly(onLeaderOnly != null ? 
onLeaderOnly : false));
+                                        .onInstancesOnly(runOnOpts));
                             }
                         } else {
                             this.logger.debug("Ignoring servce {} : no 
scheduling property found.", ref);
@@ -792,6 +806,30 @@ public class QuartzScheduler implements 
         } else {
             name = job.getClass().getName() + ':' + UUID.randomUUID();
         }
+
+        // check run on
+        if ( opts.runOn != null ) {
+            boolean schedule = false;
+            if ( opts.runOn.length == 1 && 
Scheduler.VALUE_RUN_ON_LEADER.equals(opts.runOn[0])) {
+                schedule = true;
+            } else if ( opts.runOn.length == 1 && 
Scheduler.VALUE_RUN_ON_SINGLE.equals(opts.runOn[0])) {
+                schedule = true;
+            } else { // sling IDs
+                final String myId = 
this.discoveryService.getTopology().getLocalInstance().getSlingId();
+                for(final String id : opts.runOn ) {
+                    if ( myId.equals(id) ) {
+                        schedule = true;
+                        break;
+                    }
+                }
+                opts.runOn = null;
+            }
+            if ( !schedule ) {
+                this.logger.warn("Not scheduling job {} with name {} - not in 
required Sling ID set", job, name);
+                return;
+            }
+        }
+
         final Trigger trigger = opts.trigger.withIdentity(name).build();
 
         // create the data map


Reply via email to