ARTEMIS-127 Fix some concurrency idioms for ActimeMQ Tests

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ae6a2b87
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ae6a2b87
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ae6a2b87

Branch: refs/heads/master
Commit: ae6a2b87eaa9325d2437db2ccd5152f6db0d2ad3
Parents: 64ecb95
Author: Thiago Kronig <thiagokro...@gmail.com>
Authored: Wed Jun 10 20:37:58 2015 -0300
Committer: Thiago Kronig <thiagokro...@gmail.com>
Committed: Fri Jun 12 00:30:23 2015 -0300

----------------------------------------------------------------------
 .../JmsCreateConsumerInOnMessageTest.java       |  6 ++--
 .../activemq/JmsMultipleClientsTestSupport.java |  4 +--
 .../activemq/LargeMessageTestSupport.java       |  8 ++---
 .../activemq/OnePrefetchAsyncConsumerTest.java  |  2 +-
 .../region/QueueResendDuringShutdownTest.java   |  4 +--
 .../org/apache/activemq/bugs/AMQ2149Test.java   |  2 +-
 .../org/apache/activemq/bugs/AMQ4607Test.java   |  2 +-
 .../org/apache/activemq/bugs/CraigsBugTest.java |  7 +++--
 .../activemq/bugs/amq1974/TryJmsClient.java     |  5 ++-
 .../activemq/bugs/amq1974/TryJmsManager.java    |  5 ++-
 .../apache/activemq/spring/ConsumerBean.java    | 33 +++++++++++---------
 .../apache/activemq/spring/SpringConsumer.java  |  8 ++---
 .../activemq/store/kahadb/plist/PListTest.java  |  2 +-
 .../activemq/streams/JMSInputStreamTest.java    |  2 +-
 .../activemq/transport/TopicClusterTest.java    |  4 +--
 .../transport/failover/AMQ1925Test.java         | 27 +++++-----------
 .../activemq/transport/udp/UdpTestSupport.java  |  6 ++--
 .../ConcurrentProducerDurableConsumerTest.java  | 14 +++------
 .../ConcurrentProducerQueueConsumerTest.java    | 14 +++------
 .../usecases/MultiBrokersMultiClientsTest.java  |  2 +-
 .../usecases/NoDuplicateOnTopicNetworkTest.java |  2 +-
 .../usecases/ReliableReconnectTest.java         |  8 ++---
 .../VerifyNetworkConsumersDisconnectTest.java   |  2 +-
 .../org/apache/activemq/util/MessageIdList.java | 32 ++++++++++++-------
 24 files changed, 95 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
index 7a219e2..c0a4f5f 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java
@@ -36,7 +36,7 @@ public class JmsCreateConsumerInOnMessageTest extends 
TestSupport implements Mes
     private MessageConsumer testConsumer;
     private MessageProducer producer;
     private Topic topic;
