And more, here's the output (DemandForwardingBridge set to DEBUG):

 INFO | Successfully deleted temporary storage
 INFO | Using Persistence Adapter: MemoryPersistenceAdapter
 INFO | ActiveMQ 5.3.1 JMS Message Broker (one) is starting
 INFO | For help or more information please see: http://activemq.apache.org/
 INFO | Listening for connections at: tcp://SIM-APPLE.local:61616
 INFO | Connector tcp://SIM-APPLE.local:61616 Started
 INFO | Establishing network connection from vm://one to tcp://localhost:51515
 INFO | Connector vm://one Started
 INFO | Connector vm://one Stopped
 WARN | Could not start network bridge between: vm://one and:
tcp://localhost:51515 due to: java.net.ConnectException: Connection
refused
 INFO | Network Connector network1 Started
 INFO | ActiveMQ JMS Message Broker (one,
ID:SIM-APPLE.local-57893-1273267004592-0:0) started
 INFO | Successfully deleted temporary storage
 INFO | Using Persistence Adapter: MemoryPersistenceAdapter
 INFO | ActiveMQ 5.3.1 JMS Message Broker (two) is starting
 INFO | For help or more information please see: http://activemq.apache.org/
 INFO | Listening for connections at: tcp://SIM-APPLE.local:51515
 INFO | Connector tcp://SIM-APPLE.local:51515 Started
 INFO | Establishing network connection from vm://two to tcp://localhost:61616
 INFO | Connector vm://two Started
 INFO | Network Connector network1 Started
 INFO | ActiveMQ JMS Message Broker (two,
ID:SIM-APPLE.local-57893-1273267004592-0:1) started
DEBUG | two starting remote Bridge, localBroker=vm://two#2
DEBUG | counting down remoteBrokerNameKnownLatch with: BrokerInfo
{commandId = 0, responseRequired = false, brokerId =
ID:SIM-APPLE.local-57893-1273267004592-0:0, brokerURL =
tcp://SIM-APPLE.local:61616, slaveBroker = false, masterBroker =
false, faultTolerantConfiguration = false, networkConnection = false,
duplexConnection = false, peerBrokerInfos = [], brokerName = one,
connectionId = 0, brokerUploadUrl = null, networkProperties = null}
DEBUG | two starting local Bridge, localBroker=vm://two#2
 INFO | Network connection between vm://two#2 and
tcp://localhost/127.0.0.1:61616(one) has been established.
 INFO | Successfully connected to tcp://localhost:61616
 INFO | Successfully connected to tcp://localhost:51515
