Update of /cvsroot/freenet/freenet/src/freenet/thread
In directory sc8-pr-cvs1:/tmp/cvs-serv3267

Added Files:
        YThreadFactory.java 
Log Message:
Yet another thread factory


--- NEW FILE: YThreadFactory.java ---
/* -*- Mode: java; c-basic-indent: 4; tab-width: 4 -*- */
package freenet.thread;

import freenet.Core;
import freenet.support.Irreversible;
import freenet.support.Logger;

/**
 * A derivative of QThreadFactory that queues jobs when it runs
 * out of threads.  All jobs go onto the queue, and are removed
 * by idle threads.
 *
 * Unnecessary creation of threads is avoided in two ways.
 * First, the thread deletion rate is limited.  This avoids
 * the overhead of deleting the thread and then creating
 * it again right away.  Second, thread creation is delayed
 * so long as the oldest job on the queue hasn't been there
 * more than a specified time.
 *
 * @author ejhuff
 */
public final class YThreadFactory implements ThreadFactory {
        
        /** 
         * Thread creation parameters:
         *
         * If available falls below either minimum, thread creation is
         * allowed.  Only one thread creates threads at a time, and that
         * thread waits until the oldest job on queue is old enough.
         *
         * The minimum available (total - active) threads.
         *
         * The minimum available/active ratio.
         *
         * The minimum age in millis of the oldest job on queue before a
         * new thread is actually created.
         *
         */

    private static final int threadCreationThreshold = 9;

    private static final double threadCreationRatio = 0.1;

        private static final long tolerableQueueDelay = 100;
        

        /** 
         * Thread deletion parameters:
         *
         * Both of these maximums must be exceeded for deletion to start:
         *
         * The maximum available (total - active) threads.
         *
     * The maximum available/active ratio.
         *
         * The minimum delay in millis between thread deletions.
         *
         */
    private static final int threadDeletionThreshold = 27;

    private static final double threadDeletionRatio = 1;

        private static final long threadDeletionDelay = 1000; // 1 per second
        
    private final ThreadGroup tg;
        
    private final JobQueue jobQueue = new JobQueue();
        
        private final boolean logDEBUG;
                
        private final int targetMaxThreads;

        private final String prefix;

    /**
     * @param tg     ThreadGroup all created threads will belong to
         * @param targetMaxThreads avoid creating more than this number of threads
     */
    public YThreadFactory(ThreadGroup tg, int targetMaxThreads, String prefix) {
        this.tg = tg;
        this.targetMaxThreads = targetMaxThreads;
                this.prefix = prefix;
                this.logDEBUG = Core.logger.shouldLog(Logger.DEBUG, this);
                new YThread();
    }
        
    /**
     * @return  the target maximum executing jobs.
         *          Caller may use this, together with
         *          activeThreads(), to determine load.
         *          This value is decreased by a thread
         *          which dies on outOfMemory exception.
     */
    public final int maximumThreads() {
                return targetMaxThreads;
    }
        
    /**
     * @return  the number of currently executing jobs
     */
    public final int activeThreads() {
                JobQueueSnapshot snap = new JobQueueSnapshot();
                jobQueue.snap(snap);
        return snap.active;
    }
        
    /**
     * @return  the instantaneous number of idle threads
     */
    public final int availableThreads() {
                JobQueueSnapshot snap = new JobQueueSnapshot();
                jobQueue.snap(snap);
        return snap.available;
    }
        
    /**
     * @param job  The job to be executed.
         * @return null
     */
    public final Thread getThread(Runnable job) {
                jobQueue.enqueue(job); 
                return null;
    }
        
    private final class YThread extends Thread implements PooledThread {
                
        private int jobsDone = 0;
                private long maxQueueDelay = 0;
                private long sumQueueDelay = 0;
                private Runnable job = null;

        public YThread() {
            super(tg, "YThread-unnamed-as-yet");
            super.start();
        }

                public Runnable job() {
                        return job;
                }
                
        public final void run() {    
                        jobQueue.newThreadStarting();
                        JobQueueResult result = new JobQueueResult();
                        while (true) {
                                jobQueue.dequeue(result);
                                if (result.job == null) {
                                        if (result.exit) {
                                                // An unneeded thread exits.
                                                double avgQueueDelay = 
(double)sumQueueDelay;
                                                Core.diagnostics.occurrenceContinuous
                                                        ("jobsPerYThread", jobsDone);
                                                if (jobsDone > 0) {
                                                        avgQueueDelay /= jobsDone;
                                                        
Core.diagnostics.occurrenceContinuous
                                                                
("maxQueueDelayThisYThread", maxQueueDelay);
                                                        
Core.diagnostics.occurrenceContinuous
                                                                
("avgQueueDelayThisYThread", avgQueueDelay); //same
                                                }
                                                if (logDEBUG)
                                                        Core.logger.log(this, 
Thread.currentThread().getName() + " ended. " + 
                                                                                       
 result.available + " threads available. " +
                                                                                       
 result.active + " threads active. " +
                                                                                       
 jobsDone + " jobs done. " +
                                                                                       
 maxQueueDelay + " max queue delay. " +
                                                                                       
 avgQueueDelay + " avg queue delay.",
                                                                                       
 Core.logger.DEBUG);
                                                return;
                                        }
                                        // (result.job == null && ! result.exit) means 
we
                                        // should create and start one new thread, but
                                        // only when the deadline has arrived.
                                        while (true) {
                                                long timeBeforeDeadline = 
jobQueue.timeBeforeDeadline();
                                                if (timeBeforeDeadline <= 0) break;
                                                try {
                                                        
Thread.sleep(timeBeforeDeadline);
                                                } catch (InterruptedException e) {}
                                        }
                                        new YThread();
                                } else { // (result.job != null) means we have a job 
to do.
                                        job = result.job;
                                        sumQueueDelay += result.queueDelay;
                                        if (maxQueueDelay < result.queueDelay) {
                                                maxQueueDelay = result.queueDelay;
                                        }
                                        try {
                                                
Core.diagnostics.occurrenceContinuous("jobQueueDelayAllYThreads", 
                                                                                       
                                   result.queueDelay);
                                                // 
Core.diagnostics.occurrenceCounting("jobsExecuted", 1);
                                                job.run();
                                        } catch (Throwable e) {
                                                Core.logger.log(this, "Unhandled 
exception " + e + " in job " + job,
                                                                                e, 
Core.logger.ERROR);
                                                
freenet.node.Main.dumpInterestingObjects();
                                        } finally {
                                                jobsDone++;
                                                job = null;
                                        }
                                }
                        }
                }

        }
        