-    private Object lock = new Object();
+    private final Object lock = new Object();
 
     /*
      * @see junit.framework.TestCase#setUp()
@@ -71,8 +71,8 @@ public class JmsCreateConsumerInOnMessageTest extends 
TestSupport implements Mes
     public void testCreateConsumer() throws Exception {
         Message msg = super.createMessage();
         producer.send(msg);
-        if (testConsumer == null) {
-            synchronized (lock) {
+        synchronized (lock) {
+            while(testConsumer == null) {
                 lock.wait(3000);
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
index 5eaab8d..5c73a6e 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java
@@ -83,8 +83,6 @@ public class JmsMultipleClientsTestSupport {
     protected List<Connection> connections = Collections.synchronizedList(new 
ArrayList<Connection>());
     protected MessageIdList allMessagesList = new MessageIdList();
 
-    private AtomicInteger producerLock;
-
     protected void startProducers(Destination dest, int msgCount) throws 
Exception {
         startProducers(createConnectionFactory(), dest, msgCount);
     }
@@ -92,7 +90,7 @@ public class JmsMultipleClientsTestSupport {
     protected void startProducers(final ConnectionFactory factory, final 
Destination dest, final int msgCount) throws Exception {
         // Use concurrent send
         if (useConcurrentSend) {
-            producerLock = new AtomicInteger(producerCount);
+            final AtomicInteger producerLock = new 
AtomicInteger(producerCount);
 
             for (int i = 0; i < producerCount; i++) {
                 Thread t = new Thread(new Runnable() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
index fc77218..d1ab8a5 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
@@ -61,7 +61,7 @@ public class LargeMessageTestSupport extends 
ClientTestSupport implements Messag
     protected int deliveryMode = DeliveryMode.PERSISTENT;
     protected IdGenerator idGen = new IdGenerator();
     protected boolean validMessageConsumption = true;
-    protected AtomicInteger messageCount = new AtomicInteger(0);
+    protected final AtomicInteger messageCount = new AtomicInteger(0);
 
     protected int prefetchValue = 10000000;
 
@@ -182,9 +182,9 @@ public class LargeMessageTestSupport extends 
ClientTestSupport implements Messag
             producer.send(msg);
         }
         long now = System.currentTimeMillis();
-        while (now + 60000 > System.currentTimeMillis() && messageCount.get() 
< MESSAGE_COUNT) {
-            LOG.info("message count = " + messageCount);
-            synchronized (messageCount) {
+        synchronized (messageCount) {
+            while (now + 60000 > System.currentTimeMillis() && 
messageCount.get() < MESSAGE_COUNT) {
+                LOG.info("message count = " + messageCount);
                 messageCount.wait(1000);
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
index 0851198..26c6bf1 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java
@@ -154,7 +154,7 @@ public class OnePrefetchAsyncConsumerTest extends 
EmbeddedBrokerTestSupport {
     }
 
     private class TestServerSession implements ServerSession {
-        TestServerSessionPool pool;
+        final TestServerSessionPool pool;
         Session session;
 
         public TestServerSession(TestServerSessionPool pool) throws 
JMSException {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
index 0439fa8..c7154a9 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java
@@ -54,7 +54,7 @@ public class QueueResendDuringShutdownTest {
     private Connection                  producerConnection;
     private Queue                       queue;
 
-    private Object                      messageReceiveSync = new Object();
+    private final Object                messageReceiveSync = new Object();
     private int                         receiveCount;
 
     @Before
@@ -239,7 +239,7 @@ public class QueueResendDuringShutdownTest {
     protected void  waitForMessage (long delayMs) {
         try {
             synchronized ( this.messageReceiveSync ) {
-                if ( this.receiveCount == 0 ) {
+                while ( this.receiveCount == 0 ) {
                     this.messageReceiveSync.wait(delayMs);
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
index b2eba61..c28d3ad 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
@@ -563,7 +563,7 @@ public class AMQ2149Test {
 }
 
 class TeardownTask implements Callable<Boolean> {
-    private Object brokerLock;
+    private final Object brokerLock;
     private BrokerService broker;
 
     public TeardownTask(Object brokerLock, BrokerService broker) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
index 265b692..b567c93 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java
@@ -49,7 +49,7 @@ public class AMQ4607Test extends 
JmsMultipleBrokersTestSupport implements Uncaug
 
     public boolean duplex = true;
     protected Map<String, MessageConsumer> consumerMap;
-    Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, 
Throwable>();
+    final Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, 
Throwable>();
 
     private void assertNoUnhandeledExceptions() {
         for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
index f956da6..d71a9e4 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
@@ -25,6 +25,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.command.ActiveMQQueue;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 public class CraigsBugTest extends EmbeddedBrokerTestSupport {
 
     private String connectionUri;
@@ -49,9 +52,7 @@ public class CraigsBugTest extends EmbeddedBrokerTestSupport {
         conn.start();
 
         try {
-            synchronized (this) {
-                wait(3000);
-            }
+            new CountDownLatch(1).await(3, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
index c8b4503..1f25109 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
@@ -25,6 +25,7 @@ import 
org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
 import javax.jms.*;
 import java.io.File;
 import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
 
 public class TryJmsClient
 {
@@ -59,9 +60,7 @@ public class TryJmsClient
 
         startMessageSend();
 
-        synchronized(this) {
-            this.wait();
-        }
+        new CountDownLatch(1).await();
     }
 
     private void startUsageMonitor(final BrokerService brokerService) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
index 3f58987..c8eb7b3 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
@@ -25,6 +25,7 @@ import 
org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
 import javax.jms.*;
 import java.io.File;
 import java.net.URISyntaxException;
+import java.util.concurrent.CountDownLatch;
 
 public class TryJmsManager {
 
@@ -59,9 +60,7 @@ public class TryJmsManager {
 
         startMessageConsumer();
 
-        synchronized(this) {
-            this.wait();
-        }
+        new CountDownLatch(1).await();
     }
 
     private void startUsageMonitor(final BrokerService brokerService) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
index 8f22c33..4e1ab59 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java
@@ -73,14 +73,19 @@ public class ConsumerBean extends Assert implements 
MessageListener {
 
         long start = System.currentTimeMillis();
 
-        try {
-            if (hasReceivedMessage()) {
-                synchronized (messages) {
+        synchronized(messages)
+        {
+            try
+            {
+                while (hasReceivedMessage())
+                {
                     messages.wait(4000);
                 }
             }
-        } catch (InterruptedException e) {
-            LOG.info("Caught: " + e);
+            catch (InterruptedException e)
+            {
+                LOG.info("Caught: " + e);
+            }
         }
         long end = System.currentTimeMillis() - start;
 
@@ -101,18 +106,18 @@ public class ConsumerBean extends Assert implements 
MessageListener {
         LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to 
arrive");
         long start = System.currentTimeMillis();
         long endTime = start + maxWaitTime;
-        while (maxRemainingMessageCount > 0) {
-            try {
-                synchronized (messages) {
+        synchronized (messages) {
+            while (maxRemainingMessageCount > 0) {
+                try {
                     messages.wait(1000);
+                    if (hasReceivedMessages(messageCount) || 
System.currentTimeMillis() > endTime) {
+                        break;
+                    }
+                } catch (InterruptedException e) {
+                    LOG.info("Caught: " + e);
                 }
-                if (hasReceivedMessages(messageCount) || 
System.currentTimeMillis() > endTime) {
-                    break;
-                }
-            } catch (InterruptedException e) {
-                LOG.info("Caught: " + e);
+                maxRemainingMessageCount = Math.max(0, messageCount - 
messages.size());
             }
-            maxRemainingMessageCount = Math.max(0, messageCount - 
messages.size());
         }
         long end = System.currentTimeMillis() - start;
         LOG.info("End of wait for " + end + " millis");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java
index 118e036..ed0a48a 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java
@@ -43,13 +43,13 @@ public class SpringConsumer extends ConsumerBean implements 
MessageListener {
 
         try {
             ConnectionFactory factory = template.getConnectionFactory();
-            connection = factory.createConnection();
+            final Connection c = connection = factory.createConnection();
 
             // we might be a reusable connection in spring
             // so lets only set the client ID once if its not set
-            synchronized (connection) {
-                if (connection.getClientID() == null) {
-                    connection.setClientID(myId);
+            synchronized (c) {
+                if (c.getClientID() == null) {
+                    c.setClientID(myId);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
index 555503e..71e4618 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
@@ -617,7 +617,7 @@ public class PListTest {
         }
     }
 
-    Map<PList, Object> locks = new HashMap<PList, Object>();
+    final Map<PList, Object> locks = new HashMap<PList, Object>();
 
     private Object plistLocks(PList plist) {
         Object lock = null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
index f392662..b07c8cc 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java
@@ -250,7 +250,7 @@ public class JMSInputStreamTest extends JmsTestSupport {
         }
         out.flush();
         synchronized (complete) {
-            if (!complete.get()) {
+            while (!complete.get()) {
                 complete.wait(30000);
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
index 4db7c23..26c215a 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java
@@ -55,7 +55,7 @@ public class TopicClusterTest extends TestCase implements 
MessageListener {
     
     protected Destination destination;
     protected boolean topic = true;
-    protected AtomicInteger receivedMessageCount = new AtomicInteger(0);
+    protected final AtomicInteger receivedMessageCount = new AtomicInteger(0);
     protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
     protected MessageProducer[] producers;
     protected Connection[] connections;
@@ -166,7 +166,7 @@ public class TopicClusterTest extends TestCase implements 
MessageListener {
             }
         }
         synchronized (receivedMessageCount) {
-            if (receivedMessageCount.get() < expectedReceiveCount()) {
+            while (receivedMessageCount.get() < expectedReceiveCount()) {
                 receivedMessageCount.wait(20000);
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
index dfb5dfd..d03dbcd 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java
@@ -21,6 +21,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
@@ -73,14 +74,12 @@ public class AMQ1925Test extends TestCase implements 
ExceptionListener {
 
                // The runnable is likely to interrupt during the 
session#commit, since
                // this takes the longest
-               final Object starter = new Object();
+               final CountDownLatch starter = new CountDownLatch(1);
                final AtomicBoolean restarted = new AtomicBoolean();
                new Thread(new Runnable() {
                        public void run() {
                                try {
-                                       synchronized (starter) {
-                                               starter.wait();
-                                       }
+                                       starter.await();
 
                                        // Simulate broker failure & restart
                                        bs.stop();
@@ -97,9 +96,6 @@ public class AMQ1925Test extends TestCase implements 
ExceptionListener {
                        }
                }).start();
 
-               synchronized (starter) {
-                       starter.notifyAll();
-               }
                for (int i = 0; i < MESSAGE_COUNT; i++) {
                        Message message = consumer.receive(500);
                        assertNotNull("No Message " + i + " found", message);
@@ -108,9 +104,7 @@ public class AMQ1925Test extends TestCase implements 
ExceptionListener {
                                assertFalse("Timing problem, restarted too 
soon", restarted
                                                .get());
                        if (i == 10) {
-                               synchronized (starter) {
-                                       starter.notifyAll();
-                               }
+                               starter.countDown();
                        }
                        if (i > MESSAGE_COUNT - 100) {
                                assertTrue("Timing problem, restarted too 
late", restarted
@@ -143,14 +137,12 @@ public class AMQ1925Test extends TestCase implements 
ExceptionListener {
 
                // The runnable is likely to interrupt during the 
session#commit, since
                // this takes the longest
-               final Object starter = new Object();
+               final CountDownLatch starter = new CountDownLatch(1);
                final AtomicBoolean restarted = new AtomicBoolean();
                new Thread(new Runnable() {
                        public void run() {
                                try {
-                                       synchronized (starter) {
-                                               starter.wait();
-                                       }
+                                       starter.await();
 
                                        // Simulate broker failure & restart
                                        bs.stop();
@@ -167,9 +159,6 @@ public class AMQ1925Test extends TestCase implements 
ExceptionListener {
                        }
                }).start();
 
-               synchronized (starter) {
-                       starter.notifyAll();
-               }
                Collection<Integer> results = new 
ArrayList<Integer>(MESSAGE_COUNT);
                for (int i = 0; i < MESSAGE_COUNT; i++) {
                        Message message1 = consumer1.receive(20);
@@ -191,9 +180,7 @@ public class AMQ1925Test extends TestCase implements 
ExceptionListener {
                                assertFalse("Timing problem, restarted too 
soon", restarted
                                                .get());
                        if (i == 10) {
-                               synchronized (starter) {
-                                       starter.notifyAll();
-                               }
+                               starter.countDown();
                        }
                        if (i > MESSAGE_COUNT - 50) {
                                assertTrue("Timing problem, restarted too 
late", restarted

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
index 1d770de..7defe95 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
@@ -46,7 +46,7 @@ public abstract class UdpTestSupport extends TestCase 
implements TransportListen
     protected Transport producer;
     protected Transport consumer;
 
-    protected Object lock = new Object();
+    protected final Object lock = new Object();
     protected Command receivedCommand;
     protected TransportServer server;
     protected boolean large;
@@ -251,10 +251,10 @@ public abstract class UdpTestSupport extends TestCase 
implements TransportListen
         Command answer = null;
         synchronized (lock) {
             answer = receivedCommand;
-            if (answer == null) {
+            while (answer == null) {
                 lock.wait(waitForCommandTimeout);
+                answer = receivedCommand;
             }
-            answer = receivedCommand;
         }
 
         assertNotNull("Should have received a Command by now!", answer);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
index e1035a6..0e71dfe 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
@@ -107,7 +107,7 @@ public class ConcurrentProducerDurableConsumerTest extends 
TestSupport {
 
         // periodically start a durable sub that has a backlog
         final int consumersToActivate = 5;
-        final Object addConsumerSignal = new Object();
+        final CountDownLatch addConsumerSignal = new CountDownLatch(1);
         Executors.newCachedThreadPool(new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
@@ -120,9 +120,7 @@ public class ConcurrentProducerDurableConsumerTest extends 
TestSupport {
                     MessageConsumer consumer = null;
                     for (int i = 0; i < consumersToActivate; i++) {
                         LOG.info("Waiting for add signal from producer...");
-                        synchronized (addConsumerSignal) {
-                            addConsumerSignal.wait(30 * 60 * 1000);
-                        }
+                        addConsumerSignal.await(30, TimeUnit.MINUTES);
                         TimedMessageListener listener = new 
TimedMessageListener();
                         consumer = 
createDurableSubscriber(factory.createConnection(), destination, "consumer" + 
(i + 1));
                         LOG.info("Created consumer " + consumer);
@@ -254,7 +252,7 @@ public class ConcurrentProducerDurableConsumerTest extends 
TestSupport {
                                      final int numIterations,
                                      Session session,
                                      MessageProducer producer,
-                                     Object addConsumerSignal) throws 
Exception {
+                                     CountDownLatch addConsumerSignal) throws 
Exception {
         long start;
         long count = 0;
         double batchMax = 0, max = 0, sum = 0;
@@ -269,10 +267,8 @@ public class ConcurrentProducerDurableConsumerTest extends 
TestSupport {
                 max = Math.max(max, (System.currentTimeMillis() - 
singleSendstart));
                 if (++count % 500 == 0) {
                     if (addConsumerSignal != null) {
-                        synchronized (addConsumerSignal) {
-                            addConsumerSignal.notifyAll();
-                            LOG.info("Signalled add consumer");
-                        }
+                        addConsumerSignal.countDown();
+                        LOG.info("Signalled add consumer");
                     }
                 };
                 if (count % 5000 == 0) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java
index 34807c6..931fb55 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java
@@ -95,7 +95,7 @@ public class ConcurrentProducerQueueConsumerTest extends 
TestSupport
 
         // periodically start a queue consumer
         final int consumersToActivate = 5;
-        final Object addConsumerSignal = new Object();
+        final CountDownLatch addConsumerSignal = new CountDownLatch(1);
         Executors.newCachedThreadPool(new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
@@ -108,9 +108,7 @@ public class ConcurrentProducerQueueConsumerTest extends 
TestSupport
                     MessageConsumer consumer = null;
                     for (int i = 0; i < consumersToActivate; i++) {
                         LOG.info("Waiting for add signal from producer...");
-                        synchronized (addConsumerSignal) {
-                            addConsumerSignal.wait(30 * 60 * 1000);
-                        }
+                        addConsumerSignal.await(30, TimeUnit.MINUTES);
                         TimedMessageListener listener = new 
TimedMessageListener();
                         consumer = createConsumer(factory.createConnection(), 
destination);
                         LOG.info("Created consumer " + consumer);
@@ -241,7 +239,7 @@ public class ConcurrentProducerQueueConsumerTest extends 
TestSupport
                                      final int numIterations,
                                      Session session,
                                      MessageProducer producer,
-                                     Object addConsumerSignal) throws 
Exception {
+                                     CountDownLatch addConsumerSignal) throws 
Exception {
         long start;
         long count = 0;
         double batchMax = 0, max = 0, sum = 0;
@@ -257,10 +255,8 @@ public class ConcurrentProducerQueueConsumerTest extends 
TestSupport
                 max = Math.max(max, (System.currentTimeMillis() - 
singleSendstart));
                 if (++count % 500 == 0) {
                     if (addConsumerSignal != null) {
-                        synchronized (addConsumerSignal) {
-                            addConsumerSignal.notifyAll();
-                            LOG.info("Signalled add consumer");
-                        }
+                        addConsumerSignal.countDown();
+                        LOG.info("Signalled add consumer");
                     }
                 }
                 ;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
index bd5c4c8..df02d9e 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java
@@ -44,7 +44,7 @@ public class MultiBrokersMultiClientsTest extends 
JmsMultipleBrokersTestSupport
     private static final Logger LOG = 
LoggerFactory.getLogger(MultiBrokersMultiClientsTest.class);
 
     protected Map<String, MessageConsumer> consumerMap;
-    Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, 
Throwable>();
+    final Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, 
Throwable>();
 
     public void testTopicAllConnected() throws Exception {
         bridgeAllBrokers();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
index 2aa614d..6d28f34 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
@@ -265,7 +265,7 @@ public class NoDuplicateOnTopicNetworkTest extends 
CombinationTestSupport {
         private MessageConsumer consumer;
         private final String durableID = "DURABLE_ID";
 
-        private List<String> receivedStrings = 
Collections.synchronizedList(new ArrayList<String>());
+        private final List<String> receivedStrings = 
Collections.synchronizedList(new ArrayList<String>());
         private int numMessages = 10;
         private CountDownLatch recievedLatch = new CountDownLatch(numMessages);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java
index 05fd5f8..9dc0032 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java
@@ -47,8 +47,8 @@ public class ReliableReconnectTest extends 
org.apache.activemq.TestSupport {
     protected int deliveryMode = DeliveryMode.PERSISTENT;
     protected String consumerClientId;
     protected Destination destination;
-    protected AtomicBoolean closeBroker = new AtomicBoolean(false);
-    protected AtomicInteger messagesReceived = new AtomicInteger(0);
+    protected final AtomicBoolean closeBroker = new AtomicBoolean(false);
+    protected final AtomicInteger messagesReceived = new AtomicInteger(0);
     protected BrokerService broker;
     protected int firstBatch = MESSAGE_COUNT / 10;
     private IdGenerator idGen = new IdGenerator();
@@ -159,7 +159,7 @@ public class ReliableReconnectTest extends 
org.apache.activemq.TestSupport {
         connection.close();
         spawnConsumer();
         synchronized (closeBroker) {
-            if (!closeBroker.get()) {
+            while (!closeBroker.get()) {
                 closeBroker.wait();
             }
         }
@@ -168,7 +168,7 @@ public class ReliableReconnectTest extends 
org.apache.activemq.TestSupport {
         startBroker(false);
         // System.err.println("Started Broker again");
         synchronized (messagesReceived) {
-            if (messagesReceived.get() < MESSAGE_COUNT) {
+            while (messagesReceived.get() < MESSAGE_COUNT) {
                 messagesReceived.wait(60000);
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
index 9eeb28c..eb5a3e2 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java
@@ -51,7 +51,7 @@ public class VerifyNetworkConsumersDisconnectTest extends 
JmsMultipleBrokersTest
     public static final int TIMEOUT = 30000;
 
     protected Map<String, MessageConsumer> consumerMap;
-    Map<Thread, Throwable> unhandledExceptions = new HashMap<Thread, 
Throwable>();
+    final Map<Thread, Throwable> unhandledExceptions = new HashMap<Thread, 
Throwable>();
 
     private void assertNoUnhandledExceptions() {
         for( Entry<Thread, Throwable> e: unhandledExceptions.entrySet()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ae6a2b87/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java
index 7140a86..c644c67 100644
--- 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java
@@ -140,20 +140,28 @@ public class MessageIdList extends Assert implements 
MessageListener {
 
         long start = System.currentTimeMillis();
 
-        for (int i = 0; i < messageCount; i++) {
-            try {
-                if (hasReceivedMessages(messageCount)) {
-                    break;
-                }
-                long duration = System.currentTimeMillis() - start;
-                if (duration >= maximumDuration) {
-                    break;
-                }
-                synchronized (semaphore) {
+        synchronized (semaphore)
+        {
+            for (int i = 0; i < messageCount; i++)
+            {
+                try
+                {
+                    if (hasReceivedMessages(messageCount))
+                    {
+                        break;
+                    }
+                    long duration = System.currentTimeMillis() - start;
+                    if (duration >= maximumDuration)
+                    {
+                        break;
+                    }
+
                     semaphore.wait(maximumDuration - duration);
                 }
-            } catch (InterruptedException e) {
-                LOG.info("Caught: " + e);
+                catch (InterruptedException e)
+                {
+                    LOG.info("Caught: " + e);
+                }
             }
         }
         long end = System.currentTimeMillis() - start;

Reply via email to