Ok. Just found the cause. It's described in AMQ-776 (sorry I missed this one earlier when I was trolling around). I just needed to apply the DemandForwardingBridgeSupport.patch.
The subscriptions were not getting cleared on transport interrupted. Now they are and things seem to be working well. By the way, any expectation when this patch will be represented within a stable release? -----Original Message----- From: Chris Hofstaedter [mailto:[EMAIL PROTECTED] Sent: Friday, November 03, 2006 3:30 PM To: [email protected] Subject: Missing destination queue after broker restarted The following applies to 4.0.1, 4.0.2, and head. My topology is pretty simple in that I'm trying to set up a queue in which the producers all use embedded brokers and the consumer does not. I've done my test both collocated on the same machine and across the net. My test only includes a single producer at this point. "Log4Mobility" is the queue name. Upon initial startup, everything works well. Specifically, on initial startup, the debug log on the broker indicates (I've replaced what I believe to be non pertinent info with "..." for readability): INFO TransportConnector - Connector default Started INFO BrokerService - ActiveMQ JMS Message Broker (...) started DEBUG TcpTransport - TCP consumer thread starting DEBUG WireFormatNegotiator - Sending: WireFormatInfo ... DEBUG WireFormatNegotiator - Received WireFormat: ... DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1142 ... DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1142 ... DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.Connection DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.Topic DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.Consumer.Queue.Log4Mobility DEBUG WireFormatNegotiator - Sending: WireFormatInfo ... DEBUG TcpTransport - TCP consumer thread starting DEBUG WireFormatNegotiator - Received WireFormat: ... DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1144 ... DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1144 ... DEBUG AbstractRegion - Adding destination: queue://Log4Mobility Then, while leaving the consumer and producer running, I shut down the broker. Upon subsequent startup of the broker, the debug log on the broker indicates: INFO TransportConnector - Connector default Started INFO BrokerService - ActiveMQ JMS Message Broker (...) started DEBUG TcpTransport - TCP consumer thread starting DEBUG WireFormatNegotiator - Sending: WireFormatInfo ... DEBUG WireFormatNegotiator - Received WireFormat: ... DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1165 ... DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1165 ... DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.Connection DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.Topic DEBUG AbstractRegion - Adding destination: topic://ActiveMQ.Advisory.Consumer.Queue.Log4Mobility DEBUG WireFormatNegotiator - Sending: WireFormatInfo ... DEBUG TcpTransport - TCP consumer thread starting DEBUG WireFormatNegotiator - Received WireFormat: ... DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1166 ... DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1166 ... It's virtually the same except that the message about actually adding the queue is notably missing. I've also confirmed through bstat that the queue is present before the restart but not present after the restart. At this point, no messages flow. Oddly, the producer thinks it's forwarding the messages over the bridge as indicated by its debug log from the point at which it reestablishes its connection: DEBUG FailoverTransport - Connection established INFO DemandForwardingBridge - Outbound transport to Log4Mobility resumed INFO DemandForwardingBridge - Network connection between vm://localhost#0 and tcp://10.47.20.123:61616(Log4Mobility) has been established. DEBUG DemandForwardingBridge - Ignoring sub ConsumerInfo {commandId = 4, responseRequired = true, consumerId = ID:Hofstaedter-1141-1162581700794-1:6:1:1, destination = queue://Log4Mobility, prefetchSize = 1, maximumPendingMessageLimit = 0, browser = false, dispatchAsync = false, selector = null, subcriptionName = null, noLocal = false, exclusive = false, retroactive = false, priority = 0, brokerPath = [ID:Hofstaedter-1158-1162581728012-1:0], optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate = null} already subscribed to matching destination DEBUG PrefetchSubscription - Prefetch limit. DEBUG DemandForwardingBridge - Forwarding messages for static destination: queue://Log4Mobility The fourth log message in the set immediately above is interesting and I'm unsure if it is related to the problem. I think it may related to that message b/c everything picks back up again if I restart the producer. I've seen messages from others on this list about failures on reconnect - but they are always accompanied by specific exceptions and stack traces and I see none of those - so I don't think it's the same thing. In fact, I'm fairly new to AMQ, so it could be as simple as my config or client code. The config I'm using is: <beans xmlns="http://www.springframework.org/schema/beans" xmlns:util="http://www.springframework.org/schema/util"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfi gurer"/> <broker brokerName="Log4Mobility" persistent="true" useShutdownHook="false" useJmx="true" xmlns="http://activemq.org/config/1.0"> <managementContext> <managementContext connectorPort="1099" jmxDomainName="org.apache.activemq"/> </managementContext> <persistenceAdapter> <journaledJDBC journalLogFiles="5" dataDirectory="${activemq.home}/activemq-data"/> </persistenceAdapter> <transportConnectors> <transportConnector name="default" uri=""tcp://10.47.20.123:61616"/> </transportConnectors> </broker> </beans> And finally, in case I'm performing some stupid code tricks, following is my producer and consumer code to establish my connections: Producer: m_broker = new BrokerService(); m_broker.addConnector("vm://localhost"); m_broker.setUseLoggingForShutdownErrors(false); m_broker.setUseShutdownHook(true); m_broker.setUseJmx(false); m_broker.setPersistent(m_persistent); DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(); nc.setUri(new URI("static:" + m_url)); nc.setFailover(true); nc.addStaticallyIncludedDestination(new ActiveMQQueue(m_queueName)); nc.setUserName(m_userName); nc.setPassword(m_password); m_broker.addNetworkConnector(nc); m_broker.start(); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(m_user, m_password, "vm://localhost"); m_connection = factory.createConnection(); m_connection.start(); m_session = m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE); m_destination = m_session.createQueue(m_queueName); m_producer = m_session.createProducer(m_destination); if (m_persistent) m_producer.setDeliveryMode(DeliveryMode.PERSISTENT); else m_producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Consumer: ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); factory.setBrokerURL(m_url); factory.setUserName(m_userName); factory.setPassword(m_password); m_connection = factory.createConnection(); m_session = m_connection.createSession(false, Session.AUTO_ACKNOWLEDGE); m_destination = m_session.createQueue(m_queueName + "?consumer.prefetchSize=1"); m_consumer = m_session.createConsumer(m_destination); m_connection.setExceptionListener(this); m_connection.start(); m_consumer.setMessageListener(this); Thanks in advance for any help or insight anyone can provide.
