https://issues.apache.org/jira/browse/AMQ-6086 - start exception can prevent 
stop from exiting fully, need to be more selective in creation on stop.

(cherry picked from commit 35df815fb86d201e63b4f0f2d53bee3bae5c0752)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2ffd7498
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2ffd7498
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2ffd7498

Branch: refs/heads/activemq-5.13.x
Commit: 2ffd7498a835be85b08e5ca9966a26a2d4ac0cd3
Parents: f3aedc7
Author: gtully <gary.tu...@gmail.com>
Authored: Mon Dec 14 15:39:37 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Mon Dec 14 19:09:15 2015 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   | 13 ++--
 .../StartAndConcurrentStopBrokerTest.java       | 75 ++++++++++++++++++--
 2 files changed, 77 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2ffd7498/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 62af182..d028078 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -785,9 +785,7 @@ public class BrokerService implements Service {
             return;
         }
 
-        if (started.get()) {
-            setStartException(new BrokerStoppedException("Stop invoked"));
-        }
+        setStartException(new BrokerStoppedException("Stop invoked"));
         MDC.put("activemq.broker", brokerName);
 
         if (systemExitOnShutdown) {
@@ -836,7 +834,7 @@ public class BrokerService implements Service {
             stopper.stop(getPersistenceAdapter());
             persistenceAdapter = null;
             if (isUseJmx()) {
-                stopper.stop(getManagementContext());
+                stopper.stop(managementContext);
                 managementContext = null;
             }
             // Clear SelectorParser cache to free memory
@@ -1229,8 +1227,7 @@ public class BrokerService implements Service {
     }
 
     public synchronized PersistenceAdapter getPersistenceAdapter() throws 
IOException {
-        if (persistenceAdapter == null) {
-            checkStartException();
+        if (persistenceAdapter == null && !hasStartException()) {
             persistenceAdapter = createPersistenceAdapter();
             configureService(persistenceAdapter);
             this.persistenceAdapter = 
registerPersistenceAdapterMBean(persistenceAdapter);
@@ -1330,6 +1327,10 @@ public class BrokerService implements Service {
         }
     }
 
+    synchronized private boolean hasStartException() {
+        return startException != null;
+    }
+
     synchronized private void setStartException(Throwable t) {
         startException = t;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ffd7498/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
index b2ad1cd..4d194ca 100755
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
@@ -23,6 +23,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import javax.management.Attribute;
 import javax.management.AttributeList;
 import javax.management.AttributeNotFoundException;
@@ -51,6 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 
@@ -61,7 +64,11 @@ public class StartAndConcurrentStopBrokerTest {
     @Test(timeout = 30000)
     public void testConcurrentStop() throws Exception {
 
+        final AtomicReference<Throwable> error = new 
AtomicReference<Throwable>();
         final CountDownLatch gotBrokerMbean = new CountDownLatch(1);
+        final CountDownLatch gotPaMBean = new CountDownLatch(1);
+        final AtomicBoolean checkPaMBean = new AtomicBoolean(false);
+
         final HashMap mbeans = new HashMap();
         final MBeanServer mBeanServer = new MBeanServer() {
             @Override
@@ -87,7 +94,7 @@ public class StartAndConcurrentStopBrokerTest {
             @Override
             public ObjectInstance registerMBean(Object object, ObjectName 
name) throws InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
                 if (mbeans.containsKey(name)) {
-                    throw new InstanceAlreadyExistsException("Got one 
already");
+                    throw new InstanceAlreadyExistsException("Got one already: 
" + name);
                 }
                 LOG.info("register:" + name);
 
@@ -95,8 +102,16 @@ public class StartAndConcurrentStopBrokerTest {
                     if (name.compareTo(new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost")) == 0) {
                         gotBrokerMbean.countDown();
                     }
+
+                    if (checkPaMBean.get()) {
+                        if (new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,service=PersistenceAdapter,instanceName=*").apply(name))
 {
+                            gotPaMBean.countDown();
+                        }
+                    }
+
                 } catch (Exception e) {
                     e.printStackTrace();
+                    error.set(e);
                 }
                 mbeans.put(name, object);
                 return new ObjectInstance(name, object.getClass().getName());
@@ -261,6 +276,7 @@ public class StartAndConcurrentStopBrokerTest {
 
 
         final BrokerService broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
 
         ExecutorService executor = Executors.newFixedThreadPool(4);
         executor.execute(new Runnable() {
@@ -272,6 +288,7 @@ public class StartAndConcurrentStopBrokerTest {
                 } catch (BrokerStoppedException expected) {
                 } catch (Exception e) {
                     e.printStackTrace();
+                    error.set(e);
                 }
             }
         });
@@ -285,6 +302,7 @@ public class StartAndConcurrentStopBrokerTest {
                     broker.stop();
                 } catch (Exception e) {
                     e.printStackTrace();
+                    error.set(e);
                 }
             }
         });
@@ -292,10 +310,57 @@ public class StartAndConcurrentStopBrokerTest {
         executor.shutdown();
         assertTrue("stop tasks done", executor.awaitTermination(20, 
TimeUnit.SECONDS));
 
-        BrokerService second = new BrokerService();
-        second.getManagementContext().setMBeanServer(mBeanServer);
-        second.start();
-        second.stop();
+        BrokerService sanityBroker = new BrokerService();
+        sanityBroker.getManagementContext().setMBeanServer(mBeanServer);
+        sanityBroker.start();
+        sanityBroker.stop();
+
+        assertNull("No error", error.get());
+
+        // again, after Persistence adapter mbean
+        final BrokerService brokerTwo = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        checkPaMBean.set(true);
+        executor = Executors.newFixedThreadPool(4);
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    
brokerTwo.getManagementContext().setMBeanServer(mBeanServer);
+                    brokerTwo.start();
+                } catch (BrokerStoppedException expected) {
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    error.set(e);
+                }
+            }
+        });
+
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    assertTrue("broker has registered persistence adapter 
mbean", gotPaMBean.await(10, TimeUnit.SECONDS));
+                    brokerTwo.stop();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    error.set(e);
+                }
+            }
+        });
+
+        executor.shutdown();
+        assertTrue("stop tasks done", executor.awaitTermination(20, 
TimeUnit.SECONDS));
+
+        assertTrue("broker has registered persistence adapter mbean", 
gotPaMBean.await(0, TimeUnit.SECONDS));
+
+        sanityBroker = new BrokerService();
+        sanityBroker.getManagementContext().setMBeanServer(mBeanServer);
+        sanityBroker.start();
+        sanityBroker.stop();
+
+        assertNull("No error", error.get());
 
     }
 

Reply via email to