Help!

I have a transactional JMS consumer invoked by Camel for messages on
"testQueueA" -- it needs to produce/send thousands of messages to the JMS
queue "testQueueB".  Depending upon how many messages need to be sent, this
scenario hangs.  This happens when talking to ActiveMQ via TCP.

I wrote a simplified test case, and one interesting thing I found was that
the "bigger" my messages were, the fewer would be sent before everything
hangs.  It implied to me that there was some sort of buffer limit being
imposed.

Here's the code:

import java.util.logging.Logger;
import org.apache.camel.*;
import org.springframework.beans.factory.annotation.*;

public class ConsumerThatAlsoProduces {
    Logger logger = Logger.getLogger(getClass().getName());
    @Autowired
    ProducerTemplate producerTemplate;
    @Autowired
    @Qualifier("testQueueB")
    Endpoint testQueueB;
   
    public void onTestMessage(TestMessage msg) {
        logger.info("Received from A: " + msg);
        final int numToProduce = 20000;
        logger.info("Producing " + numToProduce + " messages on queueB");
        for (int k = 1; k <= numToProduce; ++k) {
            logger.info("Sending " + k + " of " + numToProduce);
            producerTemplate.sendBody(testQueueB, new TestMessage(k));
        }
    }
}

-----

public class TestMessage implements java.io.Serializable {
    int value;
    byte[] theBiggerThisIsTheFasterTheProblemHappens = new byte[32768];

    public TestMessage(int value) {
        this.value = value;
    }
    
    public String toString() {
        return "TestMessage{value=" + value + "}";
    }
}

When I force a thread dump, this is what I'm seeing for the thread of
interest:

"DefaultMessageListenerContainer-1" prio=5 tid=0x00000001019ff800
nid=0x111cae000 runnable [0x0000000111cab000..0x0000000111cadad0]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at
java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
        at
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:96)
        at java.io.DataOutputStream.write(DataOutputStream.java:90)
        - locked <0x000000010703aa28> (a java.io.DataOutputStream)
        at
org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightMarshalByteSequence2(BaseDataStreamMarshaller.java:432)
        at
org.apache.activemq.openwire.v3.MessageMarshaller.tightMarshal2(MessageMarshaller.java:173)
        at
org.apache.activemq.openwire.v3.ActiveMQMessageMarshaller.tightMarshal2(ActiveMQMessageMarshaller.java:90)
        at
org.apache.activemq.openwire.v3.ActiveMQObjectMessageMarshaller.tightMarshal2(ActiveMQObjectMessageMarshaller.java:90)
        at
org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:240)
        - locked <0x000000010701bb78> (a
org.apache.activemq.openwire.OpenWireFormat)
        at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:166)
        at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237)
        - locked <0x000000010705a298> (a
java.util.concurrent.atomic.AtomicBoolean)
        at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
        at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
        - locked <0x0000000106fdd0e8> (a java.lang.Object)
        at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
        at
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
        at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
        at
org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1676)
        - locked <0x0000000106fe9f00> (a java.lang.Object)
        at
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
        at
org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
        - locked <0x0000000107095d88> (a
org.apache.activemq.ActiveMQMessageProducer)
        at
org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
        at
org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:597)
        at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:237)
        at
org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:574)
        at
org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:551)
        at
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)
        at
org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:548)
        at
org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:301)
        at
org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:165)
        at
org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:151)
        at
org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:136)
        at
org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:150)
        at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:86)
        at
org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:98)
        at
org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:111)
        at
ConsumerThatAlsoProduces.onTestMessage(ConsumerThatAlsoProduces.java:19)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at
org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:173)
        at
org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:95)
        at
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:111)
        at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
        at
org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
        at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
        at
org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:110)
        at
org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)
        at
org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128)
        at
org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:80)
        at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
        at
org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
        at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
        at
org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:54)
        at
org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48)
        at
org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:76)
        at
org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:543)
        at
org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482)
        at
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
        at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
        at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:241)
        at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
        at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:881)
        at java.lang.Thread.run(Thread.java:637)


I'm pretty sure the problem is that since my consumer is running inside a
JMS transaction, the messages it tries to send are being "buffered" by the
ActiveMQ server until the transaction commits.  Thus the catch...the
transaction won't commit until my consumer returns, which won't happen until
all of the messages are sent.

