Hi all,

After performing stress testing on Tomcat 3.2.2 I found that one area of
trouble was the Thread Pool. Long running servlets may eat up all resources
and
prevent tomcat from accepting any new connections on the socket. Also for
some still unknown reason performance would decrease over time.

I have patched the code to do perform two safety checks:

1. If a thread has been processing a request for too long (as specified in a
parameter) the thread is stoped.
2. After a thread has already processed X requests or more (X
TcpWorkerThreads have been attached), it is terminated and a new thread is
created.

I also changed the code and moved most synchronization from ThreadPool to
ObjectHASH (a fully synchronized hash table)

I tested the patched code with 60 threads from 3 different machines with
very
good results in terms of reliability and performance.

This patch involves changes to:

util/ThreadPool.java
util/ThreadPoolRunnable.java
service/PoolTcpEndPoint.java

Regards
Hector Gonzalez
[EMAIL PROTECTED]
Index: ThreadPool.java
===================================================================
RCS file: 
/home/cvspublic/jakarta-tomcat/src/share/org/apache/tomcat/util/Attic/ThreadPool.java,v
retrieving revision 1.9.2.2
diff -u -r1.9.2.2 ThreadPool.java
--- ThreadPool.java     2001/04/23 02:16:03     1.9.2.2
+++ ThreadPool.java     2001/06/01 20:10:21
@@ -1,7 +1,7 @@
 /*
- * $Header: 
/home/cvspublic/jakarta-tomcat/src/share/org/apache/tomcat/util/Attic/ThreadPool.java,v
 1.9.2.2 2001/04/23 02:16:03 marcsaeg Exp $
- * $Revision: 1.9.2.2 $
- * $Date: 2001/04/23 02:16:03 $
+ * $Header: 
+/home/cvs/jakarta-tomcat/src/share/org/apache/tomcat/util/Attic/ThreadPool.java,v 
+1.9.2.1 2000/07/06 22:20:17 alex Exp $
+ * $Revision: 1.9.2.1 $
+ * $Date: 2000/07/06 22:20:17 $
  *
  * ====================================================================
  *
@@ -71,12 +71,17 @@
 import org.apache.tomcat.logging.*; 
 
 /**
- * A thread pool that is trying to copy the apache process management.
+ * A thread pool that creates a number of threads and
+ * assigns them to runnable objects as requested.
+ * The pool provides maintenance on the individual
+ * threads by cheking running time and number of runs.
+ * Tried to keep synchronization to a minimum, most of the
+ * synchronization happens at the idleWorkers (ObjectHASH)
+ * level.
  *
- * @author Gal Shachor
+ * @author Hector Gonzalez
  */
