Author: tabish
Date: Wed Mar 21 19:55:30 2012
New Revision: 1303544
URL: http://svn.apache.org/viewvc?rev=1303544&view=rev
Log:
fis for: https://issues.apache.org/jira/browse/AMQ-3718
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1303544&r1=1303543&r2=1303544&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Wed Mar 21 19:55:30 2012
@@ -31,6 +31,8 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -2371,13 +2373,35 @@ public class BrokerService implements Se
protected synchronized ThreadPoolExecutor getExecutor() {
if (this.executor == null) {
- this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "Usage Async Task");
- thread.setDaemon(true);
- return thread;
- }
- });
+ this.executor = new ThreadPoolExecutor(1, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+
+ private long i = 0;
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ this.i++;
+ Thread thread = new Thread(runnable,
"BrokerService.worker." + this.i);
+ thread.setDaemon(true);
+ thread.setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(final Thread t, final
Throwable e) {
+ LOG.error("Error in thread '{}'", t.getName(), e);
+ }
+ });
+ return thread;
+ }
+ }, new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(final Runnable r, final
ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException("Interrupted
waiting for BrokerService.worker");
+ }
+
+ throw new RejectedExecutionException("Timed Out while
attempting to enqueue Task.");
+ }
+ });
}
return this.executor;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=1303544&r1=1303543&r2=1303544&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java
Wed Mar 21 19:55:30 2012
@@ -48,9 +48,9 @@ public class TaskRunnerFactory implement
public TaskRunnerFactory() {
this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
}
-
+
private TaskRunnerFactory(String name, int priority, boolean daemon, int
maxIterationsPerRun) {
- this(name,priority,daemon,maxIterationsPerRun,false);
+ this(name,priority,daemon,maxIterationsPerRun,false);
}
public TaskRunnerFactory(String name, int priority, boolean daemon, int
maxIterationsPerRun, boolean dedicatedTaskRunner) {
@@ -92,7 +92,7 @@ public class TaskRunnerFactory implement
public void execute(Runnable runnable) {
execute(runnable, "ActiveMQ Task");
}
-
+
public void execute(Runnable runnable, String name) {
init();
if (executor != null) {
@@ -103,7 +103,7 @@ public class TaskRunnerFactory implement
}
protected ExecutorService createDefaultExecutor() {
- ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+ ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, name + "-" +
id.incrementAndGet());
thread.setDaemon(daemon);
@@ -111,7 +111,6 @@ public class TaskRunnerFactory implement
return thread;
}
});
- // rc.allowCoreThreadTimeOut(true);
return rc;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java?rev=1303544&r1=1303543&r2=1303544&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/SelectorManager.java
Wed Mar 21 19:55:30 2012
@@ -29,11 +29,9 @@ import java.util.concurrent.TimeUnit;
/**
* The SelectorManager will manage one Selector and the thread that checks the
* selector.
- *
+ *
* We may need to consider running more than one thread to check the selector
if
* servicing the selector takes too long.
- *
- * @version $Rev: 46019 $ $Date: 2004-09-14 05:56:06 -0400 (Tue, 14 Sep 2004) $
*/
public final class SelectorManager {
@@ -43,28 +41,31 @@ public final class SelectorManager {
private Executor channelExecutor = selectorExecutor;
private LinkedList<SelectorWorker> freeWorkers = new
LinkedList<SelectorWorker>();
private int maxChannelsPerWorker = 1024;
-
+
protected ExecutorService createDefaultExecutor() {
- ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new
ThreadFactory() {
+ ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
+
+ private long i = 0;
+
public Thread newThread(Runnable runnable) {
- return new Thread(runnable, "ActiveMQ NIO Worker");
+ this.i++;
+ final Thread t = new Thread(runnable, "ActiveMQ NIO Worker " +
this.i);
+ return t;
}
});
- // rc.allowCoreThreadTimeOut(true);
+
return rc;
}
-
+
public static SelectorManager getInstance() {
return SINGLETON;
}
public interface Listener {
void onSelect(SelectorSelection selector);
-
void onError(SelectorSelection selection, Throwable error);
}
-
public synchronized SelectorSelection register(SocketChannel
socketChannel, Listener listener)
throws IOException {
@@ -78,7 +79,6 @@ public final class SelectorManager {
worker.retain();
selection = new SelectorSelection(worker, socketChannel,
listener);
}
-
} else {
// Worker starts /w retain count of 1
SelectorWorker worker = new SelectorWorker(this);
@@ -86,7 +86,7 @@ public final class SelectorManager {
selection = new SelectorSelection(worker, socketChannel,
listener);
}
}
-
+
return selection;
}
@@ -125,5 +125,4 @@ public final class SelectorManager {
public void setSelectorExecutor(Executor selectorExecutor) {
this.selectorExecutor = selectorExecutor;
}
-
}