[ 
https://issues.apache.org/jira/browse/AMQ-1509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13146959#comment-13146959
 ] 

Giovanni Toffetti commented on AMQ-1509:
----------------------------------------

Hi Dejan,

just by chance I started looking again at this issue. I don't think the problem 
is fixed: as soon as there are more than one ( at least 2 hops ) paths between 
brokers message duplication occurs.

Here's a little example:

{code:title=FourBrokerTopicNetworkTest}

public class FourBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport 
implements MessageListener {
        protected static final int MESSAGE_COUNT = 5;
        public boolean dynamicOnly;

        public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
                addCombinationValues("dynamicOnly", new Object[] { true, false 
});
        }

        /**
         * A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC 
BrokerB
         * <-> BrokerD BrokerD <-> BrokerC
         * 
         */
        public void testSquareConnectedBrokerNetwork2() throws Exception {
                int networkTTL = 2;
                boolean conduitSubs = true;
                boolean dynamicOnly = false;

                // Setup broker networks
                bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
                                conduitSubs);

                bridgeBrokers("BrokerD", "BrokerC", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerC", "BrokerD", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerD", "BrokerB", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerB", "BrokerD", dynamicOnly, networkTTL,
                                conduitSubs);

                startAllBrokers();

                // Setup destination
                Destination dest = createDestination("TEST.FOO", true);

                // Setup consumers
                MessageConsumer clientA = createConsumer("BrokerA", dest, 
"msgId > 0");
                MessageConsumer clientB = createConsumer("BrokerB", dest, 
"msgId > 0");
                MessageConsumer clientC = createConsumer("BrokerC", dest, 
"msgId > 0");
                MessageConsumer clientD = createConsumer("BrokerD", dest, 
"msgId > 0");
                // let consumers propogate around the network
                Thread.sleep(5000);
                
                clientD.setMessageListener(this);

                // Send messages
                String[] brokers = { "BrokerA", "BrokerB", "BrokerC", "BrokerD" 
};
                HashMap<String, Object> props = new HashMap<String, Object>();
                for (String broker : brokers) {
                        props.put("sender", broker);
                        for (int i = 1; i <= MESSAGE_COUNT; i++) {
                                props.put("msgId", i);
                                sendMessages(broker, dest, 1, props);
                        }
                }

                // Get message count
                MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
                MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
                MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
                MessageIdList msgsD = getConsumerMessages("BrokerD", clientD);

                msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4);
                msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4);
                msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4);
                msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4);
                
                System.out.println(msgsA.toString());

                assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount());
                assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount());
                assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount());
                assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount());
        }

        /**
         * A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC 
BrokerB
         * <-> BrokerD BrokerD <-> BrokerC
         * 
         */
        public void testSquareConnectedBrokerNetwork() throws Exception {
                int networkTTL = 2;
                boolean conduitSubs = true;
                boolean dynamicOnly = false;

                // Setup broker networks
                bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
                                conduitSubs);

                bridgeBrokers("BrokerD", "BrokerC", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerC", "BrokerD", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerD", "BrokerB", dynamicOnly, networkTTL,
                                conduitSubs);
                bridgeBrokers("BrokerB", "BrokerD", dynamicOnly, networkTTL,
                                conduitSubs);

                startAllBrokers();

                // Setup destination
                Destination dest = createDestination("TEST.FOO", true);

                // Setup consumers
                MessageConsumer clientA = createConsumer("BrokerA", dest);
                MessageConsumer clientB = createConsumer("BrokerB", dest);
                MessageConsumer clientC = createConsumer("BrokerC", dest);
                MessageConsumer clientD = createConsumer("BrokerD", dest);
                // let consumers propogate around the network
                Thread.sleep(5000);

                // Send messages
                sendMessages("BrokerA", dest, MESSAGE_COUNT);
                sendMessages("BrokerB", dest, MESSAGE_COUNT);
                sendMessages("BrokerC", dest, MESSAGE_COUNT);
                sendMessages("BrokerD", dest, MESSAGE_COUNT);

                // Get message count
                MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
                MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
                MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
                MessageIdList msgsD = getConsumerMessages("BrokerD", clientD);

                msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4);
                msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4);
                msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4);
                msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4);

                assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount());
                assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount());
                assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount());
                assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount());
        }

        public void setUp() throws Exception {
                super.setAutoFail(true);
                super.setUp();
                String options = new String("?persistent=false&useJmx=false");
                createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + 
options));
                createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + 
options));
                createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + 
options));
                createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + 
options));
        }

        public static Test suite() {
                return suite(FourBrokerTopicNetworkTest.class);
        }

        @Override
        public void onMessage(Message message) {
                try {
                        System.err.println(message.getStringProperty("sender") 
+ " msgID:" + message.getIntProperty("msgId") );
                } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
                
        }
}

{code}

I don't know if there's anything wrong with this test, or if I should use 
different configurations of TTL, conduit, and dynamicOnly. I tested it with the 
latest AMQ I could get (5.5.1).

As you can see delivered messages are more than 20, they are 25. The reason 
behind it can be seen in the testSquareConnectedBrokerNetwork2 method: clientD 
will print all messages coming from BrokerA twice as they are forwarded by both 
BrokerB and BrokerC on two different paths.
And of course this is a major problem whenever a broker network has multiple 
paths as message duplication becomes so severe that it basically kills the 
whole thing.

Please let me know if the test is correct as I'd like to have some more insight 
about why this is happening. Also my colleagues and I have some ideas about the 
correct way to fix it.

Regards,

g
                
> Duplicate topic messages received with network of brokers and selectors
> -----------------------------------------------------------------------
>
>                 Key: AMQ-1509
>                 URL: https://issues.apache.org/jira/browse/AMQ-1509
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Transport
>    Affects Versions: 4.1.1
>            Reporter: Howard Orner
>            Assignee: Rob Davies
>             Fix For: 5.3.0
>
>         Attachments: ActiveMQActor.java
>
>
> If you create a network of two brokers, A and B, one publisher publishing to 
> A, and n (where n is > 1) receivers with selectors, each receiver recieves n 
> messages for every 1 message sent.  The key here is to have a selector.   It 
> would appear that the conduitSubscriptions flag does not work when using 
> selectors.  The conduit does not properly reconcile consumers if they have 
> selectors.  A suggested soltuion would be that ather than process each 
> selector independantly, each selector should be or'ed together and if any 
> selector results in true then a single message should be sent to the other 
> broker.
> In doing research, it would appear that this problem was introduced with bug 
> fix AMQ-810.  Another user reported it via email back to the assignee of 
> AMQ-810 and a short dialog transpired.  See 
> http://www.mail-archive.com/[email protected]/msg05198.html. 
>  

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to