And more, here's the output (DemandForwardingBridge set to DEBUG): INFO | Successfully deleted temporary storage INFO | Using Persistence Adapter: MemoryPersistenceAdapter INFO | ActiveMQ 5.3.1 JMS Message Broker (one) is starting INFO | For help or more information please see: http://activemq.apache.org/ INFO | Listening for connections at: tcp://SIM-APPLE.local:61616 INFO | Connector tcp://SIM-APPLE.local:61616 Started INFO | Establishing network connection from vm://one to tcp://localhost:51515 INFO | Connector vm://one Started INFO | Connector vm://one Stopped WARN | Could not start network bridge between: vm://one and: tcp://localhost:51515 due to: java.net.ConnectException: Connection refused INFO | Network Connector network1 Started INFO | ActiveMQ JMS Message Broker (one, ID:SIM-APPLE.local-57893-1273267004592-0:0) started INFO | Successfully deleted temporary storage INFO | Using Persistence Adapter: MemoryPersistenceAdapter INFO | ActiveMQ 5.3.1 JMS Message Broker (two) is starting INFO | For help or more information please see: http://activemq.apache.org/ INFO | Listening for connections at: tcp://SIM-APPLE.local:51515 INFO | Connector tcp://SIM-APPLE.local:51515 Started INFO | Establishing network connection from vm://two to tcp://localhost:61616 INFO | Connector vm://two Started INFO | Network Connector network1 Started INFO | ActiveMQ JMS Message Broker (two, ID:SIM-APPLE.local-57893-1273267004592-0:1) started DEBUG | two starting remote Bridge, localBroker=vm://two#2 DEBUG | counting down remoteBrokerNameKnownLatch with: BrokerInfo {commandId = 0, responseRequired = false, brokerId = ID:SIM-APPLE.local-57893-1273267004592-0:0, brokerURL = tcp://SIM-APPLE.local:61616, slaveBroker = false, masterBroker = false, faultTolerantConfiguration = false, networkConnection = false, duplexConnection = false, peerBrokerInfos = [], brokerName = one, connectionId = 0, brokerUploadUrl = null, networkProperties = null} DEBUG | two starting local Bridge, localBroker=vm://two#2 INFO | Network connection between vm://two#2 and tcp://localhost/127.0.0.1:61616(one) has been established. INFO | Successfully connected to tcp://localhost:61616 INFO | Successfully connected to tcp://localhost:51515 DEBUG | two bridging sub on vm://two#2 from one : ConsumerInfo {commandId = 4, responseRequired = true, consumerId = ID:SIM-APPLE.local-57893-1273267004592-4:0:1:1, destination = queue://testingqueue, prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, subscriptionName = null, noLocal = false, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null} DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination = queue://testingqueue, transactionId = null, expiration = 0, timestamp = 1273267005260, arrival = 0, brokerInTime = 1273267005261, brokerOutTime = 1273267005272, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1040, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hello World!} DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 6, responseRequired = true, messageId = ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination = queue://testingqueue, transactionId = null, expiration = 0, timestamp = 1273267005271, arrival = 0, brokerInTime = 1273267005272, brokerOutTime = 1273267005275, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1040, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hello World!} DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 7, responseRequired = true, messageId = ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:3, originalDestination = null, originalTransactionId = null, producerId = ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination = queue://testingqueue, transactionId = null, expiration = 0, timestamp = 1273267005272, arrival = 0, brokerInTime = 1273267005272, brokerOutTime = 1273267005282, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1040, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hello World!} DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 8, responseRequired = true, messageId = ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:4, originalDestination = null, originalTransactionId = null, producerId = ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination = queue://testingqueue, transactionId = null, expiration = 0, timestamp = 1273267005273, arrival = 0, brokerInTime = 1273267005274, brokerOutTime = 1273267005286, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1040, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hello World!} DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 9, responseRequired = true, messageId = ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:5, originalDestination = null, originalTransactionId = null, producerId = ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination = queue://testingqueue, transactionId = null, expiration = 0, timestamp = 1273267005274, arrival = 0, brokerInTime = 1273267005275, brokerOutTime = 1273267005302, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1040, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hello World!} DEBUG | two Ignoring sub from one as already subscribed to matching destination : ConsumerInfo {commandId = 10, responseRequired = true, consumerId = ID:SIM-APPLE.local-57893-1273267004592-4:0:2:1, destination = queue://testingqueue, prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, subscriptionName = null, noLocal = false, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null} active consumer count: 2 concurrent consumer count: 1 DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 10, responseRequired = true, messageId = ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:6, originalDestination = null, originalTransactionId = null, producerId = ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination = queue://testingqueue, transactionId = null, expiration = 0, timestamp = 1273267005275, arrival = 0, brokerInTime = 1273267005275, brokerOutTime = 1273267005305, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1040, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hello World!} DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 11, responseRequired = true, messageId = ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:7, originalDestination = null, originalTransactionId = null, producerId = ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination = queue://testingqueue, transactionId = null, expiration = 0, timestamp = 1273267005277, arrival = 0, brokerInTime = 1273267005280, brokerOutTime = 1273267005306, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1040, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hello World!} DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 12, responseRequired = true, messageId = ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:8, originalDestination = null, originalTransactionId = null, producerId = ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination = queue://testingqueue, transactionId = null, expiration = 0, timestamp = 1273267005283, arrival = 0, brokerInTime = 1273267005286, brokerOutTime = 1273267005308, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1040, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hello World!} DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 13, responseRequired = true, messageId = ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:9, originalDestination = null, originalTransactionId = null, producerId = ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination = queue://testingqueue, transactionId = null, expiration = 0, timestamp = 1273267005294, arrival = 0, brokerInTime = 1273267005294, brokerOutTime = 1273267005310, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1040, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hello World!} DEBUG | two Ignoring sub from one as already subscribed to matching destination : ConsumerInfo {commandId = 15, responseRequired = true, consumerId = ID:SIM-APPLE.local-57893-1273267004592-4:0:3:1, destination = queue://testingqueue, prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, subscriptionName = null, noLocal = false, exclusive = false, retroactive = false, priority = 0, brokerPath = null, optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null} DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 14, responseRequired = true, messageId = ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:10, originalDestination = null, originalTransactionId = null, producerId = ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination = queue://testingqueue, transactionId = null, expiration = 0, timestamp = 1273267005300, arrival = 0, brokerInTime = 1273267005301, brokerOutTime = 1273267005314, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1040, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, text = Hello World!} INFO | Establishing network connection from vm://one to tcp://localhost:51515 INFO | Connector vm://one Started DEBUG | one starting remote Bridge, localBroker=vm://one#4 DEBUG | counting down remoteBrokerNameKnownLatch with: BrokerInfo {commandId = 0, responseRequired = false, brokerId = ID:SIM-APPLE.local-57893-1273267004592-0:1, brokerURL = tcp://SIM-APPLE.local:51515, slaveBroker = false, masterBroker = false, faultTolerantConfiguration = false, networkConnection = false, duplexConnection = false, peerBrokerInfos = [], brokerName = two, connectionId = 0, brokerUploadUrl = null, networkProperties = null} DEBUG | one starting local Bridge, localBroker=vm://one#4 INFO | Network connection between vm://one#4 and tcp://localhost/127.0.0.1:51515(two) has been established. DEBUG | one Ignoring sub from two, already routed through this broker once : ConsumerInfo {commandId = 4, responseRequired = false, consumerId = ID:SIM-APPLE.local-57893-1273267004592-3:1:1:1, destination = queue://testingqueue, prefetchSize = 1, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, subscriptionName = null, noLocal = false, exclusive = false, retroactive = false, priority = 0, brokerPath = [ID:SIM-APPLE.local-57893-1273267004592-0:0], optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = org.apache.activemq.command.networkbridgefil...@73c6641} active consumer count: 3 concurrent consumer count: 1 DEBUG | two remove request on vm://two#2 from one , consumer id: ID:SIM-APPLE.local-57893-1273267004592-4:0:2:1, matching sub: null DEBUG | two remove request on vm://two#2 from one , consumer id: ID:SIM-APPLE.local-57893-1273267004592-4:0:1:1, matching sub: org.apache.activemq.network.demandsubscript...@6632060c DEBUG | two remove local subscription for remote ID:SIM-APPLE.local-57893-1273267004592-4:0:1:1 DEBUG | two removed sub on vm://two#2 from one : ConsumerInfo {commandId = 4, responseRequired = true, consumerId = ID:SIM-APPLE.local-57893-1273267004592-4:0:1:1, destination = queue://testingqueue, prefetchSize = 1000, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true, selector = null, subscriptionName = null, noLocal = false, exclusive = false, retroactive = false, priority = 0, brokerPath = [ID:SIM-APPLE.local-57893-1273267004592-0:0], optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null} DEBUG | one remove request on vm://one#4 from two , consumer id: ID:SIM-APPLE.local-57893-1273267004592-3:1:1:1, matching sub: null INFO | Kaha Store using data directory activemq-data/two/tmp_storage count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 count 90 INFO | Kaha Store using data directory activemq-data/one/tmp_storage DEBUG | two remove request on vm://two#2 from one , consumer id: ID:SIM-APPLE.local-57893-1273267004592-4:0:3:1, matching sub: null INFO | ActiveMQ Message Broker (one, ID:SIM-APPLE.local-57893-1273267004592-0:0) is shutting down DEBUG | stopping one bridge to two INFO | Connector vm://one Stopped INFO | one bridge to two stopped INFO | Network Connector network1 Stopped WARN | Network connection between vm://two#2 and tcp://localhost/127.0.0.1:61616 shutdown due to a remote error: java.io.EOFException INFO | Connector tcp://SIM-APPLE.local:61616 Stopped DEBUG | The remote Exception was: java.io.EOFException java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:375) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:211) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186) at java.lang.Thread.run(Thread.java:637) DEBUG | stopping two bridge to one DEBUG | Caught exception sending shutdown org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616 at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:235) at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83) at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.network.DemandForwardingBridgeSupport$5.run(DemandForwardingBridgeSupport.java:381) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:637) INFO | Connector vm://two Stopped INFO | two bridge to one stopped INFO | Establishing network connection from vm://two to tcp://localhost:61616 INFO | Connector vm://two Started INFO | Connector vm://two Stopped WARN | Could not start network bridge between: vm://two and: tcp://localhost:61616 due to: java.net.ConnectException: Connection refused INFO | ActiveMQ JMS Message Broker (one, ID:SIM-APPLE.local-57893-1273267004592-0:0) stopped INFO | ActiveMQ Message Broker (two, ID:SIM-APPLE.local-57893-1273267004592-0:1) is shutting down INFO | Network Connector network1 Stopped INFO | Connector tcp://SIM-APPLE.local:51515 Stopped INFO | ActiveMQ JMS Message Broker (two, ID:SIM-APPLE.local-57893-1273267004592-0:1) stopped
On Fri, May 7, 2010 at 10:20 PM, Jamie McCrindle <jamiemccrin...@gmail.com> wrote: > Oh, this is using: > > ActiveMQ 5.3.1 > Spring 2.5.6 > > On Fri, May 7, 2010 at 10:19 PM, Jamie McCrindle > <jamiemccrin...@gmail.com> wrote: >> Greetings all, >> >> After some weeks of scratching my head, I _believe_ I have found the >> magic combination that appears to be causing messages to become stuck >> in our network of brokers. It's so convoluted that it's entirely >> likely that the error isn't what I think it is but I have managed to >> create a test case that mirrors the behaviour we're seeing live. And >> it goes something like this: >> >> We have two brokers in a network of brokers. Producers were publishing >> to a queue on one of the brokers and consumers reading off the queue >> in the other broker. After a short while, messages would suddenly pile >> up on the 'producer' broker and not get read off on the 'consumer' >> broker. >> >> Pause for lots of random testing... >> >> It appeared that the network bridge subscription from the 'consumer' >> broker was disappearing from the 'producer' broker, causing the pile >> up. >> >> More testing later... >> >> And it looks like if we have a DefaultMessageListenerContainer with >> the following configuration: >> >> maxMessagesPerTask: 1 >> cacheLevel: CONSUMER >> maxConcurrentConsumer: 3 (more than 1, basically) >> concurrentConsumers: 1 >> sessionAcknowledgeMode: Session.AUTO_ACKNOWLEDGE >> >> When Spring scales back the dynamic amount of consumers, the network >> subscription appears to get lost and messages pile up on the producer >> side. >> >> Workaround: >> >> Use concurrentConsumers instead of maxConcurrentConsumers so that >> there are a static number of consumers (not setting maxMessagesPerTask >> also seems to work). >> >> I'll add it this to Jira if you'd like but the test case to replicate >> the behaviour is as follows: >> >> >> package org.example.activemq; >> >> import java.util.concurrent.Callable; >> import java.util.concurrent.CountDownLatch; >> import java.util.concurrent.ExecutorService; >> import java.util.concurrent.Executors; >> import java.util.concurrent.TimeUnit; >> >> import javax.jms.JMSException; >> import javax.jms.Message; >> import javax.jms.MessageListener; >> import javax.jms.Session; >> import javax.jms.TextMessage; >> >> import junit.framework.TestCase; >> >> import org.apache.activemq.ActiveMQConnectionFactory; >> import org.apache.activemq.broker.BrokerService; >> import org.apache.activemq.command.ActiveMQQueue; >> import org.apache.activemq.network.NetworkConnector; >> import org.apache.activemq.pool.PooledConnectionFactory; >> import org.apache.activemq.store.memory.MemoryPersistenceAdapter; >> import org.springframework.jms.core.JmsTemplate; >> import org.springframework.jms.core.MessageCreator; >> import org.springframework.jms.listener.DefaultMessageListenerContainer; >> >> public class NetworkTest extends TestCase { >> >> public void testNetworkOfBrokers() throws Exception { >> BrokerService brokerService1 = null; >> BrokerService brokerService2 = null; >> >> final int total = 100; >> final CountDownLatch latch = new CountDownLatch(total); >> >> try { >> >> { >> brokerService1 = new BrokerService(); >> brokerService1.setBrokerName("one"); >> brokerService1.setUseJmx(false); >> brokerService1.setPersistenceAdapter(new >> MemoryPersistenceAdapter()); >> brokerService1.addConnector("tcp://0.0.0.0:61616"); >> NetworkConnector network1 = >> brokerService1.addNetworkConnector("static:(tcp://localhost:51515)"); >> network1.setName("network1"); >> network1.setDynamicOnly(true); >> network1.setNetworkTTL(3); >> network1.setPrefetchSize(1); >> brokerService1.start(); >> } >> >> { >> brokerService2 = new BrokerService(); >> brokerService2.setBrokerName("two"); >> brokerService2.setUseJmx(false); >> brokerService2.setPersistenceAdapter(new >> MemoryPersistenceAdapter()); >> brokerService2.addConnector("tcp://0.0.0.0:51515"); >> NetworkConnector network2 = >> brokerService2.addNetworkConnector("static:(tcp://localhost:61616)"); >> network2.setName("network1"); >> network2.setDynamicOnly(true); >> network2.setNetworkTTL(3); >> network2.setPrefetchSize(1); >> brokerService2.start(); >> } >> >> ExecutorService pool = Executors.newSingleThreadExecutor(); >> >> ActiveMQConnectionFactory connectionFactory1 = new >> ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:51515)?randomize=false"); >> >> >> final DefaultMessageListenerContainer container = new >> DefaultMessageListenerContainer(); >> container.setConnectionFactory(connectionFactory1); >> container.setMaxConcurrentConsumers(10); >> container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); >> >> container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); >> container.setDestination(new ActiveMQQueue("testingqueue")); >> container.setMessageListener(new MessageListener() { >> public void onMessage(Message message) { >> latch.countDown(); >> } >> }); >> container.setMaxMessagesPerTask(1); >> container.afterPropertiesSet(); >> container.start(); >> >> pool.submit(new Callable<Object>() { >> public Object call() throws Exception { >> try { >> final int batch = 10; >> ActiveMQConnectionFactory connectionFactory2 = new >> ActiveMQConnectionFactory("failover:(tcp://localhost:51515,tcp://localhost:61616)?randomize=false"); >> PooledConnectionFactory pooledConnectionFactory = >> new PooledConnectionFactory(connectionFactory2); >> JmsTemplate template = new >> JmsTemplate(pooledConnectionFactory); >> ActiveMQQueue queue = new ActiveMQQueue("testingqueue"); >> for(int b = 0; b < batch; b++) { >> for(int i = 0; i < (total / batch); i++) { >> template.send(queue, new MessageCreator() { >> public Message createMessage(Session >> session) throws JMSException { >> TextMessage message = >> session.createTextMessage(); >> message.setText("Hello World!"); >> return message; >> } >> }); >> } >> // give spring time to scale back again >> while(container.getActiveConsumerCount() > 1) { >> System.out.println("active consumer count: >> " + container.getActiveConsumerCount()); >> System.out.println("concurrent consumer >> count: " + container.getConcurrentConsumers()); >> Thread.sleep(1000); >> } >> } >> pooledConnectionFactory.stop(); >> } catch(Throwable t) { >> t.printStackTrace(); >> } >> return null; >> } >> }); >> >> pool.shutdown(); >> pool.awaitTermination(10, TimeUnit.SECONDS); >> >> int count = 0; >> >> // give it 20 seconds >> while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) { >> System.out.println("count " + latch.getCount()); >> } >> >> >> container.destroy(); >> >> } finally { >> try { if(brokerService1 != null) { brokerService1.stop(); >> }} catch(Throwable t) { t.printStackTrace(); } >> try { if(brokerService2 != null) { brokerService2.stop(); >> }} catch(Throwable t) { t.printStackTrace(); } >> } >> >> if(latch.getCount() > 0) { >> fail("latch should have gone down to 0 but was " + >> latch.getCount()); >> } >> >> } >> >> } >> >