Author: elecharny Date: Sun Apr 5 21:57:44 2009 New Revision: 762167 URL: http://svn.apache.org/viewvc?rev=762167&view=rev Log: o Added Javadoc for all the constructors o Replaced the non thread safe circular queue by a concurentLinkedQueue o Added some constants definition for clarity sake o Removed the inner SessionBuffer class o Renamed the BUFFER constants to TASKS_QUEUE o Added comments in the important parts of the code, for clarity o Renamed queuehandler by eventQueueHandler o Removed the maxPoolSize and corPoolSize, as they are already present in the parent class o Added @inheritDoc tags o Some other minor cleaning
Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java?rev=762167&r1=762166&r2=762167&view=diff ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java (original) +++ mina/trunk/core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java Sun Apr 5 21:57:44 2009 @@ -25,6 +25,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; @@ -38,7 +39,6 @@ import org.apache.mina.core.session.DummySession; import org.apache.mina.core.session.IoEvent; import org.apache.mina.core.session.IoSession; -import org.apache.mina.util.CircularQueue; /** * A {...@link ThreadPoolExecutor} that maintains the order of {...@link IoEvent}s. @@ -51,48 +51,117 @@ * @org.apache.xbean.XBean */ public class OrderedThreadPoolExecutor extends ThreadPoolExecutor { - + /** A default value for the initial pool size */ + private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0; + + /** A default value for the maximum pool size */ + private static final int DEFAULT_MAX_THREAD_POOL = 16; + + /** A default value for the KeepAlive delay */ + private static final int DEFAULT_KEEP_ALIVE = 30; + private static final IoSession EXIT_SIGNAL = new DummySession(); - private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer"); + /** A key stored into the session's attribute for the event tasks being queued */ + private final AttributeKey TASKS_QUEUE = new AttributeKey(getClass(), "tasksQueue"); + private final BlockingQueue<IoSession> waitingSessions = new LinkedBlockingQueue<IoSession>(); private final Set<Worker> workers = new HashSet<Worker>(); - private volatile int corePoolSize; - private volatile int maximumPoolSize; private volatile int largestPoolSize; private final AtomicInteger idleWorkers = new AtomicInteger(); private long completedTaskCount; private volatile boolean shutdown; - private final IoEventQueueHandler queueHandler; + private final IoEventQueueHandler eventQueueHandler; + /** + * Creates a default ThreadPool, with default values : + * - minimum pool size is 0 + * - maximum pool size is 16 + * - keepAlive set to 30 seconds + * - A default ThreadFactory + * - All events are accepted + */ public OrderedThreadPoolExecutor() { - this(16); + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, + DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null); } + /** + * Creates a default ThreadPool, with default values : + * - minimum pool size is 0 + * - keepAlive set to 30 seconds + * - A default ThreadFactory + * - All events are accepted + * + * @param maximumPoolSize The maximum pool size + */ public OrderedThreadPoolExecutor(int maximumPoolSize) { - this(0, maximumPoolSize); + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, + Executors.defaultThreadFactory(), null); } + /** + * Creates a default ThreadPool, with default values : + * - keepAlive set to 30 seconds + * - A default ThreadFactory + * - All events are accepted + * + * @param corePoolSize The initial pool sizePoolSize + * @param maximumPoolSize The maximum pool size + */ public OrderedThreadPoolExecutor(int corePoolSize, int maximumPoolSize) { - this(corePoolSize, maximumPoolSize, 30, TimeUnit.SECONDS); + this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, + Executors.defaultThreadFactory(), null); } + /** + * Creates a default ThreadPool, with default values : + * - A default ThreadFactory + * - All events are accepted + * + * @param corePoolSize The initial pool sizePoolSize + * @param maximumPoolSize The maximum pool size + * @param keepAliveTime Default duration for a thread + * @param unit Time unit used for the keepAlive value + */ public OrderedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory()); + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, + Executors.defaultThreadFactory(), null); } + /** + * Creates a default ThreadPool, with default values : + * - A default ThreadFactory + * + * @param corePoolSize The initial pool sizePoolSize + * @param maximumPoolSize The maximum pool size + * @param keepAliveTime Default duration for a thread + * @param unit Time unit used for the keepAlive value + * @param queueHandler The queue used to store events + */ public OrderedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, IoEventQueueHandler queueHandler) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), queueHandler); + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, + Executors.defaultThreadFactory(), queueHandler); } + /** + * Creates a default ThreadPool, with default values : + * - A default ThreadFactory + * + * @param corePoolSize The initial pool sizePoolSize + * @param maximumPoolSize The maximum pool size + * @param keepAliveTime Default duration for a thread + * @param unit Time unit used for the keepAlive value + * @param threadFactory The factory used to create threads + */ public OrderedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, @@ -100,46 +169,76 @@ this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null); } + /** + * Creates a new instance of a OrderedThreadPoolExecutor. + * + * @param corePoolSize The initial pool sizePoolSize + * @param maximumPoolSize The maximum pool size + * @param keepAliveTime Default duration for a thread + * @param unit Time unit used for the keepAlive value + * @param threadFactory The factory used to create threads + * @param queueHandler The queue used to store events + */ public OrderedThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) { - super(0, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy()); - if (corePoolSize < 0) { + // We have to initialize the pool with default values (0 and 1) in order to + // handle the exception in a better way. We can't add a try {} catch() {} + // around the super() call. + super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, + new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy()); + + if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) { throw new IllegalArgumentException("corePoolSize: " + corePoolSize); } - if (maximumPoolSize == 0 || maximumPoolSize < corePoolSize) { + if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) { throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); } - if (queueHandler == null) { - queueHandler = IoEventQueueHandler.NOOP; - } - - this.corePoolSize = corePoolSize; - this.maximumPoolSize = maximumPoolSize; - this.queueHandler = queueHandler; - } - + // Now, we can setup the pool sizes + super.setCorePoolSize( corePoolSize ); + super.setMaximumPoolSize( maximumPoolSize ); + + // The queueHandler might be null. + this.eventQueueHandler = queueHandler; + } + + + /** + * @return The associated queue handler. + */ public IoEventQueueHandler getQueueHandler() { - return queueHandler; + return eventQueueHandler; } + /** + * {...@inheritdoc} + */ @Override public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { // Ignore the request. It must always be AbortPolicy. } + /** + * Add a new thread to execute a task, if needed and possible. + * It depends on the current pool size. If it's full, we do nothing. + */ private void addWorker() { synchronized (workers) { - if (workers.size() >= maximumPoolSize) { + if (workers.size() >= super.getMaximumPoolSize()) { return; } + // Create a new worker, and add it to the thread pool Worker worker = new Worker(); Thread thread = getThreadFactory().newThread(worker); + + // As we have added a new thread, it's considered as idle. idleWorkers.incrementAndGet(); + + // Now, we can start it. thread.start(); workers.add(worker); @@ -149,10 +248,13 @@ } } + /** + * Add a new Worker only if there are no idle worker. + */ private void addWorkerIfNecessary() { if (idleWorkers.get() == 0) { synchronized (workers) { - if (workers.isEmpty() || idleWorkers.get() == 0) { + if (workers.isEmpty() || (idleWorkers.get() == 0)) { addWorker(); } } @@ -161,27 +263,33 @@ private void removeWorker() { synchronized (workers) { - if (workers.size() <= corePoolSize) { + if (workers.size() <= super.getCorePoolSize()) { return; } waitingSessions.offer(EXIT_SIGNAL); } } + /** + * {...@inheritdoc} + */ @Override public int getMaximumPoolSize() { - return maximumPoolSize; + return super.getMaximumPoolSize(); } + /** + * {...@inheritdoc} + */ @Override public void setMaximumPoolSize(int maximumPoolSize) { - if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) { + if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) { throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); } synchronized (workers) { - this.maximumPoolSize = maximumPoolSize; + super.setMaximumPoolSize( maximumPoolSize ); int difference = workers.size() - maximumPoolSize; while (difference > 0) { removeWorker(); @@ -190,6 +298,9 @@ } } + /** + * {...@inheritdoc} + */ @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { @@ -209,11 +320,17 @@ return isTerminated(); } + /** + * {...@inheritdoc} + */ @Override public boolean isShutdown() { return shutdown; } + /** + * {...@inheritdoc} + */ @Override public boolean isTerminated() { if (!shutdown) { @@ -225,6 +342,9 @@ } } + /** + * {...@inheritdoc} + */ @Override public void shutdown() { if (shutdown) { @@ -240,12 +360,16 @@ } } + /** + * {...@inheritdoc} + */ @Override public List<Runnable> shutdownNow() { shutdown(); List<Runnable> answer = new ArrayList<Runnable>(); IoSession session; + while ((session = waitingSessions.poll()) != null) { if (session == EXIT_SIGNAL) { waitingSessions.offer(EXIT_SIGNAL); @@ -253,55 +377,71 @@ continue; } - SessionBuffer buf = (SessionBuffer) session.getAttribute(BUFFER); - synchronized (buf.queue) { - for (Runnable task: buf.queue) { + Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE); + + synchronized (tasksQueue) { + + for (Runnable task: tasksQueue) { getQueueHandler().polled(this, (IoEvent) task); answer.add(task); } - buf.queue.clear(); + + tasksQueue.clear(); } } return answer; } + /** + * {...@inheritdoc} + */ @Override public void execute(Runnable task) { if (shutdown) { rejectTask(task); } + // Check that it's a IoEvent task checkTaskType(task); - IoEvent e = (IoEvent) task; - IoSession s = e.getSession(); - SessionBuffer buf = getSessionBuffer(s); - Queue<Runnable> queue = buf.queue; + IoEvent event = (IoEvent) task; + IoSession session = event.getSession(); + + // Get the session's queue of events + Queue<Runnable> tasksQueue = getTasksQueue(session); boolean offerSession; - boolean offerEvent = queueHandler.accept(this, e); + boolean offerEvent = true; + + // propose the new event to the event queue handler. If we + // use a throttle queue handler, the message may be rejected + // if the maximum size has been reached. + if (eventQueueHandler != null) { + offerEvent = eventQueueHandler.accept(this, event); + } + if (offerEvent) { - synchronized (queue) { - queue.offer(e); - if (buf.processingCompleted) { - buf.processingCompleted = false; - offerSession = true; - } else { - offerSession = false; - } + // Ok, the message has been accepted + synchronized (tasksQueue) { + offerSession = tasksQueue.isEmpty(); + + // Inject the event into the executor taskQueue + tasksQueue.offer(event); } } else { offerSession = false; } if (offerSession) { - waitingSessions.offer(s); + waitingSessions.offer(session); } addWorkerIfNecessary(); if (offerEvent) { - queueHandler.offered(this, e); + if (eventQueueHandler != null) { + eventQueueHandler.offered(this, event); + } } } @@ -315,6 +455,9 @@ } } + /** + * {...@inheritdoc} + */ @Override public int getActiveCount() { synchronized (workers) { @@ -322,6 +465,9 @@ } } + /** + * {...@inheritdoc} + */ @Override public long getCompletedTaskCount() { synchronized (workers) { @@ -334,11 +480,17 @@ } } + /** + * {...@inheritdoc} + */ @Override public int getLargestPoolSize() { return largestPoolSize; } + /** + * {...@inheritdoc} + */ @Override public int getPoolSize() { synchronized (workers) { @@ -346,11 +498,17 @@ } } + /** + * {...@inheritdoc} + */ @Override public long getTaskCount() { return getCompletedTaskCount(); } + /** + * {...@inheritdoc} + */ @Override public boolean isTerminating() { synchronized (workers) { @@ -358,11 +516,14 @@ } } + /** + * {...@inheritdoc} + */ @Override public int prestartAllCoreThreads() { int answer = 0; synchronized (workers) { - for (int i = corePoolSize - workers.size() ; i > 0; i --) { + for (int i = super.getCorePoolSize() - workers.size() ; i > 0; i --) { addWorker(); answer ++; } @@ -370,10 +531,13 @@ return answer; } + /** + * {...@inheritdoc} + */ @Override public boolean prestartCoreThread() { synchronized (workers) { - if (workers.size() < corePoolSize) { + if (workers.size() < super.getCorePoolSize()) { addWorker(); return true; } else { @@ -382,77 +546,92 @@ } } + /** + * {...@inheritdoc} + */ @Override public BlockingQueue<Runnable> getQueue() { throw new UnsupportedOperationException(); } + /** + * {...@inheritdoc} + */ @Override public void purge() { // Nothing to purge in this implementation. } + /** + * {...@inheritdoc} + */ @Override public boolean remove(Runnable task) { checkTaskType(task); - IoEvent e = (IoEvent) task; - IoSession s = e.getSession(); - SessionBuffer buffer = (SessionBuffer) s.getAttribute(BUFFER); - if (buffer == null) { + IoEvent event = (IoEvent) task; + IoSession session = event.getSession(); + Queue<Runnable> tasksQueue = (Queue<Runnable>)session.getAttribute(TASKS_QUEUE); + + if (tasksQueue == null) { return false; } boolean removed; - synchronized (buffer.queue) { - removed = buffer.queue.remove(task); + + synchronized (tasksQueue) { + removed = tasksQueue.remove(task); } if (removed) { - getQueueHandler().polled(this, e); + getQueueHandler().polled(this, event); } return removed; } + /** + * {...@inheritdoc} + */ @Override public int getCorePoolSize() { - return corePoolSize; + return super.getCorePoolSize(); } + /** + * {...@inheritdoc} + */ @Override public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) { throw new IllegalArgumentException("corePoolSize: " + corePoolSize); } - if (corePoolSize > maximumPoolSize) { + if (corePoolSize > super.getMaximumPoolSize()) { throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize"); } synchronized (workers) { - if (this.corePoolSize > corePoolSize) { - for (int i = this.corePoolSize - corePoolSize; i > 0; i --) { + if (super.getCorePoolSize()> corePoolSize) { + for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i --) { removeWorker(); } } - this.corePoolSize = corePoolSize; + super.setCorePoolSize(corePoolSize); } } - private SessionBuffer getSessionBuffer(IoSession session) { - SessionBuffer buffer = (SessionBuffer) session.getAttribute(BUFFER); - if (buffer == null) { - buffer = new SessionBuffer(); - SessionBuffer oldBuffer = (SessionBuffer) session.setAttributeIfAbsent(BUFFER, buffer); - if (oldBuffer != null) { - buffer = oldBuffer; + private Queue<Runnable> getTasksQueue(IoSession session) { + Queue<Runnable> tasksQueue = (Queue<Runnable>) session.getAttribute(TASKS_QUEUE); + + if (tasksQueue == null) { + tasksQueue = new ConcurrentLinkedQueue<Runnable>(); + Queue<Runnable> oldTasksQueue = (Queue<Runnable>) session.setAttributeIfAbsent(TASKS_QUEUE, tasksQueue); + + if (oldTasksQueue != null) { + tasksQueue = oldTasksQueue; } } - return buffer; - } - - private static class SessionBuffer { - private final Queue<Runnable> queue = new CircularQueue<Runnable>(); - private boolean processingCompleted = true; + + return tasksQueue; } private class Worker implements Runnable { @@ -471,7 +650,7 @@ if (session == null) { synchronized (workers) { - if (workers.size() > corePoolSize) { + if (workers.size() > getCorePoolSize()) { // Remove now to prevent duplicate exit. workers.remove(this); break; @@ -485,7 +664,7 @@ try { if (session != null) { - runTasks(getSessionBuffer(session)); + runTasks(getTasksQueue(session)); } } finally { idleWorkers.incrementAndGet(); @@ -527,19 +706,25 @@ return session; } - private void runTasks(SessionBuffer buf) { + private void runTasks(Queue<Runnable> tasksQueue) { for (;;) { Runnable task; - synchronized (buf.queue) { - task = buf.queue.poll(); + + synchronized (tasksQueue) { + if ( tasksQueue.isEmpty()) { + break; + } + + task = tasksQueue.poll(); if (task == null) { - buf.processingCompleted = true; break; } } - queueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task); + if (eventQueueHandler != null) { + eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task); + } runTask(task); }