Author: cziegeler
Date: Fri Mar 5 14:40:45 2010
New Revision: 919431
URL: http://svn.apache.org/viewvc?rev=919431&view=rev
Log:
SLING-1426 : Spurious wakeups are not handled correctly
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=919431&r1=919430&r2=919431&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
Fri Mar 5 14:40:45 2010
@@ -280,7 +280,7 @@
public static boolean rescheduleJob(Event job) {
final JobStatusNotifier.NotifierContext ctx = getNotifierContext(job);
if ( ctx != null ) {
- return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
+ return ctx.notifier.finishedJob(job, ctx.eventNodePath, true);
}
return false;
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=919431&r1=919430&r2=919431&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Fri Mar 5 14:40:45 2010
@@ -383,7 +383,6 @@
Session s = null;
try {
s = this.createSession();
- final Node parentNode =
(Node)s.getItem(this.repositoryPath);
logger.debug("Executing query {}", queryString);
final Query q =
s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
final NodeIterator iter = q.execute().getNodes();
@@ -393,7 +392,7 @@
eventNode.remove();
count++;
}
- parentNode.save();
+ s.save();
logger.debug("Removed {} entries from the repository.",
count);
} catch (RepositoryException e) {
@@ -816,6 +815,8 @@
EventUtil.toString(info.event),
this.maximumParallelJobs);
}
try {
+ // we don't check in a loop here, if this is a
spurious wakeup
+ // we just process anyway
this.backgroundLock.wait();
} catch (InterruptedException e) {
this.ignoreException(e);
@@ -1223,7 +1224,7 @@
}
/**
- * @see
org.apache.sling.event.EventUtil.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event,
java.lang.String)
+ * @see
org.apache.sling.event.impl.job.JobStatusNotifier#sendAcknowledge(org.osgi.service.event.Event,
java.lang.String)
*/
public boolean sendAcknowledge(Event job, String eventNodePath) {
synchronized ( this.processingEventsList ) {
@@ -1237,23 +1238,15 @@
}
- /**
- * This is a notification from the component which processed the job.
- *
- * @see
org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event,
String, boolean)
- */
- public boolean finishedJob(Event job, String eventNodePath, boolean
shouldReschedule) {
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Received finish for job {},
shouldReschedule={}", EventUtil.toString(job), shouldReschedule);
- }
- // let's remove the event from our processing list
- // this is just a sanity check, as usually the job should have been
- // removed during sendAcknowledge.
- synchronized ( this.processingEventsList ) {
- this.processingEventsList.remove(eventNodePath);
- }
+ public static final class RescheduleInfo {
+ public Event job;
+ public boolean reschedule;
+ }
- boolean reschedule = shouldReschedule;
+ private RescheduleInfo handleReschedule(final Event job, final boolean
shouldReschedule) {
+ final RescheduleInfo info = new RescheduleInfo();
+ info.job = job;
+ info.reschedule = shouldReschedule;
if ( shouldReschedule ) {
// check if we exceeded the number of retries
int retries = this.maxJobRetries;
@@ -1266,14 +1259,14 @@
}
retryCount++;
if ( retries != -1 && retryCount > retries ) {
- reschedule = false;
+ info.reschedule = false;
}
- if ( reschedule ) {
+ if ( info.reschedule ) {
// update event with retry count and retries
final Dictionary<String, Object> newProperties = new
EventPropertiesMap(job);
newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT,
retryCount);
newProperties.put(EventUtil.PROPERTY_JOB_RETRIES, retries);
- job = new Event(job.getTopic(), newProperties);
+ info.job = new Event(job.getTopic(), newProperties);
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Failed job {}",
EventUtil.toString(job));
}
@@ -1290,115 +1283,127 @@
}
this.sendNotification(EventUtil.TOPIC_JOB_FINISHED, job);
}
- final ParallelInfo parInfo = ParallelInfo.getParallelInfo(job);
- EventInfo putback = null;
- // we have to use the same session for unlocking that we used for
locking!
+ return info;
+ }
+
+ /**
+ * This is a notification from the component which processed the job.
+ *
+ * @see
org.apache.sling.event.impl.job.JobStatusNotifier#finishedJob(org.osgi.service.event.Event,
String, boolean)
+ */
+ public boolean finishedJob(Event job, final String eventNodePath, final
boolean shouldReschedule) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Received finish for job {},
shouldReschedule={}", EventUtil.toString(job), shouldReschedule);
+ }
+ // let's remove the event from our processing list
+ // this is just a sanity check, as usually the job should have been
+ // removed during sendAcknowledge.
+ synchronized ( this.processingEventsList ) {
+ this.processingEventsList.remove(eventNodePath);
+ }
+
+ // handle the reschedule, a new job might be returned with updated
reschedule info!
+ final RescheduleInfo rescheduleInfo = this.handleReschedule(job,
shouldReschedule);
+ job = rescheduleInfo.job;
+ final boolean reschedule = rescheduleInfo.reschedule;
+
+ // if this is set after the synchronized block we have an error
+ Boolean errorOccured = null;
synchronized ( this.backgroundLock ) {
- // we might get here asnyc while this service has already been
shutdown!
- if ( this.backgroundSession == null ) {
- checkForNotify(job, null);
- // we can only return false here
- return false;
+ // get the parallel info and unlock
+ final ParallelInfo parInfo = ParallelInfo.getParallelInfo(job);
+ final String jobTopic =
(String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
+ this.unlockState(parInfo, jobTopic);
+
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) == null ) {
+ this.parallelJobCount--;
+ this.backgroundLock.notify();
}
+ // we have to use the same session for unlocking that we used for
locking!
try {
- this.backgroundSession.refresh(false);
- // check if the job has been cancelled
- if ( !this.backgroundSession.itemExists(eventNodePath) ) {
- checkForNotify(job, null);
- return true;
- }
- final Node eventNode = (Node)
this.backgroundSession.getItem(eventNodePath);
- boolean unlock = true;
- try {
- if ( !reschedule ) {
- synchronized ( this.deletedJobs ) {
- this.deletedJobs.add(eventNodePath);
- }
- // unlock node
- try {
- eventNode.unlock();
- } catch (RepositoryException e) {
- // if unlock fails, we silently ignore this
- this.ignoreException(e);
- }
- unlock = false;
- final String jobId =
(String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
- if ( jobId == null ) {
- // remove node from repository if no job is set
- final Node parentNode = eventNode.getParent();
- eventNode.remove();
- parentNode.save();
- } else {
-
eventNode.setProperty(EventHelper.NODE_PROPERTY_FINISHED,
Calendar.getInstance());
- eventNode.save();
- }
- }
- } catch (RepositoryException re) {
- // if an exception occurs, we just log
- this.logger.error("Exception during job finishing.", re);
- } finally {
- final String jobTopic =
(String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
- this.unlockState(parInfo, jobTopic);
-
- if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) ==
null ) {
- this.parallelJobCount--;
- this.backgroundLock.notify();
- }
-
- if ( unlock ) {
- synchronized ( this.deletedJobs ) {
- this.deletedJobs.add(eventNodePath);
- }
- // unlock node
- try {
- eventNode.unlock();
- } catch (RepositoryException e) {
- // if unlock fails, we silently ignore this
- this.ignoreException(e);
- }
+ // we might get here asnyc while this service has already been
shutdown!
+ if ( this.backgroundSession == null ) {
+ // we can only return false here
+ errorOccured = false;
+ } else {
+ this.backgroundSession.refresh(false);
+ // check if the job has been cancelled
+ if ( !this.backgroundSession.itemExists(eventNodePath) ) {
+ errorOccured = true;
}
}
- if ( reschedule ) {
- // update retry count and retries in the repository
- try {
- eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRIES,
(Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRIES));
-
eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT,
(Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT));
- eventNode.save();
- } catch (RepositoryException re) {
- // if an exception occurs, we just log
- this.logger.error("Exception during job updating job
rescheduling information.", re);
+ if ( errorOccured == null ) {
+ synchronized ( this.deletedJobs ) {
+ this.deletedJobs.add(eventNodePath);
}
- final EventInfo info = new EventInfo();
+ final Node eventNode = (Node)
this.backgroundSession.getItem(eventNodePath);
+ // unlock node
try {
- info.event = job;
- info.nodePath = eventNode.getPath();
+ eventNode.unlock();
} catch (RepositoryException e) {
- // this should never happen
+ // if unlock fails, we silently ignore this
this.ignoreException(e);
}
- // if this is an own job queue, we simply signal the queue
to continue
- // it will pick up the event and either reschedule or wait
- if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) !=
null ) {
- checkForNotify(job, info);
+ // update status in repository
+ if ( !reschedule ) {
+ try {
+ final String jobId =
(String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
+ if ( jobId == null ) {
+ // remove node from repository if no job id is
set
+ eventNode.remove();
+ } else {
+ // set finished date - if job id is set
+
eventNode.setProperty(EventHelper.NODE_PROPERTY_FINISHED,
Calendar.getInstance());
+ }
+ this.backgroundSession.save();
+ } catch (RepositoryException re) {
+ // if an exception occurs, we just log
+ this.logger.error("Exception during finished job
update.", re);
+ }
} else {
- putback = info;
+ // update retry count and retries in the repository
+ try {
+
eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRIES,
(Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRIES));
+
eventNode.setProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT,
(Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT));
+ this.backgroundSession.save();
+ } catch (RepositoryException re) {
+ // if an exception occurs, we just log
+ this.logger.error("Exception during job updating
job rescheduling information.", re);
+ }
}
- } else {
- // if this is an own job queue, we simply signal the queue
to continue
- // it will pick up the event and continue with the next
event
- checkForNotify(job, null);
}
} catch (RepositoryException re) {
- this.logger.error("Unable to create new session.", re);
- return false;
+ this.logger.error("Unable to access repository to check job
node.", re);
+ errorOccured = false;
}
}
- if ( putback != null ) {
- this.putBackIntoMainQueue(putback, false);
+ // check for error
+ if ( errorOccured != null ) {
+ checkForNotify(job, null);
+ return errorOccured;
+ }
+ // reschedule
+ if ( reschedule ) {
+ final EventInfo putback = new EventInfo();
+ putback.event = job;
+ putback.nodePath = eventNodePath;
+ // if this is an own job queue, we simply signal the queue to
continue
+ // it will pick up the event and either reschedule or wait
+ if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) {
+ checkForNotify(job, putback);
+ } else {
+ this.putBackIntoMainQueue(putback, false);
+ }
+ } else {
+ // if this is an own job queue, we simply signal the queue to
continue
+ // it will pick up the event and continue with the next event
+ checkForNotify(job, null);
}
+ // if we shouldn't reschedule - we always return true as everything
went fine
if ( !shouldReschedule ) {
return true;
}
+ // if we should reschedule, we return the reschedule status
return reschedule;
}
@@ -1641,9 +1646,8 @@
try {
if ( this.writerSession.itemExists(jobId) ) {
final Item item = this.writerSession.getItem(jobId);
- final Node parentNode = item.getParent();
item.remove();
- parentNode.save();
+ this.writerSession.save();
}
} catch (RepositoryException e) {
this.logger.error("Error during cancelling job at " +
jobId, e);
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java?rev=919431&r1=919430&r2=919431&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/job/JobBlockingQueue.java
Fri Mar 5 14:40:45 2010
@@ -75,9 +75,10 @@
this.isWaiting = true;
this.markForCleanUp = false;
this.logger.debug("Job queue {} is waiting for finish.",
this.queueName);
- this.lock.wait();
+ while ( this.isWaiting ) {
+ this.lock.wait();
+ }
this.logger.debug("Job queue {} is continuing.", this.queueName);
- this.isWaiting = false;
final EventInfo object = this.eventInfo;
this.eventInfo = null;
return object;
@@ -109,9 +110,10 @@
this.isWaiting = true;
this.markForCleanUp = false;
this.logger.debug("Job queue {} is processing {} job - waiting for
a free slot.", this.queueName, jobCount);
- this.lock.wait();
+ while ( this.isWaiting ) {
+ this.lock.wait();
+ }
this.logger.debug("Job queue {} is continuing.", this.queueName);
- this.isWaiting = false;
}
jobCount++;
}
@@ -123,6 +125,7 @@
jobCount--;
if ( this.isWaiting ) {
this.logger.debug("Notifying job queue {} to continue
processing.", this.queueName);
+ this.isWaiting = false;
this.lock.notify();
}
}
@@ -140,6 +143,7 @@
public void notifyFinish(EventInfo i) {
this.eventInfo = i;
this.logger.debug("Notifying job queue {} to continue processing.",
this.queueName);
+ this.isWaiting = false;
this.lock.notify();
}