User: oleg
Date: 00/11/08 08:43:29
Modified: src/main/org/jboss/util WorkerQueue.java
Log:
Minor improvement to Exception logging: EJBException is unwrapped
Revision Changes Path
1.3 +225 -208 jboss/src/main/org/jboss/util/WorkerQueue.java
Index: WorkerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/jboss/src/main/org/jboss/util/WorkerQueue.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- WorkerQueue.java 2000/10/19 21:51:59 1.2
+++ WorkerQueue.java 2000/11/08 16:43:29 1.3
@@ -6,222 +6,239 @@
*/
package org.jboss.util;
+
+import javax.ejb.EJBException;
+
+import org.jboss.logging.Logger;
+
+
+
/**
* Class that queues {@link Executable} jobs that are executed sequentially
* by a single thread.
*
* @see Executable
* @author Simone Bordet ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class WorkerQueue
{
- // Constants -----------------------------------------------------
+ // Constants -----------------------------------------------------
- // Attributes ----------------------------------------------------
- /* The thread that runs the Executable jobs */
- protected Thread m_queueThread;
- /* The job that will be executed by the worker thread */
- private JobItem m_currentJob;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
- /**
- * Creates a new worker queue with default thread name of "Worker Thread"
- */
- public WorkerQueue()
- {
- this("Worker Thread");
- }
- /**
- * Creates a new worker queue with the specified thread name
- */
- public WorkerQueue(String threadName)
- {
- m_queueThread = new Thread(createQueueLoop(), threadName);
-// m_queueThread.setPriority(Thread.MIN_PRIORITY);
- }
-
- // Public --------------------------------------------------------
- /**
- * Starts the worker queue.
- * @see #stop
- */
- public void start()
- {
- if (m_queueThread != null) {m_queueThread.start();}
- }
- /**
- * Stops nicely the worker queue. <br>
- * After this call trying to put a new job will result in a
- * InterruptedException to be thrown. The jobs queued before and not
- * yet processed are processed until the queue is empty, then this
- * worker queue is cleared.
- * @see #clear
- * @see #start
- * @see #isInterrupted
- */
- public synchronized void stop()
- {
- if (m_queueThread != null) {m_queueThread.interrupt();}
- }
- /**
- * Called by a thread that is not the WorkerQueue thread, this method
- * queues the job and, if necessary, wakes up this worker queue that is
- * waiting in {@link #getJob}.
- */
- public synchronized void putJob(Executable job)
- {
- // Preconditions
- if (m_queueThread == null || !m_queueThread.isAlive()) {throw new
IllegalStateException("Can't put job, thread is not alive or not present");}
- if (isInterrupted()) {throw new IllegalStateException("Can't put job,
thread was interrupted");}
-
- putJobImpl(job);
- }
- /**
- * Returns whether the worker thread has been interrupted. <br>
- * When this method returns true, it is not possible to put new jobs in the
- * queue and the already present jobs are executed and removed from the
- * queue, then the thread exits.
- * @see #stop
- */
- protected boolean isInterrupted()
- {
- return m_queueThread.isInterrupted();
- }
-
- // Z implementation ----------------------------------------------
-
- // Y overrides ---------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
- /**
- * Called by this class, this method checks if the queue is empty;
- * if it is, then waits, else returns the current job.
- * @see #putJob
- */
- protected synchronized Executable getJob() throws InterruptedException
- {
- // Preconditions
- if (m_queueThread == null || !m_queueThread.isAlive()) {throw new
IllegalStateException();}
-
- return getJobImpl();
- }
- /**
- * Never call this method, only override in subclasses to perform
- * job getting in a specific way, normally tied to the data structure
- * holding the jobs.
- */
- protected Executable getJobImpl() throws InterruptedException
- {
- // While the queue is empty, wait();
- // when notified take an event from the queue and return it.
- while (m_currentJob == null) {wait();}
- // This one is the job to return
- JobItem item = m_currentJob;
- // Go on to the next object for the next call.
- m_currentJob = m_currentJob.m_next;
- return item.m_job;
- }
- /**
- * Never call this method, only override in subclasses to perform
- * job adding in a specific way, normally tied to the data structure
- * holding the jobs.
- */
- protected void putJobImpl(Executable job)
- {
- JobItem posted = new JobItem(job);
- if (m_currentJob == null)
- {
- // The queue is empty, set the current job to process and
- // wake up the thread waiting in method getJob
- m_currentJob = posted;
- notifyAll();
- }
- else
- {
- JobItem item = m_currentJob;
- // The queue is not empty, find the end of the queue ad add the
- // posted job at the end
- while (item.m_next != null) {item = item.m_next;}
- item.m_next = posted;
- }
- }
-
- /**
- * Clears the running thread after the queue has been stopped. <br>
- * After this call, this worker queue is unusable and can be garbaged.
- */
- protected void clear()
- {
-// System.out.println(m_queueThread.getName() + " has ended.");
- m_queueThread = null;
- m_currentJob = null;
- }
- /**
- * Creates the loop that will get the next job and process it. <br>
- * Override in subclasses to create a custom loop.
- */
- protected Runnable createQueueLoop() {return new QueueLoop();}
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
- /**
- * Class that loops getting the next job to be executed and then
- * executing it, in the worker thread.
- */
- protected class QueueLoop implements Runnable
- {
- public void run()
- {
- try
- {
- while (true)
- {
- try
- {
- if (isInterrupted())
- {
- flush();
- break;
- }
- else
- {
- getJob().execute();
- }
- }
- catch (InterruptedException ex)
- {
- try {flush();}
- catch (Exception ignored) {}
- break;
- }
- catch (Exception x) {x.printStackTrace();}
- }
- }
- finally {clear();}
- }
- protected void flush() throws Exception
- {
- // Empty the queue of the posted jobs and exit
- while (m_currentJob != null)
- {
- m_currentJob.m_job.execute();
- m_currentJob = m_currentJob.m_next;
- }
- }
- }
- /**
- * Simple linked cell, that has only a reference to the next job.
- */
- private class JobItem
- {
- private Executable m_job;
- private JobItem m_next;
- private JobItem(Executable job) {m_job = job;}
- }
+ // Attributes ----------------------------------------------------
+ /* The thread that runs the Executable jobs */
+ protected Thread m_queueThread;
+ /* The job that will be executed by the worker thread */
+ private JobItem m_currentJob;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ /**
+ * Creates a new worker queue with default thread name of "Worker Thread"
+ */
+ public WorkerQueue()
+ {
+ this("Worker Thread");
+ }
+ /**
+ * Creates a new worker queue with the specified thread name
+ */
+ public WorkerQueue(String threadName)
+ {
+ m_queueThread = new Thread(createQueueLoop(), threadName);
+// m_queueThread.setPriority(Thread.MIN_PRIORITY);
+ }
+
+ // Public --------------------------------------------------------
+ /**
+ * Starts the worker queue.
+ * @see #stop
+ */
+ public void start()
+ {
+ if (m_queueThread != null) {m_queueThread.start();}
+ }
+ /**
+ * Stops nicely the worker queue. <br>
+ * After this call trying to put a new job will result in a
+ * InterruptedException to be thrown. The jobs queued before and not
+ * yet processed are processed until the queue is empty, then this
+ * worker queue is cleared.
+ * @see #clear
+ * @see #start
+ * @see #isInterrupted
+ */
+ public synchronized void stop()
+ {
+ if (m_queueThread != null) {m_queueThread.interrupt();}
+ }
+ /**
+ * Called by a thread that is not the WorkerQueue thread, this method
+ * queues the job and, if necessary, wakes up this worker queue that is
+ * waiting in {@link #getJob}.
+ */
+ public synchronized void putJob(Executable job)
+ {
+ // Preconditions
+ if (m_queueThread == null || !m_queueThread.isAlive()) {throw new
IllegalStateException("Can't put job, thread is not alive or not present");}
+ if (isInterrupted()) {throw new IllegalStateException("Can't put job,
thread was interrupted");}
+
+ putJobImpl(job);
+ }
+ /**
+ * Returns whether the worker thread has been interrupted. <br>
+ * When this method returns true, it is not possible to put new jobs in the
+ * queue and the already present jobs are executed and removed from the
+ * queue, then the thread exits.
+ * @see #stop
+ */
+ protected boolean isInterrupted()
+ {
+ return m_queueThread.isInterrupted();
+ }
+
+ // Z implementation ----------------------------------------------
+
+ // Y overrides ---------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+ /**
+ * Called by this class, this method checks if the queue is empty;
+ * if it is, then waits, else returns the current job.
+ * @see #putJob
+ */
+ protected synchronized Executable getJob() throws InterruptedException
+ {
+ // Preconditions
+ if (m_queueThread == null || !m_queueThread.isAlive()) {throw new
IllegalStateException();}
+
+ return getJobImpl();
+ }
+ /**
+ * Never call this method, only override in subclasses to perform
+ * job getting in a specific way, normally tied to the data structure
+ * holding the jobs.
+ */
+ protected Executable getJobImpl() throws InterruptedException
+ {
+ // While the queue is empty, wait();
+ // when notified take an event from the queue and return it.
+ while (m_currentJob == null) {wait();}
+ // This one is the job to return
+ JobItem item = m_currentJob;
+ // Go on to the next object for the next call.
+ m_currentJob = m_currentJob.m_next;
+ return item.m_job;
+ }
+ /**
+ * Never call this method, only override in subclasses to perform
+ * job adding in a specific way, normally tied to the data structure
+ * holding the jobs.
+ */
+ protected void putJobImpl(Executable job)
+ {
+ JobItem posted = new JobItem(job);
+ if (m_currentJob == null)
+ {
+ // The queue is empty, set the current job to process and
+ // wake up the thread waiting in method getJob
+ m_currentJob = posted;
+ notifyAll();
+ }
+ else
+ {
+ JobItem item = m_currentJob;
+ // The queue is not empty, find the end of the queue ad add the
+ // posted job at the end
+ while (item.m_next != null) {item = item.m_next;}
+ item.m_next = posted;
+ }
+ }
+
+ /**
+ * Clears the running thread after the queue has been stopped. <br>
+ * After this call, this worker queue is unusable and can be garbaged.
+ */
+ protected void clear()
+ {
+// System.out.println(m_queueThread.getName() + " has ended.");
+ m_queueThread = null;
+ m_currentJob = null;
+ }
+ /**
+ * Creates the loop that will get the next job and process it. <br>
+ * Override in subclasses to create a custom loop.
+ */
+ protected Runnable createQueueLoop() {return new QueueLoop();}
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+ /**
+ * Class that loops getting the next job to be executed and then
+ * executing it, in the worker thread.
+ */
+ protected class QueueLoop implements Runnable
+ {
+ public void run()
+ {
+ try
+ {
+ while (true)
+ {
+ try
+ {
+ if (isInterrupted())
+ {
+ flush();
+ break;
+ }
+ else
+ {
+ getJob().execute();
+ }
+ }
+ catch (InterruptedException ex)
+ {
+ try {flush();}
+ catch (Exception ignored) {}
+ break;
+ }
+ catch (Exception x) {
+ // Log system exceptions
+ if (x instanceof EJBException)
+ {
+ Logger.error("BEAN EXCEPTION:"+x.getMessage());
+ if (((EJBException)x).getCausedByException() != null)
+
Logger.exception(((EJBException)x).getCausedByException());
+ } else {
+ Logger.exception(x);
+ }
+ }
+ }
+ }
+ finally {clear();}
+ }
+ protected void flush() throws Exception
+ {
+ // Empty the queue of the posted jobs and exit
+ while (m_currentJob != null)
+ {
+ m_currentJob.m_job.execute();
+ m_currentJob = m_currentJob.m_next;
+ }
+ }
+ }
+ /**
+ * Simple linked cell, that has only a reference to the next job.
+ */
+ private class JobItem
+ {
+ private Executable m_job;
+ private JobItem m_next;
+ private JobItem(Executable job) {m_job = job;}
+ }
}