Author: cziegeler
Date: Mon Oct 27 03:58:40 2008
New Revision: 708132
URL: http://svn.apache.org/viewvc?rev=708132&view=rev
Log:
SLING-339 Implementing a job acknowledge mechanism: All started jobs are
queried after a configured time, if someone started to process them. If not,
they're requeued. If more than one job processor wants to process a job, only
the first one is used to notify the job event handler of success/failure.
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
incubator/sling/trunk/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=708132&r1=708131&r2=708132&view=diff
==============================================================================
---
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
(original)
+++
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
Mon Oct 27 03:58:40 2008
@@ -190,39 +190,50 @@
public static boolean isJobEvent(Event event) {
return event.getProperty(PROPERTY_JOB_TOPIC) != null;
}
+
/**
- * Notify a finished job.
+ * Check if this a job event and return the notifier context.
+ * @throws IllegalArgumentException If the event is a job event but does
not have a notifier context.
*/
- public static void finishedJob(Event job) {
+ private static JobStatusNotifier.NotifierContext getNotifierContext(final
Event job) {
// check if this is a job event
if ( !isJobEvent(job) ) {
- return;
+ return null;
}
final JobStatusNotifier.NotifierContext ctx = (NotifierContext)
job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
if ( ctx == null ) {
- throw new NullPointerException("JobStatusNotifier context is not
available in event properties.");
+ throw new IllegalArgumentException("JobStatusNotifier context is
not available in event properties.");
+ }
+ return ctx;
+ }
+
+ /**
+ * Notify a finished job.
+ * @throws IllegalArgumentException If the event is a job event but does
not have a notifier context.
+ */
+ public static void finishedJob(Event job) {
+ final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ if ( ctx != null ) {
+ ctx.notifier.finishedJob(job, ctx.eventNodePath, false);
}
- ctx.notifier.finishedJob(job, ctx.eventNodePath, false);
}
/**
* Notify a failed job.
* @return <code>true</code> if the job has been rescheduled,
<code>false</code> otherwise.
+ * @throws IllegalArgumentException If the event is a job event but does
not have a notifier context.
*/
public static boolean rescheduleJob(Event job) {
- // check if this is a job event
- if ( !isJobEvent(job) ) {
- return false;
+ final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
+ if ( ctx != null ) {
+ return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
}
- final JobStatusNotifier.NotifierContext ctx = (NotifierContext)
job.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
- if ( ctx == null ) {
- throw new NullPointerException("JobStatusNotifier context is not
available in event properties.");
- }
- return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
+ return false;
}
/**
* Process a job in the background and notify its success.
+ * This method also sends an acknowledge message to the job event handler.
*/
public static void processJob(final Event job, final JobProcessor
processor) {
final Runnable task = new Runnable() {
@@ -232,17 +243,31 @@
*/
public void run() {
boolean result = false;
+ boolean notifyResult = true;
try {
+ // first check for a notifier context to send an
acknowledge
+ final JobStatusNotifier.NotifierContext ctx =
getNotifierContext(job);
+ if ( ctx != null ) {
+ if ( !ctx.notifier.sendAcknowledge(job,
ctx.eventNodePath) ) {
+ // if we don't get an ack, someone else is already
processing this job.
+ // we process but do not notify the job event
handler.
+
LoggerFactory.getLogger(EventUtil.class).info("Someone else is already
processing job {}.", job);
+ notifyResult = false;
+ }
+ }
+
result = processor.process(job);
} catch (Throwable t) {
- LoggerFactory.getLogger(EventUtil.class).error("Unhandled
error occured in job processor " + t.getMessage(), t);
+ LoggerFactory.getLogger(EventUtil.class).error("Unhandled
error occured in job processor " + t.getMessage() + " while processing job " +
job, t);
// we don't reschedule if an exception occurs
result = true;
} finally {
- if ( result ) {
- EventUtil.finishedJob(job);
- } else {
- EventUtil.rescheduleJob(job);
+ if ( notifyResult ) {
+ if ( result ) {
+ EventUtil.finishedJob(job);
+ } else {
+ EventUtil.rescheduleJob(job);
+ }
}
}
}
@@ -277,6 +302,15 @@
}
/**
+ * Send an acknowledge message that someone is processing the job.
+ * @param job The job.
+ * @param eventNodePath The storage node in the repository.
+ * @return <code>true</code> if the ack is ok, <code>false</code>
otherwise (e.g. if
+ * someone else already send an ack for this job.
+ */
+ boolean sendAcknowledge(Event job, String eventNodePath);
+
+ /**
* Notify that the job is finished.
* If the job is not rescheduled, a return value of <code>false</code>
indicates an error
* during the processing. If the job should be rescheduled,
<code>true</code> indicates
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=708132&r1=708131&r2=708132&view=diff
==============================================================================
---
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Mon Oct 27 03:58:40 2008
@@ -51,6 +51,7 @@
import org.apache.sling.event.EventPropertiesMap;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.JobStatusProvider;
+import org.osgi.framework.Constants;
import org.osgi.service.component.ComponentConstants;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.Event;
@@ -70,7 +71,7 @@
* We schedule this event handler to run in the background and clean up
* obsolete events.
* @scr.service interface="java.lang.Runnable"
- * @scr.property name="scheduler.period" value="600" type="Long"
+ * @scr.property name="scheduler.period" value="300" type="Long"
label="%jobscheduler.period.name" description="%jobscheduler.period.description"
* @scr.property name="scheduler.concurrent" value="false" type="Boolean"
private="true"
*/
public class JobEventHandler
@@ -95,12 +96,21 @@
/** @scr.property valueRef="DEFAULT_MAX_JOB_RETRIES" */
private static final String CONFIG_PROPERTY_MAX_JOB_RETRIES =
"max.job.retries";
+ /** Default number of seconds to wait for an ack. */
+ private static final long DEFAULT_WAIT_FOR_ACK = 90; // by default we wait
90 secs
+
+ /** @scr.property valueRef="DEFAULT_WAIT_FOR_ACK" */
+ private static final String CONFIG_PROPERTY_WAIT_FOR_ACK = "wait.for.ack";
+
/** We check every 30 secs by default. */
private long sleepTime;
/** How often should a job be retried by default. */
private int maxJobRetries;
+ /** How long do we wait for an ack (in ms) */
+ private long waitForAckMs;
+
/** Background session. */
private Session backgroundSession;
@@ -110,10 +120,10 @@
/** List of deleted jobs. */
private Set<String>deletedJobs = new HashSet<String>();
- /** Default clean up time is 10 minutes. */
- private static final int DEFAULT_CLEANUP_PERIOD = 10;
+ /** Default clean up time is 5 minutes. */
+ private static final int DEFAULT_CLEANUP_PERIOD = 5;
- /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" */
+ /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer"
label="%jobcleanup.period.name" description="%jobcleanup.period.description" */
private static final String CONFIG_PROPERTY_CLEANUP_PERIOD =
"cleanup.period";
/** We remove everything which is older than 5 min by default. */
@@ -125,6 +135,9 @@
/** Our component context. */
private ComponentContext componentContext;
+ /** The map of events we're currently processing. */
+ private final Map<String, StartedJobInfo> processingEventsList = new
HashMap<String, StartedJobInfo>();
+
public static ThreadPool JOB_THREAD_POOL;
/**
@@ -139,6 +152,7 @@
this.cleanupPeriod =
OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD),
DEFAULT_CLEANUP_PERIOD);
this.sleepTime =
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
this.maxJobRetries =
OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES),
DEFAULT_MAX_JOB_RETRIES);
+ this.waitForAckMs =
OsgiUtil.toLong(props.get(CONFIG_PROPERTY_WAIT_FOR_ACK), DEFAULT_WAIT_FOR_ACK)
* 1000;
this.componentContext = context;
super.activate(context);
JOB_THREAD_POOL = this.threadPool;
@@ -199,37 +213,63 @@
* @see java.lang.Runnable#run()
*/
public void run() {
- if ( this.cleanupPeriod > 0 && this.running ) {
- this.logger.debug("Cleaning up repository, removing all finished
jobs older than {} minutes.", this.cleanupPeriod);
-
- final String queryString = this.getCleanUpQueryString();
- // we create an own session for concurrency issues
- Session s = null;
- try {
- s = this.createSession();
- final Node parentNode = (Node)s.getItem(this.repositoryPath);
- logger.debug("Executing query {}", queryString);
- final Query q =
s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
- final NodeIterator iter = q.execute().getNodes();
- int count = 0;
- while ( iter.hasNext() ) {
- final Node eventNode = iter.nextNode();
- eventNode.remove();
- count++;
+ if ( this.running ) {
+ // check for jobs that were started but never got an aknowledge
+ final long tooOld = System.currentTimeMillis() - this.waitForAckMs;
+ // to keep the synchronized block as fast as possible we just
store the
+ // jobs to be removed in a new list and process this list
afterwards
+ final List<StartedJobInfo> restartJobs = new
ArrayList<StartedJobInfo>();
+ synchronized ( this.processingEventsList ) {
+ final Iterator<Map.Entry<String, StartedJobInfo>> i =
this.processingEventsList.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, StartedJobInfo> entry = i.next();
+ if ( entry.getValue().started <= tooOld ) {
+ restartJobs.add(entry.getValue());
+ i.remove();
+ }
}
- parentNode.save();
- logger.debug("Removed {} entries from the repository.", count);
+ }
+ final Iterator<StartedJobInfo> jobIter = restartJobs.iterator();
+ while ( jobIter.hasNext() ) {
+ final StartedJobInfo info = jobIter.next();
+ this.logger.info("No acknowledge received for job {} stored at
{}. Requeueing job.", info.event, info.nodePath);
+ this.finishedJob(info.event, info.nodePath, true);
+ }
+
+ // remove obsolete jobs from the repository
+ if ( this.cleanupPeriod > 0 ) {
+ this.logger.debug("Cleaning up repository, removing all
finished jobs older than {} minutes.", this.cleanupPeriod);
+
+ final String queryString = this.getCleanUpQueryString();
+ // we create an own session for concurrency issues
+ Session s = null;
+ try {
+ s = this.createSession();
+ final Node parentNode =
(Node)s.getItem(this.repositoryPath);
+ logger.debug("Executing query {}", queryString);
+ final Query q =
s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
+ final NodeIterator iter = q.execute().getNodes();
+ int count = 0;
+ while ( iter.hasNext() ) {
+ final Node eventNode = iter.nextNode();
+ eventNode.remove();
+ count++;
+ }
+ parentNode.save();
+ logger.debug("Removed {} entries from the repository.",
count);
- } catch (RepositoryException e) {
- // in the case of an error, we just log this as a warning
- this.logger.warn("Exception during repository cleanup.", e);
- } finally {
- if ( s != null ) {
- s.logout();
+ } catch (RepositoryException e) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during repository cleanup.",
e);
+ } finally {
+ if ( s != null ) {
+ s.logout();
+ }
}
}
}
}
+
/**
* @see
org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
*/
@@ -343,10 +383,10 @@
if ( this.running ) {
this.loadJobs();
} else {
- logger.info("Deactivating component due to errors.");
- // deactivate
final ComponentContext ctx = this.componentContext;
+ // deactivate
if ( ctx != null ) {
+ logger.info("Deactivating component {} due to errors during
startup.", ctx.getProperties().get(Constants.SERVICE_ID));
final String name = (String)
componentContext.getProperties().get(
ComponentConstants.COMPONENT_NAME);
ctx.disableComponent(name);
@@ -736,11 +776,18 @@
final String jobTopic =
(String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
boolean unlock = true;
try {
- final Event jobEvent = this.getJobEvent(event,
eventNode.getPath());
+ final String nodePath = eventNode.getPath();
+ final Event jobEvent = this.getJobEvent(event, nodePath);
eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR,
this.applicationId);
eventNode.save();
final EventAdmin localEA = this.eventAdmin;
if ( localEA != null ) {
+ final StartedJobInfo jobInfo = new StartedJobInfo(jobEvent,
nodePath, System.currentTimeMillis());
+ // let's add the event to our processing list
+ synchronized ( this.processingEventsList ) {
+ this.processingEventsList.put(nodePath, jobInfo);
+ }
+
// we need async delivery, otherwise we might create a deadlock
// as this method runs inside a synchronized block and the
finishedJob
// method as well!
@@ -931,11 +978,30 @@
}
/**
+ * @see
org.apache.sling.event.EventUtil.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event,
java.lang.String)
+ */
+ public boolean sendAcknowledge(Event job, String eventNodePath) {
+ synchronized ( this.processingEventsList ) {
+ // if the event is still in the processing list, we confirm the ack
+ final Object ack = this.processingEventsList.remove(eventNodePath);
+ return ack != null;
+ }
+
+ }
+
+ /**
* This is a notification from the component which processed the job.
*
* @see
org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event,
String, boolean)
*/
public boolean finishedJob(Event job, String eventNodePath, boolean
shouldReschedule) {
+ // let's remove the event from our processing list
+ // this is just a sanity check, as usually the job should have been
+ // removed during sendAcknowledge.
+ synchronized ( this.processingEventsList ) {
+ this.processingEventsList.remove(eventNodePath);
+ }
+
boolean reschedule = shouldReschedule;
if ( shouldReschedule ) {
// check if we exceeded the number of retries
@@ -1311,4 +1377,16 @@
return lock;
}
}
+
+ private static final class StartedJobInfo {
+ public final Event event;
+ public final String nodePath;
+ public final long started;
+
+ public StartedJobInfo(final Event e, final String path, final long
started) {
+ this.event = e;
+ this.nodePath = path;
+ this.started = started;
+ }
+ }
}
Modified:
incubator/sling/trunk/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=708132&r1=708131&r2=708132&view=diff
==============================================================================
---
incubator/sling/trunk/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
(original)
+++
incubator/sling/trunk/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Mon Oct 27 03:58:40 2008
@@ -17,11 +17,10 @@
# under the License.
#
-
#
# This file contains localization strings for configuration labels and
# descriptions as used in the metatype.xml descriptor generated by the
-# the Sling SCR plugin
+# the SCR plugin
#
# Distribution Event Handler
@@ -36,23 +35,41 @@
# Job Event Handler
job.events.name = Job Event Handler
job.events.description = Manages job scheduling on a single system as well \
- as a cluster to enable running jobs on a single node only or on all nodes. \
+ as on a cluster. A Job runs only on a single cluster node. \
The respective scheduling is persisted in the repository and distributed \
- amongst the cluster nodes through Repository Events and locally in the nodes \
- through the OSGi Event Admin.
+ amongst the cluster nodes through repository events. The jobs are started \
+ locally on a single cluster node through the OSGi Event Admin.
sleep.time.name = Retry Interval
sleep.time.description = The number of milliseconds to sleep between two \
- consecutive retries of a Job which failed and was set to be retried. The \
- default value is 20 seconds. This value is only relevant if there is a single
\
+ consecutive retries of a job which failed and was set to be retried. The \
+ default value is 30 seconds. This value is only relevant if there is a single
\
failed job in the queue. If there are multiple failed jobs, each job is \
- retried in turn without and intervening delay.
+ retried in turn without an intervening delay.
+
max.job.retries.name = Maximum Retries
max.job.retries.description = The maximum number of times a failed job slated \
for retries is actually retried. If a job has been retried this number of \
times and still fails, it is not rescheduled and assumed to have failed. The \
default value is 10.
+jobscheduler.period.name = Event Cleanup Internal
+jobscheduler.period.description = Interval in seconds in which jobs older than
\
+ a specific age (see Event Cleanup Age) are purged from the repository. \
+ The default value is 5 minutes (300 seconds).
+
+jobcleanup.period.name = Event Cleanup Age
+jobcleanup.period.description = The maximum age in minutes of persisted job to
\
+ be purged from the repository during the cleanup run. The default is 5 \
+ minutes. Note that this setting defines the minimum time an event remains \
+ in the repository.
+
+wait.for.ack.name = Acknowledge Waiting Time
+wait.for.ack.description = If a service is processing a job, it acknowledges
this \
+ by sending a message to the Job Event Handler. If the Job Event Handler does
not \
+ receive such a message in the configured time, it reschedules the job. The
configured \
+ time is in seconds (default is 90 secs).
+
#
# Shared labels
scheduler.period.name = Event Cleanup Internal