User: simone
Date: 00/10/16 16:17:27
Added: src/main/org/jboss/util WorkerQueue.java
Log:
Queue that executes Executable jobs in a worker thread
Revision Changes Path
1.1 jboss/src/main/org/jboss/util/WorkerQueue.java
Index: WorkerQueue.java
===================================================================
/*
* jBoss, the OpenSource EJB server
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.jboss.util;
/**
* Class that queues {@link Executable} jobs that are executed sequentially
* by a single thread.
*
* @see Executable
* @author Simone Bordet ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class WorkerQueue
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
/* The thread that runs the Executable jobs */
private Thread m_queueThread;
/* The job that will be executed by the worker thread */
private JobItem m_currentJob;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Creates a new worker queue with default thread name of "Worker Thread"
*/
public WorkerQueue()
{
this("Worker Thread");
}
/**
* Creates a new worker queue with the specified thread name
*/
public WorkerQueue(String threadName)
{
m_queueThread = new Thread(createQueueLoop(), threadName);
// m_queueThread.setPriority(Thread.MIN_PRIORITY);
}
// Public --------------------------------------------------------
/**
* Starts the worker queue.
* @see #stop
*/
public void start()
{
if (m_queueThread != null) {m_queueThread.start();}
}
/**
* Stops nicely the worker queue. <br>
* After this call trying to put a new job will result in a
* InterruptedException to be thrown. The jobs queued before and not
* yet processed are processed until the queue is empty, then this
* worker queue is cleared.
* @see #clear
* @see #start
* @see #isInterrupted
*/
public synchronized void stop()
{
if (m_queueThread != null) {m_queueThread.interrupt();}
}
/**
* Called by a thread that is not the WorkerQueue thread, this method
* queues the job and, if necessary, wakes up this worker queue that is
* waiting in {@link #getJob}.
*/
public synchronized void putJob(Executable job)
{
// Preconditions
if (m_queueThread == null || !m_queueThread.isAlive()) {throw new
IllegalStateException("Can't put job, thread is not alive or not present");}
if (isInterrupted()) {throw new IllegalStateException("Can't put job,
thread was interrupted");}
putJobImpl(job);
}
/**
* Returns whether the worker thread has been interrupted. <br>
* When this method returns true, it is not possible to put new jobs in the
* queue and the already present jobs are executed and removed from the
* queue, then the thread exits.
* @see #stop
*/
protected boolean isInterrupted()
{
return m_queueThread.isInterrupted();
}
// Z implementation ----------------------------------------------
// Y overrides ---------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
/**
* Called by this class, this method checks if the queue is empty;
* if it is, then waits, else returns the current job.
* @see #putJob
*/
protected synchronized Executable getJob() throws InterruptedException
{
// Preconditions
if (m_queueThread == null || !m_queueThread.isAlive()) {throw new
IllegalStateException();}
return getJobImpl();
}
/**
* Never call this method, only override in subclasses to perform
* job getting in a specific way, normally tied to the data structure
* holding the jobs.
*/
protected Executable getJobImpl() throws InterruptedException
{
// While the queue is empty, wait();
// when notified take an event from the queue and return it.
while (m_currentJob == null) {wait();}
// This one is the job to return
JobItem item = m_currentJob;
// Go on to the next object for the next call.
m_currentJob = m_currentJob.m_next;
return item.m_job;
}
/**
* Never call this method, only override in subclasses to perform
* job adding in a specific way, normally tied to the data structure
* holding the jobs.
*/
protected void putJobImpl(Executable job)
{
JobItem posted = new JobItem(job);
if (m_currentJob == null)
{
// The queue is empty, set the current job to process and
// wake up the thread waiting in method getJob
m_currentJob = posted;
notifyAll();
}
else
{
JobItem item = m_currentJob;
// The queue is not empty, find the end of the queue ad add the
// posted job at the end
while (item.m_next != null) {item = item.m_next;}
item.m_next = posted;
}
}
/**
* Clears the running thread after the queue has been stopped. <br>
* After this call, this worker queue is unusable and can be garbaged.
*/
protected void clear()
{
// System.out.println(m_queueThread.getName() + " has ended.");
m_queueThread = null;
m_currentJob = null;
}
/**
* Creates the loop that will get the next job and process it. <br>
* Override in subclasses to create a custom loop.
*/
protected Runnable createQueueLoop() {return new QueueLoop();}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
/**
* Class that loops getting the next job to be executed and then
* executing it, in the worker thread.
*/
protected class QueueLoop implements Runnable
{
public void run()
{
try
{
while (true)
{
try
{
if (isInterrupted())
{
flush();
break;
}
else
{
getJob().execute();
}
}
catch (InterruptedException ex)
{
try {flush();}
catch (Exception ignored) {}
break;
}
catch (Exception x) {x.printStackTrace();}
}
}
finally {clear();}
}
protected void flush() throws Exception
{
// Empty the queue of the posted jobs and exit
while (m_currentJob != null)
{
m_currentJob.m_job.execute();
m_currentJob = m_currentJob.m_next;
}
}
}
/**
* Simple linked cell, that has only a reference to the next job.
*/
private class JobItem
{
private Executable m_job;
private JobItem m_next;
private JobItem(Executable job) {m_job = job;}
}
}