Author: dblevins
Date: Tue Aug 21 20:28:29 2012
New Revision: 1375764
URL: http://svn.apache.org/viewvc?rev=1375764&view=rev
Log:
TOMEE-398 - Unified Executor configuration options (@Asynchronous, @Timeout)
Added:
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/OfferRejectedExecutionHandler.java
Modified:
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/EjbTimerServiceImpl.java
Modified:
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java?rev=1375764&r1=1375763&r2=1375764&view=diff
==============================================================================
---
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
(original)
+++
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/async/AsynchronousPool.java
Tue Aug 21 20:28:29 2012
@@ -18,25 +18,19 @@ package org.apache.openejb.async;
import org.apache.openejb.AppContext;
import org.apache.openejb.core.ThreadContext;
-import org.apache.openejb.loader.Options;
import org.apache.openejb.util.DaemonThreadFactory;
-import org.apache.openejb.util.Duration;
+import org.apache.openejb.util.ExecutorBuilder;
import javax.ejb.EJBException;
import javax.ejb.NoSuchEJBException;
import java.rmi.NoSuchObjectException;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -50,61 +44,19 @@ public class AsynchronousPool {
private final BlockingQueue<Runnable> blockingQueue;
private final ExecutorService executor;
- public AsynchronousPool(String id, int corePoolSize, int maximumPoolSize,
Duration keepAliveTime, BlockingQueue<Runnable> blockingQueue) {
- blockingQueue = new LinkedBlockingQueue<Runnable>();
- final TimeUnit unit = (keepAliveTime.getUnit() != null) ?
keepAliveTime.getUnit() : TimeUnit.SECONDS;
- this.blockingQueue = blockingQueue;
- this.executor = new ThreadPoolExecutor(
- corePoolSize,
- maximumPoolSize,
- keepAliveTime.getTime(),
- unit, blockingQueue, new DaemonThreadFactory("@Asynchronous",
id.trim()));
+ public AsynchronousPool(ThreadPoolExecutor threadPoolExecutor) {
+ this.blockingQueue = threadPoolExecutor.getQueue();
+ this.executor = threadPoolExecutor;
}
public static AsynchronousPool create(AppContext appContext) {
- final Options options = appContext.getOptions();
- final String id = appContext.getId();
- final int corePoolSize = options.get("AsynchronousPool.CorePoolSize",
10);
- final int maximumPoolSize =
Math.max(options.get("AsynchronousPool.MaximumPoolSize", 20), corePoolSize);
- final Duration keepAliveTime =
options.get("AsynchronousPool.KeepAliveTime", new Duration(60,
TimeUnit.SECONDS));
- final BlockingQueue queue = options.get("AsynchronousPool.QueueType",
QueueType.LINKED).create(options);
+ final ExecutorBuilder builder = new ExecutorBuilder()
+ .prefix("AsynchronousPool")
+ .size(10)
+ .threadFactory(new DaemonThreadFactory("@Asynchronous",
appContext.getId()));
- return new AsynchronousPool(id, corePoolSize, maximumPoolSize,
keepAliveTime, queue);
- }
-
- private static enum QueueType {
- ARRAY,
- DELAY,
- LINKED,
- PRIORITY,
- SYNCHRONOUS;
-
- public BlockingQueue create(Options options) {
- switch (this) {
- case ARRAY: {
- return new
ArrayBlockingQueue(options.get("AsynchronousPool.QueueSize", 100));
- }
- case DELAY: {
- return new DelayQueue();
- }
- case LINKED: {
- return new
LinkedBlockingQueue(options.get("AsynchronousPool.QueueSize",
Integer.MAX_VALUE));
- }
- case PRIORITY: {
- return new PriorityBlockingQueue();
- }
- case SYNCHRONOUS: {
- return new
SynchronousQueue(options.get("AsynchronousPool.QueueFair", false));
- }
- default: {
- // The Options class will throw an error if the user
supplies an unknown enum string
- // The only way we can reach this is if we add a new
QueueType element and forget to
- // implement it in the above switch statement.
- throw new IllegalArgumentException("Unknown QueueType
type: " + this);
- }
- }
- }
+ return new AsynchronousPool(builder.build(appContext.getOptions()));
}
public Object invoke(Callable<Object> callable, boolean isVoid) throws
Throwable {
Modified:
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java?rev=1375764&r1=1375763&r2=1375764&view=diff
==============================================================================
---
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
(original)
+++
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java
Tue Aug 21 20:28:29 2012
@@ -17,19 +17,21 @@
package org.apache.openejb.core.timer;
+import org.apache.openejb.loader.Options;
import org.apache.openejb.loader.SystemInstance;
import org.apache.openejb.util.DaemonThreadFactory;
+import org.apache.openejb.util.ExecutorBuilder;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
+import org.apache.openejb.util.executor.OfferRejectedExecutionHandler;
import org.quartz.SchedulerConfigException;
import org.quartz.spi.ThreadPool;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
* @version $Rev$ $Date$
@@ -62,41 +64,17 @@ public class DefaultTimerThreadPoolAdapt
private final boolean threadPoolExecutorUsed;
public DefaultTimerThreadPoolAdapter() {
- this.executor = SystemInstance.get().getComponent(Executor.class);
+ final TimerExecutor timerExecutor =
SystemInstance.get().getComponent(TimerExecutor.class);
- if (this.executor == null) {
-
- int size =
Integer.parseInt(SystemInstance.get().getProperty(OPENEJB_TIMER_POOL_SIZE,
"3"));
- if (size < 3) {
- size = 3;
- }
-
- this.executor = new ThreadPoolExecutor(size
- , size
- , 60L
- , TimeUnit.SECONDS
- , new LinkedBlockingQueue<Runnable>(size)
- , new
DaemonThreadFactory(DefaultTimerThreadPoolAdapter.class)
- , new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(final Runnable r, final
ThreadPoolExecutor tpe) {
-
- if (null == r || null == tpe || tpe.isShutdown() ||
tpe.isTerminated() || tpe.isTerminating()) {
- return;
- }
+ if (timerExecutor != null) {
+ this.executor = timerExecutor.executor;
+ } else {
+ this.executor = new ExecutorBuilder()
+ .size(3)
+ .prefix("EjbTimerPool")
+ .build(SystemInstance.get().getOptions());
- try {
- if (!tpe.getQueue().offer(r, 30, TimeUnit.SECONDS)) {
- throw new RejectedExecutionException("Timeout
waiting for executor slot");
- }
- } catch (InterruptedException e) {
- throw new RejectedExecutionException("Interrupted
waiting for executor slot");
- }
- }
- }
- );
- ((ThreadPoolExecutor) this.executor).allowCoreThreadTimeOut(true);
- SystemInstance.get().setComponent(Executor.class, this.executor);
+ SystemInstance.get().setComponent(TimerExecutor.class, new
TimerExecutor(this.executor));
}
this.threadPoolExecutorUsed = (this.executor instanceof
ThreadPoolExecutor);
@@ -106,6 +84,21 @@ public class DefaultTimerThreadPoolAdapt
}
}
+ // This is to prevent other parts of the code becoming dependent
+ // on the executor produced for EJB Timers
+ //
+ // If we want to share an Executor across the whole system
+ // for @Asynchronous and @Remote execution we should design
+ // that specifically and have it explicitly created somewhere
+ public final static class TimerExecutor {
+ private final Executor executor;
+
+ private TimerExecutor(Executor executor) {
+ if (executor == null) throw new IllegalArgumentException("executor
cannot be null");
+ this.executor = executor;
+ }
+ }
+
@Override
public int blockForAvailableThreads() {
if (this.threadPoolExecutorUsed) {
@@ -186,4 +179,5 @@ public class DefaultTimerThreadPoolAdapt
public void setThreadPriority(final int threadPriority) {
this.threadPriority = threadPriority;
}
+
}
Modified:
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/EjbTimerServiceImpl.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/EjbTimerServiceImpl.java?rev=1375764&r1=1375763&r2=1375764&view=diff
==============================================================================
---
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/EjbTimerServiceImpl.java
(original)
+++
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/core/timer/EjbTimerServiceImpl.java
Tue Aug 21 20:28:29 2012
@@ -56,6 +56,7 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
+import java.util.Map;
import java.util.Properties;
@@ -63,7 +64,6 @@ public class EjbTimerServiceImpl impleme
private static final long serialVersionUID = 1L;
private static final Logger log = Logger.getInstance(LogCategory.TIMER,
"org.apache.openejb.util.resources");
- public static final String QUARTZ_THREAD_POOL_ADAPTER =
"openejb.org.quartz.threadPool.class";
public static final String QUARTZ_MAKE_SCHEDULER_THREAD_DAEMON =
"org.quartz.scheduler.makeSchedulerThreadDaemon";
public static final String OPENEJB_TIMEOUT_JOB_NAME =
"OPENEJB_TIMEOUT_JOB";
@@ -130,26 +130,19 @@ public class EjbTimerServiceImpl impleme
}
final Properties properties = new Properties();
- properties.putAll(SystemInstance.get().getProperties());
-
properties.putAll(deployment.getModuleContext().getAppContext().getProperties());
- properties.putAll(deployment.getModuleContext().getProperties());
- properties.putAll(deployment.getProperties());
-
- boolean newInstance = false;
- for (String key : properties.stringPropertyNames()) {
- if (key.startsWith("org.quartz.")) { // custom config -> don't use
default scheduler
- newInstance = true;
- break;
- }
- }
+ putAll(properties, SystemInstance.get().getProperties());
+ putAll(properties,
deployment.getModuleContext().getAppContext().getProperties());
+ putAll(properties, deployment.getModuleContext().getProperties());
+ putAll(properties, deployment.getProperties());
+
+ // custom config -> don't use default scheduler
+ boolean newInstance = properties.size() > 0;
final SystemInstance systemInstance = SystemInstance.get();
final String defaultThreadPool =
DefaultTimerThreadPoolAdapter.class.getName();
if
(!properties.containsKey(StdSchedulerFactory.PROP_THREAD_POOL_CLASS)) {
- properties.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,
- systemInstance.hasProperty(QUARTZ_THREAD_POOL_ADAPTER) ?
-
systemInstance.getOptions().get(QUARTZ_THREAD_POOL_ADAPTER,
SimpleThreadPool.class.getName()) : defaultThreadPool);
+ properties.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,
defaultThreadPool);
}
if
(!properties.containsKey(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME)) {
properties.put(StdSchedulerFactory.PROP_SCHED_INSTANCE_NAME,
"OpenEJB-TimerService-Scheduler");
@@ -206,6 +199,15 @@ public class EjbTimerServiceImpl impleme
return thisScheduler;
}
+ private static void putAll(Properties a, final Properties b) {
+ for (Map.Entry<Object, Object> entry : b.entrySet()) {
+ final String key = entry.getKey().toString();
+ if (key.startsWith("org.quartz.")) {
+ a.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
public void shutdownMe() {
cleanTimerData();
shutdownMyScheduler();
Added:
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java?rev=1375764&view=auto
==============================================================================
---
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
(added)
+++
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/ExecutorBuilder.java
Tue Aug 21 20:28:29 2012
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.util;
+
+import org.apache.openejb.loader.Options;
+import org.apache.openejb.util.executor.OfferRejectedExecutionHandler;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class ExecutorBuilder {
+
+ private int size = 10;
+ private String prefix = "Pool";
+ private ThreadFactory threadFactory;
+ private RejectedExecutionHandler rejectedExecutionHandler;
+
+ public ExecutorBuilder() {
+ }
+
+ public ExecutorBuilder size(int size) {
+ this.size = size;
+ return this;
+ }
+
+ public ExecutorBuilder prefix(String prefix) {
+ this.prefix = prefix;
+ return this;
+ }
+
+ public ExecutorBuilder threadFactory(ThreadFactory threadFactory) {
+ this.threadFactory = threadFactory;
+ return this;
+ }
+
+ public ExecutorBuilder rejectedExecutionHandler(RejectedExecutionHandler
rejectedExecutionHandler) {
+ this.rejectedExecutionHandler = rejectedExecutionHandler;
+ return this;
+ }
+
+ public ThreadPoolExecutor build(Options options) {
+ final int corePoolSize = options.get(prefix + ".CorePoolSize", size);
+
+ // Default setting is for a fixed pool size,
MaximumPoolSize==CorePoolSize
+ final int maximumPoolSize = Math.max(options.get(prefix +
".MaximumPoolSize", corePoolSize), corePoolSize);
+
+ // Default QueueSize is bounded using the MaximumPoolSize
+ final int size = options.get(prefix + ".QueueSize", maximumPoolSize);
+
+ // Keep Threads inactive threads alive for 60 seconds by default
+ final Duration keepAliveTime = options.get(prefix + ".KeepAliveTime",
new Duration(60, TimeUnit.SECONDS));
+
+ // All threads can be timed out by default
+ final boolean allowCoreThreadTimeout = options.get(prefix +
".AllowCoreThreadTimeOut", true);
+
+ // If the user explicitly set the QueueSize to 0, we default QueueType
to SYNCHRONOUS
+ final QueueType defaultQueueType = (size == 0) ? QueueType.SYNCHRONOUS
: QueueType.LINKED;
+ final BlockingQueue queue = options.get(prefix + ".QueueType",
defaultQueueType).create(options, prefix, size);
+
+ ThreadFactory factory = this.threadFactory;
+ if (factory == null) {
+ factory = new DaemonThreadFactory(prefix);
+ }
+
+ RejectedExecutionHandler handler = this.rejectedExecutionHandler;
+ if (handler == null) {
+ final Duration duration = options.get(prefix + ".OfferTimeout",
new Duration(30, TimeUnit.SECONDS));
+ handler = new OfferRejectedExecutionHandler(duration);
+ }
+
+ final ThreadPoolExecutor threadPoolExecutor = new
ThreadPoolExecutor(corePoolSize
+ , maximumPoolSize
+ , keepAliveTime.getTime()
+ , keepAliveTime.getUnit() != null ? keepAliveTime.getUnit() :
TimeUnit.SECONDS
+ , queue
+ , factory
+ , handler
+ );
+
+ threadPoolExecutor.allowCoreThreadTimeOut(allowCoreThreadTimeout);
+
+ return threadPoolExecutor;
+ }
+
+ /**
+ * @version $Rev$ $Date$
+ */
+ public static enum QueueType {
+ ARRAY,
+ LINKED,
+ PRIORITY,
+ SYNCHRONOUS;
+
+ public BlockingQueue create(Options options, final String prefix,
final int queueSize) {
+ switch (this) {
+ case ARRAY: {
+ return new ArrayBlockingQueue(queueSize);
+ }
+ case LINKED: {
+ return new LinkedBlockingQueue(queueSize);
+ }
+ case PRIORITY: {
+ return new PriorityBlockingQueue();
+ }
+ case SYNCHRONOUS: {
+ return new SynchronousQueue(options.get(prefix +
".QueueFair", false));
+ }
+ default: {
+ // The Options class will throw an error if the user
supplies an unknown enum string
+ // The only way we can reach this is if we add a new
QueueType element and forget to
+ // implement it in the above switch statement.
+ throw new IllegalArgumentException("Unknown QueueType
type: " + this);
+ }
+ }
+ }
+ }
+}
Added:
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/OfferRejectedExecutionHandler.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/OfferRejectedExecutionHandler.java?rev=1375764&view=auto
==============================================================================
---
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/OfferRejectedExecutionHandler.java
(added)
+++
openejb/trunk/openejb/container/openejb-core/src/main/java/org/apache/openejb/util/executor/OfferRejectedExecutionHandler.java
Tue Aug 21 20:28:29 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.util.executor;
+
+import org.apache.openejb.util.Duration;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class OfferRejectedExecutionHandler implements RejectedExecutionHandler
{
+
+ private long timeout = 30;
+ private TimeUnit seconds = TimeUnit.SECONDS;
+
+ public OfferRejectedExecutionHandler(Duration duration) {
+ this(duration.getTime(), duration.getUnit() == null ? TimeUnit.SECONDS
: duration.getUnit());
+ }
+
+ public OfferRejectedExecutionHandler(long timeout, TimeUnit timeUnit) {
+ if (timeout <= 0) throw new IllegalArgumentException("timeout must be
greater than zero");
+ if (timeUnit == null) throw new IllegalArgumentException("TimeUnit
must not be null");
+
+ this.timeout = timeout;
+ this.seconds = timeUnit;
+ }
+
+ @Override
+ public void rejectedExecution(final Runnable r, final ThreadPoolExecutor
tpe) {
+
+ if (null == r || null == tpe || tpe.isShutdown() || tpe.isTerminated()
|| tpe.isTerminating()) {
+ return;
+ }
+
+ try {
+ if (!tpe.getQueue().offer(r, timeout, seconds)) {
+ throw new RejectedExecutionException("Timeout waiting for
executor slot: waited " + timeout + " " + seconds.toString().toLowerCase());
+ }
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException("Interrupted waiting for
executor slot");
+ }
+ }
+}