User: simone  
  Date: 00/10/16 16:17:27

  Added:       src/main/org/jboss/util WorkerQueue.java
  Log:
  Queue that executes Executable jobs in a worker thread
  
  Revision  Changes    Path
  1.1                  jboss/src/main/org/jboss/util/WorkerQueue.java
  
  Index: WorkerQueue.java
  ===================================================================
  /*
   * jBoss, the OpenSource EJB server
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.jboss.util;
  
  /**
   * Class that queues {@link Executable} jobs that are executed sequentially 
   * by a single thread.
   *
   * @see Executable
   * @author Simone Bordet ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $
   */
  public class WorkerQueue
  {
        // Constants -----------------------------------------------------
  
        // Attributes ----------------------------------------------------
        /* The thread that runs the Executable jobs */
        private Thread m_queueThread;
        /* The job that will be executed by the worker thread */
        private JobItem m_currentJob;
  
        // Static --------------------------------------------------------
  
        // Constructors --------------------------------------------------
        /**
         * Creates a new worker queue with default thread name of "Worker Thread"
         */
        public WorkerQueue() 
        {
                this("Worker Thread");
        }
        /**
         * Creates a new worker queue with the specified thread name
         */
        public WorkerQueue(String threadName) 
        {
                m_queueThread = new Thread(createQueueLoop(), threadName);
  //            m_queueThread.setPriority(Thread.MIN_PRIORITY);
        }
  
        // Public --------------------------------------------------------
        /**
         * Starts the worker queue.
         * @see #stop
         */
        public void start() 
        {
                if (m_queueThread != null) {m_queueThread.start();}
        }
        /**
         * Stops nicely the worker queue. <br> 
         * After this call trying to put a new job will result in a 
         * InterruptedException to be thrown. The jobs queued before and not 
         * yet processed are processed until the queue is empty, then this 
         * worker queue is cleared.
         * @see #clear
         * @see #start
         * @see #isInterrupted
         */
        public synchronized void stop() 
        {
                if (m_queueThread != null) {m_queueThread.interrupt();}
        }
        /**
         * Called by a thread that is not the WorkerQueue thread, this method 
         * queues the job and, if necessary, wakes up this worker queue that is 
         * waiting in {@link #getJob}.
         */
        public synchronized void putJob(Executable job)
        {
                // Preconditions
                if (m_queueThread == null || !m_queueThread.isAlive()) {throw new 
IllegalStateException("Can't put job, thread is not alive or not present");}
                if (isInterrupted()) {throw new IllegalStateException("Can't put job, 
thread was interrupted");}
                
                putJobImpl(job);
        }
        /**
         * Returns whether the worker thread has been interrupted. <br>
         * When this method returns true, it is not possible to put new jobs in the
         * queue and the already present jobs are executed and removed from the 
         * queue, then the thread exits.
         * @see #stop
         */
        protected boolean isInterrupted() 
        {
                return m_queueThread.isInterrupted();
        }
  
        // Z implementation ----------------------------------------------
  
        // Y overrides ---------------------------------------------------
  
        // Package protected ---------------------------------------------
  
        // Protected -----------------------------------------------------
        /**
         * Called by this class, this method checks if the queue is empty; 
         * if it is, then waits, else returns the current job.
         * @see #putJob
         */
        protected synchronized Executable getJob() throws InterruptedException
        {
                // Preconditions
                if (m_queueThread == null || !m_queueThread.isAlive()) {throw new 
IllegalStateException();}
                
                return getJobImpl();
        }
        /**
         * Never call this method, only override in subclasses to perform
         * job getting in a specific way, normally tied to the data structure 
         * holding the jobs.
         */
        protected Executable getJobImpl() throws InterruptedException
        {
                // While the queue is empty, wait();
                // when notified take an event from the queue and return it.
                while (m_currentJob == null) {wait();}
                // This one is the job to return
                JobItem item = m_currentJob;
                // Go on to the next object for the next call. 
                m_currentJob = m_currentJob.m_next;
                return item.m_job;
        }
        /**
         * Never call this method, only override in subclasses to perform
         * job adding in a specific way, normally tied to the data structure
         * holding the jobs.
         */
        protected void putJobImpl(Executable job) 
        {
                JobItem posted = new JobItem(job);              
                if (m_currentJob == null) 
                {
                        // The queue is empty, set the current job to process and
                        // wake up the thread waiting in method getJob
                        m_currentJob = posted;
                        notifyAll();
                }
                else 
                {
                        JobItem item = m_currentJob;
                        // The queue is not empty, find the end of the queue ad add the
                        // posted job at the end
                        while (item.m_next != null) {item = item.m_next;}
                        item.m_next = posted;                   
                }
        }
  
        /**
         * Clears the running thread after the queue has been stopped. <br> 
         * After this call, this worker queue is unusable and can be garbaged.
         */
        protected void clear() 
        {
  //            System.out.println(m_queueThread.getName() + " has ended.");
                m_queueThread = null;
                m_currentJob = null;
        }
        /**
         * Creates the loop that will get the next job and process it. <br>
         * Override in subclasses to create a custom loop.
         */
        protected Runnable createQueueLoop() {return new QueueLoop();}
  
        // Private -------------------------------------------------------
  
        // Inner classes -------------------------------------------------
        /**
         * Class that loops getting the next job to be executed and then 
         * executing it, in the worker thread.
         */
        protected class QueueLoop implements Runnable 
        {
                public void run() 
                {
                        try
                        {
                                while (true) 
                                {
                                        try 
                                        {
                                                if (isInterrupted()) 
                                                {
                                                        flush();
                                                        break;
                                                }
                                                else 
                                                {
                                                        getJob().execute();
                                                }
                                        }
                                        catch (InterruptedException ex) 
                                        {
                                                try {flush();}
                                                catch (Exception ignored) {}
                                                break;
                                        }
                                        catch (Exception x) {x.printStackTrace();}
                                }
                        }
                        finally {clear();}
                }
                protected void flush() throws Exception
                {
                        // Empty the queue of the posted jobs and exit
                        while (m_currentJob != null) 
                        {
                                m_currentJob.m_job.execute();
                                m_currentJob = m_currentJob.m_next;
                        }
                }
        }
        /**
         * Simple linked cell, that has only a reference to the next job.
         */
        private class JobItem 
        {
                private Executable m_job;
                private JobItem m_next;
                private JobItem(Executable job) {m_job = job;}
        }
  }
  
  
  

Reply via email to