        private final class JobQueueSnapshot {
                int active = 0;
                int available = 0;
        }

        private final class JobQueueResult {
                Runnable job = null;
                boolean exit = false;
                int active = 0;
                int available = 0;
                long queueDelay = 0;
        }
        
    private final class JobQueue {
                // information about threads
                private int total = 0;
                private int active = 0;
                private int available = 0; // total == active + available
                private int threadNumber = 0;
                private long nextThreadDeletionTime = 0;
                private boolean notCreatingThreads = true;

                // information about the job queue.
                private final JobQueueItem head = new JobQueueItem();
                private JobQueueItem tail = head;
                private JobQueueItem free = null;
                private int freeListLength = 0;

                private final class JobQueueItem {
                        JobQueueItem next = null;
                        Runnable job = null;
                        long enqueueTime = 0;
                }

                // A new thread has started.  Account for it.
                private synchronized void newThreadStarting() {
                        total++;
                        active++; // active + available == total
                        Thread.currentThread().setName(prefix + (threadNumber++));
                        notCreatingThreads = true;
                }

                private synchronized void snap(JobQueueSnapshot snap) {
                        snap.active = active;
                        snap.available = available;
                }
                
                // called only from getThread()
                private synchronized void enqueue(Runnable job) {
                        if (free == null) {
                                free = new JobQueueItem();
                                freeListLength++;
                        }
                        JobQueueItem mine = free;
                        free = free.next;
                        freeListLength--;
                        mine.next = null;
                        mine.job = job;
                        mine.enqueueTime = System.currentTimeMillis();
                        tail.next = mine;
                        tail = mine;
                        this.notify(); // wake a thread
                }

                // called only from YThread.run()
                private synchronized long timeBeforeDeadline() {
                        if (head.next == null) return tolerableQueueDelay;
                        return tolerableQueueDelay + head.next.enqueueTime -
                                System.currentTimeMillis();
                }

                // called only from YThread.run()
                private synchronized void dequeue(JobQueueResult result) {
            active--;
            available++; // active + available == total
                        while (true) {
                                long now = System.currentTimeMillis();
                                long deleteDelay = 0;
                                // if we need more threads
                                if ( ( available < threadCreationThreshold ) ||
                                         ( available < active * threadCreationRatio )) 
{
                                        // if not already creating a thread
                                        if (notCreatingThreads) {
                                                // then make one new thread.
                                                notCreatingThreads = false;
                                                active++;
                                                available--; // active + available == 
total
                                                result.active = active;
                                                result.available = available;
                                                result.exit = false;
                                                result.job = null;
                                                result.queueDelay = 0;
                                                this.notify(); // wake another thread 
in case there is work
                                                return;
                                        }
                                }
                                if (head.next != null) {
                                        active++;
                                        available--; // active + available == total
                                        JobQueueItem mine = head.next;
                                        head.next = mine.next;
                                        if (head.next == null) tail = head;
                                        result.job = mine.job;
                                        result.queueDelay = now - mine.enqueueTime;
                                        mine.job = null;
                                        mine.enqueueTime = 0;
                                        mine.next = free;
                                        free = mine;
                                        freeListLength++;
                                        while (freeListLength > 10000) {
                                                mine = free;
                                                free = free.next;
                                                freeListLength--;
                                                mine.next = null;
                                                // mine will be garbage collected later
                                        }
                                        result.active = active;
                                        result.available = available;
                                        result.exit = false;
                                        this.notify(); // wake the next thread
                                        return;
                                }
                                // No jobs left on queue to run.
                                // If thread deletion is allowed at this time
                                // and if there are too many threads, delete one.
                                if (available > threadDeletionThreshold) {
                                        if (available > active * threadDeletionRatio) {
                                                deleteDelay = nextThreadDeletionTime - 
now;
                                                if (deleteDelay <= 0) {
                                                        total--;
                                                        available--; // active + 
available == total
                                                        nextThreadDeletionTime = now + 
threadDeletionDelay;
                                                        result.active = active;
                                                        result.available = available;
                                                        result.exit = true;
                                                        result.job = null;
                                                        result.queueDelay = 0;
                                                        this.notify();
                                                        return;
                                                }
                                        }
                                }
                                try {
                                        // idle threads wait here
                                        if (deleteDelay > 0) {
                                                // wait until it is time to consider 
deleting
                                                // threads or until more work comes in.
                                                this.wait(deleteDelay);
                                        } else {
                                                this.wait(); // no thread deletion 
presently planned
                                        }
                                } catch ( InterruptedException ie ) {}
                        }
                }

        }
}

_______________________________________________
cvs mailing list
[EMAIL PROTECTED]
http://dodo.freenetproject.org/cgi-bin/mailman/listinfo/cvs

Reply via email to