Author: cziegeler
Date: Fri Aug 29 05:31:34 2008
New Revision: 690208
URL: http://svn.apache.org/viewvc?rev=690208&view=rev
Log:
Fix synchronization for own queues.
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=690208&r1=690207&r2=690208&view=diff
==============================================================================
---
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Fri Aug 29 05:31:34 2008
@@ -328,7 +328,7 @@
true);
// give the system some time to start
try {
- Thread.sleep(1000 * 60 * 2); // 2min
+ Thread.sleep(1000 * 60 * 1); // 1min
} catch (InterruptedException e) {
this.ignoreException(e);
}
@@ -414,66 +414,68 @@
}
if ( info != null && this.running ) {
- final EventInfo processInfo = info;
- info = null;
- if ( this.executeJob(processInfo, jobQueue) ) {
- EventInfo newInfo = null;
- try {
- newInfo = jobQueue.waitForFinish();
- } catch (InterruptedException e) {
- this.ignoreException(e);
- }
- // if we have an info, this is a reschedule
- if ( newInfo != null ) {
- final EventInfo newEventInfo = newInfo;
- final Event job = newInfo.event;
-
- // is this an ordered queue?
- final boolean orderedQueue =
job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
-
- if ( orderedQueue ) {
- // we just sleep for the delay time - if none, we
continue and retry
- // this job again
- if (
job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
- final long delay =
(Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
- try {
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- this.ignoreException(e);
- }
- }
- info = newInfo;
- } else {
- // delay rescheduling?
- if (
job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
- final long delay =
(Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
- final Date fireDate = new Date();
- fireDate.setTime(System.currentTimeMillis() +
delay);
-
- final Runnable t = new Runnable() {
- public void run() {
- try {
- jobQueue.put(newEventInfo);
- } catch (InterruptedException e) {
- // this should never happen
- ignoreException(e);
- }
+ synchronized ( jobQueue.getLock()) {
+ final EventInfo processInfo = info;
+ info = null;
+ if ( this.executeJob(processInfo, jobQueue) ) {
+ EventInfo newInfo = null;
+ try {
+ newInfo = jobQueue.waitForFinish();
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ // if we have an info, this is a reschedule
+ if ( newInfo != null ) {
+ final EventInfo newEventInfo = newInfo;
+ final Event job = newInfo.event;
+
+ // is this an ordered queue?
+ final boolean orderedQueue =
job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_ORDERED) != null;
+
+ if ( orderedQueue ) {
+ // we just sleep for the delay time - if none,
we continue and retry
+ // this job again
+ if (
job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ final long delay =
(Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
}
- };
- try {
- this.scheduler.fireJobAt(null, t, null,
fireDate);
- } catch (Exception e) {
- // we ignore the exception and just put
back the job in the queue
- ignoreException(e);
- t.run();
}
+ info = newInfo;
} else {
- // put directly into queue
- try {
- jobQueue.put(newInfo);
- } catch (InterruptedException e) {
- // this should never happen
- this.ignoreException(e);
+ // delay rescheduling?
+ if (
job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
+ final long delay =
(Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
+ final Date fireDate = new Date();
+
fireDate.setTime(System.currentTimeMillis() + delay);
+
+ final Runnable t = new Runnable() {
+ public void run() {
+ try {
+ jobQueue.put(newEventInfo);
+ } catch (InterruptedException e) {
+ // this should never happen
+ ignoreException(e);
+ }
+ }
+ };
+ try {
+ this.scheduler.fireJobAt(null, t,
null, fireDate);
+ } catch (Exception e) {
+ // we ignore the exception and just
put back the job in the queue
+ ignoreException(e);
+ t.run();
+ }
+ } else {
+ // put directly into queue
+ try {
+ jobQueue.put(newInfo);
+ } catch (InterruptedException e) {
+ // this should never happen
+ this.ignoreException(e);
+ }
}
}
}
@@ -1052,7 +1054,9 @@
synchronized ( this.jobQueues ) {
jobQueue =
this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
}
- jobQueue.notifyFinish(info);
+ synchronized ( jobQueue.getLock()) {
+ jobQueue.notifyFinish(info);
+ }
} else {
// delay rescheduling?
@@ -1097,7 +1101,9 @@
synchronized ( this.jobQueues ) {
jobQueue =
this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME));
}
- jobQueue.notifyFinish(null);
+ synchronized ( jobQueue.getLock()) {
+ jobQueue.notifyFinish(null);
+ }
}
}
if ( !shouldReschedule ) {
@@ -1295,9 +1301,7 @@
private final Object lock = new Object();
public EventInfo waitForFinish() throws InterruptedException {
- synchronized ( this.lock ) {
- this.lock.wait();
- }
+ this.lock.wait();
final EventInfo object = this.eventInfo;
this.eventInfo = null;
return object;
@@ -1305,9 +1309,11 @@
public void notifyFinish(EventInfo i) {
this.eventInfo = i;
- synchronized ( this.lock ) {
- this.lock.notify();
- }
+ this.lock.notify();
+ }
+
+ public Object getLock() {
+ return lock;
}
}
}