I might be able to help a little.
Here is a blog I am working on that has a Message Driven POJO through
Spring.
http://www.baselogic.com/blog/java/testing-activemq-virtualtopics-using-camel-and-junit
I have a unit test that starts an embedded AMQ broker, then starts a jms
listener for the MDP:
<jms:listener-container client-id="jmsContainer1"
transaction-manager="transactionManager">
<jms:listener id="jmsListener1"
destination="Consumer.1.VirtualTopic.Table.1"
ref="testClient1"
method="onMessage" />
</jms:listener-container>
Then I use Mocks to test I get the messages delivered.
Because as i look at your stack, it seems that the broker might not be
started, and/or your MDB has not successfully connected to your destination.
Maybe Spring can be an easier route?
---
Thank You…
Mick Knutson, President
BASE Logic, Inc.
Enterprise Architecture, Design, Mentoring & Agile Consulting
p. (866) BLiNC-411: (254-6241-1)
f. (415) 685-4233
Website: http://baselogic.com
Linked IN: http://linkedin.com/in/mickknutson
Vacation Rental: http://tahoe.baselogic.com
---
On Mon, Sep 7, 2009 at 12:08 AM, Neo Wang <[email protected]> wrote:
>
> Environment: JBoss 5.1GA, Active MQ 5.2 embeded broker configuration
>
> I want to send a message to a topic when receiving one message in a MDB,
> but
> it can't be successful all the time, any one can help me? I have struggled
> in it about 2 weeks.
>
> It is my sample code:
>
> package com.trading.platform.ejb;
>
> import javax.ejb.MessageDrivenBean;
> import javax.ejb.MessageDrivenContext;
> import javax.jms.Message;
> import javax.jms.MessageListener;
> import javax.jms.Queue;
> import javax.jms.QueueConnectionFactory;
> import javax.jms.TextMessage;
> import javax.jms.Topic;
> import javax.jms.TopicConnection;
> import javax.jms.TopicConnectionFactory;
> import javax.jms.TopicPublisher;
> import javax.jms.TopicSession;
> import javax.naming.Context;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
>
> import com.trading.platform.context.ServerContext;
> import com.trading.platform.exception.InvokeException;
> import com.trading.platform.exception.SystemException;
> import com.trading.platform.handler.MessageHandler;
> import com.trading.platform.log.ILogger;
> import com.trading.platform.log.Logger;
>
> public class OrderResponseMDB implements MessageDrivenBean,
> MessageListener{
> private ILogger logger = Logger.getLogger(this.getClass());
> private MessageDrivenContext messageDrivenContext;
>
> private volatile TopicConnectionFactory topicConnnectionFactory;
> private volatile Topic topic;
>
> private void getInitialContext(){
> try {
> Context context = new InitialContext();
> topicConnnectionFactory =
>
> (TopicConnectionFactory)context.lookup("java:comp/env/jms/TopicConnectionFactory");
> //topic =
> (Topic)context.lookup("java:comp/env/jms/OrderBroadcastTopic");
> if(logger.isDebugEnabled())
> logger.debug("Platform:Publisher Topic
> Session connnection is created..." );
> } catch (NamingException e) {
> if(logger.isErrorEnabled())
> logger.error("NamingExcetion:" + e);
> throw new SystemException("System Error", e);
> }
> }
>
> public void onMessage(Message inMessage) {
>
> TopicConnection topicConn = null;
> try {
> if (inMessage instanceof TextMessage) {
> TextMessage txtmsg = (TextMessage)inMessage;
>
> System.out.println("MESSAGE BEAN3: Message received:
> index="+txtmsg.getText());
> topicConn =
> topicConnnectionFactory.createTopicConnection();
> topicConn.start();
> TopicSession topicSession =
> topicConn.createTopicSession(false,
> TopicSession.AUTO_ACKNOWLEDGE);
> topic =
> topicSession.createTopic("topic.orderbroadcast");
>
> TopicPublisher publisher =
> topicSession.createPublisher(topic);
> TextMessage newMsg =
> topicSession.createTextMessage("11111");
> publisher.send(newMsg);
>
> //publisher.close();
> //topicSession.close();
> topicConn.close();
> } else {
> System.out.println("WRONG MESSAGE TYPE received: " +
> inMessage.getClass().getName());
> }
> } catch (Throwable t) {
> System.out.println("error...");
> t.printStackTrace();
> messageDrivenContext.setRollbackOnly();
>
> try {
> if (topicConn != null) topicConn.close();
> System.out.println("MESSAGE BEAN: Rollback:
> index="+((TextMessage)inMessage).getText());
> } catch (Exception e) {
> System.out.println("MESSAGE BEAN: Unable to get
> index
> property because:"+e.getMessage());
> e.printStackTrace();
>
> }
> throw new RuntimeException(t.getMessage());
> }
> }
>
>
> public void ejbCreate(){
> System.out.println("ConnectionFactory created...");
> getInitialContext();
> }
>
> public void ejbRemove(){
>
> }
>
> public void setMessageDrivenContext(MessageDrivenContext
> messageDrivenContext){
> this.messageDrivenContext = messageDrivenContext;
> }
> }
>
> The following is the exception information:
>
> 14:59:09,601 WARN [Service] Async error occurred:
> javax.transaction.xa.XAException: Transaction
>
> 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
> has not been started.
> javax.transaction.xa.XAException: Transaction
>
> 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
> has not been started.
> at
>
> org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)
> at
>
> org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)
> at
> org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
> at
>
> org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
> at
>
> org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)
> at
>
> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)
> at
> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)
> at
>
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
> at
>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
> at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> at
>
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> at
>
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
> at
>
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> at java.lang.Thread.run(Thread.java:595)
> 14:59:09,614 WARN [ActiveMQManagedConnection] Connection failed:
> javax.jms.JMSException: Transaction
>
> 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
> has not been started.
> 14:59:09,615 WARN [TxConnectionManager] Connection error occured:
>
> org.jboss.resource.connectionmanager.txconnectionmanager$txconnectioneventliste...@18936ae
> [state=NORMAL
> mc=org.apache.activemq.ra.activemqmanagedconnect...@1365339 handles=0
> lastUse=1252306749595 permit=false trackByTx=false
>
> mcp=org.jboss.resource.connectionmanager.jbossmanagedconnectionpool$onep...@191c3ce
>
> context=org.jboss.resource.connectionmanager.internalmanagedconnectionp...@ce1472
> xaresource=org.apache.activemq.ra.activemqmanagedconnectio...@164e48d
> txSync=null]
> javax.jms.JMSException: Transaction
>
> 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
> has not been started.
> at
>
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
> at
>
> org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1784)
> at
>
> org.apache.activemq.ActiveMQConnection$2$1.run(ActiveMQConnection.java:1705)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
> at java.lang.Thread.run(Thread.java:595)
> Caused by: javax.transaction.xa.XAException: Transaction
>
> 'XID:131075:312d613061383764343a313862333a34616134616561303a3662:613061383764343a313862333a34616134616561303a3663'
> has not been started.
> at
>
> org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:266)
> at
>
> org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:208)
> at
> org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:126)
> at
>
> org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
> at
>
> org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:133)
> at
>
> org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:455)
> at
> org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:639)
> at
>
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
> at
>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
> at
>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> at
>
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> at
>
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
> at
>
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
> at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> ... 1 more
>
>
>
> --
> View this message in context:
> http://www.nabble.com/How-to-send-one-message-to-a-topic-in-MDB--tp25325925p25325925.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>