leosimons 2003/08/23 02:40:05
Modified: scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler
BinaryHeap.java DefaultTimeScheduler.java
PriorityQueue.java SynchronizedPriorityQueue.java
TimeScheduledEntry.java
Log:
based on mailing list discussions with
Stefan Seifert from 12-8 -> 23-8, refactor
this package a little to expose its
implementation for customization.
Revision Changes Path
1.3 +2 -2
avalon-components/scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler/BinaryHeap.java
Index: BinaryHeap.java
===================================================================
RCS file:
/home/cvs/avalon-components/scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler/BinaryHeap.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- BinaryHeap.java 19 Jun 2003 20:31:29 -0000 1.2
+++ BinaryHeap.java 23 Aug 2003 09:40:05 -0000 1.3
@@ -62,7 +62,7 @@
* @version CVS $Revision$ $Date$
* @since 4.0
*/
-final class BinaryHeap
+public final class BinaryHeap
implements PriorityQueue
{
private static final class MinComparator
1.3 +201 -139
avalon-components/scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler/DefaultTimeScheduler.java
Index: DefaultTimeScheduler.java
===================================================================
RCS file:
/home/cvs/avalon-components/scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler/DefaultTimeScheduler.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- DefaultTimeScheduler.java 19 Jun 2003 20:31:29 -0000 1.2
+++ DefaultTimeScheduler.java 23 Aug 2003 09:40:05 -0000 1.3
@@ -56,6 +56,9 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Vector;
+import java.util.List;
+import java.util.Map;
+
import org.apache.avalon.cornerstone.services.scheduler.Target;
import org.apache.avalon.cornerstone.services.scheduler.TimeScheduler;
import org.apache.avalon.cornerstone.services.scheduler.TimeTrigger;
@@ -77,6 +80,9 @@
extends AbstractLogEnabled
implements TimeScheduler, Serviceable, Startable, Disposable, Runnable,
MonitorableTimeSchedulerMBean
{
+ // ----------------------------------------------------------------------
+ // Properties
+ // ----------------------------------------------------------------------
private final Hashtable m_entries = new Hashtable();
private final PriorityQueue m_priorityQueue =
new SynchronizedPriorityQueue( new BinaryHeap() );
@@ -84,6 +90,49 @@
private boolean m_running;
private ArrayList m_triggerFailureListeners = new ArrayList();
+ // ----------------------------------------------------------------------
+ // Getter/Setter methods
+ // ----------------------------------------------------------------------
+ //
+ // LSD: these have been added in to allow subclasses of the
+ // DefaultScheduler to override implementation behaviour.
+ // You should *not* make these public in subclasses (hence
+ // they are final); they're here for convenience implementation
+ // only.
+
+ protected final ThreadManager getThreadManager()
+ {
+ return m_threadManager;
+ }
+
+ protected final boolean isRunning()
+ {
+ return m_running;
+ }
+
+ protected final void setRunning( boolean running )
+ {
+ m_running = running;
+ }
+
+ protected final List getTriggerFailureListeners()
+ {
+ return m_triggerFailureListeners;
+ }
+
+ protected final Map getEntryMap()
+ {
+ return m_entries;
+ }
+
+ protected final PriorityQueue getPriorityQueue()
+ {
+ return m_priorityQueue;
+ }
+
+ // ----------------------------------------------------------------------
+ // Avalon Lifecycle
+ // ----------------------------------------------------------------------
public void service( final ServiceManager serviceManager )
throws ServiceException
{
@@ -100,6 +149,114 @@
m_priorityQueue.clear();
}
+ public void start()
+ throws Exception
+ {
+ //this should suck threads from a named pool
+ getThreadManager().getDefaultThreadPool().execute( this );
+ }
+
+ public void stop()
+ {
+ m_running = false;
+ synchronized( this )
+ {
+ notifyAll();
+ }
+ }
+
+ // ----------------------------------------------------------------------
+ // Work Interface: Runnable
+ // ----------------------------------------------------------------------
+ /**
+ * Entry point for thread that monitors entrys and triggers
+ * entrys when necessary.
+ */
+ public void run()
+ {
+ m_running = true;
+
+ while( m_running )
+ {
+ long duration = 0;
+
+ if( !getPriorityQueue().isEmpty() )
+ {
+ TimeScheduledEntry entry = null;
+ synchronized( this )
+ {
+ entry = getNextEntry();
+ if( null == entry ) continue;
+
+ duration = entry.getNextTime() - System.currentTimeMillis();
+
+ if( duration < 0 )
+ {
+ //time to run job so remove it from priority queue
+ //and run it
+ getPriorityQueue().pop();
+
+ //Note that we need the pop to occur in a
+ //synchronized section while the runEntry
+ //does not need to be synchronized
+ //hence why there is to if statements
+ //structured in this ugly way
+ }
+ }
+
+ if( duration < 0 )
+ {
+ runEntry( entry );
+ rescheduleEntry( entry, false );
+ continue;
+ }
+ else if( 0 == duration )
+ {
+ //give a short duration that will sleep
+ // so that next loop will definetly be below 0.
+ //Can not act on zero else multiple runs could go through
+ //at once
+ duration = 1;
+ }
+ }
+
+ //wait/sleep until monitor is signalled which occurs when
+ //next jobs is likely to occur or when a new job gets added to
+ //top of heap
+ try
+ {
+ synchronized( this )
+ {
+ wait( duration );
+ }
+ }
+ catch( final InterruptedException ie )
+ {
+ }
+ }
+ }
+
+ // ----------------------------------------------------------------------
+ // Work Interface: Time Scheduler
+ // ----------------------------------------------------------------------
+ /**
+ * Add a trigger failure listener
+ * @param listener The listener
+ */
+ public void addTriggerFailureListener( TriggerFailureListener listener )
+ {
+ getTriggerFailureListeners().add( listener );
+ }
+
+ /**
+ * Remove a trigger failure listener
+ * @param listener The listener
+ */
+ public void removeTriggerFailureListener( TriggerFailureListener listener )
+ {
+ getTriggerFailureListeners().remove( listener );
+ }
+
/**
* Schedule a time based trigger.
* Note that if a TimeTrigger already has same name then it is removed.
@@ -121,14 +278,14 @@
}
final TimeScheduledEntry entry = new TimeScheduledEntry( name, trigger,
target );
- m_entries.put( name, entry );
+ getEntryMap().put( name, entry );
final boolean added = rescheduleEntry( entry, false );
if( !added ) return;
try
{
- if( entry == m_priorityQueue.peek() )
+ if( entry == getPriorityQueue().peek() )
{
notifyAll();
}
@@ -154,7 +311,7 @@
//use the kill-o-matic against any entry with same name
final TimeScheduledEntry entry = getEntry( name );
entry.invalidate();
- m_entries.remove( name );
+ getEntryMap().remove( name );
}
/**
@@ -171,6 +328,30 @@
rescheduleEntry( entry, true );
}
+ // ----------------------------------------------------------------------
+ // Work Interface: MonitorableTimeSchedulerMBean
+ // ----------------------------------------------------------------------
+
+ /**
+ * Return a collection of the triggerable names.
+ * @return
+ */
+ public synchronized Collection getEntries()
+ {
+ Collection coll = getEntryMap().keySet();
+ Vector retval = new Vector();
+ for( Iterator iterator = coll.iterator(); iterator.hasNext(); )
+ {
+ TimeScheduledEntry tse = (TimeScheduledEntry)getEntryMap().get(
iterator.next() );
+ retval.add( tse.toString() );
+ }
+ return retval;
+ }
+
+ // ----------------------------------------------------------------------
+ // Helper methods
+ // ----------------------------------------------------------------------
+
/**
* Reschedule an entry.
* if clone is true then invalidate old version and create a new entry to
@@ -180,7 +361,7 @@
* @param clone true if new entry is to be created
* @return true if added to queue, false if not added
*/
- private synchronized boolean rescheduleEntry( final TimeScheduledEntry
timeEntry,
+ protected synchronized boolean rescheduleEntry( final TimeScheduledEntry
timeEntry,
final boolean clone )
{
TimeScheduledEntry entry = timeEntry;
@@ -194,8 +375,8 @@
// remove old refernce to the entry..so that next time
// somebody calls getEntry( name ), we will get the new valid entry.
- m_entries.remove( timeEntry.getName() );
- m_entries.put( timeEntry.getName(), entry );
+ getEntryMap().remove( timeEntry.getName() );
+ getEntryMap().put( timeEntry.getName(), entry );
}
//reschedule if appropriate
@@ -204,9 +385,9 @@
if( 0 < next )
{
entry.setNextTime( next );
- m_priorityQueue.insert( entry );
+ getPriorityQueue().insert( entry );
- if( entry == m_priorityQueue.peek() )
+ if( entry == getPriorityQueue().peek() )
{
notify();
}
@@ -226,11 +407,11 @@
* @return the entry
* @exception NoSuchElementException if no entry is found with that name
*/
- private TimeScheduledEntry getEntry( final String name )
+ protected TimeScheduledEntry getEntry( final String name )
throws NoSuchElementException
{
//use the kill-o-matic against any entry with same name
- final TimeScheduledEntry entry = (TimeScheduledEntry)m_entries.get( name );
+ final TimeScheduledEntry entry = (TimeScheduledEntry)getEntryMap().get(
name );
if( null != entry )
{
return entry;
@@ -246,7 +427,7 @@
*
* @param entry the entry to run
*/
- private void runEntry( final TimeScheduledEntry entry )
+ protected void runEntry( final TimeScheduledEntry entry )
{
final Runnable runnable = new Runnable()
{
@@ -259,7 +440,7 @@
//this should suck threads from a named pool
try
{
- m_threadManager.getDefaultThreadPool().execute( runnable );
+ getThreadManager().getDefaultThreadPool().execute( runnable );
}
catch( final Exception e )
{
@@ -273,7 +454,7 @@
*
* @param entry the entry to run
*/
- private void doRunEntry( final TimeScheduledEntry entry )
+ protected void doRunEntry( final TimeScheduledEntry entry )
{
try
{
@@ -294,90 +475,6 @@
}
}
- public void start()
- throws Exception
- {
- //this should suck threads from a named pool
- m_threadManager.getDefaultThreadPool().execute( this );
- }
-
- public void stop()
- {
- m_running = false;
- synchronized( this )
- {
- notifyAll();
- }
- }
-
- /**
- * Entry point for thread that monitors entrys and triggers
- * entrys when necessary.
- */
- public void run()
- {
- m_running = true;
-
- while( m_running )
- {
- long duration = 0;
-
- if( !m_priorityQueue.isEmpty() )
- {
- TimeScheduledEntry entry = null;
- synchronized( this )
- {
- entry = getNextEntry();
- if( null == entry ) continue;
-
- duration = entry.getNextTime() - System.currentTimeMillis();
-
- if( duration < 0 )
- {
- //time to run job so remove it from priority queue
- //and run it
- m_priorityQueue.pop();
-
- //Note that we need the pop to occur in a
- //synchronized section while the runEntry
- //does not need to be synchronized
- //hence why there is to if statements
- //structured in this ugly way
- }
- }
-
- if( duration < 0 )
- {
- runEntry( entry );
- rescheduleEntry( entry, false );
- continue;
- }
- else if( 0 == duration )
- {
- //give a short duration that will sleep
- // so that next loop will definetly be below 0.
- //Can not act on zero else multiple runs could go through
- //at once
- duration = 1;
- }
- }
-
- //wait/sleep until monitor is signalled which occurs when
- //next jobs is likely to occur or when a new job gets added to
- //top of heap
- try
- {
- synchronized( this )
- {
- wait( duration );
- }
- }
- catch( final InterruptedException ie )
- {
- }
- }
- }
-
/**
* Retrieve next valid entry. It will pop off any
* invalid entrys until the heap is empty or a valid entry
@@ -385,70 +482,35 @@
*
* @return the next valid entry or null if none
*/
- private synchronized TimeScheduledEntry getNextEntry()
+ protected synchronized TimeScheduledEntry getNextEntry()
{
TimeScheduledEntry entry =
- (TimeScheduledEntry)m_priorityQueue.peek();
+ (TimeScheduledEntry)getPriorityQueue().peek();
//if job has been invalidated then remove it and continue
while( !entry.isValid() )
{
- m_priorityQueue.pop();
+ getPriorityQueue().pop();
- if( m_priorityQueue.isEmpty() )
+ if( getPriorityQueue().isEmpty() )
{
return null;
}
- entry = (TimeScheduledEntry)m_priorityQueue.peek();
+ entry = (TimeScheduledEntry)getPriorityQueue().peek();
}
return entry;
}
- /**
- * Add a trigger failure listener
- * @param listener The listener
- */
- public void addTriggerFailureListener( TriggerFailureListener listener )
- {
- m_triggerFailureListeners.add( listener );
- }
-
- /**
- * Remove a trigger failure listener
- * @param listener The listener
- */
- public void removeTriggerFailureListener( TriggerFailureListener listener )
- {
- m_triggerFailureListeners.remove( listener );
- }
-
- private void notifyFailedTriggers( Throwable t )
+ protected void notifyFailedTriggers( Throwable t )
{
- for( int i = 0; i < m_triggerFailureListeners.size(); i++ )
+ for( int i = 0; i < getTriggerFailureListeners().size(); i++ )
{
TriggerFailureListener triggerFailureListener =
(TriggerFailureListener)m_triggerFailureListeners.get( i );
triggerFailureListener.triggerFailure( t );
}
}
-
- /**
- * Return a collection of the triggerable names.
- * @return
- */
- public synchronized Collection getEntries()
- {
- Collection coll = m_entries.keySet();
- Vector retval = new Vector();
- for( Iterator iterator = coll.iterator(); iterator.hasNext(); )
- {
- TimeScheduledEntry tse = (TimeScheduledEntry)m_entries.get(
iterator.next() );
- retval.add( tse.toString() );
- }
- return retval;
- }
-
}
1.3 +2 -2
avalon-components/scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler/PriorityQueue.java
Index: PriorityQueue.java
===================================================================
RCS file:
/home/cvs/avalon-components/scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler/PriorityQueue.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- PriorityQueue.java 19 Jun 2003 20:31:29 -0000 1.2
+++ PriorityQueue.java 23 Aug 2003 09:40:05 -0000 1.3
@@ -60,7 +60,7 @@
* @version CVS $Revision$ $Date$
* @since 4.0
*/
-interface PriorityQueue
+public interface PriorityQueue
{
/**
* Clear all elements from queue.
1.3 +2 -2
avalon-components/scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler/SynchronizedPriorityQueue.java
Index: SynchronizedPriorityQueue.java
===================================================================
RCS file:
/home/cvs/avalon-components/scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler/SynchronizedPriorityQueue.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SynchronizedPriorityQueue.java 19 Jun 2003 20:31:29 -0000 1.2
+++ SynchronizedPriorityQueue.java 23 Aug 2003 09:40:05 -0000 1.3
@@ -61,7 +61,7 @@
* @version CVS $Revision$ $Date$
* @since 4.0
*/
-final class SynchronizedPriorityQueue
+public final class SynchronizedPriorityQueue
implements PriorityQueue
{
private final PriorityQueue m_priorityQueue;
1.3 +1 -1
avalon-components/scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler/TimeScheduledEntry.java
Index: TimeScheduledEntry.java
===================================================================
RCS file:
/home/cvs/avalon-components/scheduler-impl/src/java/org/apache/avalon/cornerstone/blocks/scheduler/TimeScheduledEntry.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- TimeScheduledEntry.java 19 Jun 2003 20:31:29 -0000 1.2
+++ TimeScheduledEntry.java 23 Aug 2003 09:40:05 -0000 1.3
@@ -60,7 +60,7 @@
*
* @author <a href="mailto:[EMAIL PROTECTED]">Avalon Development Team</a>
*/
-final class TimeScheduledEntry
+public final class TimeScheduledEntry
implements Comparable
{
private static final SimpleDateFormat DATEFORMAT = new SimpleDateFormat();
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]