deweese     01/08/06 12:38:55

  Modified:    sources/org/apache/batik/util RunnableQueue.java
               test-sources/org/apache/batik/util RunnableQueueTest.java
  Log:
  1) Cleaned up notion of 'suspendExecution'.
     Required addition of transitory 'Suspending' state.
  2) Augmented the Test code to test random suspension and resumption by
     two threads. :)
  3) Split synchronization mutexes to allow for adding Runnables while
     runHandler methods are running (wasn't comfortable assuming these
     were 'quick').
  
  Revision  Changes    Path
  1.4       +160 -45   xml-batik/sources/org/apache/batik/util/RunnableQueue.java
  
  Index: RunnableQueue.java
  ===================================================================
  RCS file: /home/cvs/xml-batik/sources/org/apache/batik/util/RunnableQueue.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- RunnableQueue.java        2001/08/06 15:35:13     1.3
  +++ RunnableQueue.java        2001/08/06 19:38:55     1.4
  @@ -16,17 +16,57 @@
    * invocation in a single thread.
    *
    * @author <a href="mailto:[EMAIL PROTECTED]";>Stephane Hillion</a>
  - * @version $Id: RunnableQueue.java,v 1.3 2001/08/06 15:35:13 deweese Exp $
  + * @version $Id: RunnableQueue.java,v 1.4 2001/08/06 19:38:55 deweese Exp $
    */
   public class RunnableQueue implements Runnable {
   
       /**
  -     * whether this thread is suspended.
  +     * Type-safe enumeration of queue states.
        */
  -    protected boolean suspended;
  +    public static class RunnableQueueState extends Object {
  +        final String value;
  +        private RunnableQueueState(String value) {
  +            this.value = value.intern(); }
  +        public String getValue() { return value; }
  +        public String toString() { 
  +            return "[RunnableQueueState: " + value + "]"; }
  +    }
  +
  +    /**
  +     * The queue is in the processes of running tasks.
  +     */
  +    public static final RunnableQueueState RUNNING 
  +        = new RunnableQueueState("Running");
  +
  +    /**
  +     * The queue may still be running tasks but as soon as possible
  +     * will go to SUSPENDED state.
  +     */
  +    public static final RunnableQueueState SUSPENDING
  +        = new RunnableQueueState("Suspending");
  +
  +    /**
  +     * The queue is no longer running any tasks and will not
  +     * run any tasks until resumeExecution is called.
  +     */
  +    public static final RunnableQueueState SUSPENDED
  +        = new RunnableQueueState("Suspended");
  +
  +    /**
  +     * The Suspension state of this thread.
  +     */
  +    protected RunnableQueueState state;
  +
  +    /**
  +     * Object to synchronize/wait/notify for suspension
  +     * issues.
  +     */
  +    protected Object stateLock = new Object();
   
  +
       /**
  -     * The Runnable objects list.
  +     * The Runnable objects list, also used as synchoronization point
  +     * for pushing/poping runables.
        */
       protected DoublyLinkedList list = new DoublyLinkedList();
   
  @@ -65,39 +105,50 @@
       public void run() {
           synchronized (this) {
               runnableQueueThread = Thread.currentThread();
  +            // Wake the create method so it knows we are in
  +            // our run and ready to go.
               notify();
           }
  +
           Link l;
           Runnable rable;
           try {
               while (!Thread.currentThread().isInterrupted()) {
  -                synchronized (this) {
  -                    if (suspended) {
  -                        if (runHandler != null) {
  -                            runHandler.executionSuspended(this);
  +                
  +                // Mutex for suspention work.
  +                synchronized (stateLock) {
  +                    if (state != RUNNING) {
  +                        state = SUSPENDED;
  +
  +                        // notify suspendExecution in case it is
  +                        // waiting til we shut down.
  +                        stateLock.notifyAll();
  +
  +                        executionSuspended();
  +
  +                        while (state != RUNNING) {
  +                            state = SUSPENDED;
  +                            // Wait until resumeExecution called.
  +                            stateLock.wait();
                           }
  -                        while (suspended) {
  -                            wait();
  -                        }
  -                        if (runHandler != null) {
  -                            runHandler.executionResumed(this);
  -                        }
  +
  +                        executionResumed();
                       }
  +                }
   
  +                synchronized (list) {
                       l = (Link)list.pop();
                       if (l == null) {
  -                        wait();
  +                        // No item to run, wait till there is one.
  +                        list.wait();
                           continue; // start loop over again...
                       }
  +
                       rable = l.runnable;
                   }
                   rable.run();
                   l.unlock();
  -                synchronized (this) {
  -                    if (runHandler != null) {
  -                        runHandler.runnableInvoked(this, rable);
  -                    }
  -                }
  +                runnableInvoked(rable);
               }
           } catch (InterruptedException e) {
           } finally {
  @@ -121,13 +172,15 @@
        * An exception is thrown if the RunnableQueue was not started.
        * @throws IllegalStateException if getThread() is null.
        */
  -    public synchronized void invokeLater(Runnable r) {
  +    public void invokeLater(Runnable r) {
           if (runnableQueueThread == null) {
               throw new IllegalStateException
                   ("RunnableQueue not started or has exited");
           }
  -        list.push(new Link(r));
  -        notify();
  +        synchronized (list) {
  +            list.push(new Link(r));
  +            list.notify();
  +        }
       }
   
       /**
  @@ -149,41 +202,71 @@
           }
   
           LockableLink l = new LockableLink(r);
  -        synchronized (this) {
  +        synchronized (list) {
               list.push(l);
  -            notify();
  +            list.notify();
           }
           l.lock();
       }
   
  -    public synchronized boolean isSuspended() { return suspended; }
  +    public RunnableQueueState getQueueState() { 
  +        synchronized (stateLock) {
  +            return state; 
  +        }
  +    }
   
       /**
  -     * Suspends the execution of this queue.
  -     * @throws IllegalStateException if getThread() is null.
  -     */
  -    public synchronized void suspendExecution() {
  +     * Suspends the execution of this queue after the current runnable
  +     * completes.
  +     * @param waitTillSuspended if true this method will not return
  +     *        until the queue has suspended (no runnable in progress
  +     *        or about to be in progress). If resumeExecution is
  +     *        called while waiting will simply return (this really
  +     *        indicates a race condition in your code).  This may
  +     *        return before an associated RunHandler is notified.
  +     * @throws IllegalStateException if getThread() is null.  */
  +    public void suspendExecution(boolean waitTillSuspended) {
           if (runnableQueueThread == null) {
               throw new IllegalStateException
                   ("RunnableQueue not started or has exited");
           }
  -        
  -        suspended = true;
  +        synchronized (stateLock) {
  +            if (state == SUSPENDED) 
  +                // already suspended...
  +                return;
  +
  +            if (state == RUNNING) {
  +                state = SUSPENDING;
  +                synchronized (list) {
  +                    // Wake up run thread if it is waiting for jobs,
  +                    // so we go into the suspended case (notifying
  +                    // run-handler etc...)
  +                    list.notify();
  +                }
  +            }
  +
  +            if (waitTillSuspended)
  +                try {
  +                    stateLock.wait();
  +                } catch(InterruptedException ie) { }
  +        }
       }
   
       /**
        * Resumes the execution of this queue.
        * @throws IllegalStateException if getThread() is null.
        */
  -    public synchronized void resumeExecution() {
  +    public void resumeExecution() {
           if (runnableQueueThread == null) {
               throw new IllegalStateException
                   ("RunnableQueue not started or has exited");
           }
   
  -        if (suspended) {
  -            suspended = false;
  -            notify();
  +        synchronized (stateLock) {
  +            if (state != RUNNING) {
  +                state = RUNNING;
  +                stateLock.notifyAll(); // wake it up.
  +            }
           }
       }
   
  @@ -194,21 +277,22 @@
        * to suspend (with <tt>suspendExecution()</tt>) the queue.
        * @throws IllegalStateException if getThread() is null.
        */
  -    public synchronized List getRunnableList() {
  +    public List getRunnableList() {
           if (runnableQueueThread == null) {
               throw new IllegalStateException
                   ("RunnableQueue not started or has exited");
           }
   
           List result = new LinkedList();
  -        Link l, h;
  -        l = h = (Link)list.getHead();
  -        if (h==null) return result;
  -        do {
  -            result.add(l.runnable);
  -            l = (Link)l.getNext();
  -        } while (l != h);
  -
  +        synchronized (list) {
  +            Link l, h;
  +            l = h = (Link)list.getHead();
  +            if (h==null) return result;
  +            do {
  +                result.add(l.runnable);
  +                l = (Link)l.getNext();
  +            } while (l != h);
  +        }
           return result;
       }
   
  @@ -224,6 +308,37 @@
        */
       public synchronized RunHandler getRunHandler() {
           return runHandler;
  +    }
  +
  +    /**
  +     * Called when execution is being suspended.
  +     * Currently just notifies runHandler
  +     */
  +    protected synchronized void executionSuspended() {
  +        if (runHandler != null) {
  +            runHandler.executionSuspended(this);
  +        }
  +    }
  +
  +    /**
  +     * Called when execution is being resumed.
  +     * Currently just notifies runHandler
  +     */
  +    protected synchronized void executionResumed() {
  +        if (runHandler != null) {
  +            runHandler.executionResumed(this);
  +        }
  +    }
  +        
  +    /**
  +     * Called when a Runnable completes.
  +     * Currently just notifies runHandler
  +     * @param rable The runnable that just completed.
  +     */
  +    protected synchronized void runnableInvoked(Runnable rable ) {
  +        if (runHandler != null) {
  +            runHandler.runnableInvoked(this, rable);
  +        }
       }
   
       /**
  
  
  
  1.2       +42 -11    
xml-batik/test-sources/org/apache/batik/util/RunnableQueueTest.java
  
  Index: RunnableQueueTest.java
  ===================================================================
  RCS file: 
/home/cvs/xml-batik/test-sources/org/apache/batik/util/RunnableQueueTest.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- RunnableQueueTest.java    2001/08/06 15:35:13     1.1
  +++ RunnableQueueTest.java    2001/08/06 19:38:55     1.2
  @@ -20,6 +20,8 @@
   
       public int nThreads;
       public int activeThreads;
  +    public Random rand;
  +    public RunnableQueue rq;
   
       /**
        * Constructor
  @@ -43,32 +45,61 @@
        * of the test's internal operation fails.
        */
       public TestReport runImpl() throws Exception {
  -        RunnableQueue rq = RunnableQueue.createRunnableQueue();
  +        rq = RunnableQueue.createRunnableQueue();
   
           List l = new ArrayList(nThreads);
  -        Random rand = new Random(2345);
  +        rand = new Random(2345);
  +
  +        // Two switch flickers to make things interesting...
  +        l.add(new SwitchFlicker());
  +        l.add(new SwitchFlicker());
  +
           for (int i=0; i<nThreads; i++) {
  -            Runnable rqRable = new RQRable(i, rand.nextInt(50));
  +            Runnable rqRable = new RQRable(i, rand.nextInt(50)+1);
               l.add(new TPRable(rq, i, rand.nextBoolean(),
  -                              rand.nextInt(1000), 20, rqRable));
  +                              rand.nextInt(500)+1, 20, rqRable));
           }
  +
           synchronized (this) {
               ThreadPounder tp = new ThreadPounder(l);
               tp.start();
               activeThreads = nThreads;
               while (activeThreads != 0) {
  -                rq.suspendExecution();
  -                System.out.println("Suspended");
  -                wait(rand.nextInt(100));
  -                if (activeThreads == 0) break;
  -                System.out.println("Resuming");
  -                rq.resumeExecution();
  -                wait(rand.nextInt(500));
  +                wait();
               }
           }
   
           System.exit(0);
           return null;
  +    }
  +
  +    public class SwitchFlicker implements Runnable {
  +        public void run() {
  +            boolean suspendp, waitp;
  +            int time;
  +            while (true) {
  +                try {
  +                    synchronized (rand) {
  +                        suspendp = rand.nextBoolean();
  +                        waitp = rand.nextBoolean();
  +                        time  = rand.nextInt(500);
  +                    }
  +                    if (suspendp) {
  +                        // 1/2 of the time suspend, 1/2 time wait, 1/2 the
  +                        // time don't
  +                        rq.suspendExecution(waitp);
  +                        System.out.println("Suspended - " + 
  +                                           (waitp?"Wait":"Later"));
  +                        Thread.sleep(time/10);
  +                    } else {
  +                        // 1/2 the time resume
  +                        rq.resumeExecution();
  +                        System.out.println("Resumed");
  +                        Thread.sleep(time);
  +                    }
  +                } catch(InterruptedException ie) { }
  +            }
  +        }
       }
   
       public class TPRable implements Runnable {
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to