Author: cziegeler
Date: Mon Dec 8 14:16:59 2008
New Revision: 724512
URL: http://svn.apache.org/viewvc?rev=724512&view=rev
Log:
Improve synchronization and thread handling.
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=724512&r1=724511&r2=724512&view=diff
==============================================================================
---
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Mon Dec 8 14:16:59 2008
@@ -172,22 +172,32 @@
synchronized ( this.jobQueues ) {
final Iterator<JobBlockingQueue> i =
this.jobQueues.values().iterator();
while ( i.hasNext() ) {
+ final JobBlockingQueue jbq = i.next();
+ // wake up qeue
+ if ( jbq.isWaiting() ) {
+ synchronized ( jbq.getLock()) {
+ jbq.notifyFinish(null);
+ }
+ }
+ // continue queue processing
try {
- i.next().put(new EventInfo());
+ jbq.put(new EventInfo());
} catch (InterruptedException e) {
this.ignoreException(e);
}
}
}
if ( this.backgroundSession != null ) {
- try {
-
this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
- } catch (RepositoryException e) {
- // we just ignore it
- this.logger.warn("Unable to remove event listener.", e);
+ synchronized ( this.backgroundLock ) {
+ try {
+
this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
+ } catch (RepositoryException e) {
+ // we just ignore it
+ this.logger.warn("Unable to remove event listener.", e);
+ }
+ this.backgroundSession.logout();
+ this.backgroundSession = null;
}
- this.backgroundSession.logout();
- this.backgroundSession = null;
}
this.componentContext = null;
JOB_THREAD_POOL = null;
@@ -433,7 +443,13 @@
* @see java.lang.Runnable#run()
*/
public void run() {
- runJobQueue(queueName, jq);
+ while ( running ) {
+ try {
+ runJobQueue(queueName, jq);
+ } catch (Throwable t) {
+ logger.error("Job queue stopped
with exception: " + t.getMessage() + ". Restarting.", t);
+ }
+ }
}
});
@@ -1036,6 +1052,11 @@
EventInfo putback = null;
// we have to use the same session for unlocking that we used for
locking!
synchronized ( this.backgroundLock ) {
+ // we might get here asnyc while this service has already been
shutdown!
+ if ( this.backgroundSession == null ) {
+ // we can only return false here
+ return false;
+ }
try {
this.backgroundSession.refresh(false);
// check if the job has been cancelled
@@ -1137,7 +1158,7 @@
}
} else {
// if this is an own job queue, we simply signal the queue
to continue
- // it will pick up the event continue with the next event
+ // it will pick up the event and continue with the next
event
if ( job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) !=
null ) {
// we know the queue exists
final JobBlockingQueue jobQueue;
@@ -1367,8 +1388,12 @@
private final Object lock = new Object();
+ private boolean isWaiting = false;
+
public EventInfo waitForFinish() throws InterruptedException {
+ this.isWaiting = true;
this.lock.wait();
+ this.isWaiting = false;
final EventInfo object = this.eventInfo;
this.eventInfo = null;
return object;
@@ -1382,6 +1407,10 @@
public Object getLock() {
return lock;
}
+
+ public boolean isWaiting() {
+ return this.isWaiting;
+ }
}
private static final class StartedJobInfo {