deweese 01/08/06 12:38:55 Modified: sources/org/apache/batik/util RunnableQueue.java test-sources/org/apache/batik/util RunnableQueueTest.java Log: 1) Cleaned up notion of 'suspendExecution'. Required addition of transitory 'Suspending' state. 2) Augmented the Test code to test random suspension and resumption by two threads. :) 3) Split synchronization mutexes to allow for adding Runnables while runHandler methods are running (wasn't comfortable assuming these were 'quick'). Revision Changes Path 1.4 +160 -45 xml-batik/sources/org/apache/batik/util/RunnableQueue.java Index: RunnableQueue.java =================================================================== RCS file: /home/cvs/xml-batik/sources/org/apache/batik/util/RunnableQueue.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- RunnableQueue.java 2001/08/06 15:35:13 1.3 +++ RunnableQueue.java 2001/08/06 19:38:55 1.4 @@ -16,17 +16,57 @@ * invocation in a single thread. * * @author <a href="mailto:[EMAIL PROTECTED]">Stephane Hillion</a> - * @version $Id: RunnableQueue.java,v 1.3 2001/08/06 15:35:13 deweese Exp $ + * @version $Id: RunnableQueue.java,v 1.4 2001/08/06 19:38:55 deweese Exp $ */ public class RunnableQueue implements Runnable { /** - * whether this thread is suspended. + * Type-safe enumeration of queue states. */ - protected boolean suspended; + public static class RunnableQueueState extends Object { + final String value; + private RunnableQueueState(String value) { + this.value = value.intern(); } + public String getValue() { return value; } + public String toString() { + return "[RunnableQueueState: " + value + "]"; } + } + + /** + * The queue is in the processes of running tasks. + */ + public static final RunnableQueueState RUNNING + = new RunnableQueueState("Running"); + + /** + * The queue may still be running tasks but as soon as possible + * will go to SUSPENDED state. + */ + public static final RunnableQueueState SUSPENDING + = new RunnableQueueState("Suspending"); + + /** + * The queue is no longer running any tasks and will not + * run any tasks until resumeExecution is called. + */ + public static final RunnableQueueState SUSPENDED + = new RunnableQueueState("Suspended"); + + /** + * The Suspension state of this thread. + */ + protected RunnableQueueState state; + + /** + * Object to synchronize/wait/notify for suspension + * issues. + */ + protected Object stateLock = new Object(); + /** - * The Runnable objects list. + * The Runnable objects list, also used as synchoronization point + * for pushing/poping runables. */ protected DoublyLinkedList list = new DoublyLinkedList(); @@ -65,39 +105,50 @@ public void run() { synchronized (this) { runnableQueueThread = Thread.currentThread(); + // Wake the create method so it knows we are in + // our run and ready to go. notify(); } + Link l; Runnable rable; try { while (!Thread.currentThread().isInterrupted()) { - synchronized (this) { - if (suspended) { - if (runHandler != null) { - runHandler.executionSuspended(this); + + // Mutex for suspention work. + synchronized (stateLock) { + if (state != RUNNING) { + state = SUSPENDED; + + // notify suspendExecution in case it is + // waiting til we shut down. + stateLock.notifyAll(); + + executionSuspended(); + + while (state != RUNNING) { + state = SUSPENDED; + // Wait until resumeExecution called. + stateLock.wait(); } - while (suspended) { - wait(); - } - if (runHandler != null) { - runHandler.executionResumed(this); - } + + executionResumed(); } + } + synchronized (list) { l = (Link)list.pop(); if (l == null) { - wait(); + // No item to run, wait till there is one. + list.wait(); continue; // start loop over again... } + rable = l.runnable; } rable.run(); l.unlock(); - synchronized (this) { - if (runHandler != null) { - runHandler.runnableInvoked(this, rable); - } - } + runnableInvoked(rable); } } catch (InterruptedException e) { } finally { @@ -121,13 +172,15 @@ * An exception is thrown if the RunnableQueue was not started. * @throws IllegalStateException if getThread() is null. */ - public synchronized void invokeLater(Runnable r) { + public void invokeLater(Runnable r) { if (runnableQueueThread == null) { throw new IllegalStateException ("RunnableQueue not started or has exited"); } - list.push(new Link(r)); - notify(); + synchronized (list) { + list.push(new Link(r)); + list.notify(); + } } /** @@ -149,41 +202,71 @@ } LockableLink l = new LockableLink(r); - synchronized (this) { + synchronized (list) { list.push(l); - notify(); + list.notify(); } l.lock(); } - public synchronized boolean isSuspended() { return suspended; } + public RunnableQueueState getQueueState() { + synchronized (stateLock) { + return state; + } + } /** - * Suspends the execution of this queue. - * @throws IllegalStateException if getThread() is null. - */ - public synchronized void suspendExecution() { + * Suspends the execution of this queue after the current runnable + * completes. + * @param waitTillSuspended if true this method will not return + * until the queue has suspended (no runnable in progress + * or about to be in progress). If resumeExecution is + * called while waiting will simply return (this really + * indicates a race condition in your code). This may + * return before an associated RunHandler is notified. + * @throws IllegalStateException if getThread() is null. */ + public void suspendExecution(boolean waitTillSuspended) { if (runnableQueueThread == null) { throw new IllegalStateException ("RunnableQueue not started or has exited"); } - - suspended = true; + synchronized (stateLock) { + if (state == SUSPENDED) + // already suspended... + return; + + if (state == RUNNING) { + state = SUSPENDING; + synchronized (list) { + // Wake up run thread if it is waiting for jobs, + // so we go into the suspended case (notifying + // run-handler etc...) + list.notify(); + } + } + + if (waitTillSuspended) + try { + stateLock.wait(); + } catch(InterruptedException ie) { } + } } /** * Resumes the execution of this queue. * @throws IllegalStateException if getThread() is null. */ - public synchronized void resumeExecution() { + public void resumeExecution() { if (runnableQueueThread == null) { throw new IllegalStateException ("RunnableQueue not started or has exited"); } - if (suspended) { - suspended = false; - notify(); + synchronized (stateLock) { + if (state != RUNNING) { + state = RUNNING; + stateLock.notifyAll(); // wake it up. + } } } @@ -194,21 +277,22 @@ * to suspend (with <tt>suspendExecution()</tt>) the queue. * @throws IllegalStateException if getThread() is null. */ - public synchronized List getRunnableList() { + public List getRunnableList() { if (runnableQueueThread == null) { throw new IllegalStateException ("RunnableQueue not started or has exited"); } List result = new LinkedList(); - Link l, h; - l = h = (Link)list.getHead(); - if (h==null) return result; - do { - result.add(l.runnable); - l = (Link)l.getNext(); - } while (l != h); - + synchronized (list) { + Link l, h; + l = h = (Link)list.getHead(); + if (h==null) return result; + do { + result.add(l.runnable); + l = (Link)l.getNext(); + } while (l != h); + } return result; } @@ -224,6 +308,37 @@ */ public synchronized RunHandler getRunHandler() { return runHandler; + } + + /** + * Called when execution is being suspended. + * Currently just notifies runHandler + */ + protected synchronized void executionSuspended() { + if (runHandler != null) { + runHandler.executionSuspended(this); + } + } + + /** + * Called when execution is being resumed. + * Currently just notifies runHandler + */ + protected synchronized void executionResumed() { + if (runHandler != null) { + runHandler.executionResumed(this); + } + } + + /** + * Called when a Runnable completes. + * Currently just notifies runHandler + * @param rable The runnable that just completed. + */ + protected synchronized void runnableInvoked(Runnable rable ) { + if (runHandler != null) { + runHandler.runnableInvoked(this, rable); + } } /** 1.2 +42 -11 xml-batik/test-sources/org/apache/batik/util/RunnableQueueTest.java Index: RunnableQueueTest.java =================================================================== RCS file: /home/cvs/xml-batik/test-sources/org/apache/batik/util/RunnableQueueTest.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- RunnableQueueTest.java 2001/08/06 15:35:13 1.1 +++ RunnableQueueTest.java 2001/08/06 19:38:55 1.2 @@ -20,6 +20,8 @@ public int nThreads; public int activeThreads; + public Random rand; + public RunnableQueue rq; /** * Constructor @@ -43,32 +45,61 @@ * of the test's internal operation fails. */ public TestReport runImpl() throws Exception { - RunnableQueue rq = RunnableQueue.createRunnableQueue(); + rq = RunnableQueue.createRunnableQueue(); List l = new ArrayList(nThreads); - Random rand = new Random(2345); + rand = new Random(2345); + + // Two switch flickers to make things interesting... + l.add(new SwitchFlicker()); + l.add(new SwitchFlicker()); + for (int i=0; i<nThreads; i++) { - Runnable rqRable = new RQRable(i, rand.nextInt(50)); + Runnable rqRable = new RQRable(i, rand.nextInt(50)+1); l.add(new TPRable(rq, i, rand.nextBoolean(), - rand.nextInt(1000), 20, rqRable)); + rand.nextInt(500)+1, 20, rqRable)); } + synchronized (this) { ThreadPounder tp = new ThreadPounder(l); tp.start(); activeThreads = nThreads; while (activeThreads != 0) { - rq.suspendExecution(); - System.out.println("Suspended"); - wait(rand.nextInt(100)); - if (activeThreads == 0) break; - System.out.println("Resuming"); - rq.resumeExecution(); - wait(rand.nextInt(500)); + wait(); } } System.exit(0); return null; + } + + public class SwitchFlicker implements Runnable { + public void run() { + boolean suspendp, waitp; + int time; + while (true) { + try { + synchronized (rand) { + suspendp = rand.nextBoolean(); + waitp = rand.nextBoolean(); + time = rand.nextInt(500); + } + if (suspendp) { + // 1/2 of the time suspend, 1/2 time wait, 1/2 the + // time don't + rq.suspendExecution(waitp); + System.out.println("Suspended - " + + (waitp?"Wait":"Later")); + Thread.sleep(time/10); + } else { + // 1/2 the time resume + rq.resumeExecution(); + System.out.println("Resumed"); + Thread.sleep(time); + } + } catch(InterruptedException ie) { } + } + } } public class TPRable implements Runnable { --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]