Help! I have a transactional JMS consumer invoked by Camel for messages on "testQueueA" -- it needs to produce/send thousands of messages to the JMS queue "testQueueB". Depending upon how many messages need to be sent, this scenario hangs. This happens when talking to ActiveMQ via TCP.
I wrote a simplified test case, and one interesting thing I found was that the "bigger" my messages were, the fewer would be sent before everything hangs. It implied to me that there was some sort of buffer limit being imposed. Here's the code: import java.util.logging.Logger; import org.apache.camel.*; import org.springframework.beans.factory.annotation.*; public class ConsumerThatAlsoProduces { Logger logger = Logger.getLogger(getClass().getName()); @Autowired ProducerTemplate producerTemplate; @Autowired @Qualifier("testQueueB") Endpoint testQueueB; public void onTestMessage(TestMessage msg) { logger.info("Received from A: " + msg); final int numToProduce = 20000; logger.info("Producing " + numToProduce + " messages on queueB"); for (int k = 1; k <= numToProduce; ++k) { logger.info("Sending " + k + " of " + numToProduce); producerTemplate.sendBody(testQueueB, new TestMessage(k)); } } } ----- public class TestMessage implements java.io.Serializable { int value; byte[] theBiggerThisIsTheFasterTheProblemHappens = new byte[32768]; public TestMessage(int value) { this.value = value; } public String toString() { return "TestMessage{value=" + value + "}"; } } When I force a thread dump, this is what I'm seeing for the thread of interest: "DefaultMessageListenerContainer-1" prio=5 tid=0x00000001019ff800 nid=0x111cae000 runnable [0x0000000111cab000..0x0000000111cadad0] java.lang.Thread.State: RUNNABLE at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:96) at java.io.DataOutputStream.write(DataOutputStream.java:90) - locked <0x000000010703aa28> (a java.io.DataOutputStream) at org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightMarshalByteSequence2(BaseDataStreamMarshaller.java:432) at org.apache.activemq.openwire.v3.MessageMarshaller.tightMarshal2(MessageMarshaller.java:173) at org.apache.activemq.openwire.v3.ActiveMQMessageMarshaller.tightMarshal2(ActiveMQMessageMarshaller.java:90) at org.apache.activemq.openwire.v3.ActiveMQObjectMessageMarshaller.tightMarshal2(ActiveMQObjectMessageMarshaller.java:90) at org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:240) - locked <0x000000010701bb78> (a org.apache.activemq.openwire.OpenWireFormat) at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:166) at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237) - locked <0x000000010705a298> (a java.util.concurrent.atomic.AtomicBoolean) at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83) at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0x0000000106fdd0e8> (a java.lang.Object) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225) at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219) at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1676) - locked <0x0000000106fe9f00> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231) at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74) - locked <0x0000000107095d88> (a org.apache.activemq.ActiveMQMessageProducer) at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59) at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:597) at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:237) at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:574) at org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:551) at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471) at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:548) at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:301) at org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:165) at org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:151) at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:136) at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:150) at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:86) at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:98) at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:111) at ConsumerThatAlsoProduces.onTestMessage(ConsumerThatAlsoProduces.java:19) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:173) at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:95) at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:111) at org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61) at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61) at org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52) at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147) at org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:110) at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33) at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128) at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:80) at org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52) at org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52) at org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147) at org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:54) at org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48) at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:76) at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:543) at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482) at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:241) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:881) at java.lang.Thread.run(Thread.java:637) I'm pretty sure the problem is that since my consumer is running inside a JMS transaction, the messages it tries to send are being "buffered" by the ActiveMQ server until the transaction commits. Thus the catch...the transaction won't commit until my consumer returns, which won't happen until all of the messages are sent. On this link http://activemq.apache.org/how-do-transactions-work.html I noticed this comment: "Now the operations carried out on a transacted session inside a transaction, like a send message or acknowledge message, do not really perform a real send or acknowledge until the commit occurs. So the Broker explicitly handles these cases separately - essentially buffering up the commands until the commit occurs when the messages are really sent or acknowledged." What I find interesting is that if I run an embedded ActiveMQ broker and talk via vm://localhost instead of tcp://localhost:61616, the problem goes away. This ONLY happens when talking TCP. FWIW, I've been using Camel 1.6.1 with ActiveMQ 5.2.0, but I have also tested this against ActiveMQ 5.3-SNAPSHOT (June 1, 2009), which uses Camel 2.0. Both straight out of the box. Same results with both old & new versions. Here's my Spring context, FWIW: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:camel="http://activemq.apache.org/camel/schema/spring" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <context:annotation-config/> <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> <property name="maxConnections"> <value>5</value> </property> </bean> <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="connectionFactory" ref="jmsConnectionFactory"/> <property name="transacted" value="true"/> <property name="transactionManager" ref="jmsTransactionManager"/> </bean> <camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring"> <template id="producerTemplate"/> <endpoint id="testQueueA" uri="activemq:queue:test.queueA"/> <endpoint id="testQueueB" uri="activemq:queue:test.queueB"/> <route> <from uri="activemq:queue:test.queueA?concurrentConsumers=1"/> <transacted/> <bean ref="consumerThatAlsoProduces" method="onTestMessage"/> </route> </camelContext> <bean id="consumerThatAlsoProduces" class="ConsumerThatAlsoProduces" autowire="byName"/> </beans> Is this a bug in ActiveMQ? Does it really impose some weenie-small limit on how much you can send back to the server while still in a transaction? Is it a hard limit? Configurable? Have I screwed up my configuration? Should I be using some different method to have my consumer in turn produce its messages in separate transactions/connections? What I love about Camel and the use of ProducerTemplate/Endpoint is that it seriously simplifies my code...but obviously I forfeit control over JMS Connection/Session + Transaction management...which I like! But I would expect that a combo like Camel + ActiveMQ would get along nicely! I'm attaching a zip of my test (minus the "lib" dir, which contains all the typical stuff you'd expect in an ActiveMQ/Camel setup -- Nabble wouldn't let me upload anything big enough to contain the lib dir). Maybe somebody can take a peek at my code/config and let me know what I'm doing wrong...or let me know if this is a bug in ActiveMQ? http://www.nabble.com/file/p23833387/camel-activemq-bug.tar.gz camel-activemq-bug.tar.gz http://www.nabble.com/file/p23833387/camel-activemq-bug.zip camel-activemq-bug.zip Help! Thanks!! P.S. -- I originally posted this in the Camel - Users forum, but a reply there suggested that this is probably an ActiveMQ bug and it should be posted here (apologizes for the double post). P.P.S. -- The use case behind this scenario is...when one "event" occurs, I need to "alert" any number of my users (up to tens of thousands, or eventually more) asynchronously. We post the event to queue A, and the consumer of queue A is responsible for posting the per-user alert requests to queue B. A consumer of queue B does the actual alert sends. If anybody can think of another way to accomplish this same goal without having a "consumer that also produces thousands of messages," I'm open to hearing it! -- View this message in context: http://www.nabble.com/BUG%3A-JMS-consumer-hangs-while-producing-profusely-tp23833387p23833387.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.