-public class ThreadPool  {
-
+public class ThreadPool{
     /*
      * Default values ...
      */
@@ -84,470 +89,675 @@
     public static final int MAX_SPARE_THREADS = 50;
     public static final int MIN_SPARE_THREADS = 10;
     public static final int WORK_WAIT_TIMEOUT = 60*1000;
+    public static final int MAX_THREAD_RUNS = 100;
+    public static final int MAX_THREAD_ITERATIONS = 10;
 
-    /*
-     * Where the threads are held.
-     */
-    protected Vector pool;
+    static int debug=0;
+    LogHelper loghelper = new LogHelper("tc_log", "ThreadPool");
 
     /*
-     * A monitor thread that monitors the pool for idel threads.
+     * Max number of threads that you can open in the pool.
      */
-    protected MonitorRunnable monitor;
-
+    protected int maxThreads;
 
     /*
-     * Max number of threads that you can open in the pool.
+     * Min number of idle threads that you can leave in the pool.
      */
-    protected int maxThreads;
+    protected int maxSpareThreads;
 
     /*
-     * Min number of idel threads that you can leave in the pool.
+     * Max number of idle threads that you can leave in the pool.
      */
     protected int minSpareThreads;
 
     /*
-     * Max number of idel threads that you can leave in the pool.
+     * Number of runs before thread is replaced
      */
-    protected int maxSpareThreads;
+    protected int maxThreadRuns;
 
-    /*
-     * Number of threads in the pool.
+    /* 
+     * Number of times the monitor thread will run before
+     * stopping a thread that has been continuously busy
+     * effective time before stopping = WORK_WAIT_TIMEOUT * MAX_THREAD_ITERATIONS
      */
-    protected int currentThreadCount;
+    protected int maxThreadIterations;
 
     /*
-     * Number of busy threads in the pool.
+     * Keep list of idle threads
      */
-    protected int currentThreadsBusy;
+    ObjectHASH idleWorkers;
 
     /*
-     * Flag that the pool should terminate all the threads and stop.
+     * List of all the created threads, idle and busy
      */
-    protected boolean stopThePool;
-
-    static int debug=0;
+    Hashtable workerList;
 
-    /**
-     * Helper object for logging
-     **/
-    LogHelper loghelper = new LogHelper("tc_log", "ThreadPool");
-    
-    public ThreadPool() {
-        maxThreads      = MAX_THREADS;
-        maxSpareThreads = MAX_SPARE_THREADS;
-        minSpareThreads = MIN_SPARE_THREADS;
-        currentThreadCount  = 0;
-        currentThreadsBusy  = 0;
-        stopThePool = false;
-    }
-
-    public synchronized void start() {
-        adjustLimits();
-
-        openThreads(minSpareThreads);
-        monitor = new MonitorRunnable(this);
-    }
+    /*
+     * Thread that monitors the pool
+     */
+    protected MonitorRunnable monitor;
 
-    public void setMaxThreads(int maxThreads) {
+    public void setMaxThreads(int maxThreads){
         this.maxThreads = maxThreads;
     }
 
-    public int getMaxThreads() {
+    public int getMaxThreads(){
         return maxThreads;
+    }    
+
+    public void setMaxSpareThreads(int maxSpareThreads){
+        this.maxSpareThreads = maxSpareThreads;
     }
 
-    public void setMinSpareThreads(int minSpareThreads) {
+    public int getMaxSpareThreads(){
+        return maxSpareThreads;
+    }    
+
+    public void setMinSpareThreads(int minSpareThreads){
         this.minSpareThreads = minSpareThreads;
     }
 
-    public int getMinSpareThreads() {
+    public int getMinSpareThreads(){
         return minSpareThreads;
-    }
+    }    
 
-    public void setMaxSpareThreads(int maxSpareThreads) {
-        this.maxSpareThreads = maxSpareThreads;
+    public void setMaxThreadRuns(int maxThreadRuns){
+        this.maxThreadRuns = maxThreadRuns;
     }
 
-    public int getMaxSpareThreads() {
-        return maxSpareThreads;
+    public int getMaxThreadRuns(){
+        return maxThreadRuns;
     }
-
-    //
-    // You may wonder what you see here ... basically I am trying
-    // to maintain a stack of threads. This way locality in time
-    // is kept and there is a better chance to find residues of the
-    // thread in memory next time it runs.
-    //
 
-    /**
-     * Executes a given Runnable on a thread in the pool, block if needed.
-     */
-    public void runIt(ThreadPoolRunnable r) {
+    public void setMaxThreadIterations(int maxThreadIterations){
+        this.maxThreadIterations = maxThreadIterations;
+    }
 
-        if(null == r) {
-            throw new NullPointerException();
-        }
+    public int getMaxThreadIterations(){
+        return maxThreadIterations;
+    }    
 
-        if(0 == currentThreadCount || stopThePool) {
-            throw new IllegalStateException();
-        }
+    public ThreadPool() {
+        maxThreads = MAX_THREADS;
+        maxSpareThreads= MAX_SPARE_THREADS;
+        minSpareThreads=MIN_SPARE_THREADS;
+        maxThreadRuns=MAX_THREAD_RUNS;
+        maxThreadIterations=MAX_THREAD_ITERATIONS;
+    }
 
-        ControlRunnable c = null;
+    public void start() {
+        adjustLimits();
 
-        // Obtain a free thread from the pool.
-        synchronized(this) {
-            if(currentThreadsBusy == currentThreadCount) {
-                 // All threads are busy
-                if(currentThreadCount < maxThreads) {
-                    // Not all threads were open,
-                    // Open new threads up to the max number of idel threads
-                    int toOpen = currentThreadCount + minSpareThreads;
-                    openThreads(toOpen);
-                } else {
-                    // XXX There really should be a way to log which pool is exhuasted
-                    loghelper.log("Pool exhausted with " + currentThreadCount + " 
threads.");
-
-                    // Wait for a thread to become idel.
-                    while(currentThreadsBusy == currentThreadCount) {
-                        try {
-                            this.wait();
-                        }
-                       // was just catch Throwable -- but no other
-                       // exceptions can be thrown by wait, right?
-                       // So we catch and ignore this one, since
-                       // it'll never actually happen, since nowhere
-                       // do we say pool.interrupt().
-                       catch(InterruptedException e) {
-                           loghelper.log("Unexpected exception", e);
-                        }
-
-                        // Pool was stopped. Get away of the pool.
-                        if(0 == currentThreadCount || stopThePool) {
-                            throw new IllegalStateException();
-                        }
-                    }
+        if(debug > 0){
+            loghelper.log("ThreadPool: "+this);
+            loghelper.log("ThreadPool: maxThreads: "+maxThreads);
+            loghelper.log("ThreadPool: maxSpareThreads: "+maxSpareThreads);
+            loghelper.log("ThreadPool: minSpareThreads: "+minSpareThreads);
+            loghelper.log("ThreadPool: maxThreadRuns: "+maxThreadRuns);
+            loghelper.log("ThreadPool: maxThreadIterations: "+maxThreadIterations);
+        }    
+
+        idleWorkers = new ObjectHASH(maxThreads);
+        workerList = new Hashtable();
+
+        openThreads();
+        monitor = new MonitorRunnable(this,WORK_WAIT_TIMEOUT);
+    }
+
+    public void openThreads(){
+        try{
+            //Needs to be synchronized so that we never open more than maxThreads
+            synchronized(idleWorkers){
+                int currentThreadCount = workerList.size();
+
+                int toOpen = currentThreadCount+minSpareThreads;
+                if(toOpen > maxThreads)
+                    toOpen = maxThreads;
+
+                for(int i=currentThreadCount;i<toOpen;i++){
+                    ControlRunnable c = new 
+ControlRunnable(idleWorkers,workerList,maxThreadRuns);
+                    if(debug > 0){
+                        loghelper.log("ThreadPool: open thread "+c.getId());
+                    }    
                 }
-            }
-
-            // If we are here it means that there is a free thred. Take it.
-            c = (ControlRunnable)pool.lastElement();
-            pool.removeElement(c);
-            currentThreadsBusy++;
+            }        
+        }catch(Exception e){
         }
-        c.runIt(r);
     }
 
-    /**
-     * Stop the thread pool
-     */
-    public synchronized void shutdown() {
-        if(!stopThePool) {
-            stopThePool = true;
-            monitor.terminate();
-            monitor = null;
-            for(int i = 0 ; i < (currentThreadCount - currentThreadsBusy) ; i++) {
-                try {
-                    ((ControlRunnable)(pool.elementAt(i))).terminate();
-                } catch(Throwable t) {
-                    /* 
-                                        * Do nothing... The show must go on, we are 
shutting 
-                                        * down the pool and nothing should stop that.
-                                        */
-                   loghelper.log("Ignored exception while shutting down thread pool", 
t, Logger.ERROR);
+    public void closeThreads(){
+        try{
+            //Needs to be synchronized so that we do not open more threads than 
+minSpareThreads
+            synchronized(idleWorkers){
+                int currentThreadCountBusy = idleWorkers.getSize();
+                int currentThreadCount = workerList.size();
+
+                if((currentThreadCount - currentThreadCountBusy) > maxSpareThreads) {
+                    int toFree = currentThreadCount -
+                                 currentThreadCountBusy -
+                                 maxSpareThreads;
+
+                    for(int i = 0 ; i < toFree ; i++) {
+                        ControlRunnable c = (ControlRunnable)idleWorkers.remove();
+                        workerList.remove(c.getId());
+                        c.stopRequest();
+                    }    
                 }
-            }
-            currentThreadsBusy = currentThreadCount = 0;
-            pool = null;
-            notifyAll();
+            }    
+        }catch(Exception e){
         }
-    }
-
-    /**
-     * Called by the monitor thread to harvest idel threads.
-     */
-    protected synchronized void checkSpareControllers() {
+    }    
 
-        if(stopThePool) {
-            return;
-        }
-        if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
-            int toFree = currentThreadCount -
-                         currentThreadsBusy -
-                         maxSpareThreads;
-
-            for(int i = 0 ; i < toFree ; i++) {
-                ControlRunnable c = (ControlRunnable)pool.firstElement();
-                pool.removeElement(c);
-                c.terminate();
-                currentThreadCount --;
+    public ControlRunnable getControlRunnable(){
+        ControlRunnable c = null;
+        try{
+            //could remove synchronized to improve performance
+            //workerList may not be up to date but that is fine as long as you
+            //double check idleWorkers before using workerList (checkWorkers does 
+this)
+
+            synchronized(idleWorkers){
+                c = (ControlRunnable)idleWorkers.remove();
+                WorkerStatus ws = (WorkerStatus)workerList.get(c.getId());
+                if(ws != null){
+                    ws.t = 1;
+                    ws.runs++;
+                }    
             }
+        }catch(Exception e){
         }
-    }
+        return c;
+    }    
 
-    /**
-     * Returns the thread to the pool.
-     * Called by threads as they are becoming idel.
+    /*
+     * Method called by the worker thread to indicate that it
+     * has begun working. It has moved past socket.accept
      */
-    protected synchronized void returnController(ControlRunnable c) {
-
-        if(0 == currentThreadCount || stopThePool) {
-            c.terminate();
-            return;
-        }
-
-        currentThreadsBusy--;
-        pool.addElement(c);
-        notify();
+    public void setWorkerTimer(String id,int t){
+        WorkerStatus ws = (WorkerStatus)workerList.get(id);
+        if(ws != null) ws.t = t;
     }
 
-    /**
-     * Inform the pool that the specific thread finish.
-     *
-     * Called by the ControlRunnable.run() when the runnable 
-     * throws an exception.
-     */
-    protected synchronized void notifyThreadEnd(ControlRunnable c) {
-        currentThreadsBusy--;
-        currentThreadCount --;
-        notify();
-    }
-    
-
-    /*
-     * Checks for problematic configuration and fix it.
-     * The fix provides reasonable settings for a single CPU
-     * with medium load.
-     */
-    protected void adjustLimits() {
-        if(maxThreads <= 0) {
+    public void adjustLimits() {
+        if(maxThreads <= 0){
             maxThreads = MAX_THREADS;
         }
 
-        if(maxSpareThreads >= maxThreads) {
+        if(maxSpareThreads >= maxThreads){
             maxSpareThreads = maxThreads;
         }
 
-               if(maxSpareThreads <= 0) {
-            if(1 == maxThreads) {
+        if(maxSpareThreads <= 0){
+            if(maxThreads == 1) {
                 maxSpareThreads = 1;
-            } else {
+            }else{
                 maxSpareThreads = maxThreads/2;
             }
-        }
+         }
 
-        if(minSpareThreads >  maxSpareThreads) {
-            minSpareThreads =  maxSpareThreads;
-               }
+         if(minSpareThreads > maxSpareThreads){
+            minSpareThreads = maxSpareThreads;
+         }
 
-               if(minSpareThreads <= 0) {
-            if(1 == maxSpareThreads) {
+         if(minSpareThreads <= 0){
+            if(maxSpareThreads == 1){
                 minSpareThreads = 1;
-            } else {
+            }else{
                 minSpareThreads = maxSpareThreads/2;
             }
+         }
+
+
+         if(maxThreadRuns <= 0){
+            maxThreadRuns = MAX_THREAD_RUNS;
+         }
+
+         if(maxThreadIterations <= 0){
+            maxThreadIterations = MAX_THREAD_ITERATIONS;
+         }   
+    }   
+         
+
+    public void runIt(ThreadPoolRunnable r){
+        try{
+            ControlRunnable c = getControlRunnable();
+            c.process(r);
+        }catch(InterruptedException e){
+        }
+    }
+
+    public void stopRequestIdleWorkers() {
+        try{
+            Object[] idle = idleWorkers.removeAll();
+            for(int i=0;i<idle.length;i++){
+                ((ControlRunnable)idle[i]).stopRequest();
+            }
+        }catch(InterruptedException x){
+            Thread.currentThread().interrupt();
+        }    
+    }
+
+    public void shutdown() {
+        synchronized(idleWorkers){
+            monitor.stopRequest();
+
+            stopRequestIdleWorkers();
+            try{ 
+                Thread.sleep(250);
+            }catch(InterruptedException x){
+            }
+
+            Enumeration keys = workerList.keys();
+            while(keys.hasMoreElements()){
+                String key = (String)keys.nextElement();
+                WorkerStatus ws = (WorkerStatus)workerList.get(key);
+                if(ws.c.isAlive()){
+                    ws.c.stopRequest();
+                }
+
+                workerList.remove(key);
+            } 
+        }        
+    }
+
+    /*
+     * Called by the monitor thread to check for:
+     * 1. Threads that have been running for too long
+     * 2. Threads that have ran too many times
+     * 3. Excessive number of free threads
+     * 4. Too few free threads
+     */
+    protected void checkWorkers() {
+        try{
+
+            //Could eliminate synchronized but keep in mind that
+            //it could be possible to get a worker that seems idle
+            //just to find out that it has been claimed by 
+            //another thread in the background
+            //just need to double check agains idleWorkers
+            //before updating workerList
+
+            synchronized(idleWorkers){
+
+            if(debug > 0){
+                loghelper.log("ThreadPool: checkWorkers run");
+                loghelper.log("ThreadPool: checkWorkers workerList.size() 
+"+workerList.size());
+                loghelper.log("ThreadPool: checkWorkers idleWorkers.getSize() 
+"+idleWorkers.getSize());
+            }    
+
+            Vector all_keys = new Vector();
+            Enumeration keys = workerList.keys();
+            int count=0;
+            while(keys.hasMoreElements()){
+                count++;
+                all_keys.add(keys.nextElement());
+            } 
+
+            for(int i=0;i<count;i++){
+                String key = (String)all_keys.elementAt(i);
+                WorkerStatus ws = (WorkerStatus)workerList.get(key);
+
+                if(ws != null){
+
+                    if(debug>0){
+                        loghelper.log("ThreadPool: checkWorkers: "+key+" runs: 
+"+ws.runs+" t: "+ws.t);
+                    }
+
+                    if(ws.t > 1){
+                        if(ws.t > maxThreadIterations){
+                            ControlRunnable c = 
+(ControlRunnable)idleWorkers.remove(key);
+                            if(c != null){
+                                if(debug>0){
+                                    loghelper.log("ThreadPool: OOPS, SOMEONE FREED 
+THE THREAD");
+                                }    
+                            }else{    
+                                workerList.remove(key);
+                                ws.c.stopRequest();
+                                if(debug>0){
+                                    loghelper.log("ThreadPool: TERMINATED LONG 
+RUNNING THREAD: "+key);
+                                }    
+                            }
+                        }else{
+                            ws.t++; 
+                        }    
+                    }
+                    else if(ws.t == 0){
+                        if(ws.runs > maxThreadRuns){
+                            Object c = idleWorkers.remove(key);
+                            if(c == null){
+                                if(debug>0){
+                                    loghelper.log("ThreadPool: OOPS, SOMEONE REMOVED 
+THE IDLEWORKER FOR THE TIRED THREAD "+key);
+                                }    
+                            }else{    
+                                workerList.remove(key);
+                                ws.c.stopRequest();
+                                if(debug>0){
+                                    loghelper.log("ThreadPool: TERMINATED TIRED 
+THREAD: "+key);
+                                }    
+                            }    
+                        }    
+                    }
+        
+                }
+            } 
+
+            //Adjust pool size
+            if(idleWorkers.getSize() == 0){
+                openThreads();
+            }else{
+                closeThreads();
+            }    
+
+
+            if(debug>0){
+                loghelper.log("ThreadPool: idleWorkers.getSize(): 
+"+idleWorkers.getSize());
+                loghelper.log("ThreadPool: workerList.size(): "+workerList.size());
+            }    
+
+            }
+
+
+        }catch(Exception e){
         }
     }
+
+    /**
+     * The thread to be ran on the thread pool
+     */
+    private class ControlRunnable extends Object {
+        /*
+         *List of idle workers, received from ThreadPool
+         */
+        private ObjectHASH idleWorkers;
+
+        /*
+         *List of all workers (idle and busy) received from ThreadPool
+         */
+        private Hashtable workerList;
+
+        /*
+         *Used to get the object to run
+         */
+        private ObjectHASH handoffBox;
+
+        private Thread internalThread;
+        private volatile boolean noStopRequested;
+        private boolean noThData;
+        private Object thData[]=null;
+        private long maxThreadRuns;
+
+        public ControlRunnable(ObjectHASH idleWorkers,Hashtable workerList,long 
+maxThreadRuns) {
+            this.idleWorkers = idleWorkers;
+            this.workerList = workerList;
+            this.maxThreadRuns = maxThreadRuns;
+
+            handoffBox = new ObjectHASH(1);
+            noStopRequested = true;
+            noThData = true;
+
+            workerList.put(this.toString(),new WorkerStatus(this));
+
+            Runnable r = new Runnable(){
+                public void run(){
+                   try{
+                       runWork();
+                   } catch(Exception x){
+                   }
+                }    
+            };
+
+            internalThread = new Thread(r);
+            internalThread.start();
+        }    
+
+        public String getId(){
+            return this.toString();
+        }    
+
+        public void process(ThreadPoolRunnable r) throws InterruptedException {
+            handoffBox.add(r);
+        }
+
+        private void runWork() {
+            while(noStopRequested){
+                try{
+                    //Could elminate synchronized but need to
+                    //keep in mind that workerList may not get updated in time
+                    synchronized(idleWorkers){
+                         idleWorkers.add(this);
+                         WorkerStatus ws = (WorkerStatus)workerList.get(getId());
+                         if(ws != null){
+                             ws.t = 0;
+                         }
+                    }
 
-    protected void openThreads(int toOpen) {
+                    /*
+                    //If you threads need to stop after exactly maxThreadRuns
+                    //comment above code, and uncomment this code
+                    WorkerStatus ws = (WorkerStatus)workerList.get(getId());
+                    if(ws != null){
+                        if(ws.runs > maxThreadRuns){
+                            workerList.remove(getId());
+                            //Create new thread to replace this one
+                            System.out.println("ThreadPool: TERMINATED THREAD: 
+"+getId()+" runs: "+ws.runs);
+                            ControlRunnable c = new 
+ControlRunnable(idleWorkers,workerList,maxThreadRuns);
+                            break;
+                        }else{
+                            ws.t = 0;
+                            idleWorkers.add(this);
+                        }    
+                    }else{
+                        //Should never get here
+                        break;
+                    }    
+                    */
 
-        if(toOpen > maxThreads) {
-            toOpen = maxThreads;
+                    //Blocks until there is an object to run
+                    ThreadPoolRunnable r = (ThreadPoolRunnable) handoffBox.remove();
+                    runIt(r);
+                } catch(Exception e){
+                    Thread.currentThread().interrupt();
+                }
+            }    
         }
 
-        if(0 == currentThreadCount) {
-            pool = new Vector(toOpen);
+        private void runIt(ThreadPoolRunnable toRun) {
+            try{
+                if(toRun != null){
+                    if(noThData) {
+                        thData=toRun.getInitData();
+                        noThData = false;
+                    }   
+                    toRun.runIt(thData,getId());
+                }    
+            }catch(Exception runex){
+            }finally{
+                Thread.interrupted();
+            }
         }
 
-        for(int i = currentThreadCount ; i < toOpen ; i++) {
-            pool.addElement(new ControlRunnable(this));
+        public void stopRequest() {
+            noStopRequested = false;
+            internalThread.interrupt();
         }
 
-        currentThreadCount = toOpen;
+        public boolean isAlive() {
+            return internalThread.isAlive();
+        }    
     }
 
-    void log( String s ) {
-       loghelper.log(s);
-    }
-    
-    /** 
-     * Periodically execute an action - cleanup in this case
+    /**
+     * Class used to monitor the thread pool
      */
-    class MonitorRunnable implements Runnable {
+    private class MonitorRunnable implements Runnable {
         ThreadPool p;
         Thread     t;
-        boolean    shouldTerminate;
+        boolean    noStopRequested;
+        int        timeout;
 
-        MonitorRunnable(ThreadPool p) {
-            shouldTerminate = false;
+        MonitorRunnable(ThreadPool p,int timeout) {
+            noStopRequested = true;
+            this.timeout = timeout;
             this.p = p;
             t = new Thread(this);
             t.start();
         }
 
         public void run() {
-            while(true) {
+            while(noStopRequested) {
                 try {
-                    // Sleep for a while.
-                    synchronized(this) {
-                        this.wait(WORK_WAIT_TIMEOUT);
-                    }
-
-                    // Check if should terminate.
-                    // termination happens when the pool is shutting down.
-                    if(shouldTerminate) {
-                        break;
-                    }
-
-                    // Harvest idle threads.
-                    p.checkSpareControllers();
-
-                } catch(Throwable t) {
-                   loghelper.log("Unexpected exception", t, Logger.ERROR);
+                    t.sleep(timeout);
+                    p.checkWorkers();
+                } catch(InterruptedException x){
+                    Thread.currentThread().interrupt();
                 }
             }
         }
 
-       /** Stop the monitor
-        */
-        public synchronized void terminate() {
-            shouldTerminate = true;
-            this.notify();
+        public void stopRequest() {
+            noStopRequested = false;
+            t.interrupt();
         }
-    }
+    }    
 
     /**
-     * A Thread object that executes various actions ( ThreadPoolRunnable )
-     *  under control of ThreadPool
+     * Class to keep track of the status of a worker thread
+     * in the thread pool
      */
-    class ControlRunnable implements Runnable {
+    private class WorkerStatus{
+        /*
+         * Number of times that the monitor thread has found
+         * this thread to be busy. Busy meas the thread is
+         * handling a request, not that the thread is blocked
+         * in a socket.accept call.
+         * Possible values:
+         * 0 - the thread is waiting for an object to run
+         * 1 - the thread has been claimed and is in socket.accept
+         * 2 and greater - the thread is handling a request
+         */
+        public long t; 
+
+        /*
+         * Number of times the thread has been ran
+         */
+        public long runs;
+
+        /*
+         * Pointer to the thread itself
+         */
+        public ControlRunnable c; 
+
+        public WorkerStatus(ControlRunnable c){
+            this.c = c;
+            this.t = 0;
+            this.runs = 0;
+        }    
+    } 
 
-       /**
-        * ThreadPool where this thread will be returned
-        */
-        ThreadPool p;
+    /**
+     * thread safe hashtable
+     * used by thread pool to store available threads
+     * and by ControlRunnable to get object to run
+     */
+    private class ObjectHASH extends Object {
+        /*
+         * Hashtable to contain the data
+         */
+        private Hashtable index;
 
-       /**
-        * The thread that executes the actions
-        */
-        Thread     t;
+        private int capacity;
+        private int size;
 
-       /**
-        * The method that is executed in this thread
-        */
-        ThreadPoolRunnable   toRun;
-
-       /**
-        * Stop this thread
-        */
-       boolean    shouldTerminate;
-
-       /**
-        * Activate the execution of the action
-        */
-        boolean    shouldRun;
-
-       /**
-        * Per thread data - can be used only if all actions are
-        *  of the same type.
-        *  A better mechanism is possible ( that would allow association of
-        *  thread data with action type ), but right now it's enough.
-        */
-       boolean noThData;
-       Object thData[]=null;
-
-       /**
-        * Start a new thread, with no method in it
-        */
-        ControlRunnable(ThreadPool p) {
-            toRun = null;
-            shouldTerminate = false;
-            shouldRun = false;
-            this.p = p;
-            t = new Thread(this);
-            t.start();
-           noThData=true;
-           thData=null;
+        public ObjectHASH(int cap){
+            capacity = (cap > 0) ? cap : 1;
+
+            index = new Hashtable();
+            size = 0;
         }
 
-        public void run() {
-            
-            while(true) {
-                try {                     
-                           /* Wait for work. */
-                    synchronized(this) {
-                        if(!shouldRun && !shouldTerminate) {
-                            this.wait();
-                        }
-                    }
-                           if(toRun == null ) {
-                                   if( p.debug>0) p.log( "No toRun ???");
-                           }
-
-                           if( shouldTerminate ) {
-                                   if( p.debug>0) p.log( "Terminate");
-                                   break;
-                           }
-
-                    /* Check if should execute a runnable.  */
-                    try {
-                                   if(noThData) {
-                                       if(p.debug>0) p.log( "Getting new thread 
data");
-                                       thData=toRun.getInitData();
-                                       noThData = false;
-                                   }
-
-                        if(shouldRun) {
-                            toRun.runIt(thData);
-                        }
-                    } catch(Throwable t) {
-                       loghelper.log("Caught exception executing " + toRun.toString() 
+ ", terminating thread", t);
-                        /*
-                        * The runnable throw an exception (can be even a ThreadDeath),
-                        * signalling that the thread die.
-                        *
-                               * The meaning is that we should release the thread from
-                               * the pool.
-                               */
-                        shouldTerminate = true;
-                        shouldRun = false;
-                        p.notifyThreadEnd(this);
-                    } finally {
-                        if(shouldRun) {
-                            shouldRun = false;
-                            /*
-                                       * Notify the pool that the thread is now idle.
-                            */
-                            p.returnController(this);
-                        }
-                    }
+        public int getCapacity(){
+            return capacity;
+        }
 
-                    /*
-                           * Check if should terminate.
-                    * termination happens when the pool is shutting down.
-                    */
-                    if(shouldTerminate) {
-                        break;
-                    }
-                } catch(InterruptedException ie) { /* for the wait operation */
-                   // can never happen, since we don't call interrupt
-                   loghelper.log("Unexpected exception", ie);
-                }
-            }
+        public synchronized int getSize(){
+            return size;
         }
 
-        public synchronized void runIt(ThreadPoolRunnable toRun) {
-           if( toRun == null ) {
-               throw new NullPointerException("No Runnable");
-           }
-            this.toRun = toRun;
-            shouldRun = true;
-            this.notify();
+        public synchronized boolean isEmpty(){
+            return (size == 0);
         }
 
-        public synchronized void terminate() {
-            shouldTerminate = true;
-            this.notify();
+        public synchronized boolean isFull(){
+            return (size == capacity);
         }
-    }
+
+        public synchronized void add(Object obj) throws InterruptedException{
+            waitWhileFull();
+
+            index.put(obj.toString(),obj);
+
+            size++;
+            notifyAll();
+        }
+
+        public synchronized Object remove() throws InterruptedException{
+            waitWhileEmpty();
+
+            //XXX: 
+            //Need to find a more efficient way to get the
+            //first element of a hash
+
+            String key = null;
+
+            Enumeration keys = index.keys();
+            if(keys.hasMoreElements()){
+                key = (String)keys.nextElement();
+            }    
+
+            Object obj = index.remove(key);
+            size --;
+
+            notifyAll();
+
+            return obj;
+        }
+
+        /*
+         * Method to get a particular object from the hash
+         * does not block, if the object does not exist it returns null
+         */
+        public synchronized Object remove(String key) throws InterruptedException{
+            Object o = index.remove(key);
+            if(o != null){
+                size--;
+                notifyAll();
+            }    
+            return o;
+        }    
+
+        public synchronized Object[] removeAll() throws InterruptedException{
+            Object[] list = new Object[size];
+
+            Enumeration keys = index.keys();
+            int i = 0;
+            while(keys.hasMoreElements()){
+                String key = (String)keys.nextElement();
+                list[i++] = remove(key);
+            }    
+
+            return list;
+        }     
+
+
+        public synchronized void waitWhileFull() throws InterruptedException{
+            while(isFull()){
+                wait();
+            }
+        }
+
+        public synchronized void waitWhileEmpty() throws InterruptedException{
+            while(isEmpty()){
+                wait();
+            }
+        }
+    }    
 }
Index: ThreadPoolRunnable.java
===================================================================
RCS file: 
/home/cvspublic/jakarta-tomcat/src/share/org/apache/tomcat/util/Attic/ThreadPoolRunnable.java,v
retrieving revision 1.1
diff -u -r1.1 ThreadPoolRunnable.java
--- ThreadPoolRunnable.java     2000/05/26 23:06:39     1.1
+++ ThreadPoolRunnable.java     2001/06/01 20:10:29
@@ -64,6 +64,8 @@
 import java.util.*;
 import java.io.*;
 
+
+
 /** Implemented if you want to run a piece of code inside a thread pool.
  */
 public interface ThreadPoolRunnable {
@@ -77,9 +79,17 @@
      */
     public Object[] getInitData();
 
+
     /** This method will be executed in one of the pool's threads. The
      *  thread will be returned to the pool.
      */
     public void runIt(Object thData[]);
 
+    /* 
+     *  Same as above, but a thread id is passed,
+     *  so that the working thread can indicate when it has begun
+     *  to perform real work as oppossed to just waiting on a socket
+     *  useful to stop long running threads
+     */
+    public void runIt(Object thData[],String id);
 }
Index: PoolTcpEndpoint.java
===================================================================
RCS file: 
/home/cvspublic/jakarta-tomcat/src/share/org/apache/tomcat/service/Attic/PoolTcpEndpoint.java,v
retrieving revision 1.8.2.5
diff -u -r1.8.2.5 PoolTcpEndpoint.java
--- PoolTcpEndpoint.java        2001/03/21 17:11:29     1.8.2.5
+++ PoolTcpEndpoint.java        2001/06/01 20:10:48
@@ -388,8 +388,12 @@
            return obj;
        }
     }
+
+    public void runIt(Object perThrData[]){
+       runIt(perThrData,null);
+    }
     
-    public void runIt(Object perThrData[]) {
+    public void runIt(Object perThrData[],String id) {
        TcpConnection con=null;
        if( ! usePool ) {
            // extract the original.
@@ -403,6 +407,10 @@
            if(null != s) {
                // Continue accepting on another thread...
                endpoint.tp.runIt(this);
+               //Tell the thread pool that we are about to process a request
+               if(id != null){
+                   endpoint.tp.setWorkerTimer(id,2);
+               }    
                
                try {
                    if( usePool ) {

Reply via email to