Hello there,I'm trying to *UNSUBSCRIBE durable subscribers from TOPICS*.My
app is a kind of social network : each user is a topic for other users. So,
each time a user is doing something, his friends are notified.Of course, a
subscriber may unsubscribe from a topic, wanting to receive notifications
about a user no more.Each time I'm trying to unsubscribe, I've got an error
telling me that : "javax.jms.JMSException: Durable consumer is in use"Here
are my 2 classes, the SENDER one and the RECEIVER one.Can someone tell me
what I'm doing wrong ??I'm just using basic features of
ActiveMQ...Thanx.*SENDER :*package com.citizenweb.classes;import
java.util.Date;import javax.jms.Connection;import
javax.jms.ConnectionFactory;import javax.jms.Destination;import
javax.jms.JMSException;import javax.jms.MessageFormatException;import
javax.jms.MessageProducer;import javax.jms.Session;import
javax.jms.TextMessage;import javax.jms.Topic;import
javax.jms.ObjectMessage;import org.apache.activemq.ActiveMQConnection;import
org.apache.activemq.ActiveMQConnectionFactory;import
org.apache.activemq.ActiveMQSession;import
com.citizenweb.interfaces.PostIF;import
com.citizenweb.interfaces.UserIF;public class Sender {  private
ActiveMQConnectionFactory factory = null;       private ActiveMQConnection
connection = null;      private ActiveMQSession session = null; private
Destination destination = null; private MessageProducer producer = null;
public Sender() {       }       public void connect(){          try{            
        factory = new
ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);               
        // TODO
Mécanisme de sécurité d'ActiveMQ à rétablir en production               
factory.setTrustAllPackages(true);                      connection = 
(ActiveMQConnection)
factory.createConnection();                     connection.start();             
        session =
(ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);    
} catch (JMSException e){                       e.printStackTrace();            
}       }               public void
sendPost(UserIF user,PostIF post) {             if(session==null){connect();}   
        try {           
destination = session.createTopic(user.toString());                     
producer =
session.createProducer(destination);                    ObjectMessage 
postMessage =
session.createObjectMessage();                  postMessage.setObject(post);    
        
producer.send(postMessage);                     System.out.println("\n SENDER 
Object message
sent");                                         /*QueueBrowser qb = 
session.createBrowser((Queue) destination);         
Enumeration<?> msgs = qb.getEnumeration();                      if ( 
!msgs.hasMoreElements() )
{                           System.out.println("No messages in queue");         
        } else {                            while
(msgs.hasMoreElements()) {                              Message tempMsg =
(Message)msgs.nextElement();                            
System.out.println("Message: " +
tempMsg);                           }                   }*/                     
                } catch (MessageFormatException e) {            
e.printStackTrace();            } catch (JMSException e) {                      
e.printStackTrace();            }       }       
public void sendInformation(UserIF user,String info){   
if(session==null){connect();}           try {                   destination =
session.createTopic(user.toString());                   producer =
session.createProducer(destination);                    TextMessage infoMessage 
=
session.createTextMessage();                    infoMessage.setText(info);      
        
producer.send(infoMessage);                     System.out.println("\n SENDER 
Information
message sent");         } catch (JMSException e) {                      
e.printStackTrace();            }       }       /**     
* @param args    * @throws Exception     */     public static void main(String[]
args) throws Exception {                                UserIF u1, u2, u3;      
        String[] nom = new
String[5];              String[] prenom = new String[5];                
String[] login = new
String[5];              String[] password = new String[5];              Date[] 
naiss = new Date[5];     
String[] mail = new String[5];          for (int i = 0; i < 5; i++) {           
        nom[i] =
"nom_" + i;                     prenom[i] = "prenom_" + i;                      
login[i] = "login_" + i;                
password[i] = "password_" + i;                  naiss[i] = new Date();          
        mail[i] = "mail_"
+ i;            }               System.out.println("\n SENDER AFFECTATION DES 
NOMS");           u1 = new
User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]);              
u2 = new
User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]);              
u3 = new
User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]);              
                        
