This patch makes the CacheEventQueue not spawn unnecessary threads, and keep
them alive for a short time afterwards in case new events come in.
Pay close attention to the (javadoced) THREAD_TIMEOUT constant. This
determines how long a thread lives after emptying out the queue waiting for
more events to come in. It may be desirable at some point in the future to
make this a configuration option.
I did my very best to not let formatting inconsistencies find their way
in... but there's:
A) No way to tell Eclipse to put throws declarations on the next line, and
B) No standard for throws declaration in the Apache coding standards (that I
could find).
Also, this file isn't whitespace conformant to the Apache standards, so my
patch fixes that as well.
-Travis Savo
Index: CacheEventQueue.java
===================================================================
RCS file:
/home/cvspublic/jakarta-turbine-jcs/src/java/org/apache/jcs/engine/CacheEven
tQueue.java,v
retrieving revision 1.7
diff -u -r1.7 CacheEventQueue.java
--- CacheEventQueue.java 17 Apr 2004 14:00:11 -0000 1.7
+++ CacheEventQueue.java 14 May 2004 00:56:33 -0000
@@ -1,6 +1,5 @@
package org.apache.jcs.engine;
-
/*
* Copyright 2001-2004 The Apache Software Foundation.
*
@@ -16,11 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-
import java.io.IOException;
import java.io.Serializable;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.jcs.engine.behavior.ICacheElement;
@@ -29,46 +25,45 @@
/**
* An event queue is used to propagate ordered cache events to one and only
one
- * target listener.
- *
- * <pre>
+ * target listener.
+ * <pre> *
* Changes:<br>
* 17 April 2004 Hanson Char
* <ol><li>Bug fix: add missing synchronization to method
addRemoveEvent();</li>
- * <li>Use the light weight new int[0] for creating the object monitor
queueLock,
- * instead of new Object();</li>
+ * <li>Use the light weight new int[0] for creating the object monitor
queueLock, instead of new Object();</li>
* <li>Explicitely qualify member variables of CacheEventQueue in inner
classes.
* Hopefully this will help identify any potential concurrency issue.</li>
* </ol>
+ * 13 May 2004 Travis Savo<br>
+ * Changed to not spawn a thread when the queue isn't in use, and not kill
said thread until THREAD_TIMEOUT
+ * has passed without a new event.
* </pre>
*/
public class CacheEventQueue implements ICacheEventQueue
{
- private final static Log log = LogFactory.getLog( CacheEventQueue.class
);
+ private final static Log log = LogFactory.getLog( CacheEventQueue.class
);
+ /**
+ * Number of milliseconds after emptying out a queue to wait until
killing the processor thread.
+ */
+ private static final long THREAD_TIMEOUT = 60*1000;
+
private static int processorInstanceCount = 0;
-
// private LinkedQueue queue = new LinkedQueue();
-
private ICacheListener listener;
private byte listenerId;
private String cacheName;
-
private int failureCount;
private int maxFailure;
-
// in milliseconds
private int waitBeforeRetry;
-
private boolean destroyed;
- private Thread t;
-
+ private boolean working;
+
+ private Thread processorThread;
// Internal queue implementation
-
private Object queueLock = new int[0];
-
// Dummy node
-
private Node head = new Node();
private Node tail = head;
@@ -79,9 +74,7 @@
* @param listenerId
* @param cacheName
*/
- public CacheEventQueue( ICacheListener listener,
- byte listenerId,
- String cacheName )
+ public CacheEventQueue( ICacheListener listener, byte listenerId,
String cacheName)
{
this( listener, listenerId, cacheName, 10, 500 );
}
@@ -95,26 +88,19 @@
* @param maxFailure
* @param waitBeforeRetry
*/
- public CacheEventQueue( ICacheListener listener,
- byte listenerId,
- String cacheName,
- int maxFailure,
- int waitBeforeRetry )
+ public CacheEventQueue( ICacheListener listener, byte listenerId,
String cacheName, int maxFailure, int waitBeforeRetry)
{
if ( listener == null )
{
throw new IllegalArgumentException( "listener must not be null"
);
}
-
this.listener = listener;
this.listenerId = listenerId;
this.cacheName = cacheName;
this.maxFailure = maxFailure <= 0 ? 10 : maxFailure;
this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 :
waitBeforeRetry;
-
- this.t = new QProcessor();
- this.t.start();
-
+ this.processorThread = new QProcessor();
+ this.processorThread.start();
if ( log.isDebugEnabled() )
{
log.debug( "Constructed: " + this );
@@ -129,21 +115,26 @@
if ( !this.destroyed )
{
this.destroyed = true;
-
// sychronize on queue so the thread will not wait forever,
// and then interrupt the QueueProcessor
-
synchronized ( this.queueLock )
{
- this.t.interrupt();
+ this.processorThread.interrupt();
}
-
- this.t = null;
-
+ this.processorThread = null;
log.info( "Cache event queue destroyed: " + this );
}
}
-
+
+ /**
+ * Event Q is emtpy.
+ */
+ public synchronized void stopProcessing()
+ {
+ working = false;
+ processorThread = null;
+ }
+
/**
* @return
*/
@@ -160,6 +151,11 @@
return ( !this.destroyed );
}
+ private boolean isWorking()
+ {
+ return ( this.working );
+ }
+
/**
* @return The {3} value
*/
@@ -172,8 +168,7 @@
* @param ce The feature to be added to the PutEvent attribute
* @exception IOException
*/
- public synchronized void addPutEvent( ICacheElement ce )
- throws IOException
+ public synchronized void addPutEvent( ICacheElement ce ) throws
IOException
{
if ( !this.destroyed )
{
@@ -185,8 +180,7 @@
* @param key The feature to be added to the RemoveEvent attribute
* @exception IOException
*/
- public synchronized void addRemoveEvent( Serializable key )
- throws IOException
+ public synchronized void addRemoveEvent( Serializable key ) throws
IOException
{
if ( !this.destroyed )
{
@@ -197,8 +191,7 @@
/**
* @exception IOException
*/
- public synchronized void addRemoveAllEvent()
- throws IOException
+ public synchronized void addRemoveAllEvent() throws IOException
{
if ( !this.destroyed )
{
@@ -209,8 +202,7 @@
/**
* @exception IOException
*/
- public synchronized void addDisposeEvent()
- throws IOException
+ public synchronized void addDisposeEvent() throws IOException
{
if ( !this.destroyed )
{
@@ -226,59 +218,55 @@
private void put( AbstractCacheEvent event )
{
Node newNode = new Node();
-
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "Event entering Queue for " + cacheName + ": " +
event );
+ }
newNode.event = event;
-
- synchronized ( this.queueLock )
+ synchronized ( queueLock )
{
this.tail.next = newNode;
this.tail = newNode;
-
- this.queueLock.notify();
+ if ( isAlive() )
+ if ( !isWorking() )
+ {
+ this.working = true;
+ processorThread = new QProcessor();
+ processorThread.start();
+ } else
+ {
+ queueLock.notify();
+ }
}
}
- private AbstractCacheEvent take() throws InterruptedException
+ private AbstractCacheEvent take()
{
- synchronized ( this.queueLock )
+ synchronized ( queueLock )
{
// wait until there is something to read
-
- while ( this.head == this.tail )
+ if ( head == tail )
{
- this.queueLock.wait();
+ return null;
}
-
- // we have the lock, and the list is not empty
-
- Node node = this.head.next;
-
- // This is an awful bug. This will always return null.
- // This make the event Q and event destroyer.
- //AbstractCacheEvent value = head.event;
-
- // corrected
+ Node node = head.next;
AbstractCacheEvent value = node.event;
-
if ( log.isDebugEnabled() )
{
- log.debug( "head.event = " + this.head.event );
- log.debug( "node.event = " + node.event );
+ log.debug( "head.event = " + head.event );
+ log.debug( "node.event = " + node.event );
}
-
// Node becomes the new head (head is always empty)
-
node.event = null;
- this.head = node;
-
+ head = node;
return value;
}
}
///////////////////////////// Inner classes
/////////////////////////////
-
private static class Node
{
+
Node next = null;
AbstractCacheEvent event = null;
}
@@ -287,70 +275,80 @@
*/
private class QProcessor extends Thread
{
+
/**
* Constructor for the QProcessor object
*/
QProcessor()
{
- super( "CacheEventQueue.QProcessor-" + (
++CacheEventQueue.this.processorInstanceCount ) );
-
+ super( "CacheEventQueue.QProcessor-" + (
++processorInstanceCount ) );
setDaemon( true );
}
+
/**
* Main processing method for the QProcessor object
*/
public void run()
{
AbstractCacheEvent r = null;
-
- while ( !CacheEventQueue.this.destroyed )
+ while ( isAlive() )
{
- try
+ r = take();
+ if ( log.isDebugEnabled() )
{
- r = take();
-
- if ( log.isDebugEnabled() )
- {
- log.debug( "r from take() = " + r );
- }
-
+ log.debug( "Event from queue = " + r );
}
- catch ( InterruptedException e )
+ if ( r == null )
{
- // We were interrupted, just continue -- the while loop
- // will exit if we have been properly destroyed.
+ synchronized ( queueLock )
+ {
+ try
+ {
+ queueLock.wait( THREAD_TIMEOUT );
+ } catch ( InterruptedException e )
+ {
+ log.warn( "Interrupted while waiting for
another event to come in before we die." );
+ return;
+ }
+ r = take();
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "Event from queue after sleep = " +
r );
+ }
+ if ( r == null )
+ {
+ stopProcessing();
+ }
+ }
}
-
- if ( !CacheEventQueue.this.destroyed && r != null )
+ if ( isAlive() && r != null )
{
r.run();
}
}
- // declare failure as listener is permanently unreachable.
- // queue = null;
- CacheEventQueue.this.listener = null;
- // The listener failure logging more the problem of the user
- // of the q.
- log.info( "QProcessor exiting for " + CacheEventQueue.this );
+ if ( log.isInfoEnabled() )
+ {
+ log.info( "QProcessor exiting for " + this );
+ }
}
}
+
/**
* Retries before declaring failure.
*
*/
private abstract class AbstractCacheEvent implements Runnable
{
+
/**
* Main processing method for the AbstractCacheEvent object
*/
public void run()
{
IOException ex = null;
-
- while ( !CacheEventQueue.this.destroyed
- && CacheEventQueue.this.failureCount <=
CacheEventQueue.this.maxFailure )
+ while ( !CacheEventQueue.this.destroyed &&
CacheEventQueue.this.failureCount <= CacheEventQueue.this.maxFailure )
{
try
{
@@ -359,32 +357,28 @@
CacheEventQueue.this.failureCount = 0;
return;
// happy and done.
- }
- catch ( IOException e )
+ } catch ( IOException e )
{
CacheEventQueue.this.failureCount++;
ex = e;
}
// Let's get idle for a while before retry.
- if ( !CacheEventQueue.this.destroyed
- && CacheEventQueue.this.failureCount <=
CacheEventQueue.this.maxFailure )
+ if ( !CacheEventQueue.this.destroyed &&
CacheEventQueue.this.failureCount <= CacheEventQueue.this.maxFailure )
{
try
{
log.warn( "...retrying propagation " +
CacheEventQueue.this + "..." + CacheEventQueue.this.failureCount );
- Thread.currentThread().sleep(
CacheEventQueue.this.waitBeforeRetry );
- }
- catch ( InterruptedException ie )
+ Thread.sleep( CacheEventQueue.this.waitBeforeRetry
);
+ } catch ( InterruptedException ie )
{
// ignore;
}
}
}
- // Too bad. The remote host is unreachable, so we give up.
+ // Too bad. The remote host is unreachable, so we give up.
if ( ex != null )
{
log.warn( "Giving up propagation " + CacheEventQueue.this,
ex );
-
destroy();
}
return;
@@ -395,8 +389,7 @@
*
* @exception IOException
*/
- protected abstract void doRun()
- throws IOException;
+ protected abstract void doRun() throws IOException;
}
/**
@@ -412,14 +405,11 @@
* @param ice
* @exception IOException
*/
- PutEvent( ICacheElement ice )
- throws IOException
+ PutEvent( ICacheElement ice) throws IOException
{
this.ice = ice;
/*
- * this.key = key;
- * this.obj = CacheUtils.dup(obj);
- * this.attr = attr;
+ * this.key = key; this.obj = CacheUtils.dup(obj); this.attr =
attr;
* this.groupName = groupName;
*/
}
@@ -429,13 +419,11 @@
*
* @exception IOException
*/
- protected void doRun()
- throws IOException
+ protected void doRun() throws IOException
{
/*
* CacheElement ce = new CacheElement(cacheName, key, obj);
- * ce.setElementAttributes( attr );
- * ce.setGroupName( groupName );
+ * ce.setElementAttributes( attr ); ce.setGroupName( groupName
);
*/
CacheEventQueue.this.listener.handlePut( ice );
}
@@ -447,6 +435,7 @@
*/
private class RemoveEvent extends AbstractCacheEvent
{
+
private Serializable key;
/**
@@ -455,8 +444,7 @@
* @param key
* @exception IOException
*/
- RemoveEvent( Serializable key )
- throws IOException
+ RemoveEvent( Serializable key) throws IOException
{
this.key = key;
}
@@ -466,8 +454,7 @@
*
* @exception IOException
*/
- protected void doRun()
- throws IOException
+ protected void doRun() throws IOException
{
CacheEventQueue.this.listener.handleRemove(
CacheEventQueue.this.cacheName, key );
}
@@ -479,13 +466,13 @@
*/
private class RemoveAllEvent extends AbstractCacheEvent
{
+
/**
* Description of the Method
*
* @exception IOException
*/
- protected void doRun()
- throws IOException
+ protected void doRun() throws IOException
{
CacheEventQueue.this.listener.handleRemoveAll(
CacheEventQueue.this.cacheName );
}
@@ -497,16 +484,15 @@
*/
private class DisposeEvent extends AbstractCacheEvent
{
+
/**
* Description of the Method
*
* @exception IOException
*/
- protected void doRun()
- throws IOException
+ protected void doRun() throws IOException
{
CacheEventQueue.this.listener.handleDispose(
CacheEventQueue.this.cacheName );
}
}
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]