Hello there,I'm trying to *UNSUBSCRIBE durable subscribers from TOPICS*.My app is a kind of social network : each user is a topic for other users. So, each time a user is doing something, his friends are notified.Of course, a subscriber may unsubscribe from a topic, wanting to receive notifications about a user no more.Each time I'm trying to unsubscribe, I've got an error telling me that : "javax.jms.JMSException: Durable consumer is in use"Here are my 2 classes, the SENDER one and the RECEIVER one.Can someone tell me what I'm doing wrong ??I'm just using basic features of ActiveMQ...Thanx.*SENDER :*package com.citizenweb.classes;import java.util.Date;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageFormatException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.ObjectMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.ActiveMQSession;import com.citizenweb.interfaces.PostIF;import com.citizenweb.interfaces.UserIF;public class Sender { private ActiveMQConnectionFactory factory = null; private ActiveMQConnection connection = null; private ActiveMQSession session = null; private Destination destination = null; private MessageProducer producer = null; public Sender() { } public void connect(){ try{ factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production factory.setTrustAllPackages(true); connection = (ActiveMQConnection) factory.createConnection(); connection.start(); session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e){ e.printStackTrace(); } } public void sendPost(UserIF user,PostIF post) { if(session==null){connect();} try { destination = session.createTopic(user.toString()); producer = session.createProducer(destination); ObjectMessage postMessage = session.createObjectMessage(); postMessage.setObject(post); producer.send(postMessage); System.out.println("\n SENDER Object message sent"); /*QueueBrowser qb = session.createBrowser((Queue) destination); Enumeration<?> msgs = qb.getEnumeration(); if ( !msgs.hasMoreElements() ) { System.out.println("No messages in queue"); } else { while (msgs.hasMoreElements()) { Message tempMsg = (Message)msgs.nextElement(); System.out.println("Message: " + tempMsg); } }*/ } catch (MessageFormatException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } public void sendInformation(UserIF user,String info){ if(session==null){connect();} try { destination = session.createTopic(user.toString()); producer = session.createProducer(destination); TextMessage infoMessage = session.createTextMessage(); infoMessage.setText(info); producer.send(infoMessage); System.out.println("\n SENDER Information message sent"); } catch (JMSException e) { e.printStackTrace(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { UserIF u1, u2, u3; String[] nom = new String[5]; String[] prenom = new String[5]; String[] login = new String[5]; String[] password = new String[5]; Date[] naiss = new Date[5]; String[] mail = new String[5]; for (int i = 0; i < 5; i++) { nom[i] = "nom_" + i; prenom[i] = "prenom_" + i; login[i] = "login_" + i; password[i] = "password_" + i; naiss[i] = new Date(); mail[i] = "mail_" + i; } System.out.println("\n SENDER AFFECTATION DES NOMS"); u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]); u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]); u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]); Sender sender = new Sender(); sender.sendInformation(u1, "U1 notification"); sender.sendInformation(u2, "U2 notification"); sender.sendInformation(u3, "U3 notification"); //PostIF post = new Post("Mon Post","Ceci est mon message",u1,u1,"Classe Sender",((User) u1).getIdUser(),0); //sender.sendPost(user, post); sender.session.close(); sender.connection.close(); }}*RECEIVER :*package com.citizenweb.classes;import java.io.Serializable;import java.util.ArrayList;import java.util.Date;import java.util.List;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.ObjectMessage;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.ActiveMQSession;import org.apache.activemq.broker.region.Destination;import com.citizenweb.interfaces.PostIF;import com.citizenweb.interfaces.UserIF;import com.citizenweb.classes.Post;public class Receiver implements MessageListener, Serializable { private static final long serialVersionUID = 1L; private ActiveMQConnectionFactory factory = null; private ActiveMQConnection connection = null; private ActiveMQSession session = null; private Topic destination = null; private MessageConsumer consumer = null; UserIF userTopic = new User(); UserIF userSubscriber = new User(); List listeMsg = new ArrayList(); public Receiver(UserIF subscriber) { this.userSubscriber = subscriber; } public void connect() { try { factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en production factory.setTrustAllPackages(true); connection = (ActiveMQConnection) factory.createConnection(); // ClientID : // https://qnalist.com/questions/2068823/create-durable-topic-subscriber connection.setClientID(userSubscriber.toString()); connection.start(); session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); } } public void receiveMessage(UserIF topic) { try { if (session == null) { connect(); } destination = session.createTopic(topic.toString()); String nomAbonnement = topic.toString() + "->" + userSubscriber.toString(); //String nomAbonnement = userSubscriber.toString(); consumer = session.createDurableSubscriber(destination, nomAbonnement); consumer.setMessageListener(this); } catch (JMSException e) { e.printStackTrace(); } } public void unsubscribe(UserIF topic) { try { System.out.println("\n RECEIVER Désinscription du topic " + topic.toString()); //consumer.close(); String nomAbonnement = topic.toString() + "->" + userSubscriber.toString(); //String nomAbonnement = userSubscriber.toString(); System.out.println("\n RECEIVER Abonnement à clore = " + nomAbonnement); session.unsubscribe(nomAbonnement); System.out.println("\n RECEIVER " + userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement); } catch (JMSException e) { e.printStackTrace(); } } @Override public void onMessage(Message message) { System.out.println("\n RECEIVER OnMessage triggered for " + userSubscriber.toString()); listeMsg.add(message); System.out.println("\n RECEIVER Nombre de messages reçus par " + userSubscriber + " = " + listeMsg.size()); String classe = message.getClass().getSimpleName(); System.out.println("\n RECEIVER Classe de message : " + classe); try { if (message instanceof TextMessage) { TextMessage text = (TextMessage) message; System.out.println("\n RECEIVER Information : " + text.getText()); } if (message instanceof ObjectMessage) { System.out.println("\n RECEIVER ObjectMessage"); ObjectMessage oMessage = (ObjectMessage) message; if (oMessage.getObject() instanceof PostIF) { PostIF post = (PostIF) oMessage.getObject(); String s = ((Post) post).getCorpsMessage(); System.out.println("\n RECEIVER Post : " + s); } } } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) throws JMSException { /* * EACH USER IS A TOPIC FOR OTHER USERS * WHATEVER A USER DOES RESULTS IN A NOTIFICATION TO SUBSCRIBERS */ //CREATE USER UserIF u1, u2, u3; String[] nom = new String[5]; String[] prenom = new String[5]; String[] login = new String[5]; String[] password = new String[5]; Date[] naiss = new Date[5]; String[] mail = new String[5]; for (int i = 0; i < 5; i++) { nom[i] = "nom_" + i; prenom[i] = "prenom_" + i; login[i] = "login_" + i; password[i] = "password_" + i; naiss[i] = new Date(); mail[i] = "mail_" + i; } u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]); u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]); u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]); /* * MAKE EACH USER A SUBSCRIBER */ Receiver receiver1 = new Receiver(u1); Receiver receiver2 = new Receiver(u2); Receiver receiver3 = new Receiver(u3); /* * PUT A MESSAGE LISTENER FOR EACH USER */ receiver1.receiveMessage(u2); receiver1.receiveMessage(u3); receiver2.receiveMessage(u1); receiver2.receiveMessage(u3); receiver3.receiveMessage(u1); receiver3.receiveMessage(u2); /* * CALL THE SENDER CLASS TO SEND MESSAGES */ try { Sender.main(args); } catch (Exception e1) { e1.printStackTrace(); } /* * A SLEEP TO HAVE ENOUGH TIME TO LOOK AT THE ACTIVEMQ CONSOLE * CAN BE REMOVE */ try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); return; } /* * UNSUBSCRIBE SUBSCRIBERS FROM TOPICS */ receiver1.unsubscribe(u2); receiver1.unsubscribe(u3); receiver2.unsubscribe(u1); receiver2.unsubscribe(u3); receiver3.unsubscribe(u1); receiver3.unsubscribe(u2); }}
-- View this message in context: http://activemq.2283324.n4.nabble.com/Unsubscribing-DurableSubscribers-tp4716592.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.