Dealing with expected IBM JDK thread and refactoring the Thread check as a Rule
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e56ca95f Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e56ca95f Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e56ca95f Branch: refs/heads/master Commit: e56ca95fdc9f813b339e6cb182a94f833161202f Parents: 4e5ec13 Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Mon Jan 11 16:11:19 2016 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Mon Jan 11 20:02:46 2016 -0500 ---------------------------------------------------------------------- .../artemis/tests/util/ActiveMQTestBase.java | 130 +------------ .../artemis/tests/util/ThreadLeakCheckRule.java | 189 +++++++++++++++++++ ...MDBMultipleHandlersServerDisconnectTest.java | 2 +- .../broadcast/JGroupsBroadcastTest.java | 12 +- 4 files changed, 200 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e56ca95f/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 4e0c35f..bd2a156 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -143,6 +143,9 @@ import org.junit.runner.Description; */ public abstract class ActiveMQTestBase extends Assert { + @Rule + public ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule(); + public static final String TARGET_TMP = "./target/tmp"; public static final String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName(); public static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName(); @@ -173,10 +176,8 @@ public abstract class ActiveMQTestBase extends Assert { private final Collection<ActiveMQComponent> otherComponents = new HashSet<>(); private final Set<ExecutorService> executorSet = new HashSet<>(); - private boolean checkThread = true; private String testDir; private int sendMsgCount = 0; - private Map<Thread, StackTraceElement[]> previousThreads; @Rule public TestName name = new TestName(); @@ -306,46 +307,6 @@ public abstract class ActiveMQTestBase extends Assert { } } - if (checkThread) { - StringBuffer buffer = null; - - boolean failed = true; - - boolean failedOnce = false; - - long timeout = System.currentTimeMillis() + 60000; - while (failed && timeout > System.currentTimeMillis()) { - buffer = new StringBuffer(); - - failed = checkThread(buffer); - - if (failed) { - failedOnce = true; - forceGC(); - Thread.sleep(500); - log.info("There are still threads running, trying again"); - System.out.println(buffer); - } - } - - if (failed) { - logAndSystemOut("Thread leaked on test " + this.getClass().getName() + "::" + this.getName() + "\n" + - buffer); - logAndSystemOut("Thread leakage! Failure!!!"); - - fail("Thread leaked"); - } - else if (failedOnce) { - System.out.println("******************** Threads cleared after retries ********************"); - System.out.println(); - } - - - } - else { - checkThread = true; - } - if (Thread.currentThread().getContextClassLoader() == null) { Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); fail("Thread Context ClassLoader was set to null at some point before this test. We will set to this.getClass().getClassLoader(), but you are supposed to fix your tests"); @@ -370,8 +331,6 @@ public abstract class ActiveMQTestBase extends Assert { // checkFreePort(TransportConstants.DEFAULT_PORT); - previousThreads = Thread.getAllStackTraces(); - logAndSystemOut("#test " + getName()); } @@ -401,7 +360,7 @@ public abstract class ActiveMQTestBase extends Assert { } protected void disableCheckThread() { - checkThread = false; + leakCheckRule.disable(); } protected String getName() { @@ -1960,87 +1919,6 @@ public abstract class ActiveMQTestBase extends Assert { } } - /** - * @param buffer - * @return - */ - private boolean checkThread(StringBuffer buffer) { - boolean failedThread = false; - - Map<Thread, StackTraceElement[]> postThreads = Thread.getAllStackTraces(); - - if (postThreads != null && previousThreads != null && postThreads.size() > previousThreads.size()) { - - buffer.append("*********************************************************************************\n"); - buffer.append("LEAKING THREADS\n"); - - for (Thread aliveThread : postThreads.keySet()) { - if (!isExpectedThread(aliveThread) && !previousThreads.containsKey(aliveThread)) { - failedThread = true; - buffer.append("=============================================================================\n"); - buffer.append("Thread " + aliveThread + " is still alive with the following stackTrace:\n"); - StackTraceElement[] elements = postThreads.get(aliveThread); - for (StackTraceElement el : elements) { - buffer.append(el + "\n"); - } - } - - } - buffer.append("*********************************************************************************\n"); - - } - return failedThread; - } - - /** - * if it's an expected thread... we will just move along ignoring it - * - * @param thread - * @return - */ - private boolean isExpectedThread(Thread thread) { - final String threadName = thread.getName(); - final ThreadGroup group = thread.getThreadGroup(); - final boolean isSystemThread = group != null && "system".equals(group.getName()); - final String javaVendor = System.getProperty("java.vendor"); - - if (threadName.contains("SunPKCS11")) { - return true; - } - else if (threadName.contains("Attach Listener")) { - return true; - } - else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) { - return true; - } - else if (javaVendor.contains("IBM") && threadName.equals("MemoryPoolMXBean notification dispatcher")) { - return true; - } - else if (threadName.contains("globalEventExecutor")) { - return true; - } - else if (threadName.contains("threadDeathWatcher")) { - return true; - } - else if (threadName.contains("netty-threads")) { - // This is ok as we use EventLoopGroup.shutdownGracefully() which will shutdown things with a bit of delay - // if the EventLoop's are still busy. - return true; - } - else if (threadName.contains("threadDeathWatcher")) { - //another netty thread - return true; - } - else { - for (StackTraceElement element : thread.getStackTrace()) { - if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) { - return true; - } - } - return false; - } - } - private void checkFilesUsage() { long timeout = System.currentTimeMillis() + 15000; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e56ca95f/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java new file mode 100644 index 0000000..3f371a8 --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.util; + +import java.util.Map; + +import org.junit.Assert; +import org.junit.rules.ExternalResource; + +/** + * This is useful to make sure you won't have leaking threads between tests + */ +public class ThreadLeakCheckRule extends ExternalResource { + + boolean enabled = true; + + private Map<Thread, StackTraceElement[]> previousThreads; + + public void disable() { + enabled = false; + } + + /** + * Override to set up your specific external resource. + * + * @throws if setup fails (which will disable {@code after} + */ + protected void before() throws Throwable { + // do nothing + + previousThreads = Thread.getAllStackTraces(); + + } + + /** + * Override to tear down your specific external resource. + */ + protected void after() { + if (enabled) { + StringBuffer buffer = null; + + boolean failed = true; + + boolean failedOnce = false; + + long timeout = System.currentTimeMillis() + 60000; + while (failed && timeout > System.currentTimeMillis()) { + buffer = new StringBuffer(); + + failed = checkThread(buffer); + + if (failed) { + failedOnce = true; + ActiveMQTestBase.forceGC(); + try { + Thread.sleep(500); + } + catch (Throwable e) { + } + + System.out.println("There are still threads running, trying again"); + System.out.println(buffer); + } + } + + if (failed) { + System.out.println("Thread leaked on test \n" + + buffer); + System.out.println("Thread leakage! Failure!!!"); + + Assert.fail("Thread leaked"); + } + else if (failedOnce) { + System.out.println("******************** Threads cleared after retries ********************"); + System.out.println(); + } + + + } + else { + enabled = true; + } + + } + + + + /** + * @param buffer + * @return + */ + private boolean checkThread(StringBuffer buffer) { + boolean failedThread = false; + + Map<Thread, StackTraceElement[]> postThreads = Thread.getAllStackTraces(); + + if (postThreads != null && previousThreads != null && postThreads.size() > previousThreads.size()) { + + buffer.append("*********************************************************************************\n"); + buffer.append("LEAKING THREADS\n"); + + for (Thread aliveThread : postThreads.keySet()) { + if (!isExpectedThread(aliveThread) && !previousThreads.containsKey(aliveThread)) { + failedThread = true; + buffer.append("=============================================================================\n"); + buffer.append("Thread " + aliveThread + " is still alive with the following stackTrace:\n"); + StackTraceElement[] elements = postThreads.get(aliveThread); + for (StackTraceElement el : elements) { + buffer.append(el + "\n"); + } + } + + } + buffer.append("*********************************************************************************\n"); + + } + return failedThread; + } + + + /** + * if it's an expected thread... we will just move along ignoring it + * + * @param thread + * @return + */ + private boolean isExpectedThread(Thread thread) { + final String threadName = thread.getName(); + final ThreadGroup group = thread.getThreadGroup(); + final boolean isSystemThread = group != null && "system".equals(group.getName()); + final String javaVendor = System.getProperty("java.vendor"); + + if (threadName.contains("SunPKCS11")) { + return true; + } + else if (threadName.contains("Attach Listener")) { + return true; + } + else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) { + return true; + } + else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("ClassCache Reaper")) { + return true; + } + else if (javaVendor.contains("IBM") && threadName.equals("MemoryPoolMXBean notification dispatcher")) { + return true; + } + else if (threadName.contains("globalEventExecutor")) { + return true; + } + else if (threadName.contains("threadDeathWatcher")) { + return true; + } + else if (threadName.contains("netty-threads")) { + // This is ok as we use EventLoopGroup.shutdownGracefully() which will shutdown things with a bit of delay + // if the EventLoop's are still busy. + return true; + } + else if (threadName.contains("threadDeathWatcher")) { + //another netty thread + return true; + } + else { + for (StackTraceElement element : thread.getStackTrace()) { + if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) { + return true; + } + } + return false; + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e56ca95f/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java index 8077a33..5c4e854 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/ra/MDBMultipleHandlersServerDisconnectTest.java @@ -345,7 +345,7 @@ public class MDBMultipleHandlersServerDisconnectTest extends ActiveMQRATestBase System.out.println(writer.toString()); } - Assert.assertFalse(failed); + Assert.assertFalse(writer.toString(), failed); System.out.println("Received " + NUMBER_OF_MESSAGES + " messages"); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e56ca95f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java index cae7437..53a6783 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java @@ -19,18 +19,19 @@ package org.apache.activemq.artemis.tests.integration.broadcast; import org.apache.activemq.artemis.api.core.BroadcastEndpoint; import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; import org.jgroups.JChannel; import org.jgroups.conf.PlainConfigurator; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; -public class JGroupsBroadcastTest extends ActiveMQTestBase { - - private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;" + "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;" + "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;" + "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;" + "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;" + "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;" + "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;" + "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;" + "oob_thread_pool.queue_enab led=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;" + "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)" + ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;" + "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;" + "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" + "xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;" + "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;" + "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)" + ":RSVP(resend_interval=5 00;ack_on_delivery=false;timeout=60000)"; - +public class JGroupsBroadcastTest { + @Rule + public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule(); + private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;" + "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;" + "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;" + "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;" + "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;" + "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;" + "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;" + "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;" + "oob_thread_pool.queue_enab led=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;" + "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)" + ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;" + "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;" + "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" + "xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;" + "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;" + "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)" + ":RSVP(resend_interval=5 00;ack_on_delivery=false;timeout=60000)"; @Test public void testRefCount() throws Exception { @@ -83,7 +84,6 @@ public class JGroupsBroadcastTest extends ActiveMQTestBase { channelEndpoint1.openClient(); - } catch (Exception e) { e.printStackTrace();