Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x ea01aeb65 -> f94f8f471


ARTEMIS-1078 Improving ActiveMQThreadPoolExecutor

This is now considering only threads waiting for the queue to get new tasks as 
idle.

The thread pool maintained a counter of active threads, but that counter was 
increased
too late in the beforeExecute method. Submitting a task created a new thread.
If now a second task was submitter before the new thread had started to execute 
it's task,
the second task was queued without creating a 2nd thread. So the second task 
was only
executed after the first task had been completed - even if the thread pool's
maximum number of thread had not been reached.

This fix now maintains the delta between the number those threads that are 
currently waiting
in the queue's poll or take methods as idle threads, and the number of queued 
tasks.
It creates new threads unless there are enough idle threads to pick up all 
queued tasks.

This closes #1144

(cherry picked from commit 5a31e7035354516664087a65ac97e1a6f4c13f14)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f94f8f47
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f94f8f47
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f94f8f47

Branch: refs/heads/1.x
Commit: f94f8f471853316892ea381e49163198f2a1ddfb
Parents: ea01aeb
Author: Bernd Gutjahr <bernd.gutj...@hpe.com>
Authored: Tue Mar 28 15:54:58 2017 +0200
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Fri Mar 31 19:02:24 2017 -0400

----------------------------------------------------------------------
 .../utils/ActiveMQThreadPoolExecutor.java       | 120 ++++++++++++++-----
 1 file changed, 91 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f94f8f47/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
index a87b18a..c3b1988 100755
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
@@ -20,7 +20,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /*
  * ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines
@@ -47,31 +46,110 @@ public class ActiveMQThreadPoolExecutor extends 
ThreadPoolExecutor {
 
       private ActiveMQThreadPoolExecutor executor = null;
 
+      // lock object to synchronize on
+      private final Object lock = new Object();
+
+      // keep track of the difference between the number of idle threads and
+      // the number of queued tasks. If the delta is > 0, we have more
+      // idle threads than queued tasks and can add more tasks into the queue.
+      // The delta is incremented if a thread becomes idle or if a task is 
taken from the queue.
+      // The delta is decremented if a thread leaves idle state or if a task 
is added to the queue.
+      private int threadTaskDelta = 0;
+
       public void setExecutor(ActiveMQThreadPoolExecutor executor) {
          this.executor = executor;
       }
 
       @Override
       public boolean offer(Runnable runnable) {
-         int poolSize = executor.getPoolSize();
+         boolean retval = false;
+
+         // Need to lock for 2 reasons:
+         // 1. to safely handle poll timeouts
+         // 2. to protect the delta from parallel updates
+         synchronized (lock) {
+            if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || 
(threadTaskDelta > 0)) {
+               // A new task will be added to the queue if the maximum number 
of threads has been reached
+               // or if the delta is > 0, which means that there are enough 
idle threads.
+
+               retval = super.offer(runnable);
+
+               // Only decrement the delta if the task has actually been added 
to the queue
+               if (retval)
+                  threadTaskDelta--;
+            }
+         }
 
-         // If the are less threads than the configured maximum, then the 
tasks is
-         // only queued if there are some idle threads that can run that tasks.
-         // We have to add the queue size, since some tasks might just have 
been queued
-         // but not yet taken by an idle thread.
-         if (poolSize < executor.getMaximumPoolSize() && (size() + 
executor.getActive()) >= poolSize)
-            return false;
+         return retval;
+      }
 
-         return super.offer(runnable);
+      @Override
+      public Runnable take() throws InterruptedException {
+         // Increment the delta as a thread becomes idle
+         // by waiting for a task to take from the queue
+         synchronized (lock) {
+            threadTaskDelta++;
+         }
+
+         Runnable runnable = null;
+
+         try {
+            runnable = super.take();
+            return runnable;
+         } finally {
+            // Now the thread is no longer idle waiting for a task
+            // If it had taken a task, the delta remains the same
+            // (decremented by the thread and incremented by the taken task)
+            // Only if no task had been taken, we have to decrement the delta.
+            if (runnable == null) {
+               synchronized (lock) {
+                  threadTaskDelta--;
+               }
+            }
+         }
+      }
+
+      @Override
+      public Runnable poll(long arg0, TimeUnit arg2) throws 
InterruptedException {
+         // Increment the delta as a thread becomes idle
+         // by waiting for a task to poll from the queue
+         synchronized (lock) {
+            threadTaskDelta++;
+         }
+
+         Runnable runnable = null;
+         boolean timedOut = false;
+
+         try {
+            runnable = super.poll(arg0, arg2);
+            timedOut = (runnable == null);
+         } finally {
+            // Now the thread is no longer idle waiting for a task
+            // If it had taken a task, the delta remains the same
+            // (decremented by the thread and incremented by the taken task)
+            if (runnable == null) {
+               synchronized (lock) {
+                  // If the poll called timed out, we check again within a 
synchronized block
+                  // to make sure all offer calls have been completed.
+                  // This is to handle a newly queued task if the timeout 
occurred while an offer call
+                  // added that task to the queue instead of creating a new 
thread.
+                  if (timedOut)
+                     runnable = super.poll();
+
+                  // Only if no task had been taken (either no timeout, or no 
task from after-timeout poll),
+                  // we have to decrement the delta.
+                  if (runnable == null)
+                     threadTaskDelta--;
+               }
+            }
+         }
+
+         return runnable;
       }
    }
 
    private int maxPoolSize;
 
-   // count the active threads with before-/afterExecute, since the 
.getActiveCount is not very
-   // efficient.
-   private final AtomicInteger active = new AtomicInteger(0);
-
    public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, 
TimeUnit keepUnits, ThreadFactory factory) {
       this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory);
    }
@@ -88,10 +166,6 @@ public class ActiveMQThreadPoolExecutor extends 
ThreadPoolExecutor {
       myQueue.setExecutor(this);
    }
 
-   private int getActive() {
-      return active.get();
-   }
-
    @Override
    public int getMaximumPoolSize() {
       return maxPoolSize;
@@ -101,16 +175,4 @@ public class ActiveMQThreadPoolExecutor extends 
ThreadPoolExecutor {
    public void setMaximumPoolSize(int maxSize) {
       maxPoolSize = maxSize;
    }
-
-   @Override
-   protected void beforeExecute(Thread thread, Runnable runnable) {
-      super.beforeExecute(thread, runnable);
-      active.incrementAndGet();
-   }
-
-   @Override
-   protected void afterExecute(Runnable runnable, Throwable throwable) {
-      active.decrementAndGet();
-      super.afterExecute(runnable, throwable);
-   }
 }

Reply via email to