Author: rmannibucau Date: Wed Jul 23 17:08:37 2014 New Revision: 1612877 URL: http://svn.apache.org/r1612877 Log: TOMEE-1276 TimerExecutor shouldn't kill the executor each time
Modified: tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java Modified: tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java?rev=1612877&r1=1612876&r2=1612877&view=diff ============================================================================== --- tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java (original) +++ tomee/tomee/trunk/container/openejb-core/src/main/java/org/apache/openejb/core/timer/DefaultTimerThreadPoolAdapter.java Wed Jul 23 17:08:37 2014 @@ -24,10 +24,13 @@ import org.apache.openejb.util.ExecutorB import org.apache.openejb.util.LogCategory; import org.apache.openejb.util.Logger; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * @version $Rev$ $Date$ @@ -68,6 +71,7 @@ public class DefaultTimerThreadPoolAdapt // that specifically and have it explicitly created somewhere public static final class TimerExecutor { private final Executor executor; + private final AtomicInteger references = new AtomicInteger(0); private TimerExecutor(final Executor executor) { if (executor == null) { @@ -75,6 +79,15 @@ public class DefaultTimerThreadPoolAdapt } this.executor = executor; } + + public TimerExecutor incr() { + references.incrementAndGet(); + return this; + } + + public boolean decr() { + return references.decrementAndGet() == 0; + } } @Override @@ -128,14 +141,15 @@ public class DefaultTimerThreadPoolAdapt final TimerExecutor timerExecutor = SystemInstance.get().getComponent(TimerExecutor.class); if (timerExecutor != null) { - this.executor = timerExecutor.executor; + this.executor = timerExecutor.incr().executor; } else { this.executor = new ExecutorBuilder() .size(threadCount) .prefix("EjbTimerPool") .build(SystemInstance.get().getOptions()); - SystemInstance.get().setComponent(TimerExecutor.class, new TimerExecutor(this.executor)); + final TimerExecutor value = new TimerExecutor(this.executor).incr(); + SystemInstance.get().setComponent(TimerExecutor.class, value); } this.threadPoolExecutorUsed = this.executor instanceof ThreadPoolExecutor; @@ -157,19 +171,48 @@ public class DefaultTimerThreadPoolAdapt } @Override - public void shutdown(final boolean arg0) { + public synchronized void shutdown(final boolean waitForJobsToComplete) { if (threadPoolExecutorUsed) { - final ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; - tpe.shutdown(); - if (arg0) { - final int timeout = SystemInstance.get().getOptions().get(OPENEJB_EJB_TIMER_POOL_AWAIT_SECONDS, 5); - try { - tpe.awaitTermination(timeout, TimeUnit.SECONDS); - } catch (final InterruptedException e) { - logger.error(e.getMessage(), e); + final SystemInstance systemInstance = SystemInstance.get(); + final TimerExecutor te = systemInstance.getComponent(TimerExecutor.class); + if (te != null) { + if (te.executor == executor) { + if (te.decr()) { + doShutdownExecutor(waitForJobsToComplete); + systemInstance.removeComponent(TimerExecutor.class); + } else { // flush jobs, maybe not all dedicated to this threadpool if shared but shouldn't be an issue + final ThreadPoolExecutor tpe = ThreadPoolExecutor.class.cast(executor); + if (waitForJobsToComplete) { + final Collection<Runnable> jobs = new ArrayList<>(); + tpe.getQueue().drainTo(jobs); + for (final Runnable r : jobs) { + try { + r.run(); + } catch (final Exception e) { + logger.warning(e.getMessage(), e); + } + } + } + } + } else { + doShutdownExecutor(waitForJobsToComplete); } + } else { + doShutdownExecutor(waitForJobsToComplete); + } + } + } + + private void doShutdownExecutor(final boolean waitJobs) { + final ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; + tpe.shutdown(); + if (waitJobs) { + final int timeout = SystemInstance.get().getOptions().get(OPENEJB_EJB_TIMER_POOL_AWAIT_SECONDS, 5); + try { + tpe.awaitTermination(timeout, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + logger.error(e.getMessage(), e); } - SystemInstance.get().removeComponent(TimerExecutor.class); } }