Sender sender = new Sender();                           
sender.sendInformation(u1, "U1
notification");         sender.sendInformation(u2, "U2 notification");  
sender.sendInformation(u3, "U3 notification");          //PostIF post = new
Post("Mon Post","Ceci est mon message",u1,u1,"Classe Sender",((User)
u1).getIdUser(),0);             //sender.sendPost(user, post);          
sender.session.close(); 
sender.connection.close();      }}*RECEIVER :*package
com.citizenweb.classes;import java.io.Serializable;import
java.util.ArrayList;import java.util.Date;import java.util.List;import
javax.jms.JMSException;import javax.jms.Message;import
javax.jms.MessageConsumer;import javax.jms.MessageListener;import
javax.jms.ObjectMessage;import javax.jms.Session;import
javax.jms.TextMessage;import javax.jms.Topic;import
org.apache.activemq.ActiveMQConnection;import
org.apache.activemq.ActiveMQConnectionFactory;import
org.apache.activemq.ActiveMQSession;import
org.apache.activemq.broker.region.Destination;import
com.citizenweb.interfaces.PostIF;import
com.citizenweb.interfaces.UserIF;import com.citizenweb.classes.Post;public
class Receiver implements MessageListener, Serializable {       private static
final long serialVersionUID = 1L;       private ActiveMQConnectionFactory 
factory
= null; private ActiveMQConnection connection = null;   private
ActiveMQSession session = null; private Topic destination = null;       private
MessageConsumer consumer = null;        UserIF userTopic = new User();  UserIF
userSubscriber = new User();    List listeMsg = new ArrayList();        public
Receiver(UserIF subscriber) {           this.userSubscriber = subscriber;       
}       public
void connect() {                try {                   factory = new
ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);               
        // TODO
Mécanisme de sécurité d'ActiveMQ à rétablir en production               
factory.setTrustAllPackages(true);                      connection = 
(ActiveMQConnection)
factory.createConnection();                     // ClientID :                   
//
https://qnalist.com/questions/2068823/create-durable-topic-subscriber           
connection.setClientID(userSubscriber.toString());                      
connection.start();             
session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);              } catch (JMSException e) {              
e.printStackTrace();            }       }       public void 
receiveMessage(UserIF topic) {              try {           
if (session == null) {                          connect();                      
}                       destination =
session.createTopic(topic.toString());                  String nomAbonnement =
topic.toString() + "->" + userSubscriber.toString();                    //String
nomAbonnement = userSubscriber.toString();                      consumer =
session.createDurableSubscriber(destination, nomAbonnement);            
consumer.setMessageListener(this);              } catch (JMSException e) {      
        
e.printStackTrace();            }       }       public void unsubscribe(UserIF 
topic) {         try {           
System.out.println("\n RECEIVER Désinscription du topic " +
topic.toString());                      //consumer.close();                     
String nomAbonnement =
topic.toString() + "->" + userSubscriber.toString();                    //String
nomAbonnement = userSubscriber.toString();                      
System.out.println("\n RECEIVER
Abonnement à clore = " + nomAbonnement);                
session.unsubscribe(nomAbonnement);                     System.out.println("\n 
RECEIVER " +
userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement);           
}
catch (JMSException e) {                        e.printStackTrace();            
}       }       @Override       public void
onMessage(Message message) {            System.out.println("\n RECEIVER 
OnMessage
triggered for " + userSubscriber.toString());           listeMsg.add(message);  
System.out.println("\n RECEIVER Nombre de messages reçus par " +
userSubscriber + " = " + listeMsg.size());              String classe =
message.getClass().getSimpleName();             System.out.println("\n RECEIVER 
Classe
de message : " + classe);               try {                   if (message 
instanceof TextMessage) {                   
TextMessage text = (TextMessage) message;                               
System.out.println("\n RECEIVER
Information : " + text.getText());                      }                       
if (message instanceof
ObjectMessage) {                                System.out.println("\n RECEIVER 
ObjectMessage");                        
ObjectMessage oMessage = (ObjectMessage) message;                               
if
(oMessage.getObject() instanceof PostIF) {                                      
PostIF post = (PostIF)
oMessage.getObject();                                   String s = ((Post) 
post).getCorpsMessage();                             
System.out.println("\n RECEIVER Post : " + s);                          }       
                }               } catch
(JMSException e) {                      e.printStackTrace();            }       
}               public static void
main(String[] args) throws JMSException {               /*               * EACH 
USER IS A TOPIC FOR
OTHER USERS              * WHATEVER A USER DOES RESULTS IN A NOTIFICATION TO
SUBSCRIBERS             */                              //CREATE USER           
UserIF u1, u2, u3;              String[] nom = new
String[5];              String[] prenom = new String[5];                
String[] login = new
String[5];              String[] password = new String[5];              Date[] 
naiss = new Date[5];     
String[] mail = new String[5];          for (int i = 0; i < 5; i++) {           
        nom[i] =
"nom_" + i;                     prenom[i] = "prenom_" + i;                      
login[i] = "login_" + i;                
password[i] = "password_" + i;                  naiss[i] = new Date();          
        mail[i] = "mail_"
+ i;            }               u1 = new User(nom[0], prenom[0], login[0], 
password[0], naiss[0],
mail[0]);               u2 = new User(nom[1], prenom[1], login[1], password[1], 
naiss[1],
mail[1]);               u3 = new User(nom[2], prenom[2], login[2], password[2], 
naiss[2],
mail[2]);                               /*               * MAKE EACH USER A 
SUBSCRIBER           */             Receiver receiver1 =
new Receiver(u1);               Receiver receiver2 = new Receiver(u2);          
Receiver
receiver3 = new Receiver(u3);           /*               * PUT A MESSAGE 
LISTENER FOR EACH USER         
*/              receiver1.receiveMessage(u2);           
receiver1.receiveMessage(u3);   
receiver2.receiveMessage(u1);           receiver2.receiveMessage(u3);   
receiver3.receiveMessage(u1);           receiver3.receiveMessage(u2);           
                /*               * CALL
THE SENDER CLASS TO SEND MESSAGES                */             try {           
        Sender.main(args);              } catch
(Exception e1) {                        e1.printStackTrace();           }       
                        /*               * A SLEEP TO HAVE ENOUGH
TIME TO LOOK AT THE ACTIVEMQ CONSOLE             * CAN BE REMOVE                
 */             try {           
Thread.sleep(10000);            } catch (InterruptedException e) {              
        // TODO
Auto-generated catch block                      e.printStackTrace();            
        return;         }               /*               *
UNSUBSCRIBE SUBSCRIBERS FROM TOPICS              */             
receiver1.unsubscribe(u2);      
receiver1.unsubscribe(u3);              receiver2.unsubscribe(u1);      
receiver2.unsubscribe(u3);              receiver3.unsubscribe(u1);      
receiver3.unsubscribe(u2);      }}



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Unsubscribing-DurableSubscribers-tp4716592.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to