DEBUG | two bridging sub on vm://two#2 from one : ConsumerInfo
{commandId = 4, responseRequired = true, consumerId =
ID:SIM-APPLE.local-57893-1273267004592-4:0:1:1, destination =
queue://testingqueue, prefetchSize = 1000, maximumPendingMessageLimit
= 0, browser = false, dispatchAsync = true, selector = null,
subscriptionName = null, noLocal = false, exclusive = false,
retroactive = false, priority = 0, brokerPath = null,
optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate
= null}
DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 5,
responseRequired = true, messageId =
ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:1, originalDestination
= null, originalTransactionId = null, producerId =
ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination =
queue://testingqueue, transactionId = null, expiration = 0, timestamp
= 1273267005260, arrival = 0, brokerInTime = 1273267005261,
brokerOutTime = 1273267005272, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure =
null, redeliveryCounter = 0, size = 1040, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
text = Hello World!}
DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 6,
responseRequired = true, messageId =
ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:2, originalDestination
= null, originalTransactionId = null, producerId =
ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination =
queue://testingqueue, transactionId = null, expiration = 0, timestamp
= 1273267005271, arrival = 0, brokerInTime = 1273267005272,
brokerOutTime = 1273267005275, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure =
null, redeliveryCounter = 0, size = 1040, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
text = Hello World!}
DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 7,
responseRequired = true, messageId =
ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:3, originalDestination
= null, originalTransactionId = null, producerId =
ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination =
queue://testingqueue, transactionId = null, expiration = 0, timestamp
= 1273267005272, arrival = 0, brokerInTime = 1273267005272,
brokerOutTime = 1273267005282, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure =
null, redeliveryCounter = 0, size = 1040, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
text = Hello World!}
DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 8,
responseRequired = true, messageId =
ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:4, originalDestination
= null, originalTransactionId = null, producerId =
ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination =
queue://testingqueue, transactionId = null, expiration = 0, timestamp
= 1273267005273, arrival = 0, brokerInTime = 1273267005274,
brokerOutTime = 1273267005286, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure =
null, redeliveryCounter = 0, size = 1040, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
text = Hello World!}
DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 9,
responseRequired = true, messageId =
ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:5, originalDestination
= null, originalTransactionId = null, producerId =
ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination =
queue://testingqueue, transactionId = null, expiration = 0, timestamp
= 1273267005274, arrival = 0, brokerInTime = 1273267005275,
brokerOutTime = 1273267005302, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure =
null, redeliveryCounter = 0, size = 1040, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
text = Hello World!}
DEBUG | two Ignoring sub from one as already subscribed to matching
destination : ConsumerInfo {commandId = 10, responseRequired = true,
consumerId = ID:SIM-APPLE.local-57893-1273267004592-4:0:2:1,
destination = queue://testingqueue, prefetchSize = 1000,
maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true,
selector = null, subscriptionName = null, noLocal = false, exclusive =
false, retroactive = false, priority = 0, brokerPath = null,
optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate
= null}
active consumer count: 2
concurrent consumer count: 1
DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 10,
responseRequired = true, messageId =
ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:6, originalDestination
= null, originalTransactionId = null, producerId =
ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination =
queue://testingqueue, transactionId = null, expiration = 0, timestamp
= 1273267005275, arrival = 0, brokerInTime = 1273267005275,
brokerOutTime = 1273267005305, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure =
null, redeliveryCounter = 0, size = 1040, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
text = Hello World!}
DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 11,
responseRequired = true, messageId =
ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:7, originalDestination
= null, originalTransactionId = null, producerId =
ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination =
queue://testingqueue, transactionId = null, expiration = 0, timestamp
= 1273267005277, arrival = 0, brokerInTime = 1273267005280,
brokerOutTime = 1273267005306, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure =
null, redeliveryCounter = 0, size = 1040, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
text = Hello World!}
DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 12,
responseRequired = true, messageId =
ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:8, originalDestination
= null, originalTransactionId = null, producerId =
ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination =
queue://testingqueue, transactionId = null, expiration = 0, timestamp
= 1273267005283, arrival = 0, brokerInTime = 1273267005286,
brokerOutTime = 1273267005308, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure =
null, redeliveryCounter = 0, size = 1040, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
text = Hello World!}
DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 13,
responseRequired = true, messageId =
ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:9, originalDestination
= null, originalTransactionId = null, producerId =
ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination =
queue://testingqueue, transactionId = null, expiration = 0, timestamp
= 1273267005294, arrival = 0, brokerInTime = 1273267005294,
brokerOutTime = 1273267005310, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure =
null, redeliveryCounter = 0, size = 1040, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
text = Hello World!}
DEBUG | two Ignoring sub from one as already subscribed to matching
destination : ConsumerInfo {commandId = 15, responseRequired = true,
consumerId = ID:SIM-APPLE.local-57893-1273267004592-4:0:3:1,
destination = queue://testingqueue, prefetchSize = 1000,
maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true,
selector = null, subscriptionName = null, noLocal = false, exclusive =
false, retroactive = false, priority = 0, brokerPath = null,
optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate
= null}
DEBUG | bridging two -> one: ActiveMQTextMessage {commandId = 14,
responseRequired = true, messageId =
ID:SIM-APPLE.local-57893-1273267004592-4:1:1:1:10, originalDestination
= null, originalTransactionId = null, producerId =
ID:SIM-APPLE.local-57893-1273267004592-3:0:1:1, destination =
queue://testingqueue, transactionId = null, expiration = 0, timestamp
= 1273267005300, arrival = 0, brokerInTime = 1273267005301,
brokerOutTime = 1273267005314, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID
= null, content = null, marshalledProperties = null, dataStructure =
null, redeliveryCounter = 0, size = 1040, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
text = Hello World!}
 INFO | Establishing network connection from vm://one to tcp://localhost:51515
 INFO | Connector vm://one Started
