Well i can post you my code which is very simple. I have just intergrated activemq with jboss and then setup an MDB bean which listens to a queue and then once it gets the msg from the queue it trys to post it to a topic.
code is : package com.db.abmonitor.mdb; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jboss.logging.Logger; import javax.ejb.CreateException; import javax.ejb.MessageDrivenBean; import javax.ejb.MessageDrivenContext; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.naming.NamingException; import java.rmi.RemoteException; /** * @author mailto:[EMAIL PROTECTED] Michael Gaffney */ public class AutobahnMonitorMDB implements MessageDrivenBean, MessageListener { // private static final String SENDER_NAME = "java:comp/env/ejb/Sender"; private static Logger logger = Logger.getLogger(PublishMsgToTopic.class.getName()); private MessageDrivenContext context; private PublishMsgToTopic processMSG; private int counter; private boolean verbose = true; private boolean transacted = false; private Connection connection; private Session session; private String url = "tcp://localhost:61616"; private Topic topic; private MessageProducer publisher; public AutobahnMonitorMDB() { if (logger.isInfoEnabled()) { logger.info("Autobahn Monitor MDB."); } } public void onMessage(Message message) { if (logger.isInfoEnabled()) { logger.info("Autobahn Monitor.onMessage"); } //if (message instanceof ObjectMessage) { //System.out.println("Received: " + message); try { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic("TOOL.TOPICTEST"); //control = session.createTopic("topictest.control"); publisher = session.createProducer(topic); publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //session.createConsumer(control).setMessageListener(this); connection.start(); //request shutdown publisher.send(message); //connection.stop(); //connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } //processMSG = new PublishMsgToTopic(message); //processMSG.run(); //} else { // if (verbose) { // System.out.println("Received: " + message); // } //} /* Used for Hanlde message for Remote / Internal EJB try { handleMessage(message); } catch (JMSException e) { logger.error(e.toString(), e); } catch (NamingException e) { logger.error(e.toString(), e); } catch (RemoteException e) { logger.error(e.toString(), e); } catch (CreateException e) { logger.error(e.toString(), e); //} catch (SenderException e) { // logger.error(e.toString(), e); } */ } public void ejbRemove() { if (logger.isInfoEnabled()) { logger.info("Autobahn Monitor.ejbRemove"); } } public void setMessageDrivenContext(MessageDrivenContext messageDrivenContext) { if (logger.isInfoEnabled()) { logger.info("Autobahn Monitor.setMessageDrivenContext"); } context = messageDrivenContext; } public void ejbCreate() { if (logger.isInfoEnabled()) { logger.info("Autobahn Monitor.ejbCreate"); } } /* Old Routine used to EJB Sender Remote private void handleMessage(Message message) throws JMSException, NamingException, RemoteException, CreateException { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; if (logger.isInfoEnabled()) { logger.info("Message received: " + textMessage.getText()); } // send(textMessage.getText()); } else { if (logger.isInfoEnabled()) { logger.info("Unknown message type received: " + message.toString()); } // send("Unknown message type: " + message.toString()); } } */ private void PublishToTopic(Message msg){ //System.out.println("Received: " + msg); try { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); connection = factory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic("TOOL.TOPICTEST"); //control = session.createTopic("topictest.control"); publisher = session.createProducer(topic); publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //session.createConsumer(control).setMessageListener(this); connection.start(); //request shutdown publisher.send(msg); //connection.stop(); //connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } James.Strachan wrote: > > On 1/17/07, miniman <[EMAIL PROTECTED]> wrote: >> >> I am using apache-activemq-4.1.0-incubator >> >> Did these get fixed ? > > Yes - though I'm not sure why you are getting your error. Any chance > you could create a test case for us then we can fix it? > > -- > > James > ------- > http://radio.weblogs.com/0112098/ > > -- View this message in context: http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8410729 Sent from the ActiveMQ - User mailing list archive at Nabble.com.