On this link http://activemq.apache.org/how-do-transactions-work.html I
noticed this comment:

"Now the operations carried out on a transacted session inside a
transaction, like a send message or acknowledge message, do not really
perform a real send or acknowledge until the commit occurs. So the Broker
explicitly handles these cases separately - essentially buffering up the
commands until the commit occurs when the messages are really sent or
acknowledged."

What I find interesting is that if I run an embedded ActiveMQ broker and
talk via vm://localhost instead of tcp://localhost:61616, the problem goes
away.  This ONLY happens when talking TCP.

FWIW, I've been using Camel 1.6.1 with ActiveMQ 5.2.0, but I have also
tested this against ActiveMQ 5.3-SNAPSHOT (June 1, 2009), which uses Camel
2.0.  Both straight out of the box.  Same results with both old & new
versions.

Here's my Spring context, FWIW:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:camel="http://activemq.apache.org/camel/schema/spring";
       xmlns:context="http://www.springframework.org/schema/context";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
       http://camel.apache.org/schema/spring
       http://camel.apache.org/schema/spring/camel-spring.xsd
       http://www.springframework.org/schema/context
      
http://www.springframework.org/schema/context/spring-context-2.5.xsd";>

  <context:annotation-config/>

  <bean id="jmsConnectionFactory"
        class="org.apache.activemq.pool.PooledConnectionFactory"
        destroy-method="stop">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
      </bean>
    </property>
    <property name="maxConnections">
      <value>5</value>
    </property>
  </bean>

  <bean id="jmsTransactionManager"
        class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
  </bean>

  <bean id="activemq"
        class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
    <property name="transacted" value="true"/>
    <property name="transactionManager" ref="jmsTransactionManager"/>
  </bean>
  
  <camelContext id="camelContext"
                xmlns="http://camel.apache.org/schema/spring";>
    <template id="producerTemplate"/>
    <endpoint id="testQueueA" uri="activemq:queue:test.queueA"/>
    <endpoint id="testQueueB" uri="activemq:queue:test.queueB"/>
    <route>
      <from uri="activemq:queue:test.queueA?concurrentConsumers=1"/>
      <transacted/>
      <bean ref="consumerThatAlsoProduces" method="onTestMessage"/>
    </route>
  </camelContext>
  
  <bean id="consumerThatAlsoProduces"
        class="ConsumerThatAlsoProduces"
        autowire="byName"/>
  
</beans>


Is this a bug in ActiveMQ?  Does it really impose some weenie-small limit on
how much you can send back to the server while still in a transaction?  Is
it a hard limit?  Configurable?  Have I screwed up my configuration?  Should
I be using some different method to have my consumer in turn produce its
messages in separate transactions/connections?

What I love about Camel and the use of ProducerTemplate/Endpoint is that it
seriously simplifies my code...but obviously I forfeit control over JMS
Connection/Session + Transaction management...which I like!  But I would
expect that a combo like Camel + ActiveMQ would get along nicely!

I'm attaching a zip of my test (minus the "lib" dir, which contains all the
typical stuff you'd expect in an ActiveMQ/Camel setup -- Nabble wouldn't let
me upload anything big enough to contain the lib dir).  Maybe somebody can
take a peek at my code/config and let me know what I'm doing wrong...or let
me know if this is a bug in ActiveMQ?

http://www.nabble.com/file/p23833387/camel-activemq-bug.tar.gz
camel-activemq-bug.tar.gz 
http://www.nabble.com/file/p23833387/camel-activemq-bug.zip
camel-activemq-bug.zip 

Help!  Thanks!!

P.S. -- I originally posted this in the Camel - Users forum, but a reply
there suggested that this is probably an ActiveMQ bug and it should be
posted here (apologizes for the double post).

P.P.S. -- The use case behind this scenario is...when one "event" occurs, I
need to "alert" any number of my users (up to tens of thousands, or
eventually more) asynchronously.  We post the event to queue A, and the
consumer of queue A is responsible for posting the per-user alert requests
to queue B.  A consumer of queue B does the actual alert sends.  If anybody
can think of another way to accomplish this same goal without having a
"consumer that also produces thousands of messages," I'm open to hearing it!
-- 
View this message in context: 
http://www.nabble.com/BUG%3A-JMS-consumer-hangs-while-producing-profusely-tp23833387p23833387.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to