ARTEMIS-1495 Fixing In Handler executor and added benchmark to measure impact of changes
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/91db0807 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/91db0807 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/91db0807 Branch: refs/heads/master Commit: 91db08072b221885f246a9db70abf3ee0bdf170d Parents: 0fadc68 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Wed Nov 8 09:16:59 2017 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Nov 9 11:58:36 2017 -0500 ---------------------------------------------------------------------- .../artemis/utils/actors/ArtemisExecutor.java | 10 +- .../artemis/utils/actors/HandlerBase.java | 47 ++++++ .../artemis/utils/actors/ProcessorBase.java | 153 +++++++++++++------ .../utils/actors/OrderedExecutorSanityTest.java | 69 ++++++++- .../core/ServerSessionPacketHandler.java | 73 +++------ .../artemis/tests/util/ActiveMQTestBase.java | 2 +- .../tests/integration/client/ConsumerTest.java | 6 +- 7 files changed, 260 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java index 5e72ef2..8efb3d3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.utils.actors; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -40,9 +42,15 @@ public interface ArtemisExecutor extends Executor { /** It will wait the current execution (if there is one) to finish * but will not complete any further executions */ - default void shutdownNow() { + default List<Runnable> shutdownNow() { + return Collections.emptyList(); } + + default void shutdown() { + } + + /** * This will verify if the executor is flushed with no wait (or very minimal wait if not the {@link org.apache.activemq.artemis.utils.actors.OrderedExecutor} * @return http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java new file mode 100644 index 0000000..6bfbcb4 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java @@ -0,0 +1,47 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq.artemis.utils.actors; + +/** + * This abstract class will encapsulate + * ThreadLocals to determine when a class is a handler. + * This is because some functionality has to be avoided if inHandler(). + * + */ +public abstract class HandlerBase { + + //marker instance used to recognize if a thread is performing a packet handling + private static final Object DUMMY = Boolean.TRUE; + + // this cannot be static as the Actor will be used within another executor. For that reason + // each instance will have its own ThreadLocal. + // ... a thread that has its thread-local map populated with DUMMY while performing a handler + private final ThreadLocal<Object> inHandler = new ThreadLocal<>(); + + protected void enter() { + assert inHandler.get() == null : "should be null"; + inHandler.set(DUMMY); + } + + public boolean inHandler() { + final Object dummy = inHandler.get(); + return dummy != null; + } + + protected void leave() { + assert inHandler.get() != null : "marker not set"; + inHandler.set(null); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java index 73dbf2f..1c77a52 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java @@ -17,17 +17,24 @@ package org.apache.activemq.artemis.utils.actors; +import java.util.ArrayList; +import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.LockSupport; -public abstract class ProcessorBase<T> { +import org.jboss.logging.Logger; - private static final int STATE_NOT_RUNNING = 0; - private static final int STATE_RUNNING = 1; - private static final int STATE_FORCED_SHUTDOWN = 2; +public abstract class ProcessorBase<T> extends HandlerBase { + + private static final Logger logger = Logger.getLogger(ProcessorBase.class); + + public static final int STATE_NOT_RUNNING = 0; + public static final int STATE_RUNNING = 1; + public static final int STATE_FORCED_SHUTDOWN = 2; protected final Queue<T> tasks = new ConcurrentLinkedQueue<>(); @@ -41,6 +48,8 @@ public abstract class ProcessorBase<T> { private volatile boolean requestedShutdown = false; + private volatile boolean started = true; + private static final AtomicIntegerFieldUpdater<ProcessorBase> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state"); private final class ExecutorTask implements Runnable { @@ -50,19 +59,23 @@ public abstract class ProcessorBase<T> { do { //if there is no thread active and is not already dead then we run if (stateUpdater.compareAndSet(ProcessorBase.this, STATE_NOT_RUNNING, STATE_RUNNING)) { + enter(); try { T task = tasks.poll(); //while the queue is not empty we process in order - while (task != null) { + while (task != null && !requestedShutdown) { //just drain the tasks if has been requested a shutdown to help the shutdown process - if (!requestedShutdown) { - doTask(task); + if (requestedShutdown) { + tasks.add(task); + break; } + doTask(task); task = tasks.poll(); } } finally { + leave(); //set state back to not running. - stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING); + stateUpdater.compareAndSet(ProcessorBase.this, STATE_RUNNING, STATE_NOT_RUNNING); } } else { return; @@ -75,31 +88,57 @@ public abstract class ProcessorBase<T> { } } - /** It will wait the current execution (if there is one) to finish - * but will not complete any further executions */ - public void shutdownNow() { + /** + * It will shutdown and wait 30 seconds for timeout. + */ + public void shutdown() { + shutdown(30, TimeUnit.SECONDS); + } + + public void shutdown(long timeout, TimeUnit unit) { + started = false; + + if (!inHandler()) { + // if it's in handler.. we just return + flush(timeout, unit); + } + } + + /** + * It will wait the current execution (if there is one) to finish + * but will not complete any further executions + */ + public List<T> shutdownNow() { //alert anyone that has been requested (at least) an immediate shutdown requestedShutdown = true; - //it could take a very long time depending on the current executing task - do { - //alert the ExecutorTask (if is running) to just drain the current backlog of tasks - final int startState = stateUpdater.get(this); - if (startState == STATE_FORCED_SHUTDOWN) { - //another thread has completed a forced shutdown - return; - } - if (startState == STATE_RUNNING) { - //wait 100 ms to avoid burning CPU while waiting and - //give other threads a chance to make progress - LockSupport.parkNanos(100_000_000L); + started = false; + + if (inHandler()) { + stateUpdater.set(this, STATE_FORCED_SHUTDOWN); + } else { + //it could take a very long time depending on the current executing task + do { + //alert the ExecutorTask (if is running) to just drain the current backlog of tasks + final int startState = stateUpdater.get(this); + if (startState == STATE_FORCED_SHUTDOWN) { + //another thread has completed a forced shutdown + break; + } + if (startState == STATE_RUNNING) { + //wait 100 ms to avoid burning CPU while waiting and + //give other threads a chance to make progress + LockSupport.parkNanos(100_000_000L); + } } + while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN)); + //this could happen just one time: the forced shutdown state is the last one and + //can be set by just one caller. + //As noted on the execute method there is a small chance that some tasks would be enqueued } - while (!stateUpdater.compareAndSet(this, STATE_NOT_RUNNING, STATE_FORCED_SHUTDOWN)); - //this could happen just one time: the forced shutdown state is the last one and - //can be set by just one caller. - //As noted on the execute method there is a small chance that some tasks would be enqueued + ArrayList<T> returnList = new ArrayList<>(tasks); tasks.clear(); - //we can report the killed tasks somehow: ExecutorService do the same on shutdownNow + + return returnList; } protected abstract void doTask(T task); @@ -112,26 +151,48 @@ public abstract class ProcessorBase<T> { return stateUpdater.get(this) == STATE_NOT_RUNNING; } - protected void task(T command) { - if (stateUpdater.get(this) != STATE_FORCED_SHUTDOWN) { - //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks - tasks.add(command); - //cache locally the state to avoid multiple volatile loads - final int state = stateUpdater.get(this); - if (state == STATE_FORCED_SHUTDOWN) { - //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add - tasks.clear(); - } else if (state == STATE_NOT_RUNNING) { - //startPoller could be deleted but is maintained because is inherited - delegate.execute(task); + /** + * WARNING: This will only flush when all the activity is suspended. + * don't expect success on this call if another thread keeps feeding the queue + * this is only valid on situations where you are not feeding the queue, + * like in shutdown and failover situations. + */ + public final boolean flush(long timeout, TimeUnit unit) { + if (stateUpdater.get(this) == STATE_NOT_RUNNING) { + // quick test, most of the time it will be empty anyways + return true; + } + + long timeLimit = System.currentTimeMillis() + unit.toMillis(timeout); + try { + while (stateUpdater.get(this) == STATE_RUNNING && timeLimit > System.currentTimeMillis()) { + + if (tasks.isEmpty()) { + return true; + } + + Thread.sleep(10); } + } catch (InterruptedException e) { + // ignored } + + return stateUpdater.get(this) == STATE_NOT_RUNNING; } - protected void startPoller() { - if (stateUpdater.get(this) == STATE_NOT_RUNNING) { - //note that this can result in multiple tasks being queued - //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored + protected void task(T command) { + if (!started) { + logger.debug("Ordered executor has been shutdown at", new Exception("debug")); + } + //The shutdown process could finish right after the above check: shutdownNow can drain the remaining tasks + tasks.add(command); + //cache locally the state to avoid multiple volatile loads + final int state = stateUpdater.get(this); + if (state == STATE_FORCED_SHUTDOWN) { + //help the GC by draining any task just submitted: it help to cover the case of a shutdownNow finished before tasks.add + tasks.clear(); + } else if (state == STATE_NOT_RUNNING) { + //startPoller could be deleted but is maintained because is inherited delegate.execute(task); } } @@ -146,4 +207,8 @@ public abstract class ProcessorBase<T> { return tasks.size(); } + public final int status() { + return stateUpdater.get(this); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java index 9446f50..4e2bbba 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Test; @@ -70,7 +71,7 @@ public class OrderedExecutorSanityTest { //from now on new tasks won't be executed final CountDownLatch afterDeatchExecution = new CountDownLatch(1); executor.execute(afterDeatchExecution::countDown); - Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(1, TimeUnit.SECONDS)); + Assert.assertFalse("After shutdownNow no new tasks can be executed", afterDeatchExecution.await(100, TimeUnit.MILLISECONDS)); //to avoid memory leaks the executor must take care of the new submitted tasks immediatly Assert.assertEquals("Any new task submitted after death must be collected", 0, executor.remaining()); } finally { @@ -78,4 +79,70 @@ public class OrderedExecutorSanityTest { } } + + + @Test + public void shutdownWithin() throws InterruptedException { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + final OrderedExecutor executor = new OrderedExecutor(executorService); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger numberOfTasks = new AtomicInteger(0); + final CountDownLatch ran = new CountDownLatch(1); + + executor.execute(() -> { + try { + latch.await(1, TimeUnit.MINUTES); + numberOfTasks.set(executor.shutdownNow().size()); + ran.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + + for (int i = 0; i < 100; i++) { + executor.execute(() -> System.out.println("Dont worry, this will never happen")); + } + + latch.countDown(); + ran.await(1, TimeUnit.SECONDS); + Assert.assertEquals(100, numberOfTasks.get()); + + Assert.assertEquals(ProcessorBase.STATE_FORCED_SHUTDOWN, executor.status()); + Assert.assertEquals(0, executor.remaining()); + } finally { + executorService.shutdown(); + } + } + + + @Test + public void testMeasure() throws InterruptedException { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + try { + final OrderedExecutor executor = new OrderedExecutor(executorService); + int MAX_LOOP = 1_000_000; + + // extend the number for longer numbers + int runs = 10; + + for (int i = 0; i < runs; i++) { + long start = System.nanoTime(); + final CountDownLatch executed = new CountDownLatch(MAX_LOOP); + for (int l = 0; l < MAX_LOOP; l++) { + executor.execute(executed::countDown); + } + Assert.assertTrue(executed.await(1, TimeUnit.MINUTES)); + long end = System.nanoTime(); + + long elapsed = (end - start); + + System.out.println("execution " + i + " in " + TimeUnit.NANOSECONDS.toMillis(elapsed) + " milliseconds"); + } + } finally { + executorService.shutdown(); + } + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index f78f43f..e1e1b68 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -159,11 +159,6 @@ public class ServerSessionPacketHandler implements ChannelHandler { private final boolean direct; - //marker instance used to recognize if a thread is performing a packet handling - private static final Object DUMMY = Boolean.TRUE; - - //a thread that has its thread-local map populated with DUMMY is performing a packet handling - private static final ThreadLocal<Object> inHandler = new ThreadLocal<>(); public ServerSessionPacketHandler(final ActiveMQServer server, final CoreProtocolManager manager, @@ -231,26 +226,9 @@ public class ServerSessionPacketHandler implements ChannelHandler { ActiveMQServerLogger.LOGGER.clearingUpSession(session.getName()); } - private static void onStartMessagePacketHandler() { - assert inHandler.get() == null : "recursion on packet handling is not supported"; - inHandler.set(DUMMY); - } - - private static boolean inHandler() { - final Object dummy = inHandler.get(); - //sanity check: can't exist a thread using a marker different from DUMMY - assert ((dummy != null && dummy == DUMMY) || dummy == null) : "wrong marker"; - return dummy != null; - } - - private static void onExitMessagePacketHandler() { - assert inHandler.get() != null : "marker not set"; - inHandler.set(null); - } - public void closeExecutors() { - packetActor.shutdownNow(); - callExecutor.shutdownNow(); + packetActor.shutdown(); + callExecutor.shutdown(); } public void close() { @@ -280,33 +258,28 @@ public class ServerSessionPacketHandler implements ChannelHandler { if (logger.isTraceEnabled()) { logger.trace("ServerSessionPacketHandler::handlePacket," + packet); } - onStartMessagePacketHandler(); - try { - final byte type = packet.getType(); - switch (type) { - case SESS_SEND: { - onSessionSend(packet); - break; - } - case SESS_ACKNOWLEDGE: { - onSessionAcknowledge(packet); - break; - } - case SESS_PRODUCER_REQUEST_CREDITS: { - onSessionRequestProducerCredits(packet); - break; - } - case SESS_FLOWTOKEN: { - onSessionConsumerFlowCredit(packet); - break; - } - default: - // separating a method for everything else as JIT was faster this way - slowPacketHandler(packet); - break; + final byte type = packet.getType(); + switch (type) { + case SESS_SEND: { + onSessionSend(packet); + break; } - } finally { - onExitMessagePacketHandler(); + case SESS_ACKNOWLEDGE: { + onSessionAcknowledge(packet); + break; + } + case SESS_PRODUCER_REQUEST_CREDITS: { + onSessionRequestProducerCredits(packet); + break; + } + case SESS_FLOWTOKEN: { + onSessionConsumerFlowCredit(packet); + break; + } + default: + // separating a method for everything else as JIT was faster this way + slowPacketHandler(packet); + break; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index a63eec7..2d6f003 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -302,7 +302,7 @@ public abstract class ActiveMQTestBase extends Assert { //clean up pools before failing if (!exceptions.isEmpty()) { for (Exception exception : exceptions) { - exception.printStackTrace(); + exception.printStackTrace(System.out); } fail("Client Session Factories still trying to reconnect, see above to see where created"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 9c05114..ef53344 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -330,9 +330,9 @@ public class ConsumerTest extends ActiveMQTestBase { connection.close(); } - assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue"))); - assertNull(server.locateQueue(SimpleString.toSimpleString("queue"))); - assertEquals(0, server.getTotalMessageCount()); + Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString("queue")) == null); + Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString("queue")) == null); + Wait.assertEquals(0, server::getTotalMessageCount); } @Test