I think I am confusing things by describing the problem with words, so here
is a plain ActiveMQ implementation that reproduces the problem. If I run
this, I send one message and receive four instead of the expected two.

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.lang.IllegalStateException;

public class DuplicationPlainAMQ {

    public static final Logger LOG =
LoggerFactory.getLogger(DuplicationPlainAMQ.class);

    public static final String HOST = "localhost";
    public static final int PORT = 61616;
    public static final String TOPIC = "DTT";


    public static void main(String[] args) throws Exception {
        try(Hub b = new Hub();
            Spoke a = new Spoke("A");
            Spoke c = new Spoke("C");
            Spoke d = new Spoke("D")) {
            c.registerListener();
            d.registerListener();
            Thread.sleep(1000); // wait for broker and consumer info to
propagate
            a.produce();
            Thread.sleep(1000); // wait for messages to arrive
        }
    }

    private static class Spoke implements AutoCloseable, ExceptionListener {
        private final BrokerService embeddedBroker;
        private final String name;
        private final NetworkConnector networkConnector;
        private final Connection connection;
        private final Session session;
        private final Destination destination;
        private MessageConsumer consumer = null;

        private Spoke(String name) throws Exception {
            this.name = name;
            embeddedBroker = new BrokerService();
            embeddedBroker.setBrokerName(name);
            embeddedBroker.start();
            if(!embeddedBroker.waitUntilStarted()) {
                throw new IllegalStateException("Broker did not start");
            }
            networkConnector = embeddedBroker
                   
.addNetworkConnector("static:(tcp://"+HOST+":"+PORT+")");
            networkConnector.setName("connector"+name);
            networkConnector.setDuplex(true);
            networkConnector.setNetworkTTL(3);
            networkConnector.start();
            ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://"+name);
            connection = connectionFactory.createConnection();
            connection.start();
            connection.setExceptionListener(this);
            session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic(TOPIC);
        }

        private void produce() throws JMSException {
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            TextMessage message = session.createTextMessage("test");
            producer.send(message);
        }

        private void registerListener() throws JMSException {
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        LOG.info(name+": message received with id
"+message.getJMSMessageID());
                    } catch (JMSException e) {
                        LOG.error(e.getLocalizedMessage(), e);
                    }
                }
            });
        }

        @Override
        public void close() throws Exception {
            if(consumer != null) {
                consumer.close();
            }
            networkConnector.stop();
            embeddedBroker.stop();
        }

        @Override
        public void onException(JMSException exception) {
            LOG.error(exception.getLocalizedMessage(), exception);
        }
    }

    private static class Hub implements AutoCloseable {
        private final BrokerService broker;
        private final TransportConnector transportConnector;

        private Hub() throws Exception {
            broker = new BrokerService();
            broker.setBrokerName("B");
            broker.start();
            if(!broker.waitUntilStarted()) {
                throw new IllegalStateException("Broker did not start");
            }
            transportConnector = broker
                    .addConnector("tcp://"+HOST+":"+PORT);
            transportConnector.setName("transport");
            transportConnector.setBrokerService(broker);
            transportConnector.start();
        }

        @Override
        public void close() throws Exception {
            transportConnector.stop();
            broker.stop();
        }
    }
}




--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Duplicate-messages-received-with-ActiveMQ-5-13-2-tp4728627p4729100.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to