DEBUG | one starting remote Bridge, localBroker=vm://one#4
DEBUG | counting down remoteBrokerNameKnownLatch with: BrokerInfo
{commandId = 0, responseRequired = false, brokerId =
ID:SIM-APPLE.local-57893-1273267004592-0:1, brokerURL =
tcp://SIM-APPLE.local:51515, slaveBroker = false, masterBroker =
false, faultTolerantConfiguration = false, networkConnection = false,
duplexConnection = false, peerBrokerInfos = [], brokerName = two,
connectionId = 0, brokerUploadUrl = null, networkProperties = null}
DEBUG | one starting local Bridge, localBroker=vm://one#4
 INFO | Network connection between vm://one#4 and
tcp://localhost/127.0.0.1:51515(two) has been established.
DEBUG | one Ignoring sub from two, already routed through this broker
once : ConsumerInfo {commandId = 4, responseRequired = false,
consumerId = ID:SIM-APPLE.local-57893-1273267004592-3:1:1:1,
destination = queue://testingqueue, prefetchSize = 1,
maximumPendingMessageLimit = 0, browser = false, dispatchAsync = true,
selector = null, subscriptionName = null, noLocal = false, exclusive =
false, retroactive = false, priority = 0, brokerPath =
[ID:SIM-APPLE.local-57893-1273267004592-0:0], optimizedAcknowledge =
false, noRangeAcks = false, additionalPredicate =
org.apache.activemq.command.networkbridgefil...@73c6641}
active consumer count: 3
concurrent consumer count: 1
DEBUG | two remove request on vm://two#2 from one , consumer id:
ID:SIM-APPLE.local-57893-1273267004592-4:0:2:1, matching sub: null
DEBUG | two remove request on vm://two#2 from one , consumer id:
ID:SIM-APPLE.local-57893-1273267004592-4:0:1:1, matching sub:
org.apache.activemq.network.demandsubscript...@6632060c
DEBUG | two remove local subscription for remote
ID:SIM-APPLE.local-57893-1273267004592-4:0:1:1
DEBUG | two removed sub on vm://two#2 from one :  ConsumerInfo
{commandId = 4, responseRequired = true, consumerId =
ID:SIM-APPLE.local-57893-1273267004592-4:0:1:1, destination =
queue://testingqueue, prefetchSize = 1000, maximumPendingMessageLimit
= 0, browser = false, dispatchAsync = true, selector = null,
subscriptionName = null, noLocal = false, exclusive = false,
retroactive = false, priority = 0, brokerPath =
[ID:SIM-APPLE.local-57893-1273267004592-0:0], optimizedAcknowledge =
false, noRangeAcks = false, additionalPredicate = null}
DEBUG | one remove request on vm://one#4 from two , consumer id:
ID:SIM-APPLE.local-57893-1273267004592-3:1:1:1, matching sub: null
 INFO | Kaha Store using data directory activemq-data/two/tmp_storage
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
count 90
 INFO | Kaha Store using data directory activemq-data/one/tmp_storage
DEBUG | two remove request on vm://two#2 from one , consumer id:
ID:SIM-APPLE.local-57893-1273267004592-4:0:3:1, matching sub: null
 INFO | ActiveMQ Message Broker (one,
ID:SIM-APPLE.local-57893-1273267004592-0:0) is shutting down
DEBUG |  stopping one bridge to two
 INFO | Connector vm://one Stopped
 INFO | one bridge to two stopped
 INFO | Network Connector network1 Stopped
 WARN | Network connection between vm://two#2 and
tcp://localhost/127.0.0.1:61616 shutdown due to a remote error:
java.io.EOFException
 INFO | Connector tcp://SIM-APPLE.local:61616 Stopped
DEBUG | The remote Exception was: java.io.EOFException
java.io.EOFException
        at java.io.DataInputStream.readInt(DataInputStream.java:375)
        at 
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
        at 
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:211)
        at 
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
        at 
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186)
        at java.lang.Thread.run(Thread.java:637)
DEBUG |  stopping two bridge to one
DEBUG | Caught exception sending shutdown
org.apache.activemq.transport.InactivityIOException: Channel was
inactive for too long: localhost/127.0.0.1:61616
        at 
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:235)
        at 
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
        at 
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
        at 
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
        at 
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at 
org.apache.activemq.network.DemandForwardingBridgeSupport$5.run(DemandForwardingBridgeSupport.java:381)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:637)
 INFO | Connector vm://two Stopped
 INFO | two bridge to one stopped
 INFO | Establishing network connection from vm://two to tcp://localhost:61616
 INFO | Connector vm://two Started
 INFO | Connector vm://two Stopped
 WARN | Could not start network bridge between: vm://two and:
tcp://localhost:61616 due to: java.net.ConnectException: Connection
refused
 INFO | ActiveMQ JMS Message Broker (one,
ID:SIM-APPLE.local-57893-1273267004592-0:0) stopped
 INFO | ActiveMQ Message Broker (two,
ID:SIM-APPLE.local-57893-1273267004592-0:1) is shutting down
 INFO | Network Connector network1 Stopped
 INFO | Connector tcp://SIM-APPLE.local:51515 Stopped
 INFO | ActiveMQ JMS Message Broker (two,
ID:SIM-APPLE.local-57893-1273267004592-0:1) stopped


On Fri, May 7, 2010 at 10:20 PM, Jamie McCrindle
<jamiemccrin...@gmail.com> wrote:
> Oh, this is using:
>
> ActiveMQ 5.3.1
> Spring 2.5.6
>
> On Fri, May 7, 2010 at 10:19 PM, Jamie McCrindle
> <jamiemccrin...@gmail.com> wrote:
>> Greetings all,
>>
>> After some weeks of scratching my head, I _believe_ I have found the
>> magic combination that appears to be causing messages to become stuck
>> in our network of brokers. It's so convoluted that it's entirely
>> likely that the error isn't what I think it is but I have managed to
>> create a test case that mirrors the behaviour we're seeing live. And
>> it goes something like this:
>>
>> We have two brokers in a network of brokers. Producers were publishing
>> to a queue on one of the brokers and consumers reading off the queue
>> in the other broker. After a short while, messages would suddenly pile
>> up on the 'producer' broker and not get read off on the 'consumer'
>> broker.
>>
>> Pause for lots of random testing...
>>
>> It appeared that the network bridge subscription from the 'consumer'
>> broker was disappearing from the 'producer' broker, causing the pile
>> up.
>>
>> More testing later...
>>
>> And it looks like if we have a DefaultMessageListenerContainer with
>> the following configuration:
>>
>> maxMessagesPerTask: 1
>> cacheLevel: CONSUMER
>> maxConcurrentConsumer: 3 (more than 1, basically)
>> concurrentConsumers: 1
>> sessionAcknowledgeMode: Session.AUTO_ACKNOWLEDGE
>>
>> When Spring scales back the dynamic amount of consumers, the network
>> subscription appears to get lost and messages pile up on the producer
>> side.
>>
>> Workaround:
>>
>> Use concurrentConsumers instead of maxConcurrentConsumers so that
>> there are a static number of consumers (not setting maxMessagesPerTask
>> also seems to work).
>>
>> I'll add it this to Jira if you'd like but the test case to replicate
>> the behaviour is as follows:
>>
>>
>> package org.example.activemq;
>>
>> import java.util.concurrent.Callable;
>> import java.util.concurrent.CountDownLatch;
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.Executors;
>> import java.util.concurrent.TimeUnit;
>>
>> import javax.jms.JMSException;
>> import javax.jms.Message;
>> import javax.jms.MessageListener;
>> import javax.jms.Session;
>> import javax.jms.TextMessage;
>>
>> import junit.framework.TestCase;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.activemq.broker.BrokerService;
>> import org.apache.activemq.command.ActiveMQQueue;
>> import org.apache.activemq.network.NetworkConnector;
>> import org.apache.activemq.pool.PooledConnectionFactory;
>> import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
>> import org.springframework.jms.core.JmsTemplate;
>> import org.springframework.jms.core.MessageCreator;
>> import org.springframework.jms.listener.DefaultMessageListenerContainer;
>>
>> public class NetworkTest extends TestCase {
>>
>>    public void testNetworkOfBrokers() throws Exception {
>>        BrokerService brokerService1 = null;
>>        BrokerService brokerService2 = null;
>>
>>        final int total = 100;
>>        final CountDownLatch latch = new CountDownLatch(total);
>>
>>        try {
>>
>>        {
>>            brokerService1 = new BrokerService();
>>            brokerService1.setBrokerName("one");
>>            brokerService1.setUseJmx(false);
>>            brokerService1.setPersistenceAdapter(new
>> MemoryPersistenceAdapter());
>>            brokerService1.addConnector("tcp://0.0.0.0:61616");
>>            NetworkConnector network1 =
>> brokerService1.addNetworkConnector("static:(tcp://localhost:51515)");
>>            network1.setName("network1");
>>            network1.setDynamicOnly(true);
>>            network1.setNetworkTTL(3);
>>            network1.setPrefetchSize(1);
>>            brokerService1.start();
>>        }
>>
>>        {
>>            brokerService2 = new BrokerService();
>>            brokerService2.setBrokerName("two");
>>            brokerService2.setUseJmx(false);
>>            brokerService2.setPersistenceAdapter(new
>> MemoryPersistenceAdapter());
>>            brokerService2.addConnector("tcp://0.0.0.0:51515");
>>            NetworkConnector network2 =
>> brokerService2.addNetworkConnector("static:(tcp://localhost:61616)");
>>            network2.setName("network1");
>>            network2.setDynamicOnly(true);
>>            network2.setNetworkTTL(3);
>>            network2.setPrefetchSize(1);
>>            brokerService2.start();
>>        }
>>
>>        ExecutorService pool = Executors.newSingleThreadExecutor();
>>
>>        ActiveMQConnectionFactory connectionFactory1 = new
>> ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:51515)?randomize=false");
>>
>>
>>        final DefaultMessageListenerContainer container = new
>> DefaultMessageListenerContainer();
>>        container.setConnectionFactory(connectionFactory1);
>>        container.setMaxConcurrentConsumers(10);
>>        container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
>>        
>> container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
>>        container.setDestination(new ActiveMQQueue("testingqueue"));
>>        container.setMessageListener(new MessageListener() {
>>            public void onMessage(Message message) {
>>                latch.countDown();
>>            }
>>        });
>>        container.setMaxMessagesPerTask(1);
>>        container.afterPropertiesSet();
>>        container.start();
>>
>>        pool.submit(new Callable<Object>() {
>>            public Object call() throws Exception {
>>                try {
>>                    final int batch = 10;
>>                    ActiveMQConnectionFactory connectionFactory2 = new
>> ActiveMQConnectionFactory("failover:(tcp://localhost:51515,tcp://localhost:61616)?randomize=false");
>>                    PooledConnectionFactory pooledConnectionFactory =
>> new PooledConnectionFactory(connectionFactory2);
>>                    JmsTemplate template = new
>> JmsTemplate(pooledConnectionFactory);
>>                    ActiveMQQueue queue = new ActiveMQQueue("testingqueue");
>>                    for(int b = 0; b < batch; b++) {
>>                        for(int i = 0; i < (total / batch); i++) {
>>                            template.send(queue, new MessageCreator() {
>>                                public Message createMessage(Session
>> session) throws JMSException {
>>                                    TextMessage message =
>> session.createTextMessage();
>>                                    message.setText("Hello World!");
>>                                    return message;
>>                                }
>>                            });
>>                        }
>>                        // give spring time to scale back again
>>                        while(container.getActiveConsumerCount() > 1) {
>>                            System.out.println("active consumer count:
>> " + container.getActiveConsumerCount());
>>                            System.out.println("concurrent consumer
>> count: " + container.getConcurrentConsumers());
>>                            Thread.sleep(1000);
>>                        }
>>                    }
>>                    pooledConnectionFactory.stop();
>>                } catch(Throwable t) {
>>                    t.printStackTrace();
>>                }
>>                return null;
>>            }
>>        });
>>
>>        pool.shutdown();
>>        pool.awaitTermination(10, TimeUnit.SECONDS);
>>
>>        int count = 0;
>>
>>        // give it 20 seconds
>>        while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
>>            System.out.println("count " + latch.getCount());
>>        }
>>
>>
>>        container.destroy();
>>
>>        } finally {
>>            try { if(brokerService1 != null) { brokerService1.stop();
>> }} catch(Throwable t) { t.printStackTrace(); }
>>            try { if(brokerService2 != null) { brokerService2.stop();
>> }} catch(Throwable t) { t.printStackTrace(); }
>>        }
>>
>>        if(latch.getCount() > 0) {
>>            fail("latch should have gone down to 0 but was " +
>> latch.getCount());
>>        }
>>
>>    }
>>
>> }
>>
>

Reply via email to