I think I am confusing things by describing the problem with words, so here is a plain ActiveMQ implementation that reproduces the problem. If I run this, I send one message and receive four instead of the expected two.
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.network.NetworkConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; import java.lang.IllegalStateException; public class DuplicationPlainAMQ { public static final Logger LOG = LoggerFactory.getLogger(DuplicationPlainAMQ.class); public static final String HOST = "localhost"; public static final int PORT = 61616; public static final String TOPIC = "DTT"; public static void main(String[] args) throws Exception { try(Hub b = new Hub(); Spoke a = new Spoke("A"); Spoke c = new Spoke("C"); Spoke d = new Spoke("D")) { c.registerListener(); d.registerListener(); Thread.sleep(1000); // wait for broker and consumer info to propagate a.produce(); Thread.sleep(1000); // wait for messages to arrive } } private static class Spoke implements AutoCloseable, ExceptionListener { private final BrokerService embeddedBroker; private final String name; private final NetworkConnector networkConnector; private final Connection connection; private final Session session; private final Destination destination; private MessageConsumer consumer = null; private Spoke(String name) throws Exception { this.name = name; embeddedBroker = new BrokerService(); embeddedBroker.setBrokerName(name); embeddedBroker.start(); if(!embeddedBroker.waitUntilStarted()) { throw new IllegalStateException("Broker did not start"); } networkConnector = embeddedBroker .addNetworkConnector("static:(tcp://"+HOST+":"+PORT+")"); networkConnector.setName("connector"+name); networkConnector.setDuplex(true); networkConnector.setNetworkTTL(3); networkConnector.start(); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://"+name); connection = connectionFactory.createConnection(); connection.start(); connection.setExceptionListener(this); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic(TOPIC); } private void produce() throws JMSException { MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message = session.createTextMessage("test"); producer.send(message); } private void registerListener() throws JMSException { consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { LOG.info(name+": message received with id "+message.getJMSMessageID()); } catch (JMSException e) { LOG.error(e.getLocalizedMessage(), e); } } }); } @Override public void close() throws Exception { if(consumer != null) { consumer.close(); } networkConnector.stop(); embeddedBroker.stop(); } @Override public void onException(JMSException exception) { LOG.error(exception.getLocalizedMessage(), exception); } } private static class Hub implements AutoCloseable { private final BrokerService broker; private final TransportConnector transportConnector; private Hub() throws Exception { broker = new BrokerService(); broker.setBrokerName("B"); broker.start(); if(!broker.waitUntilStarted()) { throw new IllegalStateException("Broker did not start"); } transportConnector = broker .addConnector("tcp://"+HOST+":"+PORT); transportConnector.setName("transport"); transportConnector.setBrokerService(broker); transportConnector.start(); } @Override public void close() throws Exception { transportConnector.stop(); broker.stop(); } } } -- View this message in context: http://activemq.2283324.n4.nabble.com/Duplicate-messages-received-with-ActiveMQ-5-13-2-tp4728627p4729100.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.