Author: cziegeler
Date: Mon Jan 5 04:50:12 2009
New Revision: 731544
URL: http://svn.apache.org/viewvc?rev=731544&view=rev
Log:
SLING-635: Remove a thread after two clean up cycles by marking it in the first
cycle to be removed and remove it in the second cycle - if the thread hasn't
been used in the meantime.
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Modified:
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
URL:
http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=731544&r1=731543&r2=731544&view=diff
==============================================================================
---
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
(original)
+++
incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java
Mon Jan 5 04:50:12 2009
@@ -283,6 +283,31 @@
}
}
}
+ // check for idle threads
+ synchronized ( this.jobQueues ) {
+ final Iterator<Map.Entry<String, JobBlockingQueue>> i =
this.jobQueues.entrySet().iterator();
+ while ( i.hasNext() ) {
+ final Map.Entry<String, JobBlockingQueue> current =
i.next();
+ final JobBlockingQueue jbq = current.getValue();
+ if ( jbq.size() == 0 ) {
+ if ( jbq.isMarkedForCleanUp() ) {
+ // set to finished
+ jbq.setFinished(true);
+ // wake up
+ try {
+ jbq.put(new EventInfo());
+ } catch (InterruptedException e) {
+ this.ignoreException(e);
+ }
+ // remove
+ i.remove();
+ } else {
+ // mark to be removed during next cycle
+ jbq.markForCleanUp();
+ }
+ }
+ }
+ }
}
}
@@ -481,7 +506,7 @@
*/
private void runJobQueue(final String queueName, final JobBlockingQueue
jobQueue) {
EventInfo info = null;
- while ( this.running ) {
+ while ( this.running && !jobQueue.isFinished() ) {
if ( info == null ) {
// so let's wait/get the next job from the queue
try {
@@ -492,7 +517,7 @@
}
}
- if ( info != null && this.running ) {
+ if ( info != null && this.running && !jobQueue.isFinished() ) {
synchronized ( jobQueue.getLock()) {
final EventInfo processInfo = info;
info = null;
@@ -1394,8 +1419,13 @@
private boolean isWaiting = false;
+ private boolean markForCleanUp = false;
+
+ private boolean finished = false;
+
public EventInfo waitForFinish() throws InterruptedException {
this.isWaiting = true;
+ this.markForCleanUp = false;
this.lock.wait();
this.isWaiting = false;
final EventInfo object = this.eventInfo;
@@ -1403,6 +1433,16 @@
return object;
}
+ public void markForCleanUp() {
+ if ( !this.isWaiting ) {
+ this.markForCleanUp = true;
+ }
+ }
+
+ public boolean isMarkedForCleanUp() {
+ return !this.isWaiting && this.markForCleanUp;
+ }
+
public void notifyFinish(EventInfo i) {
this.eventInfo = i;
this.lock.notify();
@@ -1415,6 +1455,14 @@
public boolean isWaiting() {
return this.isWaiting;
}
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public void setFinished(boolean flag) {
+ this.finished = flag;
+ }
}
private static final class StartedJobInfo {