Author: kfujino
Date: Wed Jun 19 09:59:56 2013
New Revision: 1494527
URL: http://svn.apache.org/r1494527
Log:
Replace Tribes's TaskQueue as executor's workQueue in order to ensure that
executor's maxThread works correctly.
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java
Modified:
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=1494527&r1=1494526&r2=1494527&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
(original)
+++
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
Wed Jun 19 09:59:56 2013
@@ -16,8 +16,7 @@
*/
package org.apache.catalina.tribes.group.interceptors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -25,6 +24,7 @@ import org.apache.catalina.tribes.Channe
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.transport.bio.util.LinkObject;
+import org.apache.catalina.tribes.util.ExecutorFactory;
import org.apache.catalina.tribes.util.TcclThreadFactory;
/**
@@ -40,12 +40,10 @@ import org.apache.catalina.tribes.util.T
public class MessageDispatch15Interceptor extends MessageDispatchInterceptor {
protected final AtomicLong currentSize = new AtomicLong(0);
- protected ThreadPoolExecutor executor = null;
+ protected ExecutorService executor = null;
protected int maxThreads = 10;
protected int maxSpareThreads = 2;
protected long keepAliveTime = 5000;
- protected final LinkedBlockingQueue<Runnable> runnablequeue =
- new LinkedBlockingQueue<>();
@Override
public long getCurrentSize() {
@@ -84,9 +82,8 @@ public class MessageDispatch15Intercepto
@Override
public void startQueue() {
if ( run ) return;
- executor = new ThreadPoolExecutor(maxSpareThreads, maxThreads,
- keepAliveTime, TimeUnit.MILLISECONDS, runnablequeue,
- new TcclThreadFactory());
+ executor = ExecutorFactory.newThreadPool(maxSpareThreads, maxThreads,
+ keepAliveTime, TimeUnit.MILLISECONDS, new TcclThreadFactory());
run = true;
}
@@ -95,7 +92,6 @@ public class MessageDispatch15Intercepto
run = false;
executor.shutdownNow();
setAndGetCurrentSize(0);
- runnablequeue.clear();
}
public long getKeepAliveTime() {
Modified: tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java?rev=1494527&r1=1494526&r2=1494527&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java Wed
Jun 19 09:59:56 2013
@@ -17,8 +17,11 @@
package org.apache.catalina.tribes.util;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -27,18 +30,52 @@ public class ExecutorFactory {
public static ExecutorService newThreadPool(int minThreads, int
maxThreads, long maxIdleTime, TimeUnit unit) {
TaskQueue taskqueue = new TaskQueue();
- ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads,
maxThreads, maxIdleTime, unit,taskqueue);
+ ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads,
maxThreads, maxIdleTime, unit,taskqueue);
taskqueue.setParent(service);
return service;
}
public static ExecutorService newThreadPool(int minThreads, int
maxThreads, long maxIdleTime, TimeUnit unit, ThreadFactory threadFactory) {
TaskQueue taskqueue = new TaskQueue();
- ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads,
maxThreads, maxIdleTime, unit,taskqueue, threadFactory);
+ ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads,
maxThreads, maxIdleTime, unit,taskqueue, threadFactory);
taskqueue.setParent(service);
return service;
}
+ // ---------------------------------------------- TribesThreadPoolExecutor
Inner Class
+ private static class TribesThreadPoolExecutor extends ThreadPoolExecutor {
+ public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, handler);
+ }
+
+ public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler);
+ }
+
+ public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory);
+ }
+
+ public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ try {
+ super.execute(command);
+ } catch (RejectedExecutionException rx) {
+ if (super.getQueue() instanceof TaskQueue) {
+ TaskQueue queue = (TaskQueue)super.getQueue();
+ if (!queue.force(command)) {
+ throw new RejectedExecutionException("Queue capacity
is full.");
+ }
+ }
+ }
+ }
+ }
+
// ---------------------------------------------- TaskQueue Inner Class
private static class TaskQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;
@@ -53,6 +90,11 @@ public class ExecutorFactory {
parent = tp;
}
+ public boolean force(Runnable o) {
+ if ( parent.isShutdown() ) throw new
RejectedExecutionException("Executor not running, can't force a command into
the queue");
+ return super.offer(o); //forces the item onto the queue, to be
used if the task is rejected
+ }
+
@Override
public boolean offer(Runnable o) {
//we can't do any checks
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]