Dear all, 

I used to BrokerService embedded in my java application and Advisory Message
to listener consumer join, consumer left(know everything about
consumer:consumer-id, client-id, ...). But I don't know how to
close/disable/stop consumer from broker service embedded or from Advisory
Message. Please guide me the way if you know. Thank you very much :)

                BrokerService brokerService = new BrokerService();
                brokerService.addConnector("tcp://localhost:61616");
                brokerService.setUseJmx(true);
                brokerService.start();


                System.out.println("Broker started.......");
                System.out.println("tcp://localhost:61616");
===
package com.fis.activemq.pubsub;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveInfo;

public class AdvisoryTest {
        static MessageListener listener = new MessageListener() {
                @Override
                public void onMessage(Message message) {
                        System.out.println("message: " + message);
                        if (message instanceof ActiveMQMessage) {
                                ActiveMQMessage activeMessage = 
(ActiveMQMessage) message;
                                Object command = 
activeMessage.getDataStructure();
                                if (command instanceof ConsumerInfo) {
                                        ConsumerInfo consumerInfo = 
(ConsumerInfo)command;
                                
System.out.println("consumerid="+((ConsumerInfo)command).getConsumerId());
                                        System.out.println("A consumer 
subscribed to a topic or queue: " +
command);
                                } else if (command instanceof RemoveInfo) {
                                        RemoveInfo removeInfo = (RemoveInfo) 
command;
                                        if (removeInfo.isConsumerRemove()) {
                                                
System.out.println("ObjectId="+removeInfo.getObjectId());
                                                System.out.println("A consumer 
unsubscribed from a topic or
queue"+command);
                                        } else {
                                                System.out.println("RemoveInfo, 
a connection was closed: " + command);
                                        }
                                } else if (command instanceof ConnectionInfo) {
                                        System.out.println("ConnectionInfo, a 
new connection was made: " +
command);
                                } else {
                                        System.out.println("Unknown command: " 
+ command);
                                }
                        }
                }
        };

        public static void main(String[] args) {
                try {
                        // ActiveMQConnectionFactory connectionFactory = new
                        // ActiveMQConnectionFactory(
                        // "user", "password", 
ActiveMQConnection.DEFAULT_BROKER_URL);
                        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
                        Connection connection = 
connectionFactory.createConnection();
                        Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                        connection.start();
                        //Destination destinationAdvisory =
session.createTopic("ActiveMQ.Advisory..>");
                        //Destination consumerTopicAdvisoryDest =
session.createTopic("ActiveMQ.Advisory.Consumer.Topic.>");
                        //Destination advisoryAll =
session.createTopic("ActiveMQ.Advisory.Topic.*");
                        Topic advisoryAll = 
session.createTopic("ActiveMQ.Advisory.Consumer.>");
                        //Topic advisoryAll = 
session.createTopic("ActiveMQ.Advisory.>");
                        
//                      MessageConsumer consumerAdvisory =
session.createConsumer(consumerTopicAdvisoryDest);
//                      consumerAdvisory.setMessageListener(listener);
                        //
                        MessageConsumer consumerAdvisoryAll =
session.createConsumer(advisoryAll);
                        consumerAdvisoryAll.setMessageListener(listener);
                } catch (Exception ex) {
                        ex.printStackTrace();
                }
        }
}




--
View this message in context: 
http://activemq.2283324.n4.nabble.com/How-to-close-consumer-from-broker-service-embedded-tp4681015.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to