Well i can post you my code which is very simple. I have just intergrated
activemq with jboss and then setup an MDB bean which listens to a queue and
then once it gets the msg from the queue it trys to post it to a topic.



code is :


package com.db.abmonitor.mdb;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.logging.Logger;

import javax.ejb.CreateException;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.NamingException;
import java.rmi.RemoteException;

/**
 * @author  mailto:[EMAIL PROTECTED] Michael Gaffney  
 */

public class AutobahnMonitorMDB implements MessageDrivenBean,
MessageListener {

   // private static final String SENDER_NAME = "java:comp/env/ejb/Sender";
        private static Logger logger = 
Logger.getLogger(PublishMsgToTopic.class.getName());
    private MessageDrivenContext context;
    private PublishMsgToTopic processMSG;
    private int counter;
    private boolean verbose = true;
    private boolean transacted = false;
    
    
        private Connection connection;
        private Session session;
        private String url = "tcp://localhost:61616";
        private Topic topic;
        private MessageProducer publisher;

    public AutobahnMonitorMDB() {
        if (logger.isInfoEnabled()) {
            logger.info("Autobahn Monitor MDB.");
        }
    }

    public void onMessage(Message message)  {
        if (logger.isInfoEnabled()) {
            logger.info("Autobahn Monitor.onMessage");
        }
                        //if (message instanceof ObjectMessage) {

                                        //System.out.println("Received: " + 
message);

        
                try {
                        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(url);
                        connection = factory.createConnection();        
                        session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
                        topic = session.createTopic("TOOL.TOPICTEST");
                        //control = session.createTopic("topictest.control");
                        
                        publisher = session.createProducer(topic);
                        publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                        
                        
//session.createConsumer(control).setMessageListener(this);
                        connection.start();

                        //request shutdown
                        publisher.send(message);

                        //connection.stop();
                        //connection.close();
                } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }   
        
        
        
                                //processMSG = new PublishMsgToTopic(message);
                                //processMSG.run();
                        //} else {
                        //      if (verbose) {
                        //              System.out.println("Received: " + 
message);
                        //      }
                        //}
                /* Used for Hanlde message for Remote / Internal EJB
        try {
            handleMessage(message);
        } catch (JMSException e) {
            logger.error(e.toString(), e);
        } catch (NamingException e) {
            logger.error(e.toString(), e);
        } catch (RemoteException e) {
            logger.error(e.toString(), e);
        } catch (CreateException e) {
            logger.error(e.toString(), e);
        //} catch (SenderException e) {
        //    logger.error(e.toString(), e);
        }
        */
    }

        
    public void ejbRemove() {
        if (logger.isInfoEnabled()) {
            logger.info("Autobahn Monitor.ejbRemove");
        }
    }

    public void setMessageDrivenContext(MessageDrivenContext
messageDrivenContext) {
        if (logger.isInfoEnabled()) {
            logger.info("Autobahn Monitor.setMessageDrivenContext");
        }
        context = messageDrivenContext;
    }

    public void ejbCreate() {
        if (logger.isInfoEnabled()) {
            logger.info("Autobahn Monitor.ejbCreate");
        }
    }
    
    /* Old Routine used to EJB Sender Remote
    private void handleMessage(Message message) throws JMSException,
NamingException, RemoteException, CreateException {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            if (logger.isInfoEnabled()) {
                logger.info("Message received: " + textMessage.getText());
            }
           // send(textMessage.getText());
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Unknown message type received: " +
message.toString());
            }
           // send("Unknown message type: " + message.toString());
        }
    }
    */
    
        private void PublishToTopic(Message msg){
                //System.out.println("Received: " + msg);

                try {
                        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(url);
                        connection = factory.createConnection();        
                        session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
                        topic = session.createTopic("TOOL.TOPICTEST");
                        //control = session.createTopic("topictest.control");
                        
                        publisher = session.createProducer(topic);
                        publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                        
                        
//session.createConsumer(control).setMessageListener(this);
                        connection.start();

                        //request shutdown
                        publisher.send(msg);

                        //connection.stop();
                        //connection.close();
                } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }     
        }
}


James.Strachan wrote:
> 
> On 1/17/07, miniman <[EMAIL PROTECTED]> wrote:
>>
>> I am using apache-activemq-4.1.0-incubator
>>
>> Did these get fixed ?
> 
> Yes - though I'm not sure why you are getting your error. Any chance
> you could create a test case for us then we can fix it?
> 
> -- 
> 
> James
> -------
> http://radio.weblogs.com/0112098/
> 
> 

-- 
View this message in context: 
http://www.nabble.com/Trouble-with-posting-messages-to-topic-tf3022650.html#a8